update message_processor

This commit is contained in:
2026-01-07 03:02:04 +03:00
parent 75c7601df9
commit 681021394c
3 changed files with 250 additions and 265 deletions

View File

@@ -130,10 +130,6 @@ class LoggingConfig(BaseModel):
default=5, default=5,
description="Количество резервных копий логов" description="Количество резервных копий логов"
) )
log_can_messages: bool = Field(
default=False,
description="Логировать каждое CAN сообщение на INFO уровне (может быть очень много логов)"
)
class GeneralConfig(BaseModel): class GeneralConfig(BaseModel):
@@ -145,6 +141,14 @@ class GeneralConfig(BaseModel):
default=10000, default=10000,
description="Размер буфера для данных" description="Размер буфера для данных"
) )
batch_size: int = Field(
default=1000,
description="Размер батча для обработки сообщений"
)
batch_interval: float = Field(
default=0.1,
description="Интервал обработки батча (секунды)"
)
max_retries: int = Field( max_retries: int = Field(
default=3, default=3,
description="Максимальное количество попыток повтора" description="Максимальное количество попыток повтора"

View File

@@ -9,7 +9,9 @@ import can
import threading import threading
import time import time
from queue import Queue, Empty from queue import Queue, Empty
from typing import Optional, Tuple from typing import Optional, Tuple, Dict, Any, List
from abc import ABC, abstractmethod
from logger import get_logger from logger import get_logger
from config import config from config import config
from storage import get_storage from storage import get_storage
@@ -18,24 +20,157 @@ from influxdb_handler import get_influxdb_client
logger = get_logger(__name__) logger = get_logger(__name__)
# Тип для сериализованного сообщения (примитивные данные)
SerializedMessage = Tuple[float, str, int, int, bytes] # (timestamp, interface, can_id, dlc, data)
class MessageHandler(ABC):
"""Базовый класс для обработчиков сообщений."""
@abstractmethod
def process_batch(self, messages: List[SerializedMessage]) -> int:
"""
Обработка батча сообщений.
Args:
messages: Список сериализованных сообщений
Returns:
Количество успешно обработанных сообщений
"""
pass
@abstractmethod
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики обработчика."""
pass
class StorageHandler(MessageHandler):
"""Обработчик для сохранения в SQLite."""
def __init__(self):
"""Инициализация обработчика storage."""
self.logger = logger.getChild("storage_handler")
self.storage = None
self._init_storage()
def _init_storage(self) -> None:
"""Инициализация storage."""
try:
self.storage = get_storage()
self.logger.info("Storage handler initialized")
except Exception as e:
self.logger.error(f"Failed to initialize storage: {e}", exc_info=True)
self.storage = None
def process_batch(self, messages: List[SerializedMessage]) -> int:
"""Обработка батча сообщений для сохранения в SQLite."""
if not self.storage or not messages:
return 0
try:
# Сообщения уже в правильном формате для save_messages_batch
saved_count = self.storage.save_messages_batch(messages)
if saved_count != len(messages):
self.logger.warning(
f"Not all messages saved: {saved_count}/{len(messages)}",
extra={"batch_size": len(messages)}
)
return saved_count
except Exception as e:
self.logger.error(
f"Failed to save messages batch: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
)
return 0
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики storage."""
if self.storage:
try:
return self.storage.get_stats()
except Exception:
pass
return {"initialized": False}
class InfluxDBHandler(MessageHandler):
"""Обработчик для отправки в InfluxDB."""
def __init__(self):
"""Инициализация обработчика InfluxDB."""
self.logger = logger.getChild("influxdb_handler")
self.influxdb_client = None
self._init_influxdb()
def _init_influxdb(self) -> None:
"""Инициализация InfluxDB клиента."""
if not config.influxdb.enabled:
self.logger.info("InfluxDB is disabled in configuration")
return
try:
self.influxdb_client = get_influxdb_client()
self.logger.info("InfluxDB handler initialized")
except Exception as e:
self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True)
self.influxdb_client = None
def process_batch(self, messages: List[SerializedMessage]) -> int:
"""Обработка батча сообщений для отправки в InfluxDB."""
if not self.influxdb_client or not messages:
return 0
try:
# Конвертируем сериализованные сообщения в формат для InfluxDB
influx_messages = []
for timestamp, interface, can_id, dlc, data in messages:
influx_messages.append({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
})
if influx_messages:
return self.influxdb_client.write_messages_batch(influx_messages)
return 0
except Exception as e:
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
)
return 0
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики InfluxDB."""
if self.influxdb_client:
try:
return self.influxdb_client.get_stats()
except Exception:
pass
return {"enabled": False, "initialized": False}
class MessageProcessor: class MessageProcessor:
"""Класс для обработки и сохранения CAN сообщений с асинхронной обработкой.""" """Класс для обработки и сохранения CAN сообщений с асинхронной обработкой."""
def __init__(self, queue_size: int = 10000, log_messages: bool = False): def __init__(self, queue_size: int = 10000):
""" """
Инициализация процессора сообщений. Инициализация процессора сообщений.
Args: Args:
queue_size: Максимальный размер очереди сообщений queue_size: Максимальный размер очереди сообщений
log_messages: Логировать каждое сообщение на INFO уровне (по умолчанию False)
""" """
self.logger = logger self.logger = logger
self.storage = None
self.influxdb_client = None
self.log_messages = log_messages
# Очередь для асинхронной обработки сообщений # Очередь для асинхронной обработки сообщений
self.message_queue: Queue[Tuple[str, can.Message]] = Queue(maxsize=queue_size) # Храним сериализованные данные (примитивы) вместо объектов can.Message
self.message_queue: Queue[SerializedMessage] = Queue(maxsize=queue_size)
self.running = False self.running = False
self.processing_thread: Optional[threading.Thread] = None self.processing_thread: Optional[threading.Thread] = None
@@ -44,57 +179,48 @@ class MessageProcessor:
self.dropped_count = 0 self.dropped_count = 0
self.queue_full_warnings = 0 self.queue_full_warnings = 0
# Инициализируем хранилища # Инициализируем обработчики
self._init_storage() self.handlers: List[MessageHandler] = []
self._init_influxdb() self._init_handlers()
def _init_storage(self) -> None: def _init_handlers(self) -> None:
"""Инициализация локального хранилища (SQLite).""" """Инициализация обработчиков сообщений."""
try: # Добавляем обработчики в порядке приоритета
self.storage = get_storage() self.handlers.append(StorageHandler())
self.logger.info(
"Storage initialized",
extra={
"type": config.storage.type,
"path": config.storage.database_path,
"wal_mode": config.storage.wal_mode
}
)
except Exception as e:
self.logger.error(
f"Failed to initialize storage: {e}",
exc_info=True
)
self.storage = None
def _init_influxdb(self) -> None: if config.influxdb.enabled:
"""Инициализация клиента InfluxDB.""" self.handlers.append(InfluxDBHandler())
if not config.influxdb.enabled:
self.logger.info("InfluxDB is disabled in configuration")
return
try: self.logger.info(
self.influxdb_client = get_influxdb_client() f"Initialized {len(self.handlers)} message handlers",
self.logger.info( extra={"handlers": [type(h).__name__ for h in self.handlers]}
"InfluxDB initialized", )
extra={
"url": config.influxdb.url, def _serialize_message(self, interface: str, message: can.Message) -> SerializedMessage:
"bucket": config.influxdb.bucket, """
"org": config.influxdb.org Сериализация CAN сообщения в примитивные данные.
}
) Args:
except Exception as e: interface: Имя интерфейса
self.logger.error( message: CAN сообщение
f"Failed to initialize InfluxDB client: {e}",
exc_info=True Returns:
) Кортеж с примитивными данными (timestamp, interface, can_id, dlc, data)
self.influxdb_client = None """
return (
message.timestamp,
interface,
message.arbitration_id,
message.dlc,
message.data if message.data else b''
)
def enqueue(self, interface: str, message: can.Message) -> bool: def enqueue(self, interface: str, message: can.Message) -> bool:
""" """
Добавление сообщения в очередь для асинхронной обработки. Добавление сообщения в очередь для асинхронной обработки.
Этот метод вызывается из callback CAN чтения и должен быть быстрым. Этот метод вызывается из callback CAN чтения и должен быть быстрым.
Сериализуем сообщение сразу, чтобы не хранить объекты can.Message.
Args: Args:
interface: Имя интерфейса (например, 'can0') interface: Имя интерфейса (например, 'can0')
@@ -104,8 +230,11 @@ class MessageProcessor:
True если сообщение добавлено, False если очередь переполнена True если сообщение добавлено, False если очередь переполнена
""" """
try: try:
# Сериализуем сообщение в примитивные данные
serialized = self._serialize_message(interface, message)
# Пытаемся добавить в очередь без блокировки (non-blocking) # Пытаемся добавить в очередь без блокировки (non-blocking)
self.message_queue.put_nowait((interface, message)) self.message_queue.put_nowait(serialized)
return True return True
except: except:
# Очередь переполнена - пропускаем сообщение # Очередь переполнена - пропускаем сообщение
@@ -141,32 +270,36 @@ class MessageProcessor:
self.logger.info("Message processing loop started") self.logger.info("Message processing loop started")
# Батч для групповой обработки # Батч для групповой обработки
batch = [] batch: List[SerializedMessage] = []
batch_size = config.general.buffer_size batch_size = config.general.batch_size
batch_interval = config.general.batch_interval
last_batch_time = time.time() last_batch_time = time.time()
batch_interval = 0.1 # секунды
while self.running or not self.message_queue.empty(): while self.running or not self.message_queue.empty():
try: try:
# Получаем сообщение из очереди с таймаутом # Получаем сообщение из очереди с таймаутом
try: try:
interface, message = self.message_queue.get(timeout=0.1) serialized_message = self.message_queue.get(timeout=batch_interval)
batch.append(serialized_message)
except Empty: except Empty:
# Если очередь пуста, обрабатываем накопленный батч # Если очередь пуста, обрабатываем накопленный батч
if batch and (time.time() - last_batch_time) >= batch_interval: if batch:
self._process_batch(batch) self._process_batch(batch)
batch = [] batch = []
last_batch_time = time.time() last_batch_time = time.time()
continue continue
# Добавляем в батч
batch.append((interface, message))
# Обрабатываем батч если он заполнен или прошло достаточно времени # Обрабатываем батч если он заполнен или прошло достаточно времени
if len(batch) >= batch_size or (time.time() - last_batch_time) >= batch_interval: current_time = time.time()
should_flush = (
len(batch) >= batch_size or
(batch and (current_time - last_batch_time) >= batch_interval)
)
if should_flush:
self._process_batch(batch) self._process_batch(batch)
batch = [] batch = []
last_batch_time = time.time() last_batch_time = current_time
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
@@ -186,77 +319,45 @@ class MessageProcessor:
} }
) )
def _process_batch(self, batch: list) -> None: def _process_batch(self, batch: List[SerializedMessage]) -> None:
""" """
Обработка батча сообщений. Обработка батча сериализованных сообщений.
Args: Args:
batch: Список кортежей (interface, message) batch: Список сериализованных сообщений
""" """
if not batch: if not batch:
return return
try: try:
# Подготавливаем данные для пакетного сохранения в SQLite # Логируем батч на уровне DEBUG (если уровень DEBUG включен)
messages_to_save = [] # Это позволяет контролировать вывод через уровни логирования
if batch:
for interface, message in batch: timestamp, interface, can_id, dlc, data = batch[0]
# Логируем сообщение в зависимости от настроек self.logger.debug(
if self.log_messages: "CAN message batch processed",
self.logger.info( extra={
"CAN message", "batch_size": len(batch),
extra={ "first_message": {
"interface": interface, "interface": interface,
"can_id": hex(message.arbitration_id), "can_id": hex(can_id),
"dlc": message.dlc, "dlc": dlc,
"data": message.data.hex() if message.data else "", "data": data.hex() if data else "",
"timestamp": message.timestamp "timestamp": timestamp
} }
}
)
# Обрабатываем батч через все обработчики
for handler in self.handlers:
try:
handler.process_batch(batch)
except Exception as e:
self.logger.error(
f"Error in handler {type(handler).__name__}: {e}",
exc_info=True,
extra={"batch_size": len(batch)}
) )
else:
self.logger.debug(
"CAN message received",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
# Подготавливаем данные для сохранения
messages_to_save.append((
message.timestamp,
interface,
message.arbitration_id,
message.dlc,
message.data if message.data else b''
))
# Пакетное сохранение в SQLite
if self.storage and messages_to_save:
saved_count = self.storage.save_messages_batch(messages_to_save)
if saved_count != len(messages_to_save):
self.logger.warning(
f"Not all messages saved: {saved_count}/{len(messages_to_save)}",
extra={"batch_size": len(messages_to_save)}
)
# Отправляем в InfluxDB (если включено) - пакетно
if self.influxdb_client:
influx_messages = []
for interface, message in batch:
influx_messages.append({
"interface": interface,
"can_id": message.arbitration_id,
"dlc": message.dlc,
"data": message.data if message.data else b'',
"timestamp": message.timestamp
})
if influx_messages:
self.influxdb_client.write_messages_batch(influx_messages)
# Обновляем счетчик обработанных сообщений # Обновляем счетчик обработанных сообщений
self.processed_count += len(batch) self.processed_count += len(batch)
@@ -268,119 +369,6 @@ class MessageProcessor:
extra={"batch_size": len(batch)} extra={"batch_size": len(batch)}
) )
def _process_single(self, interface: str, message: can.Message) -> None:
"""
Обработка одного CAN сообщения.
Используется как fallback, если батчинг не применяется.
Args:
interface: Имя интерфейса
message: CAN сообщение
"""
try:
# Логируем сообщение в зависимости от настроек
if self.log_messages:
self.logger.info(
"CAN message",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
else:
self.logger.debug(
"CAN message received",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
# Сохраняем в локальное хранилище (SQLite)
self._save_to_storage(interface, message)
# Отправляем в InfluxDB (если включено)
if self.influxdb_client:
self._send_to_influxdb(interface, message)
except Exception as e:
self.logger.error(
f"Error processing message: {e}",
exc_info=True,
extra={
"interface": interface,
"can_id": hex(message.arbitration_id) if message else None
}
)
def _save_to_storage(self, interface: str, message: can.Message) -> None:
"""
Сохранение сообщения в локальное хранилище (SQLite).
Args:
interface: Имя интерфейса
message: CAN сообщение
"""
if not self.storage:
return
try:
message_id = self.storage.save_message(
interface=interface,
can_id=message.arbitration_id,
dlc=message.dlc,
data=message.data if message.data else b'',
timestamp=message.timestamp
)
if message_id is None:
self.logger.warning(
"Failed to save message to storage",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id)
}
)
except Exception as e:
self.logger.error(
f"Failed to save message to storage: {e}",
exc_info=True,
extra={"interface": interface}
)
def _send_to_influxdb(self, interface: str, message: can.Message) -> None:
"""
Отправка сообщения в InfluxDB (для одиночных сообщений).
Args:
interface: Имя интерфейса
message: CAN сообщение
"""
if not self.influxdb_client:
return
try:
self.influxdb_client.write_message(
interface=interface,
can_id=message.arbitration_id,
dlc=message.dlc,
data=message.data if message.data else b'',
timestamp=message.timestamp
)
except Exception as e:
self.logger.error(
f"Failed to send message to InfluxDB: {e}",
exc_info=True,
extra={"interface": interface}
)
def start(self) -> None: def start(self) -> None:
"""Запуск обработки сообщений в отдельном потоке.""" """Запуск обработки сообщений в отдельном потоке."""
if self.running: if self.running:
@@ -388,6 +376,13 @@ class MessageProcessor:
return return
self.running = True self.running = True
# Запускаем InfluxDB forwarder, если он есть
for handler in self.handlers:
if isinstance(handler, InfluxDBHandler) and handler.influxdb_client:
handler.influxdb_client.start()
# Запускаем поток обработки сообщений
self.processing_thread = threading.Thread( self.processing_thread = threading.Thread(
target=self._processing_loop, target=self._processing_loop,
name="MessageProcessor", name="MessageProcessor",
@@ -407,22 +402,16 @@ class MessageProcessor:
if self.processing_thread.is_alive(): if self.processing_thread.is_alive():
self.logger.warning("Processing thread did not stop gracefully") self.logger.warning("Processing thread did not stop gracefully")
# Закрываем соединения # Закрываем обработчики
if self.storage: for handler in self.handlers:
try: try:
self.storage.close() if isinstance(handler, StorageHandler) and handler.storage:
handler.storage.close()
elif isinstance(handler, InfluxDBHandler) and handler.influxdb_client:
handler.influxdb_client.close()
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
f"Error closing storage: {e}", f"Error closing handler {type(handler).__name__}: {e}",
exc_info=True
)
if self.influxdb_client:
try:
self.influxdb_client.close()
except Exception as e:
self.logger.error(
f"Error closing InfluxDB client: {e}",
exc_info=True exc_info=True
) )
@@ -443,20 +432,13 @@ class MessageProcessor:
"running": self.running "running": self.running
} }
# Добавляем статистику хранилища, если оно инициализировано # Добавляем статистику всех обработчиков
if self.storage: for handler in self.handlers:
try: try:
storage_stats = self.storage.get_stats() handler_stats = handler.get_stats()
stats["storage"] = storage_stats handler_name = type(handler).__name__.replace("Handler", "").lower()
stats[handler_name] = handler_stats
except Exception as e: except Exception as e:
self.logger.debug(f"Failed to get storage stats: {e}") self.logger.debug(f"Failed to get stats from {type(handler).__name__}: {e}")
# Добавляем статистику InfluxDB, если он инициализирован
if self.influxdb_client:
try:
influx_stats = self.influxdb_client.get_stats()
stats["influxdb"] = influx_stats
except Exception as e:
self.logger.debug(f"Failed to get InfluxDB stats: {e}")
return stats return stats

View File

@@ -179,8 +179,7 @@ class CANSniffer:
# Инициализируем MessageProcessor для автоматической обработки сообщений # Инициализируем MessageProcessor для автоматической обработки сообщений
# Используем настройку из конфигурации для логирования сообщений # Используем настройку из конфигурации для логирования сообщений
log_messages = config.logging.log_can_messages self.message_processor = MessageProcessor()
self.message_processor = MessageProcessor(log_messages=log_messages)
# Используем переданный callback или процессор по умолчанию # Используем переданный callback или процессор по умолчанию
if message_callback: if message_callback: