Add queue to can

This commit is contained in:
2026-01-06 18:17:37 +03:00
parent edbf6277da
commit 1d8dc815f5
2 changed files with 188 additions and 9 deletions

View File

@@ -2,10 +2,14 @@
Модуль для обработки CAN сообщений.
Обрабатывает входящие CAN сообщения и сохраняет их в SQLite и InfluxDB.
Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений.
"""
import can
from typing import Optional
import threading
import time
from queue import Queue, Empty
from typing import Optional, Tuple
from logger import get_logger
from config import config
@@ -13,14 +17,29 @@ logger = get_logger(__name__)
class MessageProcessor:
"""Класс для обработки и сохранения CAN сообщений."""
"""Класс для обработки и сохранения CAN сообщений с асинхронной обработкой."""
def __init__(self):
"""Инициализация процессора сообщений."""
def __init__(self, queue_size: int = 10000):
"""
Инициализация процессора сообщений.
Args:
queue_size: Максимальный размер очереди сообщений
"""
self.logger = logger
self.storage = None
self.influxdb_client = None
# Очередь для асинхронной обработки сообщений
self.message_queue: Queue[Tuple[str, can.Message]] = Queue(maxsize=queue_size)
self.running = False
self.processing_thread: Optional[threading.Thread] = None
# Статистика
self.processed_count = 0
self.dropped_count = 0
self.queue_full_warnings = 0
# Инициализируем хранилища
self._init_storage()
self._init_influxdb()
@@ -55,16 +74,136 @@ class MessageProcessor:
}
)
def enqueue(self, interface: str, message: can.Message) -> bool:
"""
Добавление сообщения в очередь для асинхронной обработки.
Этот метод вызывается из callback CAN чтения и должен быть быстрым.
Args:
interface: Имя интерфейса (например, 'can0')
message: CAN сообщение
Returns:
True если сообщение добавлено, False если очередь переполнена
"""
try:
# Пытаемся добавить в очередь без блокировки (non-blocking)
self.message_queue.put_nowait((interface, message))
return True
except:
# Очередь переполнена - пропускаем сообщение
self.dropped_count += 1
self.queue_full_warnings += 1
# Логируем предупреждение периодически (не каждое сообщение)
if self.queue_full_warnings % 1000 == 0:
self.logger.warning(
f"Message queue full, dropped {self.dropped_count} messages",
extra={
"dropped_count": self.dropped_count,
"queue_size": self.message_queue.qsize()
}
)
return False
def process(self, interface: str, message: can.Message) -> None:
"""
Обработка CAN сообщения.
Публичный метод для обработки сообщения.
Используется как callback для CANSniffer.
Быстро добавляет сообщение в очередь без блокировки.
Args:
interface: Имя интерфейса (например, 'can0')
message: CAN сообщение
"""
self.enqueue(interface, message)
def _processing_loop(self) -> None:
"""Основной цикл обработки сообщений из очереди."""
self.logger.info("Message processing loop started")
# Батч для групповой обработки
batch = []
batch_size = config.general.buffer_size
last_batch_time = time.time()
batch_interval = 0.1 # секунды
while self.running or not self.message_queue.empty():
try:
# Получаем сообщение из очереди с таймаутом
try:
interface, message = self.message_queue.get(timeout=0.1)
except Empty:
# Если очередь пуста, обрабатываем накопленный батч
if batch and (time.time() - last_batch_time) >= batch_interval:
self._process_batch(batch)
batch = []
last_batch_time = time.time()
continue
# Добавляем в батч
batch.append((interface, message))
# Обрабатываем батч если он заполнен или прошло достаточно времени
if len(batch) >= batch_size or (time.time() - last_batch_time) >= batch_interval:
self._process_batch(batch)
batch = []
last_batch_time = time.time()
except Exception as e:
self.logger.error(
f"Error in processing loop: {e}",
exc_info=True
)
# Обрабатываем оставшиеся сообщения в батче
if batch:
self._process_batch(batch)
self.logger.info(
"Message processing loop stopped",
extra={
"processed_count": self.processed_count,
"dropped_count": self.dropped_count
}
)
def _process_batch(self, batch: list) -> None:
"""
Обработка батча сообщений.
Args:
batch: Список кортежей (interface, message)
"""
if not batch:
return
try:
# Логируем сообщение
# Обрабатываем каждое сообщение в батче
for interface, message in batch:
self._process_single(interface, message)
self.processed_count += 1
except Exception as e:
self.logger.error(
f"Error processing batch: {e}",
exc_info=True,
extra={"batch_size": len(batch)}
)
def _process_single(self, interface: str, message: can.Message) -> None:
"""
Обработка одного CAN сообщения.
Args:
interface: Имя интерфейса
message: CAN сообщение
"""
try:
# Логируем только на DEBUG уровне (не блокируем)
# DEBUG логирование может быть отключено в production
self.logger.debug(
"CAN message received",
extra={
@@ -138,9 +277,31 @@ class MessageProcessor:
extra={"interface": interface}
)
def start(self) -> None:
"""Запуск обработки сообщений в отдельном потоке."""
if self.running:
self.logger.warning("Message processor is already running")
return
self.running = True
self.processing_thread = threading.Thread(
target=self._processing_loop,
name="MessageProcessor",
daemon=True
)
self.processing_thread.start()
self.logger.info("Message processor started")
def shutdown(self) -> None:
"""Корректное завершение работы процессора."""
self.logger.info("Shutting down message processor...")
self.running = False
# Ждем завершения потока обработки
if self.processing_thread and self.processing_thread.is_alive():
self.processing_thread.join(timeout=5.0)
if self.processing_thread.is_alive():
self.logger.warning("Processing thread did not stop gracefully")
# Закрываем соединения
if self.storage:
@@ -151,5 +312,19 @@ class MessageProcessor:
# self.influxdb_client.close()
pass
self.logger.info("Message processor stopped")
self.logger.info(
"Message processor stopped",
extra={
"processed_count": self.processed_count,
"dropped_count": self.dropped_count
}
)
def get_stats(self) -> dict:
"""Получение статистики процессора."""
return {
"processed_count": self.processed_count,
"dropped_count": self.dropped_count,
"queue_size": self.message_queue.qsize(),
"running": self.running
}

View File

@@ -185,7 +185,8 @@ class CANSniffer:
self.message_callback = message_callback
else:
# Автоматически используем MessageProcessor
self.message_callback = self.message_processor.process
# Метод enqueue быстрый и не блокирует чтение CAN
self.message_callback = self.message_processor.enqueue
self.bus_handlers: Dict[str, CANBusHandler] = {}
self.running = False
@@ -278,6 +279,9 @@ class CANSniffer:
extra={"interfaces": list(self.bus_handlers.keys())}
)
# Запускаем процессор сообщений первым
self.message_processor.start()
self.running = True
# Запускаем все обработчики параллельно