From 8d1551f2f6c82f7fc40b6c9ae386bbc89d52ac8e Mon Sep 17 00:00:00 2001 From: Sunday Date: Sat, 27 Sep 2025 11:42:27 +0800 Subject: [PATCH] Add DataLogger main code. --- DataLogger.py | 493 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 493 insertions(+) create mode 100644 DataLogger.py diff --git a/DataLogger.py b/DataLogger.py new file mode 100644 index 0000000..53c2c79 --- /dev/null +++ b/DataLogger.py @@ -0,0 +1,493 @@ +""" +Asynchronous, batched, and schema-evolving Parquet logger. + +This module provides the `DataLogger`, a high-performance logger for structured +data, designed for applications like machine learning experiments, simulations, +or any scenario requiring efficient serialization of row-based data. + +Key Features: +- **Unified Interface**: Log data via a simple `DataLogger.log({"key": "value"})` call. +- **Asynchronous & Batched**: A dedicated background thread handles I/O, + batching rows to minimize disk writes and reduce application latency. +- **Schema Evolution**: Automatically adapts the Parquet schema if new data fields + are introduced, rewriting the file to maintain a consistent structure. +- **Singleton Pattern**: A global singleton instance is managed automatically, + providing a convenient, fire-and-forget logging experience. +- **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch + tensors, converting them to Parquet-compatible formats. +- **Robust & Thread-Safe**: Designed for use in multi-threaded environments. + +Basic Usage: +------------- +.. code-block:: python + + from logger.data_logger import DataLogger + + # The first call creates and configures the singleton logger. + # A timestamped filename is generated by default. + DataLogger.log({"step": 0, "loss": 10.5, "accuracy": 0.5}) + DataLogger.log({"step": 1, "loss": 9.8, "accuracy": 0.55}) + + # For the singleton, data is automatically flushed and saved on program exit. + # No explicit `close()` call is required for this simple case. + +Advanced Usage (Instance-Based): +--------------------------------- +.. code-block:: python + + from logger.data_logger import DataLogger, LoggerConfig + + config = LoggerConfig(batch_size=512, flush_interval=5.0) + with DataLogger("my_experiment.parquet", config=config) as logger: + for i in range(1000): + logger.submit({"value": i}) + # The `with` statement ensures flush and close on exit. +""" + +from __future__ import annotations + +import datetime +import atexit +import os +import queue +import threading +import time +import traceback +import typing as t +from dataclasses import dataclass +from pathlib import Path + +# Third-party libraries are imported with runtime checks to provide clear +# error messages if they are not installed. +try: + import numpy as np +except ImportError: + np = None # type: ignore + +try: + import pandas as pd +except ImportError: + raise ImportError( + "pandas is required for DataLogger. Install with `pip install pandas`." + ) + +try: + import pyarrow as pa + import pyarrow.parquet as pq +except ImportError: + raise ImportError( + "pyarrow is required for DataLogger. Install with `pip install pyarrow`." + ) + +try: + import torch +except ImportError: + torch = None # type: ignore + +# Type alias for a single row of data. +Row = t.Dict[str, t.Any] + + +@dataclass +class LoggerConfig: + """Configuration for the DataLogger's writer behavior.""" + + batch_size: int = 1024 + """Number of rows to accumulate before writing a batch to the Parquet file.""" + + flush_interval: float = 1.0 + """Maximum time in seconds to wait before flushing the buffer, even if + `batch_size` is not reached.""" + + parquet_compression: str = "snappy" + """Compression codec to use for the Parquet file. + Common options: 'snappy', 'gzip', 'brotli', 'none'.""" + + allow_schema_rewrite: bool = True + """If True, the logger will automatically rewrite the entire Parquet file to + accommodate new columns. If False, it will raise an error.""" + + +class DataLogger: + """ + An asynchronous, batched logger that writes data to a Parquet file. + + This class manages a background thread to handle file I/O, allowing the + calling application to log data with minimal blocking. It supports schema + evolution, making it robust to changes in data structure over time. + """ + + _singleton: t.Optional["DataLogger"] = None + _singleton_lock = threading.Lock() + + # --- Public API --- + + @classmethod + def get_instance( + cls, + path: t.Optional[t.Union[str, Path]] = None, + config: t.Optional[LoggerConfig] = None, + ) -> "DataLogger": + """ + Get or create the global singleton instance of the DataLogger. + + The first time this method is called, it creates a new `DataLogger` + instance and registers a cleanup function via `atexit` to ensure + `close()` is called automatically upon program termination. + + Subsequent calls will ignore the arguments and return the existing + instance. + + Args: + path: The file path for the log file. If None, a timestamped + filename like 'log_YYYYMMDD-HHMMSS.parquet' is created in the + current working directory. + config: A `LoggerConfig` object to configure the writer's behavior. + If None, default settings are used. + + Returns: + The singleton `DataLogger` instance. + """ + if cls._singleton is None: + with cls._singleton_lock: + if cls._singleton is None: + # Create the singleton instance. + instance = cls(path, config) + # Register its close method to be called at program exit. + # This ensures data is saved even if the user forgets to call close(). + atexit.register(instance.close) + cls._singleton = instance + return cls._singleton + + @classmethod + def log(cls, row: Row) -> None: + """ + Log a data row using the singleton instance. + + This is a convenience method that lazily initializes the singleton on + its first call. The operation is non-blocking; the data is placed in + an internal queue to be processed by the background writer thread. + + Args: + row: A dictionary representing a single row of data, where keys + are column names and values are the data points. + """ + instance = cls.get_instance() + instance.submit(row) + + def __init__( + self, + path: t.Optional[t.Union[str, Path]] = None, + config: t.Optional[LoggerConfig] = None, + ): + """ + Initialize a DataLogger instance. + + Args: + path: The file path for the log file. If None, a timestamped + filename is automatically generated. + config: A `LoggerConfig` object. If None, default settings are used. + """ + self.path = self._resolve_path(path) + self._config = config or LoggerConfig() + + # Internal state for the writer thread + self._queue: queue.Queue[t.Optional[Row]] = queue.Queue() + self._stop_event = threading.Event() + self._flush_event = threading.Event() + self._writer_thread: t.Optional[threading.Thread] = None + self._writer_lock = threading.RLock() # Protects writer and schema + + # Parquet-specific state, managed exclusively by the writer thread + self._parquet_writer: t.Optional[pq.ParquetWriter] = None + self._schema: t.Optional[pa.Schema] = None + self._buffer: t.List[Row] = [] + + self._start_writer_thread() + + def submit(self, row: Row) -> None: + """ + Submit a data row to be written asynchronously by the logger instance. + + Args: + row: A dictionary representing a single row of data. + + Raises: + TypeError: If the provided row is not a dictionary. + RuntimeError: If the logger has already been closed. + """ + if self._stop_event.is_set(): + raise RuntimeError("Logger has been closed and cannot accept new data.") + if not isinstance(row, dict): + raise TypeError(f"Expected a dict for a row, but got {type(row)}.") + + normalized_row = self._normalize_row(row) + self._queue.put(normalized_row) + + def flush(self, timeout: float = 10.0) -> None: + """ + Block until all currently queued and buffered data is written to disk. + + Args: + timeout: Maximum time in seconds to wait for the flush to complete. + """ + if self._writer_thread is None or not self._writer_thread.is_alive(): + return + + self._flush_event.clear() + self._queue.put(None) # Sentinel to trigger a flush + self._flush_event.wait(timeout) + + def close(self, timeout: float = 10.0) -> None: + """ + Flush all remaining data and shut down the background writer thread. + + This method is idempotent and thread-safe. It is designed to be + called explicitly, via a `with` statement, or automatically at program + exit. + + Args: + timeout: Maximum time in seconds to wait for the writer thread + to finish. + """ + if self._stop_event.is_set(): + return + + self._stop_event.set() + self._queue.put(None) # Wake up the writer thread if it's blocking. + + # Do not join the writer thread from itself, which would cause a deadlock. + if self._writer_thread and threading.current_thread() != self._writer_thread: + self._writer_thread.join(timeout) + + def __enter__(self) -> "DataLogger": + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Ensures the logger is closed upon exiting a `with` block.""" + self.close() + + def __del__(self): + """Ensures data is flushed when the logger object is destroyed.""" + self.close() + + # --- Internal Methods --- + + def _resolve_path(self, path: t.Optional[t.Union[str, Path]]) -> Path: + """Determine the final output path for the log file.""" + if path is None: + timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + filename = f"log_{timestamp}.parquet" + return Path.cwd() / filename + + resolved_path = Path(path) + if resolved_path.suffix == "": + resolved_path = resolved_path.with_suffix(".parquet") + return resolved_path + + def _start_writer_thread(self) -> None: + """Initialize and start the background writer thread.""" + if self._writer_thread is not None: + return + thread_name = f"DataLoggerWriter-{self.path.name}" + self._writer_thread = threading.Thread( + target=self._writer_loop, name=thread_name, daemon=True + ) + self._writer_thread.start() + + def _writer_loop(self) -> None: + """ + The main loop for the background writer thread. + + This loop continuously pulls data from the queue, batches it, and + writes it to the Parquet file. It handles flush signals, stop events, + and schema evolution. + """ + try: + while not self._stop_event.is_set(): + try: + # Block until an item is available or the flush interval times out. + item = self._queue.get(timeout=self._config.flush_interval) + except queue.Empty: + # Timeout occurred, treat as a periodic flush signal. + item = None + + if item is not None: + self._buffer.append(item) + + buffer_size = len(self._buffer) + is_flush_signal = item is None + is_batch_full = buffer_size >= self._config.batch_size + is_shutting_down = self._stop_event.is_set() + + if self._buffer and ( + is_flush_signal or is_batch_full or is_shutting_down + ): + self._write_batch(self._buffer) + self._buffer.clear() + + if is_flush_signal: + self._flush_event.set() # Signal that a flush completed + + # Final drain of the queue and buffer after the stop event is set. + self._drain_remaining() + + except Exception as e: + print(f"FATAL: DataLogger writer thread crashed: {e}", flush=True) + traceback.print_exc() + finally: + # This block ensures that the Parquet writer is always closed + # when the writer thread exits, for any reason. + with self._writer_lock: + if self._parquet_writer: + try: + self._parquet_writer.close() + except Exception as e: + print( + f"ERROR: Exception while closing Parquet writer: {e}", + flush=True, + ) + self._parquet_writer = None + + def _drain_remaining(self) -> None: + """Process all remaining items in the queue and buffer during shutdown.""" + while True: + try: + item = self._queue.get_nowait() + if item: + self._buffer.append(item) + except queue.Empty: + break + if self._buffer: + self._write_batch(self._buffer) + self._buffer.clear() + + def _write_batch(self, rows: t.List[Row]) -> None: + """ + Convert a list of rows into a Parquet table and write it to the file. + + This method handles schema creation, validation, and evolution. + It is always executed within the writer thread. + """ + if not rows: + return + + try: + with self._writer_lock: + df = pd.DataFrame(rows) + # Ensure a consistent column order for schema stability. + df = df.reindex(sorted(df.columns), axis=1) + new_table = pa.Table.from_pandas(df, preserve_index=False) + + if self.path.exists(): + # File exists, need to append or evolve schema + existing_table = pq.read_table(self.path) + existing_schema = existing_table.schema + + if existing_schema.equals(new_table.schema): + # Schema matches, append the data + combined_table = pa.concat_tables([existing_table, new_table]) + else: + # Schema evolution needed + if not self._config.allow_schema_rewrite: + raise RuntimeError( + "Schema mismatch detected, and rewriting is disabled. " + f"Existing schema: {existing_schema}, New schema: {new_table.schema}" + ) + print( + f"INFO: Schema evolution detected. Rewriting {self.path}...", + flush=True, + ) + # Combine with schema evolution + combined_df = pd.concat( + [existing_table.to_pandas(), new_table.to_pandas()], + ignore_index=True, + sort=False, + ) + combined_df = combined_df.reindex( + sorted(combined_df.columns), axis=1 + ) + combined_table = pa.Table.from_pandas( + combined_df, preserve_index=False + ) + else: + # New file + self.path.parent.mkdir(parents=True, exist_ok=True) + combined_table = new_table + + # Write the combined table atomically + temp_path = self.path.with_suffix(f"{self.path.suffix}.tmp") + pq.write_table( + combined_table, + temp_path, + compression=self._config.parquet_compression, + ) + os.replace(temp_path, self.path) + + # Update our schema tracking + self._schema = combined_table.schema + + except Exception as e: + print(f"ERROR: Failed to write batch to {self.path}: {e}", flush=True) + traceback.print_exc() + + def _rewrite_with_new_schema(self, new_table: pa.Table) -> None: + """ + Rewrite the entire Parquet file to accommodate an evolved schema. + + This is a potentially expensive operation as it reads the entire + existing file into memory. + + Args: + new_table: The new batch of data with a different schema. + """ + print(f"INFO: Schema evolution detected. Rewriting {self.path}...", flush=True) + + # Close the current writer before reading the file. + if self._parquet_writer: + self._parquet_writer.close() + + # Read existing data, combine with new data, and create a unified table. + existing_table = pq.read_table(self.path) + combined_df = pd.concat( + [existing_table.to_pandas(), new_table.to_pandas()], + ignore_index=True, + sort=False, + ) + # Re-sort columns for the new unified schema. + combined_df = combined_df.reindex(sorted(combined_df.columns), axis=1) + final_table = pa.Table.from_pandas(combined_df, preserve_index=False) + self._schema = final_table.schema + + # Atomically replace the old file with the new one. + temp_path = self.path.with_suffix(f"{self.path.suffix}.tmp") + pq.write_table( + final_table, temp_path, compression=self._config.parquet_compression + ) + os.replace(temp_path, self.path) + + # Re-initialize the writer with the new schema for subsequent writes. + self._parquet_writer = pq.ParquetWriter( + self.path, self._schema, compression=self._config.parquet_compression + ) + + def _normalize_row(self, row: Row) -> Row: + """ + Sanitize all values in a row for Parquet compatibility. + """ + return {key: self._normalize_value(value) for key, value in row.items()} + + def _normalize_value(self, value: t.Any) -> t.Any: + """ + Convert a single value to a Parquet-friendly format. + - NumPy arrays and Torch tensors are converted to nested lists. + - Other types are passed through for pandas to handle. + """ + if value is None: + return None + if np and isinstance(value, np.ndarray): + return value.tolist() + if torch and isinstance(value, torch.Tensor): + return value.detach().cpu().numpy().tolist() + # return value.detach().cpu().numpy() + return value