Init commit.
This commit is contained in:
1
utils/__init__.py
Normal file
1
utils/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .logger import DataLogger, LoggerConfig
|
493
utils/logger.py
Normal file
493
utils/logger.py
Normal file
@ -0,0 +1,493 @@
|
||||
"""
|
||||
Asynchronous, batched, and schema-evolving Parquet logger.
|
||||
|
||||
This module provides the `DataLogger`, a high-performance logger for structured
|
||||
data, designed for applications like machine learning experiments, simulations,
|
||||
or any scenario requiring efficient serialization of row-based data.
|
||||
|
||||
Key Features:
|
||||
- **Unified Interface**: Log data via a simple `DataLogger.log({"key": "value"})` call.
|
||||
- **Asynchronous & Batched**: A dedicated background thread handles I/O,
|
||||
batching rows to minimize disk writes and reduce application latency.
|
||||
- **Schema Evolution**: Automatically adapts the Parquet schema if new data fields
|
||||
are introduced, rewriting the file to maintain a consistent structure.
|
||||
- **Singleton Pattern**: A global singleton instance is managed automatically,
|
||||
providing a convenient, fire-and-forget logging experience.
|
||||
- **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch
|
||||
tensors, converting them to Parquet-compatible formats.
|
||||
- **Robust & Thread-Safe**: Designed for use in multi-threaded environments.
|
||||
|
||||
Basic Usage:
|
||||
-------------
|
||||
.. code-block:: python
|
||||
|
||||
from logger.data_logger import DataLogger
|
||||
|
||||
# The first call creates and configures the singleton logger.
|
||||
# A timestamped filename is generated by default.
|
||||
DataLogger.log({"step": 0, "loss": 10.5, "accuracy": 0.5})
|
||||
DataLogger.log({"step": 1, "loss": 9.8, "accuracy": 0.55})
|
||||
|
||||
# For the singleton, data is automatically flushed and saved on program exit.
|
||||
# No explicit `close()` call is required for this simple case.
|
||||
|
||||
Advanced Usage (Instance-Based):
|
||||
---------------------------------
|
||||
.. code-block:: python
|
||||
|
||||
from logger.data_logger import DataLogger, LoggerConfig
|
||||
|
||||
config = LoggerConfig(batch_size=512, flush_interval=5.0)
|
||||
with DataLogger("my_experiment.parquet", config=config) as logger:
|
||||
for i in range(1000):
|
||||
logger.submit({"value": i})
|
||||
# The `with` statement ensures flush and close on exit.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import atexit
|
||||
import os
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import typing as t
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
# Third-party libraries are imported with runtime checks to provide clear
|
||||
# error messages if they are not installed.
|
||||
try:
|
||||
import numpy as np
|
||||
except ImportError:
|
||||
np = None # type: ignore
|
||||
|
||||
try:
|
||||
import pandas as pd
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"pandas is required for DataLogger. Install with `pip install pandas`."
|
||||
)
|
||||
|
||||
try:
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"pyarrow is required for DataLogger. Install with `pip install pyarrow`."
|
||||
)
|
||||
|
||||
try:
|
||||
import torch
|
||||
except ImportError:
|
||||
torch = None # type: ignore
|
||||
|
||||
# Type alias for a single row of data.
|
||||
Row = t.Dict[str, t.Any]
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoggerConfig:
|
||||
"""Configuration for the DataLogger's writer behavior."""
|
||||
|
||||
batch_size: int = 1024
|
||||
"""Number of rows to accumulate before writing a batch to the Parquet file."""
|
||||
|
||||
flush_interval: float = 1.0
|
||||
"""Maximum time in seconds to wait before flushing the buffer, even if
|
||||
`batch_size` is not reached."""
|
||||
|
||||
parquet_compression: str = "snappy"
|
||||
"""Compression codec to use for the Parquet file.
|
||||
Common options: 'snappy', 'gzip', 'brotli', 'none'."""
|
||||
|
||||
allow_schema_rewrite: bool = True
|
||||
"""If True, the logger will automatically rewrite the entire Parquet file to
|
||||
accommodate new columns. If False, it will raise an error."""
|
||||
|
||||
|
||||
class DataLogger:
|
||||
"""
|
||||
An asynchronous, batched logger that writes data to a Parquet file.
|
||||
|
||||
This class manages a background thread to handle file I/O, allowing the
|
||||
calling application to log data with minimal blocking. It supports schema
|
||||
evolution, making it robust to changes in data structure over time.
|
||||
"""
|
||||
|
||||
_singleton: t.Optional["DataLogger"] = None
|
||||
_singleton_lock = threading.Lock()
|
||||
|
||||
# --- Public API ---
|
||||
|
||||
@classmethod
|
||||
def get_instance(
|
||||
cls,
|
||||
path: t.Optional[t.Union[str, Path]] = None,
|
||||
config: t.Optional[LoggerConfig] = None,
|
||||
) -> "DataLogger":
|
||||
"""
|
||||
Get or create the global singleton instance of the DataLogger.
|
||||
|
||||
The first time this method is called, it creates a new `DataLogger`
|
||||
instance and registers a cleanup function via `atexit` to ensure
|
||||
`close()` is called automatically upon program termination.
|
||||
|
||||
Subsequent calls will ignore the arguments and return the existing
|
||||
instance.
|
||||
|
||||
Args:
|
||||
path: The file path for the log file. If None, a timestamped
|
||||
filename like 'log_YYYYMMDD-HHMMSS.parquet' is created in the
|
||||
current working directory.
|
||||
config: A `LoggerConfig` object to configure the writer's behavior.
|
||||
If None, default settings are used.
|
||||
|
||||
Returns:
|
||||
The singleton `DataLogger` instance.
|
||||
"""
|
||||
if cls._singleton is None:
|
||||
with cls._singleton_lock:
|
||||
if cls._singleton is None:
|
||||
# Create the singleton instance.
|
||||
instance = cls(path, config)
|
||||
# Register its close method to be called at program exit.
|
||||
# This ensures data is saved even if the user forgets to call close().
|
||||
atexit.register(instance.close)
|
||||
cls._singleton = instance
|
||||
return cls._singleton
|
||||
|
||||
@classmethod
|
||||
def log(cls, row: Row) -> None:
|
||||
"""
|
||||
Log a data row using the singleton instance.
|
||||
|
||||
This is a convenience method that lazily initializes the singleton on
|
||||
its first call. The operation is non-blocking; the data is placed in
|
||||
an internal queue to be processed by the background writer thread.
|
||||
|
||||
Args:
|
||||
row: A dictionary representing a single row of data, where keys
|
||||
are column names and values are the data points.
|
||||
"""
|
||||
instance = cls.get_instance()
|
||||
instance.submit(row)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: t.Optional[t.Union[str, Path]] = None,
|
||||
config: t.Optional[LoggerConfig] = None,
|
||||
):
|
||||
"""
|
||||
Initialize a DataLogger instance.
|
||||
|
||||
Args:
|
||||
path: The file path for the log file. If None, a timestamped
|
||||
filename is automatically generated.
|
||||
config: A `LoggerConfig` object. If None, default settings are used.
|
||||
"""
|
||||
self.path = self._resolve_path(path)
|
||||
self._config = config or LoggerConfig()
|
||||
|
||||
# Internal state for the writer thread
|
||||
self._queue: queue.Queue[t.Optional[Row]] = queue.Queue()
|
||||
self._stop_event = threading.Event()
|
||||
self._flush_event = threading.Event()
|
||||
self._writer_thread: t.Optional[threading.Thread] = None
|
||||
self._writer_lock = threading.RLock() # Protects writer and schema
|
||||
|
||||
# Parquet-specific state, managed exclusively by the writer thread
|
||||
self._parquet_writer: t.Optional[pq.ParquetWriter] = None
|
||||
self._schema: t.Optional[pa.Schema] = None
|
||||
self._buffer: t.List[Row] = []
|
||||
|
||||
self._start_writer_thread()
|
||||
|
||||
def submit(self, row: Row) -> None:
|
||||
"""
|
||||
Submit a data row to be written asynchronously by the logger instance.
|
||||
|
||||
Args:
|
||||
row: A dictionary representing a single row of data.
|
||||
|
||||
Raises:
|
||||
TypeError: If the provided row is not a dictionary.
|
||||
RuntimeError: If the logger has already been closed.
|
||||
"""
|
||||
if self._stop_event.is_set():
|
||||
raise RuntimeError("Logger has been closed and cannot accept new data.")
|
||||
if not isinstance(row, dict):
|
||||
raise TypeError(f"Expected a dict for a row, but got {type(row)}.")
|
||||
|
||||
normalized_row = self._normalize_row(row)
|
||||
self._queue.put(normalized_row)
|
||||
|
||||
def flush(self, timeout: float = 10.0) -> None:
|
||||
"""
|
||||
Block until all currently queued and buffered data is written to disk.
|
||||
|
||||
Args:
|
||||
timeout: Maximum time in seconds to wait for the flush to complete.
|
||||
"""
|
||||
if self._writer_thread is None or not self._writer_thread.is_alive():
|
||||
return
|
||||
|
||||
self._flush_event.clear()
|
||||
self._queue.put(None) # Sentinel to trigger a flush
|
||||
self._flush_event.wait(timeout)
|
||||
|
||||
def close(self, timeout: float = 10.0) -> None:
|
||||
"""
|
||||
Flush all remaining data and shut down the background writer thread.
|
||||
|
||||
This method is idempotent and thread-safe. It is designed to be
|
||||
called explicitly, via a `with` statement, or automatically at program
|
||||
exit.
|
||||
|
||||
Args:
|
||||
timeout: Maximum time in seconds to wait for the writer thread
|
||||
to finish.
|
||||
"""
|
||||
if self._stop_event.is_set():
|
||||
return
|
||||
|
||||
self._stop_event.set()
|
||||
self._queue.put(None) # Wake up the writer thread if it's blocking.
|
||||
|
||||
# Do not join the writer thread from itself, which would cause a deadlock.
|
||||
if self._writer_thread and threading.current_thread() != self._writer_thread:
|
||||
self._writer_thread.join(timeout)
|
||||
|
||||
def __enter__(self) -> "DataLogger":
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Ensures the logger is closed upon exiting a `with` block."""
|
||||
self.close()
|
||||
|
||||
def __del__(self):
|
||||
"""Ensures data is flushed when the logger object is destroyed."""
|
||||
self.close()
|
||||
|
||||
# --- Internal Methods ---
|
||||
|
||||
def _resolve_path(self, path: t.Optional[t.Union[str, Path]]) -> Path:
|
||||
"""Determine the final output path for the log file."""
|
||||
if path is None:
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
filename = f"log_{timestamp}.parquet"
|
||||
return Path.cwd() / filename
|
||||
|
||||
resolved_path = Path(path)
|
||||
if resolved_path.suffix == "":
|
||||
resolved_path = resolved_path.with_suffix(".parquet")
|
||||
return resolved_path
|
||||
|
||||
def _start_writer_thread(self) -> None:
|
||||
"""Initialize and start the background writer thread."""
|
||||
if self._writer_thread is not None:
|
||||
return
|
||||
thread_name = f"DataLoggerWriter-{self.path.name}"
|
||||
self._writer_thread = threading.Thread(
|
||||
target=self._writer_loop, name=thread_name, daemon=True
|
||||
)
|
||||
self._writer_thread.start()
|
||||
|
||||
def _writer_loop(self) -> None:
|
||||
"""
|
||||
The main loop for the background writer thread.
|
||||
|
||||
This loop continuously pulls data from the queue, batches it, and
|
||||
writes it to the Parquet file. It handles flush signals, stop events,
|
||||
and schema evolution.
|
||||
"""
|
||||
try:
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
# Block until an item is available or the flush interval times out.
|
||||
item = self._queue.get(timeout=self._config.flush_interval)
|
||||
except queue.Empty:
|
||||
# Timeout occurred, treat as a periodic flush signal.
|
||||
item = None
|
||||
|
||||
if item is not None:
|
||||
self._buffer.append(item)
|
||||
|
||||
buffer_size = len(self._buffer)
|
||||
is_flush_signal = item is None
|
||||
is_batch_full = buffer_size >= self._config.batch_size
|
||||
is_shutting_down = self._stop_event.is_set()
|
||||
|
||||
if self._buffer and (
|
||||
is_flush_signal or is_batch_full or is_shutting_down
|
||||
):
|
||||
self._write_batch(self._buffer)
|
||||
self._buffer.clear()
|
||||
|
||||
if is_flush_signal:
|
||||
self._flush_event.set() # Signal that a flush completed
|
||||
|
||||
# Final drain of the queue and buffer after the stop event is set.
|
||||
self._drain_remaining()
|
||||
|
||||
except Exception as e:
|
||||
print(f"FATAL: DataLogger writer thread crashed: {e}", flush=True)
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
# This block ensures that the Parquet writer is always closed
|
||||
# when the writer thread exits, for any reason.
|
||||
with self._writer_lock:
|
||||
if self._parquet_writer:
|
||||
try:
|
||||
self._parquet_writer.close()
|
||||
except Exception as e:
|
||||
print(
|
||||
f"ERROR: Exception while closing Parquet writer: {e}",
|
||||
flush=True,
|
||||
)
|
||||
self._parquet_writer = None
|
||||
|
||||
def _drain_remaining(self) -> None:
|
||||
"""Process all remaining items in the queue and buffer during shutdown."""
|
||||
while True:
|
||||
try:
|
||||
item = self._queue.get_nowait()
|
||||
if item:
|
||||
self._buffer.append(item)
|
||||
except queue.Empty:
|
||||
break
|
||||
if self._buffer:
|
||||
self._write_batch(self._buffer)
|
||||
self._buffer.clear()
|
||||
|
||||
def _write_batch(self, rows: t.List[Row]) -> None:
|
||||
"""
|
||||
Convert a list of rows into a Parquet table and write it to the file.
|
||||
|
||||
This method handles schema creation, validation, and evolution.
|
||||
It is always executed within the writer thread.
|
||||
"""
|
||||
if not rows:
|
||||
return
|
||||
|
||||
try:
|
||||
with self._writer_lock:
|
||||
df = pd.DataFrame(rows)
|
||||
# Ensure a consistent column order for schema stability.
|
||||
df = df.reindex(sorted(df.columns), axis=1)
|
||||
new_table = pa.Table.from_pandas(df, preserve_index=False)
|
||||
|
||||
if self.path.exists():
|
||||
# File exists, need to append or evolve schema
|
||||
existing_table = pq.read_table(self.path)
|
||||
existing_schema = existing_table.schema
|
||||
|
||||
if existing_schema.equals(new_table.schema):
|
||||
# Schema matches, append the data
|
||||
combined_table = pa.concat_tables([existing_table, new_table])
|
||||
else:
|
||||
# Schema evolution needed
|
||||
if not self._config.allow_schema_rewrite:
|
||||
raise RuntimeError(
|
||||
"Schema mismatch detected, and rewriting is disabled. "
|
||||
f"Existing schema: {existing_schema}, New schema: {new_table.schema}"
|
||||
)
|
||||
print(
|
||||
f"INFO: Schema evolution detected. Rewriting {self.path}...",
|
||||
flush=True,
|
||||
)
|
||||
# Combine with schema evolution
|
||||
combined_df = pd.concat(
|
||||
[existing_table.to_pandas(), new_table.to_pandas()],
|
||||
ignore_index=True,
|
||||
sort=False,
|
||||
)
|
||||
combined_df = combined_df.reindex(
|
||||
sorted(combined_df.columns), axis=1
|
||||
)
|
||||
combined_table = pa.Table.from_pandas(
|
||||
combined_df, preserve_index=False
|
||||
)
|
||||
else:
|
||||
# New file
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
combined_table = new_table
|
||||
|
||||
# Write the combined table atomically
|
||||
temp_path = self.path.with_suffix(f"{self.path.suffix}.tmp")
|
||||
pq.write_table(
|
||||
combined_table,
|
||||
temp_path,
|
||||
compression=self._config.parquet_compression,
|
||||
)
|
||||
os.replace(temp_path, self.path)
|
||||
|
||||
# Update our schema tracking
|
||||
self._schema = combined_table.schema
|
||||
|
||||
except Exception as e:
|
||||
print(f"ERROR: Failed to write batch to {self.path}: {e}", flush=True)
|
||||
traceback.print_exc()
|
||||
|
||||
def _rewrite_with_new_schema(self, new_table: pa.Table) -> None:
|
||||
"""
|
||||
Rewrite the entire Parquet file to accommodate an evolved schema.
|
||||
|
||||
This is a potentially expensive operation as it reads the entire
|
||||
existing file into memory.
|
||||
|
||||
Args:
|
||||
new_table: The new batch of data with a different schema.
|
||||
"""
|
||||
print(f"INFO: Schema evolution detected. Rewriting {self.path}...", flush=True)
|
||||
|
||||
# Close the current writer before reading the file.
|
||||
if self._parquet_writer:
|
||||
self._parquet_writer.close()
|
||||
|
||||
# Read existing data, combine with new data, and create a unified table.
|
||||
existing_table = pq.read_table(self.path)
|
||||
combined_df = pd.concat(
|
||||
[existing_table.to_pandas(), new_table.to_pandas()],
|
||||
ignore_index=True,
|
||||
sort=False,
|
||||
)
|
||||
# Re-sort columns for the new unified schema.
|
||||
combined_df = combined_df.reindex(sorted(combined_df.columns), axis=1)
|
||||
final_table = pa.Table.from_pandas(combined_df, preserve_index=False)
|
||||
self._schema = final_table.schema
|
||||
|
||||
# Atomically replace the old file with the new one.
|
||||
temp_path = self.path.with_suffix(f"{self.path.suffix}.tmp")
|
||||
pq.write_table(
|
||||
final_table, temp_path, compression=self._config.parquet_compression
|
||||
)
|
||||
os.replace(temp_path, self.path)
|
||||
|
||||
# Re-initialize the writer with the new schema for subsequent writes.
|
||||
self._parquet_writer = pq.ParquetWriter(
|
||||
self.path, self._schema, compression=self._config.parquet_compression
|
||||
)
|
||||
|
||||
def _normalize_row(self, row: Row) -> Row:
|
||||
"""
|
||||
Sanitize all values in a row for Parquet compatibility.
|
||||
"""
|
||||
return {key: self._normalize_value(value) for key, value in row.items()}
|
||||
|
||||
def _normalize_value(self, value: t.Any) -> t.Any:
|
||||
"""
|
||||
Convert a single value to a Parquet-friendly format.
|
||||
- NumPy arrays and Torch tensors are converted to nested lists.
|
||||
- Other types are passed through for pandas to handle.
|
||||
"""
|
||||
if value is None:
|
||||
return None
|
||||
if np and isinstance(value, np.ndarray):
|
||||
return value.tolist()
|
||||
if torch and isinstance(value, torch.Tensor):
|
||||
return value.detach().cpu().numpy().tolist()
|
||||
# return value.detach().cpu().numpy()
|
||||
return value
|
Reference in New Issue
Block a user