Refactor: the old design works badly in multiprocessing. Now the framework is migrated to a Client-Server pattern to compatible for multiprocessing usage, such as vLLM.
434 lines
16 KiB
Python
434 lines
16 KiB
Python
"""
|
|
Process-safe, asynchronous, batched, and schema-evolving Parquet logger.
|
|
|
|
This module provides the `DataLogger`, a high-performance logger for structured
|
|
data, redesigned with a client-server architecture to be fully compatible with
|
|
multi-process applications (e.g., using `multiprocessing` or libraries like vLLM).
|
|
|
|
Key Architectural Features:
|
|
- **Client-Server Model**: A single, dedicated server process manages all file I/O,
|
|
preventing race conditions and data loss from child processes.
|
|
- **Process-Safe**: All processes (main, children) act as clients, sending data
|
|
via a managed, shared queue, ensuring centralized and ordered logging.
|
|
- **Lazy Initialization & Automatic Management**: The server process is transparently
|
|
started on the first log call and is gracefully shut down on program exit via
|
|
`atexit`, requiring no manual management from the user.
|
|
- **Unified Interface**: Log data from any process via the simple and consistent
|
|
`DataLogger.log({"key": "value"})` call.
|
|
- **Asynchronous & Batched**: The server process handles I/O asynchronously from the
|
|
clients, batching rows to minimize disk writes and reduce application latency.
|
|
- **Schema Evolution**: Reuses the robust logic to automatically adapt the Parquet
|
|
schema if new data fields are introduced, rewriting the file to maintain a
|
|
consistent structure.
|
|
- **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch
|
|
tensors.
|
|
|
|
Basic Usage:
|
|
-------------
|
|
.. code-block:: python
|
|
|
|
from data_logger import DataLogger
|
|
import multiprocessing
|
|
|
|
# Optional: Configure the logger at the start of your application.
|
|
# If not called, defaults will be used on the first .log() call.
|
|
DataLogger.initialize("my_experiment.parquet")
|
|
|
|
def worker_function(worker_id):
|
|
for i in range(5):
|
|
# Log from a child process. It's that simple.
|
|
DataLogger.log({"worker_id": worker_id, "step": i, "value": i * 100})
|
|
time.sleep(0.1)
|
|
|
|
# All calls to DataLogger.log() from any process will be sent
|
|
# to the central logging server.
|
|
DataLogger.log({"main_process_event": "starting workers"})
|
|
|
|
processes = [multiprocessing.Process(target=worker_function, args=(i,)) for i in range(3)]
|
|
for p in processes:
|
|
p.start()
|
|
for p in processes:
|
|
p.join()
|
|
|
|
DataLogger.log({"main_process_event": "workers finished"})
|
|
|
|
# Data is automatically flushed and the server is shut down when the main
|
|
# program exits. For explicit control, you can call:
|
|
# DataLogger.close()
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import atexit
|
|
import datetime
|
|
import multiprocessing
|
|
import multiprocessing.synchronize
|
|
import os
|
|
import queue
|
|
import threading
|
|
import time
|
|
import traceback
|
|
import typing as t
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
# Third-party libraries are imported with runtime checks
|
|
try:
|
|
import numpy as np
|
|
except ImportError:
|
|
np = None
|
|
|
|
try:
|
|
import pandas as pd
|
|
except ImportError:
|
|
raise ImportError("pandas is required. Install with `pip install pandas`.")
|
|
|
|
try:
|
|
import pyarrow as pa
|
|
import pyarrow.parquet as pq
|
|
except ImportError:
|
|
raise ImportError("pyarrow is required. Install with `pip install pyarrow`.")
|
|
|
|
try:
|
|
import torch
|
|
except ImportError:
|
|
torch = None
|
|
|
|
# --- Type Definitions and Constants ---
|
|
Row = t.Dict[str, t.Any]
|
|
# Special command objects to be sent through the queue, distinct from data rows (dicts)
|
|
_FLUSH_COMMAND = "__FLUSH__"
|
|
_SHUTDOWN_COMMAND = "__SHUTDOWN__"
|
|
|
|
|
|
@dataclass
|
|
class LoggerConfig:
|
|
"""Configuration for the DataLogger's writer behavior."""
|
|
|
|
batch_size: int = 1024
|
|
flush_interval: float = 1.0
|
|
parquet_compression: str = "snappy"
|
|
allow_schema_rewrite: bool = True
|
|
|
|
|
|
class DataLoggerServer(multiprocessing.Process):
|
|
"""
|
|
The server process responsible for all file I/O operations.
|
|
|
|
This process runs a loop that consumes data from a shared queue, batches it,
|
|
and writes to a Parquet file. It is the single source of truth for the log file,
|
|
ensuring that writes are serialized and schema evolution is handled correctly.
|
|
It is designed to be managed by the `DataLogger` facade and not instantiated directly.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
log_queue: multiprocessing.Queue,
|
|
path: Path,
|
|
config: LoggerConfig,
|
|
flush_event: multiprocessing.synchronize.Event,
|
|
):
|
|
super().__init__(daemon=True, name=f"DataLoggerServer-{path.name}")
|
|
self._queue = log_queue
|
|
self.path = path
|
|
self._config = config
|
|
self._flush_event = flush_event
|
|
self._buffer: t.List[Row] = []
|
|
|
|
def run(self) -> None:
|
|
"""The main loop of the server process."""
|
|
try:
|
|
should_stop = False
|
|
while not should_stop:
|
|
try:
|
|
# Block until an item is available or the flush interval times out.
|
|
item = self._queue.get(timeout=self._config.flush_interval)
|
|
|
|
if isinstance(item, dict):
|
|
self._buffer.append(self._normalize_row(item))
|
|
elif item == _FLUSH_COMMAND:
|
|
self._write_buffer_if_needed(force=True)
|
|
self._flush_event.set() # Signal completion
|
|
elif item == _SHUTDOWN_COMMAND:
|
|
should_stop = True
|
|
self._drain_and_write()
|
|
|
|
except queue.Empty:
|
|
# Timeout occurred, treat as a periodic flush signal.
|
|
self._write_buffer_if_needed(force=False)
|
|
|
|
# Final write upon graceful shutdown
|
|
self._write_buffer_if_needed(force=True)
|
|
|
|
except Exception as e:
|
|
print(f"FATAL: DataLogger server process crashed: {e}", flush=True)
|
|
traceback.print_exc()
|
|
|
|
def _write_buffer_if_needed(self, force: bool = False) -> None:
|
|
"""
|
|
Determines if the buffer should be written to disk and triggers the write.
|
|
|
|
Args:
|
|
force: If True, writes the buffer regardless of size.
|
|
Used for flush commands, timeouts, and shutdown.
|
|
"""
|
|
buffer_size = len(self._buffer)
|
|
is_batch_full = buffer_size >= self._config.batch_size
|
|
|
|
if self._buffer and (force or is_batch_full):
|
|
self._write_batch(self._buffer)
|
|
self._buffer.clear()
|
|
|
|
def _drain_and_write(self) -> None:
|
|
"""Process all remaining items in the queue and buffer during shutdown."""
|
|
while True:
|
|
try:
|
|
item = self._queue.get_nowait()
|
|
if isinstance(item, dict):
|
|
self._buffer.append(self._normalize_row(item))
|
|
except queue.Empty:
|
|
break
|
|
self._write_buffer_if_needed(force=True)
|
|
|
|
def _write_batch(self, rows: t.List[Row]) -> None:
|
|
"""
|
|
Converts rows to a Parquet table and writes it, handling schema evolution.
|
|
|
|
This method encapsulates the core file I/O logic. It reads the existing
|
|
file, merges data, and atomically overwrites it. This strategy is robust
|
|
for schema changes, although it has performance implications for very
|
|
large files. This logic is preserved from the original implementation.
|
|
"""
|
|
if not rows:
|
|
return
|
|
|
|
try:
|
|
df = pd.DataFrame(rows)
|
|
# Ensure a consistent column order for schema stability.
|
|
df = df.reindex(sorted(df.columns), axis=1)
|
|
new_table = pa.Table.from_pandas(df, preserve_index=False)
|
|
|
|
combined_table: pa.Table
|
|
if self.path.exists():
|
|
existing_table = pq.read_table(self.path)
|
|
if existing_table.schema.equals(new_table.schema):
|
|
combined_table = pa.concat_tables([existing_table, new_table])
|
|
else:
|
|
# Schema evolution is needed
|
|
if not self._config.allow_schema_rewrite:
|
|
raise RuntimeError(
|
|
"Schema mismatch detected, and rewriting is disabled."
|
|
)
|
|
print(
|
|
f"INFO: Schema evolution detected. Rewriting {self.path}...",
|
|
flush=True,
|
|
)
|
|
combined_df = pd.concat(
|
|
[existing_table.to_pandas(), new_table.to_pandas()],
|
|
ignore_index=True,
|
|
sort=False,
|
|
)
|
|
combined_df = combined_df.reindex(
|
|
sorted(combined_df.columns), axis=1
|
|
)
|
|
combined_table = pa.Table.from_pandas(
|
|
combined_df, preserve_index=False
|
|
)
|
|
else:
|
|
# This is a new file
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
combined_table = new_table
|
|
|
|
# Write atomically by using a temporary file
|
|
temp_path = self.path.with_suffix(f"{self.path.suffix}.tmp")
|
|
pq.write_table(
|
|
combined_table,
|
|
temp_path,
|
|
compression=self._config.parquet_compression,
|
|
)
|
|
os.replace(temp_path, self.path)
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to write batch to {self.path}: {e}", flush=True)
|
|
traceback.print_exc()
|
|
|
|
def _normalize_row(self, row: Row) -> Row:
|
|
"""Sanitize all values in a row for Parquet compatibility."""
|
|
return {key: self._normalize_value(value) for key, value in row.items()}
|
|
|
|
def _normalize_value(self, value: t.Any) -> t.Any:
|
|
"""Convert a single value to a Parquet-friendly format."""
|
|
if value is None:
|
|
return None
|
|
if np and isinstance(value, np.ndarray):
|
|
return value.tolist()
|
|
if torch and isinstance(value, torch.Tensor):
|
|
return value.detach().cpu().numpy().tolist()
|
|
return value
|
|
|
|
|
|
multiprocessing.Event()
|
|
|
|
|
|
class DataLogger:
|
|
"""
|
|
A process-safe facade for logging structured data to a Parquet file.
|
|
|
|
This class manages a singleton server process in the background and provides
|
|
a simple, unified API (`.log()`) for all processes in an application.
|
|
"""
|
|
|
|
_manager: t.ClassVar[t.Optional[multiprocessing.managers.SyncManager]] = None
|
|
_log_queue: t.ClassVar[t.Optional[multiprocessing.Queue]] = None
|
|
_server_process: t.ClassVar[t.Optional[DataLoggerServer]] = None
|
|
_flush_event: t.ClassVar[t.Optional[multiprocessing.synchronize.Event]] = None
|
|
_lock = threading.Lock() # Use a thread-lock for initializing class-level resources
|
|
|
|
@classmethod
|
|
def initialize(
|
|
cls,
|
|
path: t.Optional[t.Union[str, Path]] = None,
|
|
config: t.Optional[LoggerConfig] = None,
|
|
) -> None:
|
|
"""
|
|
Explicitly initialize and start the logging server.
|
|
|
|
This is optional. If not called, the server will be started automatically
|
|
with default settings upon the first call to `log()`.
|
|
|
|
Args:
|
|
path: Path to the log file. If None, a timestamped name is generated.
|
|
config: Configuration for the logger's behavior.
|
|
"""
|
|
with cls._lock:
|
|
if cls._server_process is not None:
|
|
print(
|
|
"WARNING: DataLogger already initialized. Ignoring subsequent call.",
|
|
flush=True,
|
|
)
|
|
return
|
|
cls._start_server(path, config)
|
|
|
|
@classmethod
|
|
def log(cls, row: Row) -> None:
|
|
"""
|
|
Log a data row from any process.
|
|
|
|
The first call to this method will automatically start the background
|
|
logging server if it hasn't been started already. The operation is
|
|
non-blocking; the data is placed in a process-safe queue.
|
|
|
|
Args:
|
|
row: A dictionary representing a single row of data.
|
|
|
|
Raises:
|
|
TypeError: If the provided row is not a dictionary.
|
|
"""
|
|
if not isinstance(row, dict):
|
|
raise TypeError(f"Expected a dict for a row, but got {type(row)}.")
|
|
|
|
with cls._lock:
|
|
if cls._server_process is None:
|
|
# Lazy initialization with default parameters
|
|
cls._start_server()
|
|
|
|
if cls._log_queue:
|
|
try:
|
|
cls._log_queue.put(row)
|
|
except Exception as e:
|
|
# Can happen if the manager process dies unexpectedly
|
|
print(f"ERROR: Failed to queue log message: {e}", flush=True)
|
|
|
|
@classmethod
|
|
def flush(cls, timeout: float = 10.0) -> None:
|
|
"""
|
|
Block until all currently queued data is written to disk.
|
|
|
|
Args:
|
|
timeout: Maximum time in seconds to wait for the flush to complete.
|
|
"""
|
|
if (
|
|
cls._server_process is None
|
|
or cls._log_queue is None
|
|
or cls._flush_event is None
|
|
):
|
|
return
|
|
|
|
cls._flush_event.clear()
|
|
cls._log_queue.put(_FLUSH_COMMAND)
|
|
cls._flush_event.wait(timeout)
|
|
|
|
@classmethod
|
|
def close(cls, timeout: float = 10.0) -> None:
|
|
"""
|
|
Flush all data and gracefully shut down the logging server.
|
|
|
|
This is automatically registered with `atexit` and usually does not
|
|
need to be called manually.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait for the server process to join.
|
|
"""
|
|
with cls._lock:
|
|
if cls._server_process is None or not cls._server_process.is_alive():
|
|
return
|
|
|
|
print("INFO: Shutting down DataLogger server...", flush=True)
|
|
if cls._log_queue:
|
|
cls._log_queue.put(_SHUTDOWN_COMMAND)
|
|
|
|
cls._server_process.join(timeout)
|
|
if cls._server_process.is_alive():
|
|
print(
|
|
"WARNING: DataLogger server did not shut down cleanly. Terminating.",
|
|
flush=True,
|
|
)
|
|
cls._server_process.terminate()
|
|
|
|
if cls._manager:
|
|
cls._manager.shutdown()
|
|
|
|
cls._server_process = None
|
|
cls._log_queue = None
|
|
cls._manager = None
|
|
print("INFO: DataLogger shutdown complete.", flush=True)
|
|
|
|
@classmethod
|
|
def _start_server(
|
|
cls,
|
|
path: t.Optional[t.Union[str, Path]] = None,
|
|
config: t.Optional[LoggerConfig] = None,
|
|
) -> None:
|
|
"""Internal method to create and start the server process. Must be called within a lock."""
|
|
resolved_path = cls._resolve_path(path)
|
|
print(f"INFO: Starting DataLogger server for -> {resolved_path}", flush=True)
|
|
|
|
# A manager handles shared state between processes
|
|
cls._manager = multiprocessing.Manager()
|
|
cls._log_queue = cls._manager.Queue()
|
|
cls._flush_event = cls._manager.Event()
|
|
|
|
cls._server_process = DataLoggerServer(
|
|
log_queue=cls._log_queue,
|
|
path=resolved_path,
|
|
config=config or LoggerConfig(),
|
|
flush_event=cls._flush_event,
|
|
)
|
|
cls._server_process.start()
|
|
|
|
# Register the cleanup function to be called on program exit
|
|
atexit.register(cls.close)
|
|
|
|
@staticmethod
|
|
def _resolve_path(path: t.Optional[t.Union[str, Path]]) -> Path:
|
|
"""Determine the final output path for the log file."""
|
|
if path is None:
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
filename = f"log_{timestamp}.parquet"
|
|
return Path.cwd() / filename
|
|
|
|
resolved_path = Path(path)
|
|
if resolved_path.suffix == "":
|
|
resolved_path = resolved_path.with_suffix(".parquet")
|
|
return resolved_path
|