Add source

This commit is contained in:
2026-01-06 18:08:45 +03:00
parent 5a3b5e2908
commit edbf6277da
12 changed files with 1535 additions and 0 deletions

1
.cursorignore Normal file
View File

@@ -0,0 +1 @@
# Add directories or file patterns to ignore during indexing (e.g. foo/ or *.csv)

89
.gitignore vendored Normal file
View File

@@ -0,0 +1,89 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Virtual Environments
venv/
env/
ENV/
env.bak/
venv.bak/
.venv/
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
.DS_Store
# Environment variables
.env
.env.local
.env.*.local
# Logs
*.log
logs/
# Database files (SQLite)
*.db
*.db-shm
*.db-wal
*.sqlite
*.sqlite3
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
.hypothesis/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# Project specific
can_offline.db*
can_edge.log*
config.json
.cursor/

View File

@@ -0,0 +1,4 @@
pydantic>=2.0.0
pydantic-settings>=2.0.0
python-can>=4.0.0

357
can_sniffer/src/config.py Normal file
View File

@@ -0,0 +1,357 @@
"""
Модуль конфигурации для CAN Sniffer проекта.
Использует pydantic-settings для типобезопасной конфигурации с валидацией
и поддержкой загрузки из файла и переменных окружения.
"""
from pathlib import Path
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class CanConfig(BaseModel):
"""Конфигурация CAN интерфейсов."""
model_config = {"extra": "ignore"}
interfaces: List[str] = Field(
default=["can0", "can1"],
description="Список CAN интерфейсов для мониторинга"
)
listen_only: bool = Field(
default=True,
description="Режим только чтения (listen-only mode)"
)
bitrate: int = Field(
default=500000,
description="Скорость передачи CAN (бит/с)"
)
filters: List[dict] = Field(
default_factory=list,
description="Список фильтров SocketCAN: [{'can_id': 0x123, 'can_mask': 0x7FF}, ...]"
)
@field_validator('interfaces', mode='before')
@classmethod
def parse_interfaces(cls, v):
"""Парсинг интерфейсов из строки (для env переменных)."""
if isinstance(v, str):
return [item.strip() for item in v.split(',')]
return v
class StorageConfig(BaseModel):
"""Конфигурация локального хранилища (SQLite)."""
model_config = {"extra": "ignore"}
type: str = Field(
default="sqlite",
description="Тип хранилища"
)
database_path: str = Field(
default="can_offline.db",
description="Путь к файлу базы данных SQLite"
)
wal_mode: bool = Field(
default=True,
description="Включить режим WAL (Write-Ahead Logging)"
)
sync_mode: str = Field(
default="NORMAL",
description="Режим синхронизации: NORMAL, FULL, OFF"
)
class InfluxDBConfig(BaseModel):
"""Конфигурация InfluxDB."""
model_config = {"extra": "ignore"}
enabled: bool = Field(
default=True,
description="Включить отправку данных в InfluxDB"
)
url: str = Field(
default="http://localhost:8086",
description="URL сервера InfluxDB"
)
token: str = Field(
default="",
description="Токен аутентификации InfluxDB"
)
org: str = Field(
default="",
description="Организация InfluxDB"
)
bucket: str = Field(
default="can_data",
description="Имя bucket для данных"
)
batch_size: int = Field(
default=1000,
description="Размер батча для отправки данных"
)
flush_interval: int = Field(
default=5,
description="Интервал отправки батча (секунды)"
)
timeout: int = Field(
default=10,
description="Таймаут подключения (секунды)"
)
class LoggingConfig(BaseModel):
"""Конфигурация логирования."""
model_config = {"extra": "ignore"}
level: str = Field(
default="INFO",
description="Уровень логирования: DEBUG, INFO, WARNING, ERROR, CRITICAL"
)
format: str = Field(
default="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
description="Формат логов"
)
file: str = Field(
default="can_edge.log",
description="Имя файла для логов"
)
max_bytes: int = Field(
default=10485760,
description="Максимальный размер файла лога (байты)"
)
backup_count: int = Field(
default=5,
description="Количество резервных копий логов"
)
class GeneralConfig(BaseModel):
"""Общие настройки."""
model_config = {"extra": "ignore"}
buffer_size: int = Field(
default=10000,
description="Размер буфера для данных"
)
max_retries: int = Field(
default=3,
description="Максимальное количество попыток повтора"
)
retry_delay: float = Field(
default=1.0,
description="Задержка между попытками (секунды)"
)
class Config(BaseSettings):
"""Главный класс конфигурации проекта."""
model_config = SettingsConfigDict(
env_prefix="CAN_SNIFFER_",
env_nested_delimiter="__",
case_sensitive=False,
extra="ignore",
)
can: CanConfig = Field(default_factory=CanConfig)
storage: StorageConfig = Field(default_factory=StorageConfig)
influxdb: InfluxDBConfig = Field(default_factory=InfluxDBConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)
general: GeneralConfig = Field(default_factory=GeneralConfig)
@classmethod
def _find_config_file(cls) -> Optional[Path]:
"""Поиск конфигурационного файла."""
# Определяем правильный путь к корню проекта can_sniffer
# __file__ = can_sniffer/src/config.py
# parent = can_sniffer/src
# parent.parent = can_sniffer
project_root = Path(__file__).parent.parent
config_paths = [
project_root / "config.json", # can_sniffer/config.json
Path(__file__).parent / "config.json", # can_sniffer/src/config.json
Path.home() / ".can_sniffer" / "config.json",
]
for config_path in config_paths:
if config_path.exists():
return config_path
return None
def __init__(self, **kwargs):
"""Инициализация конфигурации с загрузкой из JSON файла."""
# Если kwargs пусты, пытаемся загрузить из файла
if not kwargs:
config_file = self._find_config_file()
if config_file:
import json
try:
with open(config_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Передаем данные из JSON в super().__init__()
# Pydantic автоматически создаст вложенные объекты CanConfig, StorageConfig и т.д.
super().__init__(**json_data)
return
except Exception as e:
# Если не удалось загрузить JSON, выводим предупреждение
import warnings
import traceback
warnings.warn(
f"Failed to load config from {config_file}: {e}\n"
f"Traceback: {traceback.format_exc()}\n"
f"Using defaults."
)
# Инициализация с переданными kwargs или defaults
super().__init__(**kwargs)
@classmethod
def load_from_file(cls, file_path: Optional[Path] = None) -> 'Config':
"""Загрузка конфигурации из указанного файла или поиск автоматически.
Args:
file_path: Путь к конфигурационному файлу. Если None, выполняется поиск.
Returns:
Экземпляр Config
"""
if file_path is None:
file_path = cls._find_config_file()
if file_path and file_path.exists():
import json
try:
with open(file_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
return cls.model_validate(json_data)
except Exception as e:
import warnings
warnings.warn(f"Failed to load config from {file_path}: {e}")
return cls()
def get(self, key_path: str, default=None):
"""Получение значения конфигурации по пути через точку.
Args:
key_path: Путь к значению через точку, например 'can.interfaces'
default: Значение по умолчанию, если ключ не найден
Returns:
Значение конфигурации или default
Example:
>>> config.get('can.interfaces')
['can0', 'can1']
"""
keys = key_path.split('.')
current = self
for key in keys:
if hasattr(current, key):
current = getattr(current, key)
elif isinstance(current, dict) and key in current:
current = current[key]
else:
return default
return current
def get_section(self, section: str):
"""Получение всей секции конфигурации.
Args:
section: Имя секции, например 'can', 'influxdb'
Returns:
Объект конфигурации секции
Example:
>>> can_config = config.get_section('can')
>>> print(can_config.interfaces)
"""
return getattr(self, section, None)
# Глобальный экземпляр конфигурации (singleton)
_config_instance: Optional[Config] = None
def get_config(reload: bool = False) -> Config:
"""Получение глобального экземпляра конфигурации.
Args:
reload: Если True, перезагружает конфигурацию из файла
Returns:
Экземпляр Config
Example:
>>> from config import get_config
>>> config = get_config()
>>> interfaces = config.can.interfaces
>>> # Перезагрузить конфигурацию после изменения файла
>>> config = get_config(reload=True)
"""
global _config_instance
if _config_instance is None or reload:
_config_instance = Config()
return _config_instance
def reload_config() -> Config:
"""Перезагрузка конфигурации из файла.
Returns:
Перезагруженный экземпляр Config
Example:
>>> from config import reload_config
>>> config = reload_config()
"""
return get_config(reload=True)
# Для обратной совместимости и удобства
# Используем прокси для автоматического доступа к актуальной конфигурации
class _ConfigProxy:
"""Прокси для глобального доступа к конфигурации с поддержкой перезагрузки."""
def __getattr__(self, name):
"""Делегирование доступа к атрибутам конфигурации."""
# Всегда получаем актуальный экземпляр конфигурации
return getattr(get_config(), name)
def reload(self):
"""Перезагрузка конфигурации из файла."""
global _config_instance
_config_instance = None # Сбрасываем singleton
return reload_config()
def __repr__(self):
"""Строковое представление прокси."""
return f"ConfigProxy({get_config()})"
# Поддержка прямого доступа к методам Config
def get(self, key_path: str, default=None):
"""Получение значения по пути."""
return get_config().get(key_path, default)
def get_section(self, section: str):
"""Получение секции конфигурации."""
return get_config().get_section(section)
# Глобальный прокси для удобного доступа
# ВАЖНО: После изменения config.json нужно вызвать config.reload() или перезапустить приложение
config = _ConfigProxy()

207
can_sniffer/src/logger.py Normal file
View File

@@ -0,0 +1,207 @@
"""
Модуль логирования для CAN Sniffer проекта.
Предоставляет централизованную систему логирования с поддержкой:
- Structured logging
- Ротации логов
- Записи в файл и консоль
- Интеграции с модулем config
"""
import logging
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional
from config import get_config
class StructuredFormatter(logging.Formatter):
"""Форматтер для structured logging с дополнительными полями."""
def format(self, record: logging.LogRecord) -> str:
"""Форматирование записи лога с поддержкой дополнительных полей."""
# Добавляем дополнительные поля из extra, если они есть
extra_fields = {}
for key, value in record.__dict__.items():
if key not in [
'name', 'msg', 'args', 'created', 'filename', 'funcName',
'levelname', 'levelno', 'lineno', 'module', 'msecs',
'message', 'pathname', 'process', 'processName', 'relativeCreated',
'thread', 'threadName', 'exc_info', 'exc_text', 'stack_info'
]:
extra_fields[key] = value
# Форматируем основное сообщение
message = super().format(record)
# Добавляем дополнительные поля в конец сообщения
if extra_fields:
extra_str = ' | '.join(f'{k}={v}' for k, v in extra_fields.items())
message = f'{message} | {extra_str}'
return message
class Logger:
"""Класс для управления логированием проекта."""
_instance: Optional['Logger'] = None
_initialized: bool = False
def __new__(cls):
"""Singleton паттерн для единого экземпляра логгера."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Инициализация системы логирования."""
if self._initialized:
return
self.config = get_config()
self.log_config = self.config.logging
# Настройка корневого логгера
self.root_logger = logging.getLogger('can_sniffer')
self.root_logger.setLevel(self._get_log_level())
# Очищаем существующие обработчики
self.root_logger.handlers.clear()
# Создаем форматтер
formatter = StructuredFormatter(self.log_config.format)
# Обработчик для файла с ротацией
self._setup_file_handler(formatter)
# Обработчик для консоли
self._setup_console_handler(formatter)
# Предотвращаем распространение на корневой логгер
self.root_logger.propagate = False
self._initialized = True
def _get_log_level(self) -> int:
"""Преобразование строкового уровня в числовой."""
level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
}
return level_map.get(self.log_config.level.upper(), logging.INFO)
def _setup_file_handler(self, formatter: logging.Formatter) -> None:
"""Настройка обработчика для записи в файл с ротацией."""
log_file = Path(self.log_config.file)
# Создаем директорию для логов, если её нет
log_file.parent.mkdir(parents=True, exist_ok=True)
# Создаем RotatingFileHandler
file_handler = RotatingFileHandler(
filename=str(log_file),
maxBytes=self.log_config.max_bytes,
backupCount=self.log_config.backup_count,
encoding='utf-8',
)
file_handler.setLevel(self._get_log_level())
file_handler.setFormatter(formatter)
self.root_logger.addHandler(file_handler)
def _setup_console_handler(self, formatter: logging.Formatter) -> None:
"""Настройка обработчика для записи в консоль."""
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(self._get_log_level())
console_handler.setFormatter(formatter)
self.root_logger.addHandler(console_handler)
def get_logger(self, name: Optional[str] = None) -> logging.Logger:
"""Получение логгера для конкретного модуля.
Args:
name: Имя модуля (обычно __name__). Если None, возвращается корневой логгер.
Returns:
Экземпляр logging.Logger
Example:
>>> logger = Logger().get_logger(__name__)
>>> logger.info("Message")
"""
if name is None:
return self.root_logger
# Создаем дочерний логгер с именем модуля
child_logger = self.root_logger.getChild(name)
return child_logger
def set_level(self, level: str) -> None:
"""Изменение уровня логирования во время выполнения.
Args:
level: Новый уровень логирования (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL,
}
numeric_level = level_map.get(level.upper(), logging.INFO)
self.root_logger.setLevel(numeric_level)
# Обновляем уровень для всех обработчиков
for handler in self.root_logger.handlers:
handler.setLevel(numeric_level)
def reload(self) -> None:
"""Перезагрузка конфигурации логирования.
Переинициализирует систему логирования с текущими настройками из config.
"""
self._initialized = False
# Обновляем ссылку на конфигурацию
self.config = get_config()
self.log_config = self.config.logging
# Переинициализируем
self.__init__()
# Глобальный экземпляр логгера
_logger_instance: Optional[Logger] = None
def get_logger(name: Optional[str] = None) -> logging.Logger:
"""Получение логгера для использования в модулях.
Args:
name: Имя модуля (обычно __name__). Если None, возвращается корневой логгер.
Returns:
Экземпляр logging.Logger
Example:
>>> from logger import get_logger
>>> logger = get_logger(__name__)
>>> logger.info("Application started")
>>> logger.info("CAN frame received", extra={"can_id": 0x123, "interface": "can0"})
"""
global _logger_instance
if _logger_instance is None:
_logger_instance = Logger()
return _logger_instance.get_logger(name)
# Для удобства - корневой логгер
logger = get_logger()

77
can_sniffer/src/main.py Normal file
View File

@@ -0,0 +1,77 @@
"""
Главный модуль CAN Sniffer приложения.
Только код запуска приложения. Вся логика обработки сообщений
автоматически применяется в модуле socket_can.
"""
import signal
import sys
import time
from config import config
from logger import get_logger
from socket_can import CANSniffer
# Инициализация логгера
logger = get_logger(__name__)
# Глобальная переменная для graceful shutdown
sniffer: CANSniffer = None
def signal_handler(sig, frame):
"""Обработчик сигналов для graceful shutdown."""
logger.info("Received shutdown signal, stopping gracefully...")
if sniffer:
sniffer.stop()
sys.exit(0)
def main():
"""Главная функция приложения - только запуск."""
global sniffer
# Регистрируем обработчики сигналов для graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info("CAN Sniffer application starting", extra={
"interfaces": config.can.interfaces,
"bitrate": config.can.bitrate,
"listen_only": config.can.listen_only
})
logger.info("Configuration loaded", extra={
"influxdb_enabled": config.influxdb.enabled,
"influxdb_url": config.influxdb.url if config.influxdb.enabled else None,
"storage_path": config.storage.database_path
})
try:
# Создаем и запускаем CAN Sniffer
# MessageProcessor автоматически инициализируется и используется внутри CANSniffer
sniffer = CANSniffer()
sniffer.start()
logger.info("Application initialized successfully. Reading CAN messages...")
logger.info("Press Ctrl+C to stop")
# Основной цикл - периодически выводим статистику
while True:
time.sleep(10) # Выводим статистику каждые 10 секунд
stats = sniffer.get_stats()
logger.info("Statistics", extra=stats)
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
finally:
if sniffer:
sniffer.stop()
logger.info("Application stopped")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,7 @@
"""Модуль для работы с SocketCAN интерфейсами."""
from .src import CANSniffer, CANBusHandler
from .message_processor import MessageProcessor
__all__ = ['CANSniffer', 'CANBusHandler', 'MessageProcessor']

View File

@@ -0,0 +1,155 @@
"""
Модуль для обработки CAN сообщений.
Обрабатывает входящие CAN сообщения и сохраняет их в SQLite и InfluxDB.
"""
import can
from typing import Optional
from logger import get_logger
from config import config
logger = get_logger(__name__)
class MessageProcessor:
"""Класс для обработки и сохранения CAN сообщений."""
def __init__(self):
"""Инициализация процессора сообщений."""
self.logger = logger
self.storage = None
self.influxdb_client = None
# Инициализируем хранилища
self._init_storage()
self._init_influxdb()
def _init_storage(self) -> None:
"""Инициализация локального хранилища (SQLite)."""
# TODO: Инициализация SQLite хранилища
# from storage import Storage
# self.storage = Storage(config.storage)
self.logger.info(
"Storage initialization",
extra={
"type": config.storage.type,
"path": config.storage.database_path
}
)
def _init_influxdb(self) -> None:
"""Инициализация клиента InfluxDB."""
if not config.influxdb.enabled:
self.logger.info("InfluxDB is disabled in configuration")
return
# TODO: Инициализация InfluxDB клиента
# from influxdb_client import InfluxDBClient
# self.influxdb_client = InfluxDBClient(config.influxdb)
self.logger.info(
"InfluxDB initialization",
extra={
"url": config.influxdb.url,
"bucket": config.influxdb.bucket
}
)
def process(self, interface: str, message: can.Message) -> None:
"""
Обработка CAN сообщения.
Args:
interface: Имя интерфейса (например, 'can0')
message: CAN сообщение
"""
try:
# Логируем сообщение
self.logger.debug(
"CAN message received",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
# Сохраняем в локальное хранилище (SQLite)
self._save_to_storage(interface, message)
# Отправляем в InfluxDB (если включено)
if self.influxdb_client:
self._send_to_influxdb(interface, message)
except Exception as e:
self.logger.error(
f"Error processing message: {e}",
exc_info=True,
extra={
"interface": interface,
"can_id": hex(message.arbitration_id) if message else None
}
)
def _save_to_storage(self, interface: str, message: can.Message) -> None:
"""
Сохранение сообщения в локальное хранилище (SQLite).
Args:
interface: Имя интерфейса
message: CAN сообщение
"""
if not self.storage:
# TODO: Реализовать сохранение в SQLite
# self.storage.save_message(interface, message)
return
try:
# self.storage.save_message(interface, message)
pass
except Exception as e:
self.logger.error(
f"Failed to save message to storage: {e}",
exc_info=True,
extra={"interface": interface}
)
def _send_to_influxdb(self, interface: str, message: can.Message) -> None:
"""
Отправка сообщения в InfluxDB.
Args:
interface: Имя интерфейса
message: CAN сообщение
"""
if not self.influxdb_client:
return
try:
# TODO: Реализовать отправку в InfluxDB
# self.influxdb_client.write_message(interface, message)
pass
except Exception as e:
self.logger.error(
f"Failed to send message to InfluxDB: {e}",
exc_info=True,
extra={"interface": interface}
)
def shutdown(self) -> None:
"""Корректное завершение работы процессора."""
self.logger.info("Shutting down message processor...")
# Закрываем соединения
if self.storage:
# self.storage.close()
pass
if self.influxdb_client:
# self.influxdb_client.close()
pass
self.logger.info("Message processor stopped")

View File

@@ -0,0 +1,324 @@
"""
Модуль для работы с SocketCAN интерфейсами.
Предоставляет параллельное чтение CAN сообщений с нескольких интерфейсов
с поддержкой обработки ошибок, логирования и graceful shutdown.
"""
import can
import threading
import time
from typing import Callable, Dict, List, Optional
from queue import Queue, Empty
from config import config
from logger import get_logger
from .message_processor import MessageProcessor
class CANBusHandler:
"""Обработчик для одной CAN шины."""
def __init__(
self,
interface: str,
bus: can.Bus,
message_callback: Callable,
logger,
filters: Optional[List[dict]] = None
):
"""
Инициализация обработчика CAN шины.
Args:
interface: Имя интерфейса (например, 'can0')
bus: Экземпляр can.Bus
message_callback: Функция для обработки CAN сообщений
logger: Логгер для данного интерфейса
filters: Список фильтров SocketCAN
"""
self.interface = interface
self.bus = bus
self.message_callback = message_callback
self.logger = logger
self.filters = filters or []
self.running = False
self.thread: Optional[threading.Thread] = None
self.message_count = 0
self.error_count = 0
self.last_message_time: Optional[float] = None
# Применяем фильтры, если они есть
if self.filters:
self._apply_filters()
def _apply_filters(self) -> None:
"""Применение фильтров SocketCAN к шине."""
try:
# SocketCAN фильтры применяются через set_filters
# Формат: [{"can_id": 0x123, "can_mask": 0x7FF}, ...]
self.bus.set_filters(self.filters)
self.logger.info(
f"Applied {len(self.filters)} filters to {self.interface}",
extra={"filters": self.filters}
)
except Exception as e:
self.logger.warning(
f"Failed to apply filters to {self.interface}: {e}",
exc_info=True
)
def _read_loop(self) -> None:
"""Основной цикл чтения сообщений с шины."""
self.logger.info(f"Starting read loop for {self.interface}")
while self.running:
try:
# Читаем сообщение с таймаутом для возможности проверки running
message = self.bus.recv(timeout=0.1)
if message is not None:
self.message_count += 1
self.last_message_time = time.time()
# Вызываем callback для обработки сообщения
try:
self.message_callback(self.interface, message)
except Exception as e:
self.logger.error(
f"Error in message callback for {self.interface}: {e}",
exc_info=True,
extra={"can_id": hex(message.arbitration_id)}
)
self.error_count += 1
except can.CanError as e:
self.logger.error(
f"CAN error on {self.interface}: {e}",
exc_info=True
)
self.error_count += 1
# Небольшая задержка перед повторной попыткой
time.sleep(0.1)
except Exception as e:
self.logger.error(
f"Unexpected error on {self.interface}: {e}",
exc_info=True
)
self.error_count += 1
time.sleep(0.1)
self.logger.info(
f"Read loop stopped for {self.interface}",
extra={
"total_messages": self.message_count,
"total_errors": self.error_count
}
)
def start(self) -> None:
"""Запуск чтения сообщений в отдельном потоке."""
if self.running:
self.logger.warning(f"{self.interface} is already running")
return
self.running = True
self.thread = threading.Thread(
target=self._read_loop,
name=f"CAN-{self.interface}",
daemon=True
)
self.thread.start()
self.logger.info(f"Started reading from {self.interface}")
def stop(self) -> None:
"""Остановка чтения сообщений."""
if not self.running:
return
self.logger.info(f"Stopping {self.interface}...")
self.running = False
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)
if self.thread.is_alive():
self.logger.warning(f"Thread for {self.interface} did not stop gracefully")
# Закрываем шину
try:
self.bus.shutdown()
self.logger.info(f"Bus {self.interface} closed")
except Exception as e:
self.logger.error(f"Error closing bus {self.interface}: {e}", exc_info=True)
def get_stats(self) -> Dict:
"""Получение статистики по обработке сообщений."""
return {
"interface": self.interface,
"message_count": self.message_count,
"error_count": self.error_count,
"last_message_time": self.last_message_time,
"running": self.running
}
class CANSniffer:
"""Класс для параллельного чтения CAN сообщений с нескольких интерфейсов."""
def __init__(self, message_callback: Optional[Callable] = None):
"""
Инициализация CAN Sniffer.
Args:
message_callback: Функция для обработки CAN сообщений.
Должна принимать (interface: str, message: can.Message)
"""
self.config = config.can
self.logger = get_logger(__name__)
# Инициализируем MessageProcessor для автоматической обработки сообщений
self.message_processor = MessageProcessor()
# Используем переданный callback или процессор по умолчанию
if message_callback:
self.message_callback = message_callback
else:
# Автоматически используем MessageProcessor
self.message_callback = self.message_processor.process
self.bus_handlers: Dict[str, CANBusHandler] = {}
self.running = False
self._init_buses()
def _init_buses(self) -> None:
"""Инициализация CAN шин из конфигурации."""
self.logger.info(
"Initializing CAN buses",
extra={
"interfaces": self.config.interfaces,
"listen_only": self.config.listen_only,
"bitrate": self.config.bitrate
}
)
for interface in self.config.interfaces:
try:
bus = self._create_bus(interface)
handler = CANBusHandler(
interface=interface,
bus=bus,
message_callback=self.message_callback,
logger=self.logger.getChild(f"bus.{interface}"),
filters=self.config.filters
)
self.bus_handlers[interface] = handler
self.logger.info(f"Initialized bus: {interface}")
except Exception as e:
self.logger.error(
f"Failed to initialize bus {interface}: {e}",
exc_info=True,
extra={"interface": interface}
)
def _create_bus(self, interface: str) -> can.Bus:
"""
Создание CAN шины для интерфейса.
Args:
interface: Имя интерфейса (например, 'can0')
Returns:
Экземпляр can.Bus
"""
bus_kwargs = {
"channel": interface,
"bustype": "socketcan",
"receive_own_messages": False,
}
# Добавляем listen-only режим, если указан в конфигурации
if self.config.listen_only:
# Для SocketCAN listen-only режим устанавливается через параметр
# В некоторых версиях python-can это может быть через receive_own_messages=False
# и отдельным параметром, но для SocketCAN обычно достаточно receive_own_messages=False
pass
try:
bus = can.interface.Bus(**bus_kwargs)
self.logger.debug(
f"Created bus for {interface}",
extra={"kwargs": bus_kwargs}
)
return bus
except can.CanError as e:
self.logger.error(
f"CAN error creating bus for {interface}: {e}",
exc_info=True
)
raise
except Exception as e:
self.logger.error(
f"Unexpected error creating bus for {interface}: {e}",
exc_info=True
)
raise
def start(self) -> None:
"""Запуск чтения со всех шин."""
if self.running:
self.logger.warning("CANSniffer is already running")
return
self.logger.info(
f"Starting CANSniffer with {len(self.bus_handlers)} buses",
extra={"interfaces": list(self.bus_handlers.keys())}
)
self.running = True
# Запускаем все обработчики параллельно
for handler in self.bus_handlers.values():
handler.start()
self.logger.info("CANSniffer started successfully")
def stop(self) -> None:
"""Остановка чтения со всех шин."""
if not self.running:
return
self.logger.info("Stopping CANSniffer...")
self.running = False
# Останавливаем все обработчики
for handler in self.bus_handlers.values():
handler.stop()
# Останавливаем процессор сообщений
self.message_processor.shutdown()
self.logger.info("CANSniffer stopped")
def get_stats(self) -> Dict:
"""Получение статистики по всем шинам."""
return {
"running": self.running,
"buses": {
interface: handler.get_stats()
for interface, handler in self.bus_handlers.items()
}
}
def __enter__(self):
"""Поддержка context manager."""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Поддержка context manager."""
self.stop()
return False

View File

@@ -0,0 +1,94 @@
"""
Пример использования модуля socket_can.
"""
import time
import signal
import sys
from socket_can import CANSniffer
from logger import get_logger
logger = get_logger(__name__)
# Глобальная переменная для graceful shutdown
sniffer = None
def message_handler(interface: str, message):
"""
Обработчик CAN сообщений.
Args:
interface: Имя интерфейса (например, 'can0')
message: CAN сообщение (can.Message)
"""
# Здесь можно добавить логику сохранения в SQLite/InfluxDB
logger.info(
"CAN message",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
def signal_handler(sig, frame):
"""Обработчик сигналов для graceful shutdown."""
logger.info("Received shutdown signal, stopping...")
if sniffer:
sniffer.stop()
sys.exit(0)
def main():
"""Пример использования CANSniffer."""
global sniffer
# Регистрируем обработчики сигналов
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Создаем sniffer с callback функцией
sniffer = CANSniffer(message_callback=message_handler)
try:
# Запускаем чтение (можно использовать context manager)
sniffer.start()
logger.info("CANSniffer started. Press Ctrl+C to stop.")
# Основной цикл - просто ждем
while True:
time.sleep(1)
# Периодически выводим статистику
stats = sniffer.get_stats()
logger.debug("Statistics", extra=stats)
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
finally:
if sniffer:
sniffer.stop()
logger.info("Application stopped")
def example_with_context_manager():
"""Пример использования с context manager."""
def message_handler(interface: str, message):
print(f"[{interface}] ID: {hex(message.arbitration_id)}, Data: {message.data.hex()}")
# Использование с context manager (автоматический start/stop)
with CANSniffer(message_callback=message_handler) as sniffer:
# Читаем сообщения в течение 10 секунд
time.sleep(10)
# После выхода из блока with, sniffer автоматически остановится
if __name__ == '__main__':
main()

55
chart.md Normal file
View File

@@ -0,0 +1,55 @@
---
id: 421a4a02-025a-4ab5-8ae3-fd52e2c738f0
---
flowchart LR
subgraph Vehicle["Автомобиль"]
OBD["OBD-II"]
CANBUS["HS-CAN\n500 kbps"]
OBD --> CANBUS
end
subgraph CANBoard["CAN-плата"]
PHY0["CAN PHY"]
PHY1["CAN PHY"]
MCP0["MCP2515\ncan0"]
MCP1["MCP2515\ncan1"]
ISO["Isolation"]
CANBUS --> PHY0
CANBUS --> PHY1
PHY0 --> MCP0
PHY1 --> MCP1
MCP0 --> ISO
MCP1 --> ISO
end
subgraph Edge["Raspberry Pi 5 (Edge)"]
SPI["SPI"]
SocketCAN["SocketCAN\nlisten-only"]
EdgeApp["Edge CAN Logger"]
LocalStore["SQLite WAL\nOffline Buffer"]
Forwarder["Store-and-Forward"]
ISO --> SPI
SPI --> SocketCAN
SocketCAN --> EdgeApp
EdgeApp --> LocalStore
LocalStore --> Forwarder
end
subgraph BackendHost["Backend Host"]
Influx["InfluxDB"]
Flask["Flask Backend"]
WS["WebSocket Server"]
end
subgraph UI["Web UI"]
Browser["Browser"]
Charts["Real-time Charts"]
end
Forwarder --> Influx
Influx --> Flask
Flask --> WS
WS --> Browser
Browser --> Charts

165
web/task_webui.md Normal file
View File

@@ -0,0 +1,165 @@
1.1 Цель
Разработать web-интерфейс реального времени для отображения CAN-данных, собираемых edge-устройством и сохраняемых в InfluxDB.
Система должна:
отображать данные в реальном времени,
работать на одном хосте с InfluxDB,
использовать Python / Flask,
использовать WebSocket для push-обновлений,
быть расширяемой под future-аналитику.
1.2 Общая архитектура
Edge (Raspberry Pi)
→ InfluxDB (write)
→ Flask Backend (read + stream)
→ Web UI (WebSocket)
InfluxDB — единый источник истины для UI
(не читаем данные напрямую с Edge).
1.3 Компоненты системы
1.3.1 Backend (Flask)
Назначение
HTTP API
WebSocket сервер
агрегация данных из InfluxDB
доставка данных в UI
Технологии
Python 3.11+
Flask
Flask-SocketIO (или websockets + ASGI)
InfluxDB Python Client
1.3.2 InfluxDB
Роль
time-series storage
буфер между Edge и UI
исторические запросы
Типы данных
CAN frames (raw)
декодированные сигналы (future)
1.3.3 Frontend (Web UI)
Назначение
визуализация CAN-данных
real-time обновление
базовая аналитика
Минимальный стек
HTML + JS
WebSocket клиент
Chart.js / ECharts (на ваш выбор)
1.4 Функциональные требования (MVP)
1.4.1 Real-time отображение
Обновление ≤ 500 мс
Push-модель (WebSocket)
Без polling
Отображать:
timestamp
CAN interface (can0 / can1)
CAN ID
DLC
DATA (hex)
frequency (msg/sec)
1.4.2 Исторический просмотр
выбор временного окна:
last 1 min / 5 min / 1 h
график:
частота сообщений
значение байтов (raw)
1.4.3 Фильтрация (UI)
по CAN ID
по интерфейсу
включение / выключение ID
Фильтрация не влияет на ingestion, только на отображение.
1.5 Нефункциональные требования
Производительность
≥ 510k msg/sec без деградации UI
batch-чтение из InfluxDB
Надёжность
UI не зависит от Edge availability
UI не ломается при временном отсутствии новых данных
Безопасность (минимум)
UI доступен только из доверенной сети
без write-доступа к InfluxDB
1.6 Поток данных (важно)
Write path
Edge → InfluxDB
Read / Stream path
InfluxDB → Flask → WebSocket → Browser
❗ Flask не принимает CAN напрямую
❗ WebSocket не ходит в InfluxDB
1.7 Ограничения (осознанные)
Flask — single logical service
Без auth (на MVP)
Без декодирования DBC (пока raw)