diff --git a/DataLogger.py b/DataLogger.py new file mode 100644 index 0000000..ec434bd --- /dev/null +++ b/DataLogger.py @@ -0,0 +1,441 @@ +""" +Process-safe, asynchronous, batched, and schema-evolving Parquet logger. + +This module provides the `DataLogger`, a high-performance logger for structured +data, redesigned with a client-server architecture to be fully compatible with +multi-process applications (e.g., using `multiprocessing` or libraries like vLLM). + +Key Architectural Features: +- **Client-Server Model**: A single, dedicated server process manages all file I/O, + preventing race conditions and data loss from child processes. +- **Process-Safe**: All processes (main, children) act as clients, sending data + via a managed, shared queue, ensuring centralized and ordered logging. +- **Lazy Initialization & Automatic Management**: The server process is transparently + started on the first log call and is gracefully shut down on program exit via + `atexit`, requiring no manual management from the user. +- **Automatic Server-Side Timestamping**: Optionally, the server can automatically + add a timestamp to each log record, indicating the exact time of its arrival. +- **Unified Interface**: Log data from any process via the simple and consistent + `DataLogger.log({"key": "value"})` call. +- **Asynchronous & Batched**: The server process handles I/O asynchronously from the + clients, batching rows to minimize disk writes and reduce application latency. +- **Schema Evolution**: Reuses the robust logic to automatically adapt the Parquet + schema if new data fields are introduced. +- **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch + tensors. + +Basic Usage: +------------- +.. code-block:: python + + from data_logger import DataLogger + import multiprocessing + + # Optional: Configure the logger at the start of your application. + # If not called, defaults will be used on the first .log() call. + DataLogger.initialize("my_experiment.parquet") + + def worker_function(worker_id): + for i in range(5): + # Log from a child process. It's that simple. + DataLogger.log({"worker_id": worker_id, "step": i, "value": i * 100}) + time.sleep(0.1) + + # All calls to DataLogger.log() from any process will be sent + # to the central logging server. + DataLogger.log({"main_process_event": "starting workers"}) + + processes = [multiprocessing.Process(target=worker_function, args=(i,)) for i in range(3)] + for p in processes: + p.start() + for p in processes: + p.join() + + DataLogger.log({"main_process_event": "workers finished"}) + + # Data is automatically flushed and the server is shut down when the main + # program exits. For explicit control, you can call: + # DataLogger.close() +""" + +from __future__ import annotations + +import atexit +import datetime +import multiprocessing +import multiprocessing.synchronize +import os +import queue +import threading +import time +import traceback +import typing as t +from dataclasses import dataclass +from pathlib import Path + +# Third-party libraries are imported with runtime checks +try: + import numpy as np +except ImportError: + np = None + +try: + import pandas as pd +except ImportError: + raise ImportError("pandas is required. Install with `pip install pandas`.") + +try: + import pyarrow as pa + import pyarrow.parquet as pq +except ImportError: + raise ImportError("pyarrow is required. Install with `pip install pyarrow`.") + +try: + import torch +except ImportError: + torch = None + +# --- Type Definitions and Constants --- +Row = t.Dict[str, t.Any] +# Special command objects to be sent through the queue, distinct from data rows (dicts) +_FLUSH_COMMAND = "__FLUSH__" +_SHUTDOWN_COMMAND = "__SHUTDOWN__" + + +@dataclass +class LoggerConfig: + """Configuration for the DataLogger's writer behavior.""" + + batch_size: int = 1024 + flush_interval: float = 1.0 + parquet_compression: str = "snappy" + allow_schema_rewrite: bool = True + log_server_time: bool = False + + +class DataLoggerServer(multiprocessing.Process): + """ + The server process responsible for all file I/O operations. + + This process runs a loop that consumes data from a shared queue, batches it, + and writes to a Parquet file. It is the single source of truth for the log file, + ensuring that writes are serialized and schema evolution is handled correctly. + It is designed to be managed by the `DataLogger` facade and not instantiated directly. + """ + + def __init__( + self, + log_queue: multiprocessing.Queue, + path: Path, + config: LoggerConfig, + flush_event: multiprocessing.synchronize.Event, + ): + super().__init__(daemon=True, name=f"DataLoggerServer-{path.name}") + self._queue = log_queue + self.path = path + self._config = config + self._flush_event = flush_event + self._buffer: t.List[Row] = [] + + def run(self) -> None: + """The main loop of the server process.""" + try: + should_stop = False + while not should_stop: + try: + # Block until an item is available or the flush interval times out. + item = self._queue.get(timeout=self._config.flush_interval) + + if isinstance(item, dict): + self._process_log_item(item) + elif item == _FLUSH_COMMAND: + self._write_buffer_if_needed(force=True) + self._flush_event.set() # Signal completion + elif item == _SHUTDOWN_COMMAND: + should_stop = True + self._drain_and_write() + + except queue.Empty: + # Timeout occurred, treat as a periodic flush signal. + self._write_buffer_if_needed(force=False) + + # Final write upon graceful shutdown + self._write_buffer_if_needed(force=True) + + except Exception as e: + print(f"FATAL: DataLogger server process crashed: {e}", flush=True) + traceback.print_exc() + + def _process_log_item(self, item: Row) -> None: # <<< NEW: Encapsulated logic + """Processes a single log item by timestamping, normalizing, and buffering it.""" + if self._config.log_server_time and "_time" not in item: + # Add the server-side timestamp when the item is dequeued. + item["_time"] = datetime.datetime.now() + + normalized_row = self._normalize_row(item) + self._buffer.append(normalized_row) + + def _write_buffer_if_needed(self, force: bool = False) -> None: + """ + Determines if the buffer should be written to disk and triggers the write. + + Args: + force: If True, writes the buffer regardless of size. + Used for flush commands, timeouts, and shutdown. + """ + buffer_size = len(self._buffer) + is_batch_full = buffer_size >= self._config.batch_size + + if self._buffer and (force or is_batch_full): + self._write_batch(self._buffer) + self._buffer.clear() + + def _drain_and_write(self) -> None: + """Process all remaining items in the queue and buffer during shutdown.""" + while True: + try: + item = self._queue.get_nowait() + if isinstance(item, dict): + self._process_log_item(item) + except queue.Empty: + break + self._write_buffer_if_needed(force=True) + + def _write_batch(self, rows: t.List[Row]) -> None: + """ + Converts rows to a Parquet table and writes it, handling schema evolution. + + This method encapsulates the core file I/O logic. It reads the existing + file, merges data, and atomically overwrites it. This strategy is robust + for schema changes, although it has performance implications for very + large files. This logic is preserved from the original implementation. + """ + if not rows: + return + + try: + df = pd.DataFrame(rows) + # Ensure a consistent column order for schema stability. + df = df.reindex(sorted(df.columns), axis=1) + new_table = pa.Table.from_pandas(df, preserve_index=False) + + combined_table: pa.Table + if self.path.exists(): + existing_table = pq.read_table(self.path) + if existing_table.schema.equals(new_table.schema): + combined_table = pa.concat_tables([existing_table, new_table]) + else: + # Schema evolution is needed + if not self._config.allow_schema_rewrite: + raise RuntimeError( + "Schema mismatch detected, and rewriting is disabled." + ) + print( + f"INFO: Schema evolution detected. Rewriting {self.path}...", + flush=True, + ) + combined_df = pd.concat( + [existing_table.to_pandas(), new_table.to_pandas()], + ignore_index=True, + sort=False, + ) + combined_df = combined_df.reindex( + sorted(combined_df.columns), axis=1 + ) + combined_table = pa.Table.from_pandas( + combined_df, preserve_index=False + ) + else: + # This is a new file + self.path.parent.mkdir(parents=True, exist_ok=True) + combined_table = new_table + + # Write atomically by using a temporary file + temp_path = self.path.with_suffix(f"{self.path.suffix}.tmp") + pq.write_table( + combined_table, + temp_path, + compression=self._config.parquet_compression, + ) + os.replace(temp_path, self.path) + + except Exception as e: + print(f"ERROR: Failed to write batch to {self.path}: {e}", flush=True) + traceback.print_exc() + + def _normalize_row(self, row: Row) -> Row: + """Sanitize all values in a row for Parquet compatibility.""" + return {key: self._normalize_value(value) for key, value in row.items()} + + def _normalize_value(self, value: t.Any) -> t.Any: + """Convert a single value to a Parquet-friendly format.""" + if value is None: + return None + if np and isinstance(value, np.ndarray): + return value.tolist() + if torch and isinstance(value, torch.Tensor): + return value.detach().cpu().numpy().tolist() + return value + + +class DataLogger: + """ + A process-safe facade for logging structured data to a Parquet file. + + This class manages a singleton server process in the background and provides + a simple, unified API (`.log()`) for all processes in an application. + """ + + _manager: t.ClassVar[t.Optional[multiprocessing.managers.SyncManager]] = None + _log_queue: t.ClassVar[t.Optional[multiprocessing.Queue]] = None + _server_process: t.ClassVar[t.Optional[DataLoggerServer]] = None + _flush_event: t.ClassVar[t.Optional[multiprocessing.synchronize.Event]] = None + _lock = threading.Lock() # Use a thread-lock for initializing class-level resources + + @classmethod + def initialize( + cls, + path: t.Optional[t.Union[str, Path]] = None, + config: t.Optional[LoggerConfig] = None, + ) -> None: + """ + Explicitly initialize and start the logging server. + + This is optional. If not called, the server will be started automatically + with default settings upon the first call to `log()`. + + Args: + path: Path to the log file. If None, a timestamped name is generated. + config: Configuration for the logger's behavior. + """ + with cls._lock: + if cls._server_process is not None: + print( + "WARNING: DataLogger already initialized. Ignoring subsequent call.", + flush=True, + ) + return + cls._start_server(path, config) + + @classmethod + def log(cls, row: Row) -> None: + """ + Log a data row from any process. + + The first call to this method will automatically start the background + logging server if it hasn't been started already. The operation is + non-blocking; the data is placed in a process-safe queue. + + Args: + row: A dictionary representing a single row of data. + + Raises: + TypeError: If the provided row is not a dictionary. + """ + if not isinstance(row, dict): + raise TypeError(f"Expected a dict for a row, but got {type(row)}.") + + with cls._lock: + if cls._server_process is None: + # Lazy initialization with default parameters + cls._start_server() + + if cls._log_queue: + try: + cls._log_queue.put(row) + except Exception as e: + # Can happen if the manager process dies unexpectedly + print(f"ERROR: Failed to queue log message: {e}", flush=True) + + @classmethod + def flush(cls, timeout: float = 10.0) -> None: + """ + Block until all currently queued data is written to disk. + + Args: + timeout: Maximum time in seconds to wait for the flush to complete. + """ + if ( + cls._server_process is None + or cls._log_queue is None + or cls._flush_event is None + ): + return + + cls._flush_event.clear() + cls._log_queue.put(_FLUSH_COMMAND) + cls._flush_event.wait(timeout) + + @classmethod + def close(cls, timeout: float = 10.0) -> None: + """ + Flush all data and gracefully shut down the logging server. + + This is automatically registered with `atexit` and usually does not + need to be called manually. + + Args: + timeout: Maximum time to wait for the server process to join. + """ + with cls._lock: + if cls._server_process is None or not cls._server_process.is_alive(): + return + + print("INFO: Shutting down DataLogger server...", flush=True) + if cls._log_queue: + cls._log_queue.put(_SHUTDOWN_COMMAND) + + cls._server_process.join(timeout) + if cls._server_process.is_alive(): + print( + "WARNING: DataLogger server did not shut down cleanly. Terminating.", + flush=True, + ) + cls._server_process.terminate() + + if cls._manager: + cls._manager.shutdown() + + cls._server_process = None + cls._log_queue = None + cls._manager = None + print("INFO: DataLogger shutdown complete.", flush=True) + + @classmethod + def _start_server( + cls, + path: t.Optional[t.Union[str, Path]] = None, + config: t.Optional[LoggerConfig] = None, + ) -> None: + """Internal method to create and start the server process. Must be called within a lock.""" + resolved_path = cls._resolve_path(path) + print(f"INFO: Starting DataLogger server for -> {resolved_path}", flush=True) + + # A manager handles shared state between processes + cls._manager = multiprocessing.Manager() + cls._log_queue = cls._manager.Queue() + cls._flush_event = cls._manager.Event() + + cls._server_process = DataLoggerServer( + log_queue=cls._log_queue, + path=resolved_path, + config=config or LoggerConfig(), + flush_event=cls._flush_event, + ) + cls._server_process.start() + + # Register the cleanup function to be called on program exit + atexit.register(cls.close) + + @staticmethod + def _resolve_path(path: t.Optional[t.Union[str, Path]]) -> Path: + """Determine the final output path for the log file.""" + if path is None: + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + filename = f"log_{timestamp}.parquet" + return Path.cwd() / filename + + resolved_path = Path(path) + if resolved_path.suffix == "": + resolved_path = resolved_path.with_suffix(".parquet") + return resolved_path diff --git a/test_arrow.py b/test_arrow.py new file mode 100644 index 0000000..416cdb4 --- /dev/null +++ b/test_arrow.py @@ -0,0 +1,18 @@ +# %% +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq + +# %% +batch = pa.RecordBatch.from_pandas( + pd.DataFrame({"colA": [1, 2, 3, 4], "colB": ["AAA", "BB", "CCCC", "D"]}) +) +batch + +# %% +batch.to_pandas() + +# %% + +