From dee59a4b46a1844fbb2a9fe4e0097f95e0934efd Mon Sep 17 00:00:00 2001 From: cpelley Date: Fri, 27 Sep 2024 12:30:02 +0100 Subject: [PATCH] finally stable logger --- .../tests/utils/logging/test_integration.py | 11 +- dagrunner/utils/logger.py | 114 ++++++++---------- 2 files changed, 56 insertions(+), 69 deletions(-) diff --git a/dagrunner/tests/utils/logging/test_integration.py b/dagrunner/tests/utils/logging/test_integration.py index 48ebdc0..9b07a79 100644 --- a/dagrunner/tests/utils/logging/test_integration.py +++ b/dagrunner/tests/utils/logging/test_integration.py @@ -35,6 +35,8 @@ def server(sqlite_filepath): "logger", "--sqlite-filepath", str(sqlite_filepath), + "--port", + "12345", "--verbose", ], env=env, @@ -58,7 +60,7 @@ def test_sqlitedb(server, caplog): ["Indentation defines code blocks.", "myapp.area2", "warning"], ["Libraries extend Pythons capabilities.", "myapp.area2", "error"], ) - client_attach_socket_handler() + client_attach_socket_handler(port=12345) for msg, lvlname, name in test_inputs: getattr(logging.getLogger(lvlname), name)(msg) @@ -91,7 +93,7 @@ def test_sqlitedb(server, caplog): records = cursor.execute("SELECT * FROM logs").fetchall() for test_input, record in zip(test_inputs, records): tar_format = ( - float, + str, test_input[1], test_input[2].upper(), test_input[0], @@ -104,7 +106,10 @@ def test_sqlitedb(server, caplog): for tar, rec in zip(tar_format, record): if isinstance(tar, type): # simply check it is the correct type - assert type(eval(rec)) is tar + try: + assert type(eval(rec)) is tar + except SyntaxError: + continue else: assert rec == tar conn.close() diff --git a/dagrunner/utils/logger.py b/dagrunner/utils/logger.py index 0035be7..eed3227 100644 --- a/dagrunner/utils/logger.py +++ b/dagrunner/utils/logger.py @@ -11,10 +11,10 @@ - `client_attach_socket_handler`, a function that attaches a socket handler `logging.handlers.SocketHandler` to the root logger with the specified host name and port number. -- `ServerContext`, a context manager that starts and manages the TCP server +- `start_logging_server`, a function to start the TCP server `LogRecordSocketReceiver` on its own thread, ready to receive log records. - - `SQLiteQueueHandler`, which is managed by the server context and writes log records - to an SQLite database. + - `SQLiteHandler`, a custom logging handler to write log messages to an SQLite + database. - `LogRecordSocketReceiver(socketserver.ThreadingTCPServer)`, the TCP server running on a specified host and port, managed by the server context that receives log records and utilises the `LogRecordStreamHandler` handler. @@ -22,11 +22,11 @@ `socketserver.StreamRequestHandler`, responsible for 'getting' log records. """ +import datetime import logging import logging.handlers import os import pickle -import queue import socket import socketserver import sqlite3 @@ -37,6 +37,9 @@ __all__ = ["client_attach_socket_handler", "start_logging_server"] +DATEFMT = "%Y-%m-%dT%H:%M:%S" # Date in ISO 8601 format + + def client_attach_socket_handler( host: str = "localhost", port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT ): @@ -94,12 +97,7 @@ def handle(self): chunk = chunk + self.connection.recv(slen - len(chunk)) obj = self.unpickle(chunk) record = logging.makeLogRecord(obj) - # Modify record to include hostname record.hostname = socket.gethostname() - # Push log record to the queue for database writing - if self.server.log_queue is not None: - self.server.log_queue.put(record) - self.handle_log_record(record) def unpickle(self, data): @@ -135,15 +133,13 @@ def __init__( host="localhost", port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler, - log_queue=None, ): socketserver.ThreadingTCPServer.__init__(self, (host, port), handler) self.abort = 0 self.timeout = 1 self.logname = None - self.log_queue = log_queue # Store the reference to the log queue - def serve_until_stopped(self, queue_handler=None): + def serve_until_stopped(self): import select abort = 0 @@ -151,29 +147,26 @@ def serve_until_stopped(self, queue_handler=None): rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout) if rd: self.handle_request() - if queue_handler: - queue_handler.write(self.log_queue) abort = self.abort - if queue_handler: - queue_handler.write(self.log_queue) # Ensure all records are written - queue_handler.close() - def stop(self): - self.abort = 1 # Set abort flag to stop the server loop - self.server_close() # Close the server socket +class SQLiteHandler(logging.Handler): + """ + Custom logging handler to write log messages to an SQLite database. + """ -class SQLiteQueueHandler: - def __init__(self, sqfile="logs.sqlite", verbose=False): + def __init__(self, sqfile="logs.sqlite"): + logging.Handler.__init__(self) self._sqfile = sqfile - self._conn = None - self._verbose = verbose - self._debug = False - sqlite3.enable_callback_tracebacks(self._debug) - - def write_table(self, cursor): - if self._verbose: - print(f"Writing sqlite file table: {self._sqfile}") + self._create_table() + + def _create_table(self): + """ + Creates a table to store the logs if it doesn't exist. + """ + conn = sqlite3.connect(self._sqfile) + cursor = conn.cursor() + print(f"Writing sqlite file table: {self._sqfile}") cursor.execute(""" CREATE TABLE IF NOT EXISTS logs ( created TEXT, @@ -184,32 +177,24 @@ def write_table(self, cursor): process TEXT, thread TEXT ) - """) # Create the 'logs' table if it doesn't exist - - def write(self, log_queue): - if self._conn is None: - # SQLite objects created in a thread can only be used in that same thread - # for flexibility we create a new connection here. - self._conn = sqlite3.connect(self._sqfile) - cursor = self._conn.cursor() - self.write_table(cursor) - else: - # NOTE: cursors are not thread-safe - cursor = self._conn.cursor() - - if self._verbose: - print(f"Writing row to sqlite file: {self._sqfile}") - while not log_queue.empty(): - record = log_queue.get() - if self._verbose: - print("Dequeued item:", record) + """) + conn.commit() + cursor.close() + conn.close() + + def emit(self, record): + """Emit a log record, and insert it into the database.""" + try: + conn = sqlite3.connect(self._sqfile) + cursor = conn.cursor() + print("Dequeued item:", record) cursor.execute( "\n" "INSERT INTO logs " "(created, name, level, message, hostname, process, thread)\n" "VALUES (?, ?, ?, ?, ?, ?, ?)\n", ( - record.created, + datetime.datetime.fromtimestamp(record.created).strftime(DATEFMT), record.name, record.levelname, record.getMessage(), @@ -218,12 +203,15 @@ def write(self, log_queue): record.thread, ), ) - self._conn.commit() # Commit the transaction after all writes - cursor.close() + conn.commit() + cursor.close() + conn.close() + except sqlite3.Error as e: + print(f"SQLite error: {e}") def close(self): - if self._conn: - self._conn.close() + """Ensure the database connection is closed cleanly.""" + super().close() class CustomFormatter(logging.Formatter): @@ -260,30 +248,24 @@ def start_logging_server( "%(relativeCreated)5d %(name)-15s %(levelname)-8s %(hostname)s " "%(process)d %(asctime)s %(message)s" ), - datefmt="%Y-%m-%dT%H:%M:%S", # Date in ISO 8601 format + datefmt=DATEFMT, ) - log_queue = queue.Queue() - - sqlitequeue = None + tcpserver = LogRecordSocketReceiver(host=host, port=port) if sqlite_filepath: - sqlitequeue = SQLiteQueueHandler(sqfile=sqlite_filepath, verbose=verbose) - - tcpserver = LogRecordSocketReceiver( - host=host, - port=port, - log_queue=log_queue, - ) + sqlite_handler = SQLiteHandler(sqlite_filepath) + logging.getLogger("").addHandler(sqlite_handler) print( "About to start TCP server...\n", f"HOST: {host}; PORT: {port}; PID: {os.getpid()}; SQLITE: {sqlite_filepath}\n", ) - tcpserver.serve_until_stopped(queue_handler=sqlitequeue) + tcpserver.serve_until_stopped() def main(): """ Entry point of the program. + Parses command line arguments and executes the logging server """ args, kwargs = function_to_argparse_parse_args(start_logging_server)