Add automatical time logging.

Add: the server will be able to add a `_time` field to log row, by specify `log_server_time: bool = True` in LoggerConfig. To use exact client time instead of server time, make your time field when submit a log row.
This commit is contained in:
2025-09-28 11:33:33 +08:00
parent e8266c3ee7
commit eddb879680

View File

@ -13,13 +13,14 @@ Key Architectural Features:
- **Lazy Initialization & Automatic Management**: The server process is transparently - **Lazy Initialization & Automatic Management**: The server process is transparently
started on the first log call and is gracefully shut down on program exit via started on the first log call and is gracefully shut down on program exit via
`atexit`, requiring no manual management from the user. `atexit`, requiring no manual management from the user.
- **Automatic Server-Side Timestamping**: Optionally, the server can automatically
add a timestamp to each log record, indicating the exact time of its arrival.
- **Unified Interface**: Log data from any process via the simple and consistent - **Unified Interface**: Log data from any process via the simple and consistent
`DataLogger.log({"key": "value"})` call. `DataLogger.log({"key": "value"})` call.
- **Asynchronous & Batched**: The server process handles I/O asynchronously from the - **Asynchronous & Batched**: The server process handles I/O asynchronously from the
clients, batching rows to minimize disk writes and reduce application latency. clients, batching rows to minimize disk writes and reduce application latency.
- **Schema Evolution**: Reuses the robust logic to automatically adapt the Parquet - **Schema Evolution**: Reuses the robust logic to automatically adapt the Parquet
schema if new data fields are introduced, rewriting the file to maintain a schema if new data fields are introduced.
consistent structure.
- **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch - **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch
tensors. tensors.
@ -109,6 +110,7 @@ class LoggerConfig:
flush_interval: float = 1.0 flush_interval: float = 1.0
parquet_compression: str = "snappy" parquet_compression: str = "snappy"
allow_schema_rewrite: bool = True allow_schema_rewrite: bool = True
log_server_time: bool = False
class DataLoggerServer(multiprocessing.Process): class DataLoggerServer(multiprocessing.Process):
@ -145,7 +147,7 @@ class DataLoggerServer(multiprocessing.Process):
item = self._queue.get(timeout=self._config.flush_interval) item = self._queue.get(timeout=self._config.flush_interval)
if isinstance(item, dict): if isinstance(item, dict):
self._buffer.append(self._normalize_row(item)) self._process_log_item(item)
elif item == _FLUSH_COMMAND: elif item == _FLUSH_COMMAND:
self._write_buffer_if_needed(force=True) self._write_buffer_if_needed(force=True)
self._flush_event.set() # Signal completion self._flush_event.set() # Signal completion
@ -164,6 +166,15 @@ class DataLoggerServer(multiprocessing.Process):
print(f"FATAL: DataLogger server process crashed: {e}", flush=True) print(f"FATAL: DataLogger server process crashed: {e}", flush=True)
traceback.print_exc() traceback.print_exc()
def _process_log_item(self, item: Row) -> None: # <<< NEW: Encapsulated logic
"""Processes a single log item by timestamping, normalizing, and buffering it."""
if self._config.log_server_time and "_time" not in item:
# Add the server-side timestamp when the item is dequeued.
item["_time"] = datetime.datetime.now()
normalized_row = self._normalize_row(item)
self._buffer.append(normalized_row)
def _write_buffer_if_needed(self, force: bool = False) -> None: def _write_buffer_if_needed(self, force: bool = False) -> None:
""" """
Determines if the buffer should be written to disk and triggers the write. Determines if the buffer should be written to disk and triggers the write.
@ -185,7 +196,7 @@ class DataLoggerServer(multiprocessing.Process):
try: try:
item = self._queue.get_nowait() item = self._queue.get_nowait()
if isinstance(item, dict): if isinstance(item, dict):
self._buffer.append(self._normalize_row(item)) self._process_log_item(item)
except queue.Empty: except queue.Empty:
break break
self._write_buffer_if_needed(force=True) self._write_buffer_if_needed(force=True)