diff --git a/can_sniffer/src/handlers/__init__.py b/can_sniffer/src/handlers/__init__.py new file mode 100644 index 0000000..66c3639 --- /dev/null +++ b/can_sniffer/src/handlers/__init__.py @@ -0,0 +1,16 @@ +""" +Модуль обработчиков CAN сообщений. + +Предоставляет плагинную архитектуру для обработки CAN фреймов. +""" + +from .base import BaseHandler +from .storage_handler import StorageHandler +from .influxdb_handler import InfluxDBHandler + +__all__ = [ + 'BaseHandler', + 'StorageHandler', + 'InfluxDBHandler', +] + diff --git a/can_sniffer/src/handlers/base.py b/can_sniffer/src/handlers/base.py new file mode 100644 index 0000000..a361728 --- /dev/null +++ b/can_sniffer/src/handlers/base.py @@ -0,0 +1,113 @@ +""" +Базовый класс для обработчиков CAN сообщений. + +Предоставляет интерфейс для плагинной архитектуры обработки CAN фреймов. +""" + +from abc import ABC, abstractmethod +from typing import List, Dict, Any, Optional +from can_frame import CANFrame +from logger import get_logger + + +class BaseHandler(ABC): + """ + Базовый класс для всех обработчиков CAN сообщений. + + Каждый обработчик реализует pipeline для обработки CAN фреймов. + Обработчики могут быть отключены, включены и конфигурированы независимо. + """ + + def __init__(self, name: str, enabled: bool = True): + """ + Инициализация обработчика. + + Args: + name: Имя обработчика (для логирования и идентификации) + enabled: Включен ли обработчик (по умолчанию True) + """ + self.name = name + self.enabled = enabled + self.logger = get_logger(f"{__name__}.{name}") + self._initialized = False + + @abstractmethod + def initialize(self) -> bool: + """ + Инициализация обработчика. + + Вызывается один раз при создании обработчика. + Здесь должна быть логика подключения к внешним сервисам и т.д. + + Returns: + True если инициализация успешна, False иначе + """ + pass + + @abstractmethod + def handle(self, frame: CANFrame) -> bool: + """ + Обработка одного CAN фрейма. + + Args: + frame: CANFrame для обработки + + Returns: + True если обработка успешна, False иначе + """ + pass + + @abstractmethod + def handle_batch(self, frames: List[CANFrame]) -> int: + """ + Обработка батча CAN фреймов. + + Может быть более эффективной, чем обработка по одному. + Если обработчик не поддерживает батчинг, может просто вызывать handle() для каждого. + + Args: + frames: Список CANFrame для обработки + + Returns: + Количество успешно обработанных фреймов + """ + pass + + @abstractmethod + def flush(self) -> None: + """ + Принудительная отправка накопленных данных. + + Вызывается периодически или при завершении работы. + Обработчики могут накапливать данные для батчевой отправки. + """ + pass + + @abstractmethod + def shutdown(self) -> None: + """ + Корректное завершение работы обработчика. + + Вызывается при остановке приложения. + Здесь должна быть логика закрытия соединений, сохранения данных и т.д. + """ + pass + + @abstractmethod + def get_stats(self) -> Dict[str, Any]: + """ + Получение статистики обработчика. + + Returns: + Словарь со статистикой (обработано, ошибок, очередь и т.д.) + """ + pass + + def is_enabled(self) -> bool: + """Проверка, включен ли обработчик.""" + return self.enabled + + def is_initialized(self) -> bool: + """Проверка, инициализирован ли обработчик.""" + return self._initialized + diff --git a/can_sniffer/src/handlers/example_handler.py b/can_sniffer/src/handlers/example_handler.py new file mode 100644 index 0000000..b677766 --- /dev/null +++ b/can_sniffer/src/handlers/example_handler.py @@ -0,0 +1,144 @@ +""" +Пример обработчика для демонстрации плагинной архитектуры. + +Этот файл показывает, как легко добавить новый обработчик (например, Kafka, MQTT, WebSocket). +""" + +from typing import List, Dict, Any +from can_frame import CANFrame +from .base import BaseHandler + + +class ExampleHandler(BaseHandler): + """ + Пример обработчика для демонстрации. + + Этот обработчик можно использовать как шаблон для создания новых: + - KafkaHandler + - MQTTHandler + - WebSocketHandler + - FileHandler + и т.д. + """ + + def __init__(self, enabled: bool = True): + """Инициализация примера обработчика.""" + super().__init__(name="example", enabled=enabled) + self.processed_count = 0 + self.failed_count = 0 + + def initialize(self) -> bool: + """Инициализация обработчика.""" + if not self.enabled: + return False + + try: + # Здесь должна быть логика подключения к внешнему сервису + # Например: self.kafka_producer = KafkaProducer(...) + # Или: self.mqtt_client = mqtt.Client(...) + self._initialized = True + self.logger.info("Example handler initialized") + return True + except Exception as e: + self.logger.error(f"Failed to initialize example handler: {e}", exc_info=True) + return False + + def handle(self, frame: CANFrame) -> bool: + """Обработка одного CAN фрейма.""" + if not self.enabled or not self._initialized: + return False + + try: + # Здесь должна быть логика отправки одного фрейма + # Например: self.kafka_producer.send('can-topic', frame.to_dict()) + # Или: self.mqtt_client.publish('can/data', frame.to_dict()) + + self.processed_count += 1 + return True + except Exception as e: + self.logger.error( + f"Failed to handle frame: {e}", + exc_info=True, + extra={"can_id": frame.can_id_hex} + ) + self.failed_count += 1 + return False + + def handle_batch(self, frames: List[CANFrame]) -> int: + """Обработка батча CAN фреймов.""" + if not self.enabled or not self._initialized or not frames: + return 0 + + try: + # Здесь должна быть логика пакетной отправки + # Например: self.kafka_producer.send_batch([...]) + # Или: self.mqtt_client.publish_batch([...]) + + processed = 0 + for frame in frames: + if self.handle(frame): + processed += 1 + + return processed + except Exception as e: + self.logger.error( + f"Failed to handle batch: {e}", + exc_info=True, + extra={"batch_size": len(frames)} + ) + self.failed_count += len(frames) + return 0 + + def flush(self) -> None: + """Принудительная отправка накопленных данных.""" + if not self.enabled or not self._initialized: + return + + try: + # Здесь должна быть логика принудительной отправки + # Например: self.kafka_producer.flush() + # Или: self.mqtt_client.flush() + pass + except Exception as e: + self.logger.error(f"Failed to flush: {e}", exc_info=True) + + def shutdown(self) -> None: + """Корректное завершение работы обработчика.""" + if self._initialized: + try: + # Здесь должна быть логика закрытия соединений + # Например: self.kafka_producer.close() + # Или: self.mqtt_client.disconnect() + self.logger.info("Example handler closed") + except Exception as e: + self.logger.error(f"Error closing example handler: {e}", exc_info=True) + self._initialized = False + + def get_stats(self) -> Dict[str, Any]: + """Получение статистики обработчика.""" + return { + "handler": self.name, + "enabled": self.enabled, + "initialized": self._initialized, + "processed_count": self.processed_count, + "failed_count": self.failed_count + } + + +# Пример использования: +# +# from handlers import BaseHandler, StorageHandler, InfluxDBHandler +# from handlers.example_handler import ExampleHandler +# from socket_can.message_processor import MessageProcessor +# +# # Создаем кастомный pipeline +# handlers = [ +# StorageHandler(enabled=True), +# InfluxDBHandler(enabled=True), +# ExampleHandler(enabled=True), # Новый обработчик! +# ] +# +# # Используем в MessageProcessor +# processor = MessageProcessor(handlers=handlers) +# processor.start() + diff --git a/can_sniffer/src/handlers/influxdb_handler.py b/can_sniffer/src/handlers/influxdb_handler.py new file mode 100644 index 0000000..1cb83e7 --- /dev/null +++ b/can_sniffer/src/handlers/influxdb_handler.py @@ -0,0 +1,132 @@ +""" +Обработчик для отправки CAN сообщений в InfluxDB. +""" + +from typing import List, Dict, Any, Optional +from can_frame import CANFrame +from .base import BaseHandler +from influxdb_handler import get_influxdb_client +from config import config + + +class InfluxDBHandler(BaseHandler): + """Обработчик для отправки в InfluxDB.""" + + def __init__(self, enabled: Optional[bool] = None): + """ + Инициализация обработчика InfluxDB. + + Args: + enabled: Включен ли обработчик. Если None, берется из config.influxdb.enabled + """ + super().__init__( + name="influxdb", + enabled=enabled if enabled is not None else config.influxdb.enabled + ) + self.influxdb_client = None + + def initialize(self) -> bool: + """Инициализация InfluxDB клиента.""" + if not self.enabled: + return False + + try: + self.influxdb_client = get_influxdb_client() + self._initialized = True + self.logger.info("InfluxDB handler initialized") + return True + except Exception as e: + self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True) + self.influxdb_client = None + return False + + def handle(self, frame: CANFrame) -> bool: + """Обработка одного CAN фрейма.""" + if not self.enabled or not self._initialized or not self.influxdb_client: + return False + + try: + return self.influxdb_client.write_message( + interface=frame.bus, + can_id=frame.can_id, + dlc=frame.dlc, + data=frame.data, + timestamp=frame.timestamp + ) + except Exception as e: + self.logger.error( + f"Failed to send frame to InfluxDB: {e}", + exc_info=True, + extra={"can_id": frame.can_id_hex} + ) + return False + + def handle_batch(self, frames: List[CANFrame]) -> int: + """Обработка батча CAN фреймов.""" + if not self.enabled or not self._initialized or not self.influxdb_client or not frames: + return 0 + + try: + # Конвертируем CANFrame в формат для InfluxDB + influx_messages = [] + for frame in frames: + influx_messages.append({ + "interface": frame.bus, + "can_id": frame.can_id, + "dlc": frame.dlc, + "data": frame.data, + "timestamp": frame.timestamp # float timestamp в секундах + }) + + if influx_messages: + return self.influxdb_client.write_messages_batch(influx_messages) + return 0 + except Exception as e: + self.logger.error( + f"Failed to send frames batch to InfluxDB: {e}", + exc_info=True, + extra={"batch_size": len(frames)} + ) + return 0 + + def flush(self) -> None: + """Принудительная отправка накопленных данных.""" + # InfluxDB forwarder сам управляет flush через свой цикл + # Но можно вызвать явный flush если нужно + pass + + def shutdown(self) -> None: + """Корректное завершение работы обработчика.""" + if self.influxdb_client: + try: + self.influxdb_client.close() + self.logger.info("InfluxDB handler closed") + except Exception as e: + self.logger.error(f"Error closing InfluxDB: {e}", exc_info=True) + self._initialized = False + + def get_stats(self) -> Dict[str, Any]: + """Получение статистики обработчика.""" + if self.influxdb_client: + try: + stats = self.influxdb_client.get_stats() + stats["handler"] = self.name + stats["enabled"] = self.enabled + stats["initialized"] = self._initialized + return stats + except Exception: + pass + return { + "handler": self.name, + "enabled": self.enabled, + "initialized": self._initialized + } + + def start(self) -> None: + """Запуск InfluxDB forwarder (если используется).""" + if self.influxdb_client: + try: + self.influxdb_client.start() + except Exception as e: + self.logger.error(f"Failed to start InfluxDB forwarder: {e}", exc_info=True) + diff --git a/can_sniffer/src/handlers/storage_handler.py b/can_sniffer/src/handlers/storage_handler.py new file mode 100644 index 0000000..08fe573 --- /dev/null +++ b/can_sniffer/src/handlers/storage_handler.py @@ -0,0 +1,119 @@ +""" +Обработчик для сохранения CAN сообщений в SQLite. +""" + +from typing import List, Dict, Any +from can_frame import CANFrame +from .base import BaseHandler +from storage import get_storage + + +class StorageHandler(BaseHandler): + """Обработчик для сохранения в SQLite.""" + + def __init__(self, enabled: bool = True): + """Инициализация обработчика storage.""" + super().__init__(name="storage", enabled=enabled) + self.storage = None + + def initialize(self) -> bool: + """Инициализация storage.""" + if not self.enabled: + return False + + try: + self.storage = get_storage() + self._initialized = True + self.logger.info("Storage handler initialized") + return True + except Exception as e: + self.logger.error(f"Failed to initialize storage: {e}", exc_info=True) + self.storage = None + return False + + def handle(self, frame: CANFrame) -> bool: + """Обработка одного CAN фрейма.""" + if not self.enabled or not self._initialized or not self.storage: + return False + + try: + message_id = self.storage.save_message( + interface=frame.bus, + can_id=frame.can_id, + dlc=frame.dlc, + data=frame.data, + timestamp=frame.timestamp + ) + return message_id is not None + except Exception as e: + self.logger.error( + f"Failed to save frame: {e}", + exc_info=True, + extra={"can_id": frame.can_id_hex} + ) + return False + + def handle_batch(self, frames: List[CANFrame]) -> int: + """Обработка батча CAN фреймов.""" + if not self.enabled or not self._initialized or not self.storage or not frames: + return 0 + + try: + # Конвертируем CANFrame в формат для storage + messages = [] + for frame in frames: + messages.append(( + frame.timestamp, # float timestamp в секундах + frame.bus, + frame.can_id, + frame.dlc, + frame.data + )) + + saved_count = self.storage.save_messages_batch(messages) + if saved_count != len(frames): + self.logger.warning( + f"Not all frames saved: {saved_count}/{len(frames)}", + extra={"batch_size": len(frames)} + ) + return saved_count + except Exception as e: + self.logger.error( + f"Failed to save frames batch: {e}", + exc_info=True, + extra={"batch_size": len(frames)} + ) + return 0 + + def flush(self) -> None: + """Принудительная отправка накопленных данных.""" + # SQLite не требует явного flush, данные сохраняются сразу + pass + + def shutdown(self) -> None: + """Корректное завершение работы обработчика.""" + if self.storage: + try: + self.storage.close() + self.logger.info("Storage handler closed") + except Exception as e: + self.logger.error(f"Error closing storage: {e}", exc_info=True) + self._initialized = False + + def get_stats(self) -> Dict[str, Any]: + """Получение статистики обработчика.""" + if self.storage: + try: + stats = self.storage.get_stats() + stats["handler"] = self.name + stats["enabled"] = self.enabled + stats["initialized"] = self._initialized + return stats + except Exception: + pass + return { + "handler": self.name, + "enabled": self.enabled, + "initialized": self._initialized + } + diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index 4fea44c..12c3e43 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -1,7 +1,7 @@ """ Модуль для обработки CAN сообщений. -Обрабатывает входящие CAN сообщения и сохраняет их в SQLite и InfluxDB. +Обрабатывает входящие CAN сообщения через pipeline обработчиков. Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений. """ @@ -9,167 +9,29 @@ import threading import time from queue import Queue, Empty from typing import Optional, Dict, Any, List -from abc import ABC, abstractmethod from logger import get_logger from config import config -from storage import get_storage -from influxdb_handler import get_influxdb_client from can_frame import CANFrame +from handlers import BaseHandler, StorageHandler, InfluxDBHandler logger = get_logger(__name__) -class MessageHandler(ABC): - """Базовый класс для обработчиков сообщений.""" - - @abstractmethod - def process_batch(self, frames: List[CANFrame]) -> int: - """ - Обработка батча CAN фреймов. - - Args: - frames: Список CANFrame объектов - - Returns: - Количество успешно обработанных сообщений - """ - pass - - @abstractmethod - def get_stats(self) -> Dict[str, Any]: - """Получение статистики обработчика.""" - pass - - -class StorageHandler(MessageHandler): - """Обработчик для сохранения в SQLite.""" - - def __init__(self): - """Инициализация обработчика storage.""" - self.logger = logger.getChild("storage_handler") - self.storage = None - self._init_storage() - - def _init_storage(self) -> None: - """Инициализация storage.""" - try: - self.storage = get_storage() - self.logger.info("Storage handler initialized") - except Exception as e: - self.logger.error(f"Failed to initialize storage: {e}", exc_info=True) - self.storage = None - - def process_batch(self, frames: List[CANFrame]) -> int: - """Обработка батча сообщений для сохранения в SQLite.""" - if not self.storage or not frames: - return 0 - - try: - # Конвертируем CANFrame в формат для storage - messages = [] - for frame in frames: - messages.append(( - frame.timestamp, # float timestamp в секундах - frame.bus, - frame.can_id, - frame.dlc, - frame.data - )) - - saved_count = self.storage.save_messages_batch(messages) - if saved_count != len(frames): - self.logger.warning( - f"Not all messages saved: {saved_count}/{len(frames)}", - extra={"batch_size": len(frames)} - ) - return saved_count - except Exception as e: - self.logger.error( - f"Failed to save messages batch: {e}", - exc_info=True, - extra={"batch_size": len(frames)} - ) - return 0 - - def get_stats(self) -> Dict[str, Any]: - """Получение статистики storage.""" - if self.storage: - try: - return self.storage.get_stats() - except Exception: - pass - return {"initialized": False} - - -class InfluxDBHandler(MessageHandler): - """Обработчик для отправки в InfluxDB.""" - - def __init__(self): - """Инициализация обработчика InfluxDB.""" - self.logger = logger.getChild("influxdb_handler") - self.influxdb_client = None - self._init_influxdb() - - def _init_influxdb(self) -> None: - """Инициализация InfluxDB клиента.""" - if not config.influxdb.enabled: - self.logger.info("InfluxDB is disabled in configuration") - return - - try: - self.influxdb_client = get_influxdb_client() - self.logger.info("InfluxDB handler initialized") - except Exception as e: - self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True) - self.influxdb_client = None - - def process_batch(self, frames: List[CANFrame]) -> int: - """Обработка батча сообщений для отправки в InfluxDB.""" - if not self.influxdb_client or not frames: - return 0 - - try: - # Конвертируем CANFrame в формат для InfluxDB - influx_messages = [] - for frame in frames: - influx_messages.append({ - "interface": frame.bus, - "can_id": frame.can_id, - "dlc": frame.dlc, - "data": frame.data, - "timestamp": frame.timestamp # float timestamp в секундах - }) - - if influx_messages: - return self.influxdb_client.write_messages_batch(influx_messages) - return 0 - except Exception as e: - self.logger.error( - f"Failed to send messages batch to InfluxDB: {e}", - exc_info=True, - extra={"batch_size": len(frames)} - ) - return 0 - - def get_stats(self) -> Dict[str, Any]: - """Получение статистики InfluxDB.""" - if self.influxdb_client: - try: - return self.influxdb_client.get_stats() - except Exception: - pass - return {"enabled": False, "initialized": False} - - class MessageProcessor: - """Класс для обработки и сохранения CAN сообщений с асинхронной обработкой.""" + """ + Класс для обработки и сохранения CAN сообщений с асинхронной обработкой. - def __init__(self, queue_size: int = 10000): + Использует плагинную архитектуру обработчиков (pipeline). + Каждый обработчик реализует интерфейс BaseHandler. + """ + + def __init__(self, handlers: Optional[List[BaseHandler]] = None, queue_size: int = 10000): """ Инициализация процессора сообщений. Args: + handlers: Список обработчиков для pipeline. Если None, создаются по умолчанию. queue_size: Максимальный размер очереди сообщений """ self.logger = logger @@ -186,20 +48,59 @@ class MessageProcessor: self.queue_full_warnings = 0 # Инициализируем обработчики - self.handlers: List[MessageHandler] = [] - self._init_handlers() - - def _init_handlers(self) -> None: - """Инициализация обработчиков сообщений.""" - # Добавляем обработчики в порядке приоритета - self.handlers.append(StorageHandler()) + if handlers is None: + handlers = self._create_default_handlers() - if config.influxdb.enabled: - self.handlers.append(InfluxDBHandler()) + self.handlers: List[BaseHandler] = [] + self._init_handlers(handlers) + + def _create_default_handlers(self) -> List[BaseHandler]: + """ + Создание обработчиков по умолчанию из конфигурации. + + Returns: + Список обработчиков + """ + handlers = [] + + # Storage handler всегда включен + handlers.append(StorageHandler(enabled=True)) + + # InfluxDB handler зависит от конфигурации + handlers.append(InfluxDBHandler(enabled=None)) # None = из config + + return handlers + + def _init_handlers(self, handlers: List[BaseHandler]) -> None: + """ + Инициализация обработчиков. + + Args: + handlers: Список обработчиков для инициализации + """ + for handler in handlers: + if handler.is_enabled(): + try: + if handler.initialize(): + self.handlers.append(handler) + self.logger.info( + f"Handler '{handler.name}' initialized successfully" + ) + else: + self.logger.warning( + f"Handler '{handler.name}' initialization failed" + ) + except Exception as e: + self.logger.error( + f"Error initializing handler '{handler.name}': {e}", + exc_info=True + ) + else: + self.logger.debug(f"Handler '{handler.name}' is disabled") self.logger.info( - f"Initialized {len(self.handlers)} message handlers", - extra={"handlers": [type(h).__name__ for h in self.handlers]} + f"Initialized {len(self.handlers)}/{len(handlers)} handlers", + extra={"handlers": [h.name for h in self.handlers]} ) def enqueue(self, frame: CANFrame) -> bool: @@ -255,6 +156,8 @@ class MessageProcessor: batch_size = config.general.batch_size batch_interval = config.general.batch_interval last_batch_time = time.time() + last_flush_time = time.time() + flush_interval = 5.0 # Периодический flush обработчиков while self.running or not self.message_queue.empty(): try: @@ -282,6 +185,11 @@ class MessageProcessor: batch = [] last_batch_time = current_time + # Периодический flush обработчиков + if (current_time - last_flush_time) >= flush_interval: + self._flush_handlers() + last_flush_time = current_time + except Exception as e: self.logger.error( f"Error in processing loop: {e}", @@ -292,6 +200,9 @@ class MessageProcessor: if batch: self._process_batch(batch) + # Финальный flush всех обработчиков + self._flush_handlers() + self.logger.info( "Message processing loop stopped", extra={ @@ -302,7 +213,7 @@ class MessageProcessor: def _process_batch(self, batch: List[CANFrame]) -> None: """ - Обработка батча CAN фреймов. + Обработка батча CAN фреймов через pipeline обработчиков. Args: batch: Список CANFrame объектов @@ -312,7 +223,6 @@ class MessageProcessor: try: # Логируем батч на уровне DEBUG (если уровень DEBUG включен) - # Это позволяет контролировать вывод через уровни логирования if batch: first_frame = batch[0] self.logger.debug( @@ -329,13 +239,16 @@ class MessageProcessor: } ) - # Обрабатываем батч через все обработчики + # Обрабатываем батч через все обработчики (pipeline) for handler in self.handlers: + if not handler.is_enabled() or not handler.is_initialized(): + continue + try: - handler.process_batch(batch) + handler.handle_batch(batch) except Exception as e: self.logger.error( - f"Error in handler {type(handler).__name__}: {e}", + f"Error in handler '{handler.name}': {e}", exc_info=True, extra={"batch_size": len(batch)} ) @@ -350,6 +263,18 @@ class MessageProcessor: extra={"batch_size": len(batch)} ) + def _flush_handlers(self) -> None: + """Принудительный flush всех обработчиков.""" + for handler in self.handlers: + if handler.is_enabled() and handler.is_initialized(): + try: + handler.flush() + except Exception as e: + self.logger.error( + f"Error flushing handler '{handler.name}': {e}", + exc_info=True + ) + def start(self) -> None: """Запуск обработки сообщений в отдельном потоке.""" if self.running: @@ -358,10 +283,16 @@ class MessageProcessor: self.running = True - # Запускаем InfluxDB forwarder, если он есть + # Запускаем специальные обработчики (например, InfluxDB forwarder) for handler in self.handlers: - if isinstance(handler, InfluxDBHandler) and handler.influxdb_client: - handler.influxdb_client.start() + if isinstance(handler, InfluxDBHandler) and handler.is_initialized(): + try: + handler.start() + except Exception as e: + self.logger.error( + f"Failed to start handler '{handler.name}': {e}", + exc_info=True + ) # Запускаем поток обработки сообщений self.processing_thread = threading.Thread( @@ -383,16 +314,13 @@ class MessageProcessor: if self.processing_thread.is_alive(): self.logger.warning("Processing thread did not stop gracefully") - # Закрываем обработчики + # Закрываем все обработчики for handler in self.handlers: try: - if isinstance(handler, StorageHandler) and handler.storage: - handler.storage.close() - elif isinstance(handler, InfluxDBHandler) and handler.influxdb_client: - handler.influxdb_client.close() + handler.shutdown() except Exception as e: self.logger.error( - f"Error closing handler {type(handler).__name__}: {e}", + f"Error shutting down handler '{handler.name}': {e}", exc_info=True ) @@ -410,16 +338,16 @@ class MessageProcessor: "processed_count": self.processed_count, "dropped_count": self.dropped_count, "queue_size": self.message_queue.qsize(), - "running": self.running + "running": self.running, + "handlers_count": len(self.handlers) } # Добавляем статистику всех обработчиков for handler in self.handlers: try: handler_stats = handler.get_stats() - handler_name = type(handler).__name__.replace("Handler", "").lower() - stats[handler_name] = handler_stats + stats[handler.name] = handler_stats except Exception as e: - self.logger.debug(f"Failed to get stats from {type(handler).__name__}: {e}") + self.logger.debug(f"Failed to get stats from handler '{handler.name}': {e}") return stats