Source code for decent_bench.utils.logger
from __future__ import annotations
import logging
from abc import abstractmethod
from logging import LogRecord
from logging.handlers import QueueHandler, QueueListener
from multiprocessing.managers import SyncManager
from typing import Protocol
from rich.logging import RichHandler
LOGGER = logging.getLogger()
"""
Logger to be used across the codebase.
:meta hide-value:
"""
[docs]
class LogQueue(Protocol):
"""A minimal protocol for queue-like objects used by logging handlers."""
[docs]
@abstractmethod
def put_nowait(self, item: LogRecord, /) -> None:
"""Put item."""
[docs]
@abstractmethod
def get(self) -> LogRecord:
"""Get item."""
[docs]
def start_logger(log_level: int = logging.INFO) -> None:
"""
Configure the logger for single-process use.
Use this function when you don't need multiprocessing support (e.g., when calling
`display_metrics` or `compute_metrics` directly). For multiprocessing contexts,
use `start_log_listener` and `start_queue_logger` instead.
Args:
log_level: minimum level to log, e.g. :data:`logging.INFO`
Note:
Filtering is done at the logger level (not handler level) for consistency.
The logger level controls what messages are processed, and the handler
inherits this by using NOTSET (the default).
"""
if LOGGER.hasHandlers():
return
LOGGER.handlers.clear()
LOGGER.addHandler(RichHandler())
LOGGER.setLevel(log_level)
[docs]
def start_log_listener(manager: SyncManager, log_level: int) -> QueueListener:
"""
Start listener thread which can receive log messages through a queue.
Args:
manager: used to create a log queue that can be shared across processes
log_level: minimum level to log, e.g. :data:`logging.INFO`
Returns:
`QueueListener` which can be used to access the log queue and to stop the listener
thread
Note:
For multiprocessing, filtering is done at the handler level (not logger level).
Worker processes use `start_queue_logger` which sets LOGGER to NOTSET, allowing
all messages to be sent to the queue. The RichHandler on the listener filters
messages based on `log_level`.
"""
log_queue = manager.Queue()
log_listener = QueueListener(log_queue, RichHandler(level=log_level), respect_handler_level=True)
log_listener.start()
start_queue_logger(log_listener.queue)
return log_listener
[docs]
def start_queue_logger(queue: LogQueue) -> None:
"""
Configure the default logger for the current process to put log messages in the *queue*.
Note:
Sets LOGGER level to NOTSET so all messages are sent to the queue regardless of level.
Filtering happens on the listener side via the handler's level setting.
"""
LOGGER.handlers.clear()
LOGGER.addHandler(QueueHandler(queue))
LOGGER.setLevel(logging.NOTSET)