add CANFrame model

This commit is contained in:
2026-01-07 03:13:29 +03:00
parent cd89e87e77
commit adb2cf89cd
3 changed files with 205 additions and 80 deletions

View File

@@ -0,0 +1,131 @@
"""
Модуль данных для представления CAN сообщений.
Предоставляет изолированную от библиотеки python-can структуру данных
для работы с CAN сообщениями.
"""
from dataclasses import dataclass
from typing import Optional
@dataclass(frozen=True)
class CANFrame:
"""
Неизменяемая структура данных для CAN сообщения.
Attributes:
ts_ns: Временная метка в наносекундах (Unix timestamp * 1e9)
bus: Имя шины/интерфейса (например, 'can0', 'can1')
can_id: CAN ID сообщения
is_extended: True если используется расширенный формат (29-bit ID)
dlc: Data Length Code (количество байт данных, 0-8)
data: Данные сообщения (bytes, максимум 8 байт)
"""
ts_ns: int
bus: str
can_id: int
is_extended: bool
dlc: int
data: bytes
def __post_init__(self):
"""Валидация данных после инициализации."""
if self.dlc < 0 or self.dlc > 8:
raise ValueError(f"DLC must be between 0 and 8, got {self.dlc}")
if len(self.data) > 8:
raise ValueError(f"Data length must be <= 8 bytes, got {len(self.data)}")
if len(self.data) != self.dlc:
raise ValueError(f"Data length ({len(self.data)}) does not match DLC ({self.dlc})")
if self.ts_ns < 0:
raise ValueError(f"Timestamp must be non-negative, got {self.ts_ns}")
@property
def timestamp(self) -> float:
"""Возвращает временную метку в секундах (float)."""
return self.ts_ns / 1e9
@property
def can_id_hex(self) -> str:
"""Возвращает CAN ID в hex формате."""
return hex(self.can_id)
@property
def data_hex(self) -> str:
"""Возвращает данные в hex формате."""
return self.data.hex()
def to_dict(self) -> dict:
"""
Конвертация в словарь для сериализации.
Returns:
Словарь с данными фрейма
"""
return {
"ts_ns": self.ts_ns,
"bus": self.bus,
"can_id": self.can_id,
"is_extended": self.is_extended,
"dlc": self.dlc,
"data": self.data
}
@classmethod
def from_can_message(cls, message, bus: str) -> 'CANFrame':
"""
Создание CANFrame из объекта can.Message (python-can).
Args:
message: Объект can.Message из библиотеки python-can
bus: Имя шины/интерфейса
Returns:
Экземпляр CANFrame
"""
import can
if not isinstance(message, can.Message):
raise TypeError(f"Expected can.Message, got {type(message)}")
# Конвертируем timestamp в наносекунды
ts_ns = int(message.timestamp * 1e9)
# Определяем, является ли ID расширенным
is_extended = message.is_extended_id if hasattr(message, 'is_extended_id') else False
# Получаем данные
data = message.data if message.data else b''
return cls(
ts_ns=ts_ns,
bus=bus,
can_id=message.arbitration_id,
is_extended=is_extended,
dlc=message.dlc,
data=data
)
@classmethod
def from_dict(cls, data: dict) -> 'CANFrame':
"""
Создание CANFrame из словаря.
Args:
data: Словарь с данными фрейма
Returns:
Экземпляр CANFrame
"""
return cls(
ts_ns=data["ts_ns"],
bus=data["bus"],
can_id=data["can_id"],
is_extended=data.get("is_extended", False),
dlc=data["dlc"],
data=data["data"] if isinstance(data["data"], bytes) else bytes(data["data"])
)

View File

@@ -5,35 +5,31 @@
Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений.
"""
import can
import threading
import time
from queue import Queue, Empty
from typing import Optional, Tuple, Dict, Any, List
from typing import Optional, Dict, Any, List
from abc import ABC, abstractmethod
from logger import get_logger
from config import config
from storage import get_storage
from influxdb_handler import get_influxdb_client
from can_frame import CANFrame
logger = get_logger(__name__)
# Тип для сериализованного сообщения (примитивные данные)
SerializedMessage = Tuple[float, str, int, int, bytes] # (timestamp, interface, can_id, dlc, data)
class MessageHandler(ABC):
"""Базовый класс для обработчиков сообщений."""
@abstractmethod
def process_batch(self, messages: List[SerializedMessage]) -> int:
def process_batch(self, frames: List[CANFrame]) -> int:
"""
Обработка батча сообщений.
Обработка батча CAN фреймов.
Args:
messages: Список сериализованных сообщений
frames: Список CANFrame объектов
Returns:
Количество успешно обработанных сообщений
@@ -64,25 +60,35 @@ class StorageHandler(MessageHandler):
self.logger.error(f"Failed to initialize storage: {e}", exc_info=True)
self.storage = None
def process_batch(self, messages: List[SerializedMessage]) -> int:
def process_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча сообщений для сохранения в SQLite."""
if not self.storage or not messages:
if not self.storage or not frames:
return 0
try:
# Сообщения уже в правильном формате для save_messages_batch
# Конвертируем CANFrame в формат для storage
messages = []
for frame in frames:
messages.append((
frame.timestamp, # float timestamp в секундах
frame.bus,
frame.can_id,
frame.dlc,
frame.data
))
saved_count = self.storage.save_messages_batch(messages)
if saved_count != len(messages):
if saved_count != len(frames):
self.logger.warning(
f"Not all messages saved: {saved_count}/{len(messages)}",
extra={"batch_size": len(messages)}
f"Not all messages saved: {saved_count}/{len(frames)}",
extra={"batch_size": len(frames)}
)
return saved_count
except Exception as e:
self.logger.error(
f"Failed to save messages batch: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
extra={"batch_size": len(frames)}
)
return 0
@@ -118,21 +124,21 @@ class InfluxDBHandler(MessageHandler):
self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True)
self.influxdb_client = None
def process_batch(self, messages: List[SerializedMessage]) -> int:
def process_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча сообщений для отправки в InfluxDB."""
if not self.influxdb_client or not messages:
if not self.influxdb_client or not frames:
return 0
try:
# Конвертируем сериализованные сообщения в формат для InfluxDB
# Конвертируем CANFrame в формат для InfluxDB
influx_messages = []
for timestamp, interface, can_id, dlc, data in messages:
for frame in frames:
influx_messages.append({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
"interface": frame.bus,
"can_id": frame.can_id,
"dlc": frame.dlc,
"data": frame.data,
"timestamp": frame.timestamp # float timestamp в секундах
})
if influx_messages:
@@ -142,7 +148,7 @@ class InfluxDBHandler(MessageHandler):
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
extra={"batch_size": len(frames)}
)
return 0
@@ -169,8 +175,8 @@ class MessageProcessor:
self.logger = logger
# Очередь для асинхронной обработки сообщений
# Храним сериализованные данные (примитивы) вместо объектов can.Message
self.message_queue: Queue[SerializedMessage] = Queue(maxsize=queue_size)
# Храним CANFrame объекты (неизменяемые, легковесные)
self.message_queue: Queue[CANFrame] = Queue(maxsize=queue_size)
self.running = False
self.processing_thread: Optional[threading.Thread] = None
@@ -196,45 +202,21 @@ class MessageProcessor:
extra={"handlers": [type(h).__name__ for h in self.handlers]}
)
def _serialize_message(self, interface: str, message: can.Message) -> SerializedMessage:
def enqueue(self, frame: CANFrame) -> bool:
"""
Сериализация CAN сообщения в примитивные данные.
Args:
interface: Имя интерфейса
message: CAN сообщение
Returns:
Кортеж с примитивными данными (timestamp, interface, can_id, dlc, data)
"""
return (
message.timestamp,
interface,
message.arbitration_id,
message.dlc,
message.data if message.data else b''
)
def enqueue(self, interface: str, message: can.Message) -> bool:
"""
Добавление сообщения в очередь для асинхронной обработки.
Добавление CAN фрейма в очередь для асинхронной обработки.
Этот метод вызывается из callback CAN чтения и должен быть быстрым.
Сериализуем сообщение сразу, чтобы не хранить объекты can.Message.
Args:
interface: Имя интерфейса (например, 'can0')
message: CAN сообщение
frame: CANFrame объект
Returns:
True если сообщение добавлено, False если очередь переполнена
"""
try:
# Сериализуем сообщение в примитивные данные
serialized = self._serialize_message(interface, message)
# Пытаемся добавить в очередь без блокировки (non-blocking)
self.message_queue.put_nowait(serialized)
self.message_queue.put_nowait(frame)
return True
except:
# Очередь переполнена - пропускаем сообщение
@@ -252,25 +234,24 @@ class MessageProcessor:
)
return False
def process(self, interface: str, message: can.Message) -> None:
def process(self, frame: CANFrame) -> None:
"""
Публичный метод для обработки сообщения.
Публичный метод для обработки CAN фрейма.
Используется как callback для CANSniffer.
Быстро добавляет сообщение в очередь без блокировки.
Быстро добавляет фрейм в очередь без блокировки.
Args:
interface: Имя интерфейса (например, 'can0')
message: CAN сообщение
frame: CANFrame объект
"""
self.enqueue(interface, message)
self.enqueue(frame)
def _processing_loop(self) -> None:
"""Основной цикл обработки сообщений из очереди."""
self.logger.info("Message processing loop started")
# Батч для групповой обработки
batch: List[SerializedMessage] = []
batch: List[CANFrame] = []
batch_size = config.general.batch_size
batch_interval = config.general.batch_interval
last_batch_time = time.time()
@@ -279,8 +260,8 @@ class MessageProcessor:
try:
# Получаем сообщение из очереди с таймаутом
try:
serialized_message = self.message_queue.get(timeout=batch_interval)
batch.append(serialized_message)
frame = self.message_queue.get(timeout=batch_interval)
batch.append(frame)
except Empty:
# Если очередь пуста, обрабатываем накопленный батч
if batch:
@@ -319,12 +300,12 @@ class MessageProcessor:
}
)
def _process_batch(self, batch: List[SerializedMessage]) -> None:
def _process_batch(self, batch: List[CANFrame]) -> None:
"""
Обработка батча сериализованных сообщений.
Обработка батча CAN фреймов.
Args:
batch: Список сериализованных сообщений
batch: Список CANFrame объектов
"""
if not batch:
return
@@ -333,17 +314,17 @@ class MessageProcessor:
# Логируем батч на уровне DEBUG (если уровень DEBUG включен)
# Это позволяет контролировать вывод через уровни логирования
if batch:
timestamp, interface, can_id, dlc, data = batch[0]
first_frame = batch[0]
self.logger.debug(
"CAN message batch processed",
extra={
"batch_size": len(batch),
"first_message": {
"interface": interface,
"can_id": hex(can_id),
"dlc": dlc,
"data": data.hex() if data else "",
"timestamp": timestamp
"interface": first_frame.bus,
"can_id": first_frame.can_id_hex,
"dlc": first_frame.dlc,
"data": first_frame.data_hex,
"timestamp": first_frame.timestamp
}
}
)

View File

@@ -13,6 +13,7 @@ from queue import Queue, Empty
from config import config
from logger import get_logger
from can_frame import CANFrame
from .message_processor import MessageProcessor
@@ -23,7 +24,7 @@ class CANBusHandler:
self,
interface: str,
bus: can.Bus,
message_callback: Callable,
message_callback: Callable[[CANFrame], None],
logger,
filters: Optional[List[dict]] = None
):
@@ -33,7 +34,7 @@ class CANBusHandler:
Args:
interface: Имя интерфейса (например, 'can0')
bus: Экземпляр can.Bus
message_callback: Функция для обработки CAN сообщений
message_callback: Функция для обработки CAN сообщений (принимает CANFrame)
logger: Логгер для данного интерфейса
filters: Список фильтров SocketCAN
"""
@@ -81,14 +82,26 @@ class CANBusHandler:
self.message_count += 1
self.last_message_time = time.time()
# Конвертируем can.Message в CANFrame
try:
frame = CANFrame.from_can_message(message, self.interface)
except Exception as e:
self.logger.error(
f"Failed to convert message to CANFrame for {self.interface}: {e}",
exc_info=True,
extra={"can_id": hex(message.arbitration_id) if message else None}
)
self.error_count += 1
continue
# Вызываем callback для обработки сообщения
try:
self.message_callback(self.interface, message)
self.message_callback(frame)
except Exception as e:
self.logger.error(
f"Error in message callback for {self.interface}: {e}",
exc_info=True,
extra={"can_id": hex(message.arbitration_id)}
extra={"can_id": frame.can_id_hex}
)
self.error_count += 1
@@ -166,13 +179,13 @@ class CANBusHandler:
class CANSniffer:
"""Класс для параллельного чтения CAN сообщений с нескольких интерфейсов."""
def __init__(self, message_callback: Optional[Callable] = None):
def __init__(self, message_callback: Optional[Callable[[CANFrame], None]] = None):
"""
Инициализация CAN Sniffer.
Args:
message_callback: Функция для обработки CAN сообщений.
Должна принимать (interface: str, message: can.Message)
Должна принимать CANFrame объект
"""
self.config = config.can
self.logger = get_logger(__name__)