From eddb8796805696f72936112679e70fbdc4afa338 Mon Sep 17 00:00:00 2001 From: Sunday Date: Sun, 28 Sep 2025 11:33:33 +0800 Subject: [PATCH] Add automatical time logging. Add: the server will be able to add a `_time` field to log row, by specify `log_server_time: bool = True` in LoggerConfig. To use exact client time instead of server time, make your time field when submit a log row. --- DataLogger.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/DataLogger.py b/DataLogger.py index 5851c03..ec434bd 100644 --- a/DataLogger.py +++ b/DataLogger.py @@ -13,13 +13,14 @@ Key Architectural Features: - **Lazy Initialization & Automatic Management**: The server process is transparently started on the first log call and is gracefully shut down on program exit via `atexit`, requiring no manual management from the user. +- **Automatic Server-Side Timestamping**: Optionally, the server can automatically + add a timestamp to each log record, indicating the exact time of its arrival. - **Unified Interface**: Log data from any process via the simple and consistent `DataLogger.log({"key": "value"})` call. - **Asynchronous & Batched**: The server process handles I/O asynchronously from the clients, batching rows to minimize disk writes and reduce application latency. - **Schema Evolution**: Reuses the robust logic to automatically adapt the Parquet - schema if new data fields are introduced, rewriting the file to maintain a - consistent structure. + schema if new data fields are introduced. - **Type Handling**: Natively handles Python primitives, NumPy arrays, and PyTorch tensors. @@ -109,6 +110,7 @@ class LoggerConfig: flush_interval: float = 1.0 parquet_compression: str = "snappy" allow_schema_rewrite: bool = True + log_server_time: bool = False class DataLoggerServer(multiprocessing.Process): @@ -145,7 +147,7 @@ class DataLoggerServer(multiprocessing.Process): item = self._queue.get(timeout=self._config.flush_interval) if isinstance(item, dict): - self._buffer.append(self._normalize_row(item)) + self._process_log_item(item) elif item == _FLUSH_COMMAND: self._write_buffer_if_needed(force=True) self._flush_event.set() # Signal completion @@ -164,6 +166,15 @@ class DataLoggerServer(multiprocessing.Process): print(f"FATAL: DataLogger server process crashed: {e}", flush=True) traceback.print_exc() + def _process_log_item(self, item: Row) -> None: # <<< NEW: Encapsulated logic + """Processes a single log item by timestamping, normalizing, and buffering it.""" + if self._config.log_server_time and "_time" not in item: + # Add the server-side timestamp when the item is dequeued. + item["_time"] = datetime.datetime.now() + + normalized_row = self._normalize_row(item) + self._buffer.append(normalized_row) + def _write_buffer_if_needed(self, force: bool = False) -> None: """ Determines if the buffer should be written to disk and triggers the write. @@ -185,7 +196,7 @@ class DataLoggerServer(multiprocessing.Process): try: item = self._queue.get_nowait() if isinstance(item, dict): - self._buffer.append(self._normalize_row(item)) + self._process_log_item(item) except queue.Empty: break self._write_buffer_if_needed(force=True)