diff --git a/can_sniffer/src/config.py b/can_sniffer/src/config.py index fc8e474..03c0ae5 100644 --- a/can_sniffer/src/config.py +++ b/can_sniffer/src/config.py @@ -130,10 +130,6 @@ class LoggingConfig(BaseModel): default=5, description="Количество резервных копий логов" ) - log_can_messages: bool = Field( - default=False, - description="Логировать каждое CAN сообщение на INFO уровне (может быть очень много логов)" - ) class GeneralConfig(BaseModel): @@ -145,6 +141,14 @@ class GeneralConfig(BaseModel): default=10000, description="Размер буфера для данных" ) + batch_size: int = Field( + default=1000, + description="Размер батча для обработки сообщений" + ) + batch_interval: float = Field( + default=0.1, + description="Интервал обработки батча (секунды)" + ) max_retries: int = Field( default=3, description="Максимальное количество попыток повтора" diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index 27ac725..e1ecd08 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -9,7 +9,9 @@ import can import threading import time from queue import Queue, Empty -from typing import Optional, Tuple +from typing import Optional, Tuple, Dict, Any, List +from abc import ABC, abstractmethod + from logger import get_logger from config import config from storage import get_storage @@ -18,24 +20,157 @@ from influxdb_handler import get_influxdb_client logger = get_logger(__name__) +# Тип для сериализованного сообщения (примитивные данные) +SerializedMessage = Tuple[float, str, int, int, bytes] # (timestamp, interface, can_id, dlc, data) + + +class MessageHandler(ABC): + """Базовый класс для обработчиков сообщений.""" + + @abstractmethod + def process_batch(self, messages: List[SerializedMessage]) -> int: + """ + Обработка батча сообщений. + + Args: + messages: Список сериализованных сообщений + + Returns: + Количество успешно обработанных сообщений + """ + pass + + @abstractmethod + def get_stats(self) -> Dict[str, Any]: + """Получение статистики обработчика.""" + pass + + +class StorageHandler(MessageHandler): + """Обработчик для сохранения в SQLite.""" + + def __init__(self): + """Инициализация обработчика storage.""" + self.logger = logger.getChild("storage_handler") + self.storage = None + self._init_storage() + + def _init_storage(self) -> None: + """Инициализация storage.""" + try: + self.storage = get_storage() + self.logger.info("Storage handler initialized") + except Exception as e: + self.logger.error(f"Failed to initialize storage: {e}", exc_info=True) + self.storage = None + + def process_batch(self, messages: List[SerializedMessage]) -> int: + """Обработка батча сообщений для сохранения в SQLite.""" + if not self.storage or not messages: + return 0 + + try: + # Сообщения уже в правильном формате для save_messages_batch + saved_count = self.storage.save_messages_batch(messages) + if saved_count != len(messages): + self.logger.warning( + f"Not all messages saved: {saved_count}/{len(messages)}", + extra={"batch_size": len(messages)} + ) + return saved_count + except Exception as e: + self.logger.error( + f"Failed to save messages batch: {e}", + exc_info=True, + extra={"batch_size": len(messages)} + ) + return 0 + + def get_stats(self) -> Dict[str, Any]: + """Получение статистики storage.""" + if self.storage: + try: + return self.storage.get_stats() + except Exception: + pass + return {"initialized": False} + + +class InfluxDBHandler(MessageHandler): + """Обработчик для отправки в InfluxDB.""" + + def __init__(self): + """Инициализация обработчика InfluxDB.""" + self.logger = logger.getChild("influxdb_handler") + self.influxdb_client = None + self._init_influxdb() + + def _init_influxdb(self) -> None: + """Инициализация InfluxDB клиента.""" + if not config.influxdb.enabled: + self.logger.info("InfluxDB is disabled in configuration") + return + + try: + self.influxdb_client = get_influxdb_client() + self.logger.info("InfluxDB handler initialized") + except Exception as e: + self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True) + self.influxdb_client = None + + def process_batch(self, messages: List[SerializedMessage]) -> int: + """Обработка батча сообщений для отправки в InfluxDB.""" + if not self.influxdb_client or not messages: + return 0 + + try: + # Конвертируем сериализованные сообщения в формат для InfluxDB + influx_messages = [] + for timestamp, interface, can_id, dlc, data in messages: + influx_messages.append({ + "interface": interface, + "can_id": can_id, + "dlc": dlc, + "data": data, + "timestamp": timestamp + }) + + if influx_messages: + return self.influxdb_client.write_messages_batch(influx_messages) + return 0 + except Exception as e: + self.logger.error( + f"Failed to send messages batch to InfluxDB: {e}", + exc_info=True, + extra={"batch_size": len(messages)} + ) + return 0 + + def get_stats(self) -> Dict[str, Any]: + """Получение статистики InfluxDB.""" + if self.influxdb_client: + try: + return self.influxdb_client.get_stats() + except Exception: + pass + return {"enabled": False, "initialized": False} + + class MessageProcessor: """Класс для обработки и сохранения CAN сообщений с асинхронной обработкой.""" - def __init__(self, queue_size: int = 10000, log_messages: bool = False): + def __init__(self, queue_size: int = 10000): """ Инициализация процессора сообщений. Args: queue_size: Максимальный размер очереди сообщений - log_messages: Логировать каждое сообщение на INFO уровне (по умолчанию False) """ self.logger = logger - self.storage = None - self.influxdb_client = None - self.log_messages = log_messages # Очередь для асинхронной обработки сообщений - self.message_queue: Queue[Tuple[str, can.Message]] = Queue(maxsize=queue_size) + # Храним сериализованные данные (примитивы) вместо объектов can.Message + self.message_queue: Queue[SerializedMessage] = Queue(maxsize=queue_size) self.running = False self.processing_thread: Optional[threading.Thread] = None @@ -44,57 +179,48 @@ class MessageProcessor: self.dropped_count = 0 self.queue_full_warnings = 0 - # Инициализируем хранилища - self._init_storage() - self._init_influxdb() + # Инициализируем обработчики + self.handlers: List[MessageHandler] = [] + self._init_handlers() - def _init_storage(self) -> None: - """Инициализация локального хранилища (SQLite).""" - try: - self.storage = get_storage() - self.logger.info( - "Storage initialized", - extra={ - "type": config.storage.type, - "path": config.storage.database_path, - "wal_mode": config.storage.wal_mode - } - ) - except Exception as e: - self.logger.error( - f"Failed to initialize storage: {e}", - exc_info=True - ) - self.storage = None - - def _init_influxdb(self) -> None: - """Инициализация клиента InfluxDB.""" - if not config.influxdb.enabled: - self.logger.info("InfluxDB is disabled in configuration") - return + def _init_handlers(self) -> None: + """Инициализация обработчиков сообщений.""" + # Добавляем обработчики в порядке приоритета + self.handlers.append(StorageHandler()) - try: - self.influxdb_client = get_influxdb_client() - self.logger.info( - "InfluxDB initialized", - extra={ - "url": config.influxdb.url, - "bucket": config.influxdb.bucket, - "org": config.influxdb.org - } - ) - except Exception as e: - self.logger.error( - f"Failed to initialize InfluxDB client: {e}", - exc_info=True - ) - self.influxdb_client = None + if config.influxdb.enabled: + self.handlers.append(InfluxDBHandler()) + + self.logger.info( + f"Initialized {len(self.handlers)} message handlers", + extra={"handlers": [type(h).__name__ for h in self.handlers]} + ) + + def _serialize_message(self, interface: str, message: can.Message) -> SerializedMessage: + """ + Сериализация CAN сообщения в примитивные данные. + + Args: + interface: Имя интерфейса + message: CAN сообщение + + Returns: + Кортеж с примитивными данными (timestamp, interface, can_id, dlc, data) + """ + return ( + message.timestamp, + interface, + message.arbitration_id, + message.dlc, + message.data if message.data else b'' + ) def enqueue(self, interface: str, message: can.Message) -> bool: """ Добавление сообщения в очередь для асинхронной обработки. Этот метод вызывается из callback CAN чтения и должен быть быстрым. + Сериализуем сообщение сразу, чтобы не хранить объекты can.Message. Args: interface: Имя интерфейса (например, 'can0') @@ -104,8 +230,11 @@ class MessageProcessor: True если сообщение добавлено, False если очередь переполнена """ try: + # Сериализуем сообщение в примитивные данные + serialized = self._serialize_message(interface, message) + # Пытаемся добавить в очередь без блокировки (non-blocking) - self.message_queue.put_nowait((interface, message)) + self.message_queue.put_nowait(serialized) return True except: # Очередь переполнена - пропускаем сообщение @@ -141,32 +270,36 @@ class MessageProcessor: self.logger.info("Message processing loop started") # Батч для групповой обработки - batch = [] - batch_size = config.general.buffer_size + batch: List[SerializedMessage] = [] + batch_size = config.general.batch_size + batch_interval = config.general.batch_interval last_batch_time = time.time() - batch_interval = 0.1 # секунды while self.running or not self.message_queue.empty(): try: # Получаем сообщение из очереди с таймаутом try: - interface, message = self.message_queue.get(timeout=0.1) + serialized_message = self.message_queue.get(timeout=batch_interval) + batch.append(serialized_message) except Empty: # Если очередь пуста, обрабатываем накопленный батч - if batch and (time.time() - last_batch_time) >= batch_interval: + if batch: self._process_batch(batch) batch = [] last_batch_time = time.time() continue - # Добавляем в батч - batch.append((interface, message)) - # Обрабатываем батч если он заполнен или прошло достаточно времени - if len(batch) >= batch_size or (time.time() - last_batch_time) >= batch_interval: + current_time = time.time() + should_flush = ( + len(batch) >= batch_size or + (batch and (current_time - last_batch_time) >= batch_interval) + ) + + if should_flush: self._process_batch(batch) batch = [] - last_batch_time = time.time() + last_batch_time = current_time except Exception as e: self.logger.error( @@ -186,78 +319,46 @@ class MessageProcessor: } ) - def _process_batch(self, batch: list) -> None: + def _process_batch(self, batch: List[SerializedMessage]) -> None: """ - Обработка батча сообщений. + Обработка батча сериализованных сообщений. Args: - batch: Список кортежей (interface, message) + batch: Список сериализованных сообщений """ if not batch: return try: - # Подготавливаем данные для пакетного сохранения в SQLite - messages_to_save = [] - - for interface, message in batch: - # Логируем сообщение в зависимости от настроек - if self.log_messages: - self.logger.info( - "CAN message", - extra={ + # Логируем батч на уровне DEBUG (если уровень DEBUG включен) + # Это позволяет контролировать вывод через уровни логирования + if batch: + timestamp, interface, can_id, dlc, data = batch[0] + self.logger.debug( + "CAN message batch processed", + extra={ + "batch_size": len(batch), + "first_message": { "interface": interface, - "can_id": hex(message.arbitration_id), - "dlc": message.dlc, - "data": message.data.hex() if message.data else "", - "timestamp": message.timestamp + "can_id": hex(can_id), + "dlc": dlc, + "data": data.hex() if data else "", + "timestamp": timestamp } - ) - else: - self.logger.debug( - "CAN message received", - extra={ - "interface": interface, - "can_id": hex(message.arbitration_id), - "dlc": message.dlc, - "data": message.data.hex() if message.data else "", - "timestamp": message.timestamp - } - ) - - # Подготавливаем данные для сохранения - messages_to_save.append(( - message.timestamp, - interface, - message.arbitration_id, - message.dlc, - message.data if message.data else b'' - )) + } + ) - # Пакетное сохранение в SQLite - if self.storage and messages_to_save: - saved_count = self.storage.save_messages_batch(messages_to_save) - if saved_count != len(messages_to_save): - self.logger.warning( - f"Not all messages saved: {saved_count}/{len(messages_to_save)}", - extra={"batch_size": len(messages_to_save)} + # Обрабатываем батч через все обработчики + for handler in self.handlers: + try: + handler.process_batch(batch) + except Exception as e: + self.logger.error( + f"Error in handler {type(handler).__name__}: {e}", + exc_info=True, + extra={"batch_size": len(batch)} ) - # Отправляем в InfluxDB (если включено) - пакетно - if self.influxdb_client: - influx_messages = [] - for interface, message in batch: - influx_messages.append({ - "interface": interface, - "can_id": message.arbitration_id, - "dlc": message.dlc, - "data": message.data if message.data else b'', - "timestamp": message.timestamp - }) - - if influx_messages: - self.influxdb_client.write_messages_batch(influx_messages) - # Обновляем счетчик обработанных сообщений self.processed_count += len(batch) @@ -268,119 +369,6 @@ class MessageProcessor: extra={"batch_size": len(batch)} ) - def _process_single(self, interface: str, message: can.Message) -> None: - """ - Обработка одного CAN сообщения. - - Используется как fallback, если батчинг не применяется. - - Args: - interface: Имя интерфейса - message: CAN сообщение - """ - try: - # Логируем сообщение в зависимости от настроек - if self.log_messages: - self.logger.info( - "CAN message", - extra={ - "interface": interface, - "can_id": hex(message.arbitration_id), - "dlc": message.dlc, - "data": message.data.hex() if message.data else "", - "timestamp": message.timestamp - } - ) - else: - self.logger.debug( - "CAN message received", - extra={ - "interface": interface, - "can_id": hex(message.arbitration_id), - "dlc": message.dlc, - "data": message.data.hex() if message.data else "", - "timestamp": message.timestamp - } - ) - - # Сохраняем в локальное хранилище (SQLite) - self._save_to_storage(interface, message) - - # Отправляем в InfluxDB (если включено) - if self.influxdb_client: - self._send_to_influxdb(interface, message) - - except Exception as e: - self.logger.error( - f"Error processing message: {e}", - exc_info=True, - extra={ - "interface": interface, - "can_id": hex(message.arbitration_id) if message else None - } - ) - - def _save_to_storage(self, interface: str, message: can.Message) -> None: - """ - Сохранение сообщения в локальное хранилище (SQLite). - - Args: - interface: Имя интерфейса - message: CAN сообщение - """ - if not self.storage: - return - - try: - message_id = self.storage.save_message( - interface=interface, - can_id=message.arbitration_id, - dlc=message.dlc, - data=message.data if message.data else b'', - timestamp=message.timestamp - ) - - if message_id is None: - self.logger.warning( - "Failed to save message to storage", - extra={ - "interface": interface, - "can_id": hex(message.arbitration_id) - } - ) - except Exception as e: - self.logger.error( - f"Failed to save message to storage: {e}", - exc_info=True, - extra={"interface": interface} - ) - - def _send_to_influxdb(self, interface: str, message: can.Message) -> None: - """ - Отправка сообщения в InfluxDB (для одиночных сообщений). - - Args: - interface: Имя интерфейса - message: CAN сообщение - """ - if not self.influxdb_client: - return - - try: - self.influxdb_client.write_message( - interface=interface, - can_id=message.arbitration_id, - dlc=message.dlc, - data=message.data if message.data else b'', - timestamp=message.timestamp - ) - except Exception as e: - self.logger.error( - f"Failed to send message to InfluxDB: {e}", - exc_info=True, - extra={"interface": interface} - ) - def start(self) -> None: """Запуск обработки сообщений в отдельном потоке.""" if self.running: @@ -388,6 +376,13 @@ class MessageProcessor: return self.running = True + + # Запускаем InfluxDB forwarder, если он есть + for handler in self.handlers: + if isinstance(handler, InfluxDBHandler) and handler.influxdb_client: + handler.influxdb_client.start() + + # Запускаем поток обработки сообщений self.processing_thread = threading.Thread( target=self._processing_loop, name="MessageProcessor", @@ -407,22 +402,16 @@ class MessageProcessor: if self.processing_thread.is_alive(): self.logger.warning("Processing thread did not stop gracefully") - # Закрываем соединения - if self.storage: + # Закрываем обработчики + for handler in self.handlers: try: - self.storage.close() + if isinstance(handler, StorageHandler) and handler.storage: + handler.storage.close() + elif isinstance(handler, InfluxDBHandler) and handler.influxdb_client: + handler.influxdb_client.close() except Exception as e: self.logger.error( - f"Error closing storage: {e}", - exc_info=True - ) - - if self.influxdb_client: - try: - self.influxdb_client.close() - except Exception as e: - self.logger.error( - f"Error closing InfluxDB client: {e}", + f"Error closing handler {type(handler).__name__}: {e}", exc_info=True ) @@ -443,20 +432,13 @@ class MessageProcessor: "running": self.running } - # Добавляем статистику хранилища, если оно инициализировано - if self.storage: + # Добавляем статистику всех обработчиков + for handler in self.handlers: try: - storage_stats = self.storage.get_stats() - stats["storage"] = storage_stats + handler_stats = handler.get_stats() + handler_name = type(handler).__name__.replace("Handler", "").lower() + stats[handler_name] = handler_stats except Exception as e: - self.logger.debug(f"Failed to get storage stats: {e}") - - # Добавляем статистику InfluxDB, если он инициализирован - if self.influxdb_client: - try: - influx_stats = self.influxdb_client.get_stats() - stats["influxdb"] = influx_stats - except Exception as e: - self.logger.debug(f"Failed to get InfluxDB stats: {e}") + self.logger.debug(f"Failed to get stats from {type(handler).__name__}: {e}") return stats diff --git a/can_sniffer/src/socket_can/src.py b/can_sniffer/src/socket_can/src.py index 78dcc52..b2b658c 100644 --- a/can_sniffer/src/socket_can/src.py +++ b/can_sniffer/src/socket_can/src.py @@ -179,8 +179,7 @@ class CANSniffer: # Инициализируем MessageProcessor для автоматической обработки сообщений # Используем настройку из конфигурации для логирования сообщений - log_messages = config.logging.log_can_messages - self.message_processor = MessageProcessor(log_messages=log_messages) + self.message_processor = MessageProcessor() # Используем переданный callback или процессор по умолчанию if message_callback: