diff --git a/can_sniffer/src/can_frame.py b/can_sniffer/src/can_frame.py new file mode 100644 index 0000000..5791931 --- /dev/null +++ b/can_sniffer/src/can_frame.py @@ -0,0 +1,131 @@ +""" +Модуль данных для представления CAN сообщений. + +Предоставляет изолированную от библиотеки python-can структуру данных +для работы с CAN сообщениями. +""" + +from dataclasses import dataclass +from typing import Optional + + +@dataclass(frozen=True) +class CANFrame: + """ + Неизменяемая структура данных для CAN сообщения. + + Attributes: + ts_ns: Временная метка в наносекундах (Unix timestamp * 1e9) + bus: Имя шины/интерфейса (например, 'can0', 'can1') + can_id: CAN ID сообщения + is_extended: True если используется расширенный формат (29-bit ID) + dlc: Data Length Code (количество байт данных, 0-8) + data: Данные сообщения (bytes, максимум 8 байт) + """ + ts_ns: int + bus: str + can_id: int + is_extended: bool + dlc: int + data: bytes + + def __post_init__(self): + """Валидация данных после инициализации.""" + if self.dlc < 0 or self.dlc > 8: + raise ValueError(f"DLC must be between 0 and 8, got {self.dlc}") + + if len(self.data) > 8: + raise ValueError(f"Data length must be <= 8 bytes, got {len(self.data)}") + + if len(self.data) != self.dlc: + raise ValueError(f"Data length ({len(self.data)}) does not match DLC ({self.dlc})") + + if self.ts_ns < 0: + raise ValueError(f"Timestamp must be non-negative, got {self.ts_ns}") + + @property + def timestamp(self) -> float: + """Возвращает временную метку в секундах (float).""" + return self.ts_ns / 1e9 + + @property + def can_id_hex(self) -> str: + """Возвращает CAN ID в hex формате.""" + return hex(self.can_id) + + @property + def data_hex(self) -> str: + """Возвращает данные в hex формате.""" + return self.data.hex() + + def to_dict(self) -> dict: + """ + Конвертация в словарь для сериализации. + + Returns: + Словарь с данными фрейма + """ + return { + "ts_ns": self.ts_ns, + "bus": self.bus, + "can_id": self.can_id, + "is_extended": self.is_extended, + "dlc": self.dlc, + "data": self.data + } + + @classmethod + def from_can_message(cls, message, bus: str) -> 'CANFrame': + """ + Создание CANFrame из объекта can.Message (python-can). + + Args: + message: Объект can.Message из библиотеки python-can + bus: Имя шины/интерфейса + + Returns: + Экземпляр CANFrame + """ + import can + + if not isinstance(message, can.Message): + raise TypeError(f"Expected can.Message, got {type(message)}") + + # Конвертируем timestamp в наносекунды + ts_ns = int(message.timestamp * 1e9) + + # Определяем, является ли ID расширенным + is_extended = message.is_extended_id if hasattr(message, 'is_extended_id') else False + + # Получаем данные + data = message.data if message.data else b'' + + return cls( + ts_ns=ts_ns, + bus=bus, + can_id=message.arbitration_id, + is_extended=is_extended, + dlc=message.dlc, + data=data + ) + + @classmethod + def from_dict(cls, data: dict) -> 'CANFrame': + """ + Создание CANFrame из словаря. + + Args: + data: Словарь с данными фрейма + + Returns: + Экземпляр CANFrame + """ + return cls( + ts_ns=data["ts_ns"], + bus=data["bus"], + can_id=data["can_id"], + is_extended=data.get("is_extended", False), + dlc=data["dlc"], + data=data["data"] if isinstance(data["data"], bytes) else bytes(data["data"]) + ) + diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index e1ecd08..4fea44c 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -5,35 +5,31 @@ Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений. """ -import can import threading import time from queue import Queue, Empty -from typing import Optional, Tuple, Dict, Any, List +from typing import Optional, Dict, Any, List from abc import ABC, abstractmethod from logger import get_logger from config import config from storage import get_storage from influxdb_handler import get_influxdb_client +from can_frame import CANFrame logger = get_logger(__name__) -# Тип для сериализованного сообщения (примитивные данные) -SerializedMessage = Tuple[float, str, int, int, bytes] # (timestamp, interface, can_id, dlc, data) - - class MessageHandler(ABC): """Базовый класс для обработчиков сообщений.""" @abstractmethod - def process_batch(self, messages: List[SerializedMessage]) -> int: + def process_batch(self, frames: List[CANFrame]) -> int: """ - Обработка батча сообщений. + Обработка батча CAN фреймов. Args: - messages: Список сериализованных сообщений + frames: Список CANFrame объектов Returns: Количество успешно обработанных сообщений @@ -64,25 +60,35 @@ class StorageHandler(MessageHandler): self.logger.error(f"Failed to initialize storage: {e}", exc_info=True) self.storage = None - def process_batch(self, messages: List[SerializedMessage]) -> int: + def process_batch(self, frames: List[CANFrame]) -> int: """Обработка батча сообщений для сохранения в SQLite.""" - if not self.storage or not messages: + if not self.storage or not frames: return 0 try: - # Сообщения уже в правильном формате для save_messages_batch + # Конвертируем CANFrame в формат для storage + messages = [] + for frame in frames: + messages.append(( + frame.timestamp, # float timestamp в секундах + frame.bus, + frame.can_id, + frame.dlc, + frame.data + )) + saved_count = self.storage.save_messages_batch(messages) - if saved_count != len(messages): + if saved_count != len(frames): self.logger.warning( - f"Not all messages saved: {saved_count}/{len(messages)}", - extra={"batch_size": len(messages)} + f"Not all messages saved: {saved_count}/{len(frames)}", + extra={"batch_size": len(frames)} ) return saved_count except Exception as e: self.logger.error( f"Failed to save messages batch: {e}", exc_info=True, - extra={"batch_size": len(messages)} + extra={"batch_size": len(frames)} ) return 0 @@ -118,21 +124,21 @@ class InfluxDBHandler(MessageHandler): self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True) self.influxdb_client = None - def process_batch(self, messages: List[SerializedMessage]) -> int: + def process_batch(self, frames: List[CANFrame]) -> int: """Обработка батча сообщений для отправки в InfluxDB.""" - if not self.influxdb_client or not messages: + if not self.influxdb_client or not frames: return 0 try: - # Конвертируем сериализованные сообщения в формат для InfluxDB + # Конвертируем CANFrame в формат для InfluxDB influx_messages = [] - for timestamp, interface, can_id, dlc, data in messages: + for frame in frames: influx_messages.append({ - "interface": interface, - "can_id": can_id, - "dlc": dlc, - "data": data, - "timestamp": timestamp + "interface": frame.bus, + "can_id": frame.can_id, + "dlc": frame.dlc, + "data": frame.data, + "timestamp": frame.timestamp # float timestamp в секундах }) if influx_messages: @@ -142,7 +148,7 @@ class InfluxDBHandler(MessageHandler): self.logger.error( f"Failed to send messages batch to InfluxDB: {e}", exc_info=True, - extra={"batch_size": len(messages)} + extra={"batch_size": len(frames)} ) return 0 @@ -169,8 +175,8 @@ class MessageProcessor: self.logger = logger # Очередь для асинхронной обработки сообщений - # Храним сериализованные данные (примитивы) вместо объектов can.Message - self.message_queue: Queue[SerializedMessage] = Queue(maxsize=queue_size) + # Храним CANFrame объекты (неизменяемые, легковесные) + self.message_queue: Queue[CANFrame] = Queue(maxsize=queue_size) self.running = False self.processing_thread: Optional[threading.Thread] = None @@ -196,45 +202,21 @@ class MessageProcessor: extra={"handlers": [type(h).__name__ for h in self.handlers]} ) - def _serialize_message(self, interface: str, message: can.Message) -> SerializedMessage: + def enqueue(self, frame: CANFrame) -> bool: """ - Сериализация CAN сообщения в примитивные данные. - - Args: - interface: Имя интерфейса - message: CAN сообщение - - Returns: - Кортеж с примитивными данными (timestamp, interface, can_id, dlc, data) - """ - return ( - message.timestamp, - interface, - message.arbitration_id, - message.dlc, - message.data if message.data else b'' - ) - - def enqueue(self, interface: str, message: can.Message) -> bool: - """ - Добавление сообщения в очередь для асинхронной обработки. + Добавление CAN фрейма в очередь для асинхронной обработки. Этот метод вызывается из callback CAN чтения и должен быть быстрым. - Сериализуем сообщение сразу, чтобы не хранить объекты can.Message. Args: - interface: Имя интерфейса (например, 'can0') - message: CAN сообщение + frame: CANFrame объект Returns: True если сообщение добавлено, False если очередь переполнена """ try: - # Сериализуем сообщение в примитивные данные - serialized = self._serialize_message(interface, message) - # Пытаемся добавить в очередь без блокировки (non-blocking) - self.message_queue.put_nowait(serialized) + self.message_queue.put_nowait(frame) return True except: # Очередь переполнена - пропускаем сообщение @@ -252,25 +234,24 @@ class MessageProcessor: ) return False - def process(self, interface: str, message: can.Message) -> None: + def process(self, frame: CANFrame) -> None: """ - Публичный метод для обработки сообщения. + Публичный метод для обработки CAN фрейма. Используется как callback для CANSniffer. - Быстро добавляет сообщение в очередь без блокировки. + Быстро добавляет фрейм в очередь без блокировки. Args: - interface: Имя интерфейса (например, 'can0') - message: CAN сообщение + frame: CANFrame объект """ - self.enqueue(interface, message) + self.enqueue(frame) def _processing_loop(self) -> None: """Основной цикл обработки сообщений из очереди.""" self.logger.info("Message processing loop started") # Батч для групповой обработки - batch: List[SerializedMessage] = [] + batch: List[CANFrame] = [] batch_size = config.general.batch_size batch_interval = config.general.batch_interval last_batch_time = time.time() @@ -279,8 +260,8 @@ class MessageProcessor: try: # Получаем сообщение из очереди с таймаутом try: - serialized_message = self.message_queue.get(timeout=batch_interval) - batch.append(serialized_message) + frame = self.message_queue.get(timeout=batch_interval) + batch.append(frame) except Empty: # Если очередь пуста, обрабатываем накопленный батч if batch: @@ -319,12 +300,12 @@ class MessageProcessor: } ) - def _process_batch(self, batch: List[SerializedMessage]) -> None: + def _process_batch(self, batch: List[CANFrame]) -> None: """ - Обработка батча сериализованных сообщений. + Обработка батча CAN фреймов. Args: - batch: Список сериализованных сообщений + batch: Список CANFrame объектов """ if not batch: return @@ -333,17 +314,17 @@ class MessageProcessor: # Логируем батч на уровне DEBUG (если уровень DEBUG включен) # Это позволяет контролировать вывод через уровни логирования if batch: - timestamp, interface, can_id, dlc, data = batch[0] + first_frame = batch[0] self.logger.debug( "CAN message batch processed", extra={ "batch_size": len(batch), "first_message": { - "interface": interface, - "can_id": hex(can_id), - "dlc": dlc, - "data": data.hex() if data else "", - "timestamp": timestamp + "interface": first_frame.bus, + "can_id": first_frame.can_id_hex, + "dlc": first_frame.dlc, + "data": first_frame.data_hex, + "timestamp": first_frame.timestamp } } ) diff --git a/can_sniffer/src/socket_can/src.py b/can_sniffer/src/socket_can/src.py index b2b658c..ed15c64 100644 --- a/can_sniffer/src/socket_can/src.py +++ b/can_sniffer/src/socket_can/src.py @@ -13,6 +13,7 @@ from queue import Queue, Empty from config import config from logger import get_logger +from can_frame import CANFrame from .message_processor import MessageProcessor @@ -23,7 +24,7 @@ class CANBusHandler: self, interface: str, bus: can.Bus, - message_callback: Callable, + message_callback: Callable[[CANFrame], None], logger, filters: Optional[List[dict]] = None ): @@ -33,7 +34,7 @@ class CANBusHandler: Args: interface: Имя интерфейса (например, 'can0') bus: Экземпляр can.Bus - message_callback: Функция для обработки CAN сообщений + message_callback: Функция для обработки CAN сообщений (принимает CANFrame) logger: Логгер для данного интерфейса filters: Список фильтров SocketCAN """ @@ -81,14 +82,26 @@ class CANBusHandler: self.message_count += 1 self.last_message_time = time.time() + # Конвертируем can.Message в CANFrame + try: + frame = CANFrame.from_can_message(message, self.interface) + except Exception as e: + self.logger.error( + f"Failed to convert message to CANFrame for {self.interface}: {e}", + exc_info=True, + extra={"can_id": hex(message.arbitration_id) if message else None} + ) + self.error_count += 1 + continue + # Вызываем callback для обработки сообщения try: - self.message_callback(self.interface, message) + self.message_callback(frame) except Exception as e: self.logger.error( f"Error in message callback for {self.interface}: {e}", exc_info=True, - extra={"can_id": hex(message.arbitration_id)} + extra={"can_id": frame.can_id_hex} ) self.error_count += 1 @@ -166,13 +179,13 @@ class CANBusHandler: class CANSniffer: """Класс для параллельного чтения CAN сообщений с нескольких интерфейсов.""" - def __init__(self, message_callback: Optional[Callable] = None): + def __init__(self, message_callback: Optional[Callable[[CANFrame], None]] = None): """ Инициализация CAN Sniffer. Args: message_callback: Функция для обработки CAN сообщений. - Должна принимать (interface: str, message: can.Message) + Должна принимать CANFrame объект """ self.config = config.can self.logger = get_logger(__name__)