Source code for decent_bench.utils.logger

from __future__ import annotations

import logging
from abc import abstractmethod
from logging import LogRecord
from logging.handlers import QueueHandler, QueueListener
from multiprocessing.managers import SyncManager
from typing import Protocol

from rich.logging import RichHandler

LOGGER = logging.getLogger()
"""
Logger to be used across the codebase.

:meta hide-value:
"""


[docs] class LogQueue(Protocol): """A minimal protocol for queue-like objects used by logging handlers."""
[docs] @abstractmethod def put_nowait(self, item: LogRecord, /) -> None: """Put item."""
[docs] @abstractmethod def get(self) -> LogRecord: """Get item."""
[docs] def start_logger(log_level: int = logging.INFO) -> None: """ Configure the logger for single-process use. Use this function when you don't need multiprocessing support (e.g., when calling `display_metrics` or `compute_metrics` directly). For multiprocessing contexts, use `start_log_listener` and `start_queue_logger` instead. Args: log_level: minimum level to log, e.g. :data:`logging.INFO` Note: Filtering is done at the logger level (not handler level) for consistency. The logger level controls what messages are processed, and the handler inherits this by using NOTSET (the default). """ if LOGGER.hasHandlers(): return LOGGER.handlers.clear() LOGGER.addHandler(RichHandler()) LOGGER.setLevel(log_level)
[docs] def start_log_listener(manager: SyncManager, log_level: int) -> QueueListener: """ Start listener thread which can receive log messages through a queue. Args: manager: used to create a log queue that can be shared across processes log_level: minimum level to log, e.g. :data:`logging.INFO` Returns: `QueueListener` which can be used to access the log queue and to stop the listener thread Note: For multiprocessing, filtering is done at the handler level (not logger level). Worker processes use `start_queue_logger` which sets LOGGER to NOTSET, allowing all messages to be sent to the queue. The RichHandler on the listener filters messages based on `log_level`. """ log_queue = manager.Queue() log_listener = QueueListener(log_queue, RichHandler(level=log_level), respect_handler_level=True) log_listener.start() start_queue_logger(log_listener.queue) return log_listener
[docs] def start_queue_logger(queue: LogQueue) -> None: """ Configure the default logger for the current process to put log messages in the *queue*. Note: Sets LOGGER level to NOTSET so all messages are sent to the queue regardless of level. Filtering happens on the listener side via the handler's level setting. """ LOGGER.handlers.clear() LOGGER.addHandler(QueueHandler(queue)) LOGGER.setLevel(logging.NOTSET)