Files
DataLogger/DataLogger.py
Sunday eddb879680 Add automatical time logging.
Add: the server will be able to add a `_time` field to log row, by specify `log_server_time: bool = True` in LoggerConfig. To use exact client time instead of server time, make your time field when submit a log row.
2025-09-28 11:33:33 +08:00

442 lines
16 KiB
Python

"""
Process-safe, asynchronous, batched, and schema-evolving Parquet logger.
This module provides the `DataLogger`, a high-performance logger for structured
data, redesigned with a client-server architecture to be fully compatible with
multi-process applications (e.g., using `multiprocessing` or libraries like vLLM).
Key Architectural Features:
- **Client-Server Model**: A single, dedicated server process manages all file I/O,
preventing race conditions and data loss from child processes.
- **Process-Safe**: All processes (main, children) act as clients, sending data
via a managed, shared queue, ensuring centralized and ordered logging.
- **Lazy Initialization & Automatic Management**: The server process is transparently
started on the first log call and is gracefully shut down on program exit via
`atexit`, requiring no manual management from the user.
- **Automatic Server-Side Timestamping**: Optionally, the server can automatically
add a timestamp to each log record, indicating the exact time of its arrival.
- **Unified Interface**: Log data from any process via the simple and consistent
`DataLogger.log({"key": "value"})` call.
- **Asynchronous & Batched**: The server process handles I/O asynchronously from the
clients, batching rows to minimize disk writes and reduce application latency.
- **Schema Evolution**: Reuses the robust logic to automatically adapt the Parquet
schema if new data fields are introduced.
- **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch
tensors.
Basic Usage:
-------------
.. code-block:: python
from data_logger import DataLogger
import multiprocessing
# Optional: Configure the logger at the start of your application.
# If not called, defaults will be used on the first .log() call.
DataLogger.initialize("my_experiment.parquet")
def worker_function(worker_id):
for i in range(5):
# Log from a child process. It's that simple.
DataLogger.log({"worker_id": worker_id, "step": i, "value": i * 100})
time.sleep(0.1)
# All calls to DataLogger.log() from any process will be sent
# to the central logging server.
DataLogger.log({"main_process_event": "starting workers"})
processes = [multiprocessing.Process(target=worker_function, args=(i,)) for i in range(3)]
for p in processes:
p.start()
for p in processes:
p.join()
DataLogger.log({"main_process_event": "workers finished"})
# Data is automatically flushed and the server is shut down when the main
# program exits. For explicit control, you can call:
# DataLogger.close()
"""
from __future__ import annotations
import atexit
import datetime
import multiprocessing
import multiprocessing.synchronize
import os
import queue
import threading
import time
import traceback
import typing as t
from dataclasses import dataclass
from pathlib import Path
# Third-party libraries are imported with runtime checks
try:
import numpy as np
except ImportError:
np = None
try:
import pandas as pd
except ImportError:
raise ImportError("pandas is required. Install with `pip install pandas`.")
try:
import pyarrow as pa
import pyarrow.parquet as pq
except ImportError:
raise ImportError("pyarrow is required. Install with `pip install pyarrow`.")
try:
import torch
except ImportError:
torch = None
# --- Type Definitions and Constants ---
Row = t.Dict[str, t.Any]
# Special command objects to be sent through the queue, distinct from data rows (dicts)
_FLUSH_COMMAND = "__FLUSH__"
_SHUTDOWN_COMMAND = "__SHUTDOWN__"
@dataclass
class LoggerConfig:
"""Configuration for the DataLogger's writer behavior."""
batch_size: int = 1024
flush_interval: float = 1.0
parquet_compression: str = "snappy"
allow_schema_rewrite: bool = True
log_server_time: bool = False
class DataLoggerServer(multiprocessing.Process):
"""
The server process responsible for all file I/O operations.
This process runs a loop that consumes data from a shared queue, batches it,
and writes to a Parquet file. It is the single source of truth for the log file,
ensuring that writes are serialized and schema evolution is handled correctly.
It is designed to be managed by the `DataLogger` facade and not instantiated directly.
"""
def __init__(
self,
log_queue: multiprocessing.Queue,
path: Path,
config: LoggerConfig,
flush_event: multiprocessing.synchronize.Event,
):
super().__init__(daemon=True, name=f"DataLoggerServer-{path.name}")
self._queue = log_queue
self.path = path
self._config = config
self._flush_event = flush_event
self._buffer: t.List[Row] = []
def run(self) -> None:
"""The main loop of the server process."""
try:
should_stop = False
while not should_stop:
try:
# Block until an item is available or the flush interval times out.
item = self._queue.get(timeout=self._config.flush_interval)
if isinstance(item, dict):
self._process_log_item(item)
elif item == _FLUSH_COMMAND:
self._write_buffer_if_needed(force=True)
self._flush_event.set() # Signal completion
elif item == _SHUTDOWN_COMMAND:
should_stop = True
self._drain_and_write()
except queue.Empty:
# Timeout occurred, treat as a periodic flush signal.
self._write_buffer_if_needed(force=False)
# Final write upon graceful shutdown
self._write_buffer_if_needed(force=True)
except Exception as e:
print(f"FATAL: DataLogger server process crashed: {e}", flush=True)
traceback.print_exc()
def _process_log_item(self, item: Row) -> None: # <<< NEW: Encapsulated logic
"""Processes a single log item by timestamping, normalizing, and buffering it."""
if self._config.log_server_time and "_time" not in item:
# Add the server-side timestamp when the item is dequeued.
item["_time"] = datetime.datetime.now()
normalized_row = self._normalize_row(item)
self._buffer.append(normalized_row)
def _write_buffer_if_needed(self, force: bool = False) -> None:
"""
Determines if the buffer should be written to disk and triggers the write.
Args:
force: If True, writes the buffer regardless of size.
Used for flush commands, timeouts, and shutdown.
"""
buffer_size = len(self._buffer)
is_batch_full = buffer_size >= self._config.batch_size
if self._buffer and (force or is_batch_full):
self._write_batch(self._buffer)
self._buffer.clear()
def _drain_and_write(self) -> None:
"""Process all remaining items in the queue and buffer during shutdown."""
while True:
try:
item = self._queue.get_nowait()
if isinstance(item, dict):
self._process_log_item(item)
except queue.Empty:
break
self._write_buffer_if_needed(force=True)
def _write_batch(self, rows: t.List[Row]) -> None:
"""
Converts rows to a Parquet table and writes it, handling schema evolution.
This method encapsulates the core file I/O logic. It reads the existing
file, merges data, and atomically overwrites it. This strategy is robust
for schema changes, although it has performance implications for very
large files. This logic is preserved from the original implementation.
"""
if not rows:
return
try:
df = pd.DataFrame(rows)
# Ensure a consistent column order for schema stability.
df = df.reindex(sorted(df.columns), axis=1)
new_table = pa.Table.from_pandas(df, preserve_index=False)
combined_table: pa.Table
if self.path.exists():
existing_table = pq.read_table(self.path)
if existing_table.schema.equals(new_table.schema):
combined_table = pa.concat_tables([existing_table, new_table])
else:
# Schema evolution is needed
if not self._config.allow_schema_rewrite:
raise RuntimeError(
"Schema mismatch detected, and rewriting is disabled."
)
print(
f"INFO: Schema evolution detected. Rewriting {self.path}...",
flush=True,
)
combined_df = pd.concat(
[existing_table.to_pandas(), new_table.to_pandas()],
ignore_index=True,
sort=False,
)
combined_df = combined_df.reindex(
sorted(combined_df.columns), axis=1
)
combined_table = pa.Table.from_pandas(
combined_df, preserve_index=False
)
else:
# This is a new file
self.path.parent.mkdir(parents=True, exist_ok=True)
combined_table = new_table
# Write atomically by using a temporary file
temp_path = self.path.with_suffix(f"{self.path.suffix}.tmp")
pq.write_table(
combined_table,
temp_path,
compression=self._config.parquet_compression,
)
os.replace(temp_path, self.path)
except Exception as e:
print(f"ERROR: Failed to write batch to {self.path}: {e}", flush=True)
traceback.print_exc()
def _normalize_row(self, row: Row) -> Row:
"""Sanitize all values in a row for Parquet compatibility."""
return {key: self._normalize_value(value) for key, value in row.items()}
def _normalize_value(self, value: t.Any) -> t.Any:
"""Convert a single value to a Parquet-friendly format."""
if value is None:
return None
if np and isinstance(value, np.ndarray):
return value.tolist()
if torch and isinstance(value, torch.Tensor):
return value.detach().cpu().numpy().tolist()
return value
class DataLogger:
"""
A process-safe facade for logging structured data to a Parquet file.
This class manages a singleton server process in the background and provides
a simple, unified API (`.log()`) for all processes in an application.
"""
_manager: t.ClassVar[t.Optional[multiprocessing.managers.SyncManager]] = None
_log_queue: t.ClassVar[t.Optional[multiprocessing.Queue]] = None
_server_process: t.ClassVar[t.Optional[DataLoggerServer]] = None
_flush_event: t.ClassVar[t.Optional[multiprocessing.synchronize.Event]] = None
_lock = threading.Lock() # Use a thread-lock for initializing class-level resources
@classmethod
def initialize(
cls,
path: t.Optional[t.Union[str, Path]] = None,
config: t.Optional[LoggerConfig] = None,
) -> None:
"""
Explicitly initialize and start the logging server.
This is optional. If not called, the server will be started automatically
with default settings upon the first call to `log()`.
Args:
path: Path to the log file. If None, a timestamped name is generated.
config: Configuration for the logger's behavior.
"""
with cls._lock:
if cls._server_process is not None:
print(
"WARNING: DataLogger already initialized. Ignoring subsequent call.",
flush=True,
)
return
cls._start_server(path, config)
@classmethod
def log(cls, row: Row) -> None:
"""
Log a data row from any process.
The first call to this method will automatically start the background
logging server if it hasn't been started already. The operation is
non-blocking; the data is placed in a process-safe queue.
Args:
row: A dictionary representing a single row of data.
Raises:
TypeError: If the provided row is not a dictionary.
"""
if not isinstance(row, dict):
raise TypeError(f"Expected a dict for a row, but got {type(row)}.")
with cls._lock:
if cls._server_process is None:
# Lazy initialization with default parameters
cls._start_server()
if cls._log_queue:
try:
cls._log_queue.put(row)
except Exception as e:
# Can happen if the manager process dies unexpectedly
print(f"ERROR: Failed to queue log message: {e}", flush=True)
@classmethod
def flush(cls, timeout: float = 10.0) -> None:
"""
Block until all currently queued data is written to disk.
Args:
timeout: Maximum time in seconds to wait for the flush to complete.
"""
if (
cls._server_process is None
or cls._log_queue is None
or cls._flush_event is None
):
return
cls._flush_event.clear()
cls._log_queue.put(_FLUSH_COMMAND)
cls._flush_event.wait(timeout)
@classmethod
def close(cls, timeout: float = 10.0) -> None:
"""
Flush all data and gracefully shut down the logging server.
This is automatically registered with `atexit` and usually does not
need to be called manually.
Args:
timeout: Maximum time to wait for the server process to join.
"""
with cls._lock:
if cls._server_process is None or not cls._server_process.is_alive():
return
print("INFO: Shutting down DataLogger server...", flush=True)
if cls._log_queue:
cls._log_queue.put(_SHUTDOWN_COMMAND)
cls._server_process.join(timeout)
if cls._server_process.is_alive():
print(
"WARNING: DataLogger server did not shut down cleanly. Terminating.",
flush=True,
)
cls._server_process.terminate()
if cls._manager:
cls._manager.shutdown()
cls._server_process = None
cls._log_queue = None
cls._manager = None
print("INFO: DataLogger shutdown complete.", flush=True)
@classmethod
def _start_server(
cls,
path: t.Optional[t.Union[str, Path]] = None,
config: t.Optional[LoggerConfig] = None,
) -> None:
"""Internal method to create and start the server process. Must be called within a lock."""
resolved_path = cls._resolve_path(path)
print(f"INFO: Starting DataLogger server for -> {resolved_path}", flush=True)
# A manager handles shared state between processes
cls._manager = multiprocessing.Manager()
cls._log_queue = cls._manager.Queue()
cls._flush_event = cls._manager.Event()
cls._server_process = DataLoggerServer(
log_queue=cls._log_queue,
path=resolved_path,
config=config or LoggerConfig(),
flush_event=cls._flush_event,
)
cls._server_process.start()
# Register the cleanup function to be called on program exit
atexit.register(cls.close)
@staticmethod
def _resolve_path(path: t.Optional[t.Union[str, Path]]) -> Path:
"""Determine the final output path for the log file."""
if path is None:
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
filename = f"log_{timestamp}.parquet"
return Path.cwd() / filename
resolved_path = Path(path)
if resolved_path.suffix == "":
resolved_path = resolved_path.with_suffix(".parquet")
return resolved_path