From 1d8dc815f50328bc121a623ccb7d6a56a16818f1 Mon Sep 17 00:00:00 2001 From: qsethuk Date: Tue, 6 Jan 2026 18:17:37 +0300 Subject: [PATCH] Add queue to can --- .../src/socket_can/message_processor.py | 191 +++++++++++++++++- can_sniffer/src/socket_can/src.py | 6 +- 2 files changed, 188 insertions(+), 9 deletions(-) diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index 1a84e0b..b50af9b 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -2,10 +2,14 @@ Модуль для обработки CAN сообщений. Обрабатывает входящие CAN сообщения и сохраняет их в SQLite и InfluxDB. +Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений. """ import can -from typing import Optional +import threading +import time +from queue import Queue, Empty +from typing import Optional, Tuple from logger import get_logger from config import config @@ -13,14 +17,29 @@ logger = get_logger(__name__) class MessageProcessor: - """Класс для обработки и сохранения CAN сообщений.""" + """Класс для обработки и сохранения CAN сообщений с асинхронной обработкой.""" - def __init__(self): - """Инициализация процессора сообщений.""" + def __init__(self, queue_size: int = 10000): + """ + Инициализация процессора сообщений. + + Args: + queue_size: Максимальный размер очереди сообщений + """ self.logger = logger self.storage = None self.influxdb_client = None + # Очередь для асинхронной обработки сообщений + self.message_queue: Queue[Tuple[str, can.Message]] = Queue(maxsize=queue_size) + self.running = False + self.processing_thread: Optional[threading.Thread] = None + + # Статистика + self.processed_count = 0 + self.dropped_count = 0 + self.queue_full_warnings = 0 + # Инициализируем хранилища self._init_storage() self._init_influxdb() @@ -55,16 +74,136 @@ class MessageProcessor: } ) + def enqueue(self, interface: str, message: can.Message) -> bool: + """ + Добавление сообщения в очередь для асинхронной обработки. + + Этот метод вызывается из callback CAN чтения и должен быть быстрым. + + Args: + interface: Имя интерфейса (например, 'can0') + message: CAN сообщение + + Returns: + True если сообщение добавлено, False если очередь переполнена + """ + try: + # Пытаемся добавить в очередь без блокировки (non-blocking) + self.message_queue.put_nowait((interface, message)) + return True + except: + # Очередь переполнена - пропускаем сообщение + self.dropped_count += 1 + self.queue_full_warnings += 1 + + # Логируем предупреждение периодически (не каждое сообщение) + if self.queue_full_warnings % 1000 == 0: + self.logger.warning( + f"Message queue full, dropped {self.dropped_count} messages", + extra={ + "dropped_count": self.dropped_count, + "queue_size": self.message_queue.qsize() + } + ) + return False + def process(self, interface: str, message: can.Message) -> None: """ - Обработка CAN сообщения. + Публичный метод для обработки сообщения. + + Используется как callback для CANSniffer. + Быстро добавляет сообщение в очередь без блокировки. Args: interface: Имя интерфейса (например, 'can0') message: CAN сообщение """ + self.enqueue(interface, message) + + def _processing_loop(self) -> None: + """Основной цикл обработки сообщений из очереди.""" + self.logger.info("Message processing loop started") + + # Батч для групповой обработки + batch = [] + batch_size = config.general.buffer_size + last_batch_time = time.time() + batch_interval = 0.1 # секунды + + while self.running or not self.message_queue.empty(): + try: + # Получаем сообщение из очереди с таймаутом + try: + interface, message = self.message_queue.get(timeout=0.1) + except Empty: + # Если очередь пуста, обрабатываем накопленный батч + if batch and (time.time() - last_batch_time) >= batch_interval: + self._process_batch(batch) + batch = [] + last_batch_time = time.time() + continue + + # Добавляем в батч + batch.append((interface, message)) + + # Обрабатываем батч если он заполнен или прошло достаточно времени + if len(batch) >= batch_size or (time.time() - last_batch_time) >= batch_interval: + self._process_batch(batch) + batch = [] + last_batch_time = time.time() + + except Exception as e: + self.logger.error( + f"Error in processing loop: {e}", + exc_info=True + ) + + # Обрабатываем оставшиеся сообщения в батче + if batch: + self._process_batch(batch) + + self.logger.info( + "Message processing loop stopped", + extra={ + "processed_count": self.processed_count, + "dropped_count": self.dropped_count + } + ) + + def _process_batch(self, batch: list) -> None: + """ + Обработка батча сообщений. + + Args: + batch: Список кортежей (interface, message) + """ + if not batch: + return + try: - # Логируем сообщение + # Обрабатываем каждое сообщение в батче + for interface, message in batch: + self._process_single(interface, message) + self.processed_count += 1 + + except Exception as e: + self.logger.error( + f"Error processing batch: {e}", + exc_info=True, + extra={"batch_size": len(batch)} + ) + + def _process_single(self, interface: str, message: can.Message) -> None: + """ + Обработка одного CAN сообщения. + + Args: + interface: Имя интерфейса + message: CAN сообщение + """ + try: + # Логируем только на DEBUG уровне (не блокируем) + # DEBUG логирование может быть отключено в production self.logger.debug( "CAN message received", extra={ @@ -138,9 +277,31 @@ class MessageProcessor: extra={"interface": interface} ) + def start(self) -> None: + """Запуск обработки сообщений в отдельном потоке.""" + if self.running: + self.logger.warning("Message processor is already running") + return + + self.running = True + self.processing_thread = threading.Thread( + target=self._processing_loop, + name="MessageProcessor", + daemon=True + ) + self.processing_thread.start() + self.logger.info("Message processor started") + def shutdown(self) -> None: """Корректное завершение работы процессора.""" self.logger.info("Shutting down message processor...") + self.running = False + + # Ждем завершения потока обработки + if self.processing_thread and self.processing_thread.is_alive(): + self.processing_thread.join(timeout=5.0) + if self.processing_thread.is_alive(): + self.logger.warning("Processing thread did not stop gracefully") # Закрываем соединения if self.storage: @@ -151,5 +312,19 @@ class MessageProcessor: # self.influxdb_client.close() pass - self.logger.info("Message processor stopped") - + self.logger.info( + "Message processor stopped", + extra={ + "processed_count": self.processed_count, + "dropped_count": self.dropped_count + } + ) + + def get_stats(self) -> dict: + """Получение статистики процессора.""" + return { + "processed_count": self.processed_count, + "dropped_count": self.dropped_count, + "queue_size": self.message_queue.qsize(), + "running": self.running + } diff --git a/can_sniffer/src/socket_can/src.py b/can_sniffer/src/socket_can/src.py index a0e3429..160cd4d 100644 --- a/can_sniffer/src/socket_can/src.py +++ b/can_sniffer/src/socket_can/src.py @@ -185,7 +185,8 @@ class CANSniffer: self.message_callback = message_callback else: # Автоматически используем MessageProcessor - self.message_callback = self.message_processor.process + # Метод enqueue быстрый и не блокирует чтение CAN + self.message_callback = self.message_processor.enqueue self.bus_handlers: Dict[str, CANBusHandler] = {} self.running = False @@ -278,6 +279,9 @@ class CANSniffer: extra={"interfaces": list(self.bus_handlers.keys())} ) + # Запускаем процессор сообщений первым + self.message_processor.start() + self.running = True # Запускаем все обработчики параллельно