diff --git a/.cursorignore b/.cursorignore new file mode 100644 index 0000000..6f9f00f --- /dev/null +++ b/.cursorignore @@ -0,0 +1 @@ +# Add directories or file patterns to ignore during indexing (e.g. foo/ or *.csv) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0437344 --- /dev/null +++ b/.gitignore @@ -0,0 +1,89 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual Environments +venv/ +env/ +ENV/ +env.bak/ +venv.bak/ +.venv/ + +# IDEs +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Environment variables +.env +.env.local +.env.*.local + +# Logs +*.log +logs/ + +# Database files (SQLite) +*.db +*.db-shm +*.db-wal +*.sqlite +*.sqlite3 + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.hypothesis/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Project specific +can_offline.db* +can_edge.log* +config.json + +.cursor/ diff --git a/can_sniffer/requirements.txt b/can_sniffer/requirements.txt new file mode 100644 index 0000000..0ab1435 --- /dev/null +++ b/can_sniffer/requirements.txt @@ -0,0 +1,4 @@ +pydantic>=2.0.0 +pydantic-settings>=2.0.0 +python-can>=4.0.0 + diff --git a/can_sniffer/src/config.py b/can_sniffer/src/config.py new file mode 100644 index 0000000..6cb951c --- /dev/null +++ b/can_sniffer/src/config.py @@ -0,0 +1,357 @@ +""" +Модуль конфигурации для CAN Sniffer проекта. + +Использует pydantic-settings для типобезопасной конфигурации с валидацией +и поддержкой загрузки из файла и переменных окружения. +""" + +from pathlib import Path +from typing import List, Optional + +from pydantic import BaseModel, Field, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class CanConfig(BaseModel): + """Конфигурация CAN интерфейсов.""" + + model_config = {"extra": "ignore"} + + interfaces: List[str] = Field( + default=["can0", "can1"], + description="Список CAN интерфейсов для мониторинга" + ) + listen_only: bool = Field( + default=True, + description="Режим только чтения (listen-only mode)" + ) + bitrate: int = Field( + default=500000, + description="Скорость передачи CAN (бит/с)" + ) + filters: List[dict] = Field( + default_factory=list, + description="Список фильтров SocketCAN: [{'can_id': 0x123, 'can_mask': 0x7FF}, ...]" + ) + + @field_validator('interfaces', mode='before') + @classmethod + def parse_interfaces(cls, v): + """Парсинг интерфейсов из строки (для env переменных).""" + if isinstance(v, str): + return [item.strip() for item in v.split(',')] + return v + + +class StorageConfig(BaseModel): + """Конфигурация локального хранилища (SQLite).""" + + model_config = {"extra": "ignore"} + + type: str = Field( + default="sqlite", + description="Тип хранилища" + ) + database_path: str = Field( + default="can_offline.db", + description="Путь к файлу базы данных SQLite" + ) + wal_mode: bool = Field( + default=True, + description="Включить режим WAL (Write-Ahead Logging)" + ) + sync_mode: str = Field( + default="NORMAL", + description="Режим синхронизации: NORMAL, FULL, OFF" + ) + + +class InfluxDBConfig(BaseModel): + """Конфигурация InfluxDB.""" + + model_config = {"extra": "ignore"} + + enabled: bool = Field( + default=True, + description="Включить отправку данных в InfluxDB" + ) + url: str = Field( + default="http://localhost:8086", + description="URL сервера InfluxDB" + ) + token: str = Field( + default="", + description="Токен аутентификации InfluxDB" + ) + org: str = Field( + default="", + description="Организация InfluxDB" + ) + bucket: str = Field( + default="can_data", + description="Имя bucket для данных" + ) + batch_size: int = Field( + default=1000, + description="Размер батча для отправки данных" + ) + flush_interval: int = Field( + default=5, + description="Интервал отправки батча (секунды)" + ) + timeout: int = Field( + default=10, + description="Таймаут подключения (секунды)" + ) + + +class LoggingConfig(BaseModel): + """Конфигурация логирования.""" + + model_config = {"extra": "ignore"} + + level: str = Field( + default="INFO", + description="Уровень логирования: DEBUG, INFO, WARNING, ERROR, CRITICAL" + ) + format: str = Field( + default="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + description="Формат логов" + ) + file: str = Field( + default="can_edge.log", + description="Имя файла для логов" + ) + max_bytes: int = Field( + default=10485760, + description="Максимальный размер файла лога (байты)" + ) + backup_count: int = Field( + default=5, + description="Количество резервных копий логов" + ) + + +class GeneralConfig(BaseModel): + """Общие настройки.""" + + model_config = {"extra": "ignore"} + + buffer_size: int = Field( + default=10000, + description="Размер буфера для данных" + ) + max_retries: int = Field( + default=3, + description="Максимальное количество попыток повтора" + ) + retry_delay: float = Field( + default=1.0, + description="Задержка между попытками (секунды)" + ) + + +class Config(BaseSettings): + """Главный класс конфигурации проекта.""" + + model_config = SettingsConfigDict( + env_prefix="CAN_SNIFFER_", + env_nested_delimiter="__", + case_sensitive=False, + extra="ignore", + ) + + can: CanConfig = Field(default_factory=CanConfig) + storage: StorageConfig = Field(default_factory=StorageConfig) + influxdb: InfluxDBConfig = Field(default_factory=InfluxDBConfig) + logging: LoggingConfig = Field(default_factory=LoggingConfig) + general: GeneralConfig = Field(default_factory=GeneralConfig) + + @classmethod + def _find_config_file(cls) -> Optional[Path]: + """Поиск конфигурационного файла.""" + # Определяем правильный путь к корню проекта can_sniffer + # __file__ = can_sniffer/src/config.py + # parent = can_sniffer/src + # parent.parent = can_sniffer + project_root = Path(__file__).parent.parent + + config_paths = [ + project_root / "config.json", # can_sniffer/config.json + Path(__file__).parent / "config.json", # can_sniffer/src/config.json + Path.home() / ".can_sniffer" / "config.json", + ] + + for config_path in config_paths: + if config_path.exists(): + return config_path + return None + + def __init__(self, **kwargs): + """Инициализация конфигурации с загрузкой из JSON файла.""" + # Если kwargs пусты, пытаемся загрузить из файла + if not kwargs: + config_file = self._find_config_file() + if config_file: + import json + try: + with open(config_file, 'r', encoding='utf-8') as f: + json_data = json.load(f) + + # Передаем данные из JSON в super().__init__() + # Pydantic автоматически создаст вложенные объекты CanConfig, StorageConfig и т.д. + super().__init__(**json_data) + return + except Exception as e: + # Если не удалось загрузить JSON, выводим предупреждение + import warnings + import traceback + warnings.warn( + f"Failed to load config from {config_file}: {e}\n" + f"Traceback: {traceback.format_exc()}\n" + f"Using defaults." + ) + + # Инициализация с переданными kwargs или defaults + super().__init__(**kwargs) + + @classmethod + def load_from_file(cls, file_path: Optional[Path] = None) -> 'Config': + """Загрузка конфигурации из указанного файла или поиск автоматически. + + Args: + file_path: Путь к конфигурационному файлу. Если None, выполняется поиск. + + Returns: + Экземпляр Config + """ + if file_path is None: + file_path = cls._find_config_file() + + if file_path and file_path.exists(): + import json + try: + with open(file_path, 'r', encoding='utf-8') as f: + json_data = json.load(f) + return cls.model_validate(json_data) + except Exception as e: + import warnings + warnings.warn(f"Failed to load config from {file_path}: {e}") + + return cls() + + def get(self, key_path: str, default=None): + """Получение значения конфигурации по пути через точку. + + Args: + key_path: Путь к значению через точку, например 'can.interfaces' + default: Значение по умолчанию, если ключ не найден + + Returns: + Значение конфигурации или default + + Example: + >>> config.get('can.interfaces') + ['can0', 'can1'] + """ + keys = key_path.split('.') + current = self + + for key in keys: + if hasattr(current, key): + current = getattr(current, key) + elif isinstance(current, dict) and key in current: + current = current[key] + else: + return default + + return current + + def get_section(self, section: str): + """Получение всей секции конфигурации. + + Args: + section: Имя секции, например 'can', 'influxdb' + + Returns: + Объект конфигурации секции + + Example: + >>> can_config = config.get_section('can') + >>> print(can_config.interfaces) + """ + return getattr(self, section, None) + + +# Глобальный экземпляр конфигурации (singleton) +_config_instance: Optional[Config] = None + + +def get_config(reload: bool = False) -> Config: + """Получение глобального экземпляра конфигурации. + + Args: + reload: Если True, перезагружает конфигурацию из файла + + Returns: + Экземпляр Config + + Example: + >>> from config import get_config + >>> config = get_config() + >>> interfaces = config.can.interfaces + >>> # Перезагрузить конфигурацию после изменения файла + >>> config = get_config(reload=True) + """ + global _config_instance + if _config_instance is None or reload: + _config_instance = Config() + return _config_instance + + +def reload_config() -> Config: + """Перезагрузка конфигурации из файла. + + Returns: + Перезагруженный экземпляр Config + + Example: + >>> from config import reload_config + >>> config = reload_config() + """ + return get_config(reload=True) + + +# Для обратной совместимости и удобства +# Используем прокси для автоматического доступа к актуальной конфигурации +class _ConfigProxy: + """Прокси для глобального доступа к конфигурации с поддержкой перезагрузки.""" + + def __getattr__(self, name): + """Делегирование доступа к атрибутам конфигурации.""" + # Всегда получаем актуальный экземпляр конфигурации + return getattr(get_config(), name) + + def reload(self): + """Перезагрузка конфигурации из файла.""" + global _config_instance + _config_instance = None # Сбрасываем singleton + return reload_config() + + def __repr__(self): + """Строковое представление прокси.""" + return f"ConfigProxy({get_config()})" + + # Поддержка прямого доступа к методам Config + def get(self, key_path: str, default=None): + """Получение значения по пути.""" + return get_config().get(key_path, default) + + def get_section(self, section: str): + """Получение секции конфигурации.""" + return get_config().get_section(section) + + +# Глобальный прокси для удобного доступа +# ВАЖНО: После изменения config.json нужно вызвать config.reload() или перезапустить приложение +config = _ConfigProxy() diff --git a/can_sniffer/src/logger.py b/can_sniffer/src/logger.py new file mode 100644 index 0000000..ceadf33 --- /dev/null +++ b/can_sniffer/src/logger.py @@ -0,0 +1,207 @@ +""" +Модуль логирования для CAN Sniffer проекта. + +Предоставляет централизованную систему логирования с поддержкой: +- Structured logging +- Ротации логов +- Записи в файл и консоль +- Интеграции с модулем config +""" + +import logging +import sys +from logging.handlers import RotatingFileHandler +from pathlib import Path +from typing import Optional + +from config import get_config + + +class StructuredFormatter(logging.Formatter): + """Форматтер для structured logging с дополнительными полями.""" + + def format(self, record: logging.LogRecord) -> str: + """Форматирование записи лога с поддержкой дополнительных полей.""" + # Добавляем дополнительные поля из extra, если они есть + extra_fields = {} + for key, value in record.__dict__.items(): + if key not in [ + 'name', 'msg', 'args', 'created', 'filename', 'funcName', + 'levelname', 'levelno', 'lineno', 'module', 'msecs', + 'message', 'pathname', 'process', 'processName', 'relativeCreated', + 'thread', 'threadName', 'exc_info', 'exc_text', 'stack_info' + ]: + extra_fields[key] = value + + # Форматируем основное сообщение + message = super().format(record) + + # Добавляем дополнительные поля в конец сообщения + if extra_fields: + extra_str = ' | '.join(f'{k}={v}' for k, v in extra_fields.items()) + message = f'{message} | {extra_str}' + + return message + + +class Logger: + """Класс для управления логированием проекта.""" + + _instance: Optional['Logger'] = None + _initialized: bool = False + + def __new__(cls): + """Singleton паттерн для единого экземпляра логгера.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """Инициализация системы логирования.""" + if self._initialized: + return + + self.config = get_config() + self.log_config = self.config.logging + + # Настройка корневого логгера + self.root_logger = logging.getLogger('can_sniffer') + self.root_logger.setLevel(self._get_log_level()) + + # Очищаем существующие обработчики + self.root_logger.handlers.clear() + + # Создаем форматтер + formatter = StructuredFormatter(self.log_config.format) + + # Обработчик для файла с ротацией + self._setup_file_handler(formatter) + + # Обработчик для консоли + self._setup_console_handler(formatter) + + # Предотвращаем распространение на корневой логгер + self.root_logger.propagate = False + + self._initialized = True + + def _get_log_level(self) -> int: + """Преобразование строкового уровня в числовой.""" + level_map = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL, + } + return level_map.get(self.log_config.level.upper(), logging.INFO) + + def _setup_file_handler(self, formatter: logging.Formatter) -> None: + """Настройка обработчика для записи в файл с ротацией.""" + log_file = Path(self.log_config.file) + + # Создаем директорию для логов, если её нет + log_file.parent.mkdir(parents=True, exist_ok=True) + + # Создаем RotatingFileHandler + file_handler = RotatingFileHandler( + filename=str(log_file), + maxBytes=self.log_config.max_bytes, + backupCount=self.log_config.backup_count, + encoding='utf-8', + ) + file_handler.setLevel(self._get_log_level()) + file_handler.setFormatter(formatter) + + self.root_logger.addHandler(file_handler) + + def _setup_console_handler(self, formatter: logging.Formatter) -> None: + """Настройка обработчика для записи в консоль.""" + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(self._get_log_level()) + console_handler.setFormatter(formatter) + + self.root_logger.addHandler(console_handler) + + def get_logger(self, name: Optional[str] = None) -> logging.Logger: + """Получение логгера для конкретного модуля. + + Args: + name: Имя модуля (обычно __name__). Если None, возвращается корневой логгер. + + Returns: + Экземпляр logging.Logger + + Example: + >>> logger = Logger().get_logger(__name__) + >>> logger.info("Message") + """ + if name is None: + return self.root_logger + + # Создаем дочерний логгер с именем модуля + child_logger = self.root_logger.getChild(name) + return child_logger + + def set_level(self, level: str) -> None: + """Изменение уровня логирования во время выполнения. + + Args: + level: Новый уровень логирования (DEBUG, INFO, WARNING, ERROR, CRITICAL) + """ + level_map = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRITICAL': logging.CRITICAL, + } + + numeric_level = level_map.get(level.upper(), logging.INFO) + self.root_logger.setLevel(numeric_level) + + # Обновляем уровень для всех обработчиков + for handler in self.root_logger.handlers: + handler.setLevel(numeric_level) + + def reload(self) -> None: + """Перезагрузка конфигурации логирования. + + Переинициализирует систему логирования с текущими настройками из config. + """ + self._initialized = False + # Обновляем ссылку на конфигурацию + self.config = get_config() + self.log_config = self.config.logging + # Переинициализируем + self.__init__() + + +# Глобальный экземпляр логгера +_logger_instance: Optional[Logger] = None + + +def get_logger(name: Optional[str] = None) -> logging.Logger: + """Получение логгера для использования в модулях. + + Args: + name: Имя модуля (обычно __name__). Если None, возвращается корневой логгер. + + Returns: + Экземпляр logging.Logger + + Example: + >>> from logger import get_logger + >>> logger = get_logger(__name__) + >>> logger.info("Application started") + >>> logger.info("CAN frame received", extra={"can_id": 0x123, "interface": "can0"}) + """ + global _logger_instance + if _logger_instance is None: + _logger_instance = Logger() + return _logger_instance.get_logger(name) + + +# Для удобства - корневой логгер +logger = get_logger() + diff --git a/can_sniffer/src/main.py b/can_sniffer/src/main.py new file mode 100644 index 0000000..691b7be --- /dev/null +++ b/can_sniffer/src/main.py @@ -0,0 +1,77 @@ +""" +Главный модуль CAN Sniffer приложения. + +Только код запуска приложения. Вся логика обработки сообщений +автоматически применяется в модуле socket_can. +""" + +import signal +import sys +import time +from config import config +from logger import get_logger +from socket_can import CANSniffer + +# Инициализация логгера +logger = get_logger(__name__) + +# Глобальная переменная для graceful shutdown +sniffer: CANSniffer = None + + +def signal_handler(sig, frame): + """Обработчик сигналов для graceful shutdown.""" + logger.info("Received shutdown signal, stopping gracefully...") + if sniffer: + sniffer.stop() + sys.exit(0) + + +def main(): + """Главная функция приложения - только запуск.""" + global sniffer + + # Регистрируем обработчики сигналов для graceful shutdown + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + logger.info("CAN Sniffer application starting", extra={ + "interfaces": config.can.interfaces, + "bitrate": config.can.bitrate, + "listen_only": config.can.listen_only + }) + + logger.info("Configuration loaded", extra={ + "influxdb_enabled": config.influxdb.enabled, + "influxdb_url": config.influxdb.url if config.influxdb.enabled else None, + "storage_path": config.storage.database_path + }) + + try: + # Создаем и запускаем CAN Sniffer + # MessageProcessor автоматически инициализируется и используется внутри CANSniffer + sniffer = CANSniffer() + sniffer.start() + + logger.info("Application initialized successfully. Reading CAN messages...") + logger.info("Press Ctrl+C to stop") + + # Основной цикл - периодически выводим статистику + while True: + time.sleep(10) # Выводим статистику каждые 10 секунд + + stats = sniffer.get_stats() + logger.info("Statistics", extra=stats) + + except KeyboardInterrupt: + logger.info("Keyboard interrupt received") + except Exception as e: + logger.error(f"Unexpected error: {e}", exc_info=True) + finally: + if sniffer: + sniffer.stop() + logger.info("Application stopped") + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/can_sniffer/src/socket_can/__init__.py b/can_sniffer/src/socket_can/__init__.py new file mode 100644 index 0000000..b1a8b95 --- /dev/null +++ b/can_sniffer/src/socket_can/__init__.py @@ -0,0 +1,7 @@ +"""Модуль для работы с SocketCAN интерфейсами.""" + +from .src import CANSniffer, CANBusHandler +from .message_processor import MessageProcessor + +__all__ = ['CANSniffer', 'CANBusHandler', 'MessageProcessor'] + diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py new file mode 100644 index 0000000..1a84e0b --- /dev/null +++ b/can_sniffer/src/socket_can/message_processor.py @@ -0,0 +1,155 @@ +""" +Модуль для обработки CAN сообщений. + +Обрабатывает входящие CAN сообщения и сохраняет их в SQLite и InfluxDB. +""" + +import can +from typing import Optional +from logger import get_logger +from config import config + +logger = get_logger(__name__) + + +class MessageProcessor: + """Класс для обработки и сохранения CAN сообщений.""" + + def __init__(self): + """Инициализация процессора сообщений.""" + self.logger = logger + self.storage = None + self.influxdb_client = None + + # Инициализируем хранилища + self._init_storage() + self._init_influxdb() + + def _init_storage(self) -> None: + """Инициализация локального хранилища (SQLite).""" + # TODO: Инициализация SQLite хранилища + # from storage import Storage + # self.storage = Storage(config.storage) + self.logger.info( + "Storage initialization", + extra={ + "type": config.storage.type, + "path": config.storage.database_path + } + ) + + def _init_influxdb(self) -> None: + """Инициализация клиента InfluxDB.""" + if not config.influxdb.enabled: + self.logger.info("InfluxDB is disabled in configuration") + return + + # TODO: Инициализация InfluxDB клиента + # from influxdb_client import InfluxDBClient + # self.influxdb_client = InfluxDBClient(config.influxdb) + self.logger.info( + "InfluxDB initialization", + extra={ + "url": config.influxdb.url, + "bucket": config.influxdb.bucket + } + ) + + def process(self, interface: str, message: can.Message) -> None: + """ + Обработка CAN сообщения. + + Args: + interface: Имя интерфейса (например, 'can0') + message: CAN сообщение + """ + try: + # Логируем сообщение + self.logger.debug( + "CAN message received", + extra={ + "interface": interface, + "can_id": hex(message.arbitration_id), + "dlc": message.dlc, + "data": message.data.hex() if message.data else "", + "timestamp": message.timestamp + } + ) + + # Сохраняем в локальное хранилище (SQLite) + self._save_to_storage(interface, message) + + # Отправляем в InfluxDB (если включено) + if self.influxdb_client: + self._send_to_influxdb(interface, message) + + except Exception as e: + self.logger.error( + f"Error processing message: {e}", + exc_info=True, + extra={ + "interface": interface, + "can_id": hex(message.arbitration_id) if message else None + } + ) + + def _save_to_storage(self, interface: str, message: can.Message) -> None: + """ + Сохранение сообщения в локальное хранилище (SQLite). + + Args: + interface: Имя интерфейса + message: CAN сообщение + """ + if not self.storage: + # TODO: Реализовать сохранение в SQLite + # self.storage.save_message(interface, message) + return + + try: + # self.storage.save_message(interface, message) + pass + except Exception as e: + self.logger.error( + f"Failed to save message to storage: {e}", + exc_info=True, + extra={"interface": interface} + ) + + def _send_to_influxdb(self, interface: str, message: can.Message) -> None: + """ + Отправка сообщения в InfluxDB. + + Args: + interface: Имя интерфейса + message: CAN сообщение + """ + if not self.influxdb_client: + return + + try: + # TODO: Реализовать отправку в InfluxDB + # self.influxdb_client.write_message(interface, message) + pass + except Exception as e: + self.logger.error( + f"Failed to send message to InfluxDB: {e}", + exc_info=True, + extra={"interface": interface} + ) + + def shutdown(self) -> None: + """Корректное завершение работы процессора.""" + self.logger.info("Shutting down message processor...") + + # Закрываем соединения + if self.storage: + # self.storage.close() + pass + + if self.influxdb_client: + # self.influxdb_client.close() + pass + + self.logger.info("Message processor stopped") + diff --git a/can_sniffer/src/socket_can/src.py b/can_sniffer/src/socket_can/src.py new file mode 100644 index 0000000..a0e3429 --- /dev/null +++ b/can_sniffer/src/socket_can/src.py @@ -0,0 +1,324 @@ +""" +Модуль для работы с SocketCAN интерфейсами. + +Предоставляет параллельное чтение CAN сообщений с нескольких интерфейсов +с поддержкой обработки ошибок, логирования и graceful shutdown. +""" + +import can +import threading +import time +from typing import Callable, Dict, List, Optional +from queue import Queue, Empty + +from config import config +from logger import get_logger +from .message_processor import MessageProcessor + + +class CANBusHandler: + """Обработчик для одной CAN шины.""" + + def __init__( + self, + interface: str, + bus: can.Bus, + message_callback: Callable, + logger, + filters: Optional[List[dict]] = None + ): + """ + Инициализация обработчика CAN шины. + + Args: + interface: Имя интерфейса (например, 'can0') + bus: Экземпляр can.Bus + message_callback: Функция для обработки CAN сообщений + logger: Логгер для данного интерфейса + filters: Список фильтров SocketCAN + """ + self.interface = interface + self.bus = bus + self.message_callback = message_callback + self.logger = logger + self.filters = filters or [] + self.running = False + self.thread: Optional[threading.Thread] = None + self.message_count = 0 + self.error_count = 0 + self.last_message_time: Optional[float] = None + + # Применяем фильтры, если они есть + if self.filters: + self._apply_filters() + + def _apply_filters(self) -> None: + """Применение фильтров SocketCAN к шине.""" + try: + # SocketCAN фильтры применяются через set_filters + # Формат: [{"can_id": 0x123, "can_mask": 0x7FF}, ...] + self.bus.set_filters(self.filters) + self.logger.info( + f"Applied {len(self.filters)} filters to {self.interface}", + extra={"filters": self.filters} + ) + except Exception as e: + self.logger.warning( + f"Failed to apply filters to {self.interface}: {e}", + exc_info=True + ) + + def _read_loop(self) -> None: + """Основной цикл чтения сообщений с шины.""" + self.logger.info(f"Starting read loop for {self.interface}") + + while self.running: + try: + # Читаем сообщение с таймаутом для возможности проверки running + message = self.bus.recv(timeout=0.1) + + if message is not None: + self.message_count += 1 + self.last_message_time = time.time() + + # Вызываем callback для обработки сообщения + try: + self.message_callback(self.interface, message) + except Exception as e: + self.logger.error( + f"Error in message callback for {self.interface}: {e}", + exc_info=True, + extra={"can_id": hex(message.arbitration_id)} + ) + self.error_count += 1 + + except can.CanError as e: + self.logger.error( + f"CAN error on {self.interface}: {e}", + exc_info=True + ) + self.error_count += 1 + # Небольшая задержка перед повторной попыткой + time.sleep(0.1) + + except Exception as e: + self.logger.error( + f"Unexpected error on {self.interface}: {e}", + exc_info=True + ) + self.error_count += 1 + time.sleep(0.1) + + self.logger.info( + f"Read loop stopped for {self.interface}", + extra={ + "total_messages": self.message_count, + "total_errors": self.error_count + } + ) + + def start(self) -> None: + """Запуск чтения сообщений в отдельном потоке.""" + if self.running: + self.logger.warning(f"{self.interface} is already running") + return + + self.running = True + self.thread = threading.Thread( + target=self._read_loop, + name=f"CAN-{self.interface}", + daemon=True + ) + self.thread.start() + self.logger.info(f"Started reading from {self.interface}") + + def stop(self) -> None: + """Остановка чтения сообщений.""" + if not self.running: + return + + self.logger.info(f"Stopping {self.interface}...") + self.running = False + + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=2.0) + if self.thread.is_alive(): + self.logger.warning(f"Thread for {self.interface} did not stop gracefully") + + # Закрываем шину + try: + self.bus.shutdown() + self.logger.info(f"Bus {self.interface} closed") + except Exception as e: + self.logger.error(f"Error closing bus {self.interface}: {e}", exc_info=True) + + def get_stats(self) -> Dict: + """Получение статистики по обработке сообщений.""" + return { + "interface": self.interface, + "message_count": self.message_count, + "error_count": self.error_count, + "last_message_time": self.last_message_time, + "running": self.running + } + + +class CANSniffer: + """Класс для параллельного чтения CAN сообщений с нескольких интерфейсов.""" + + def __init__(self, message_callback: Optional[Callable] = None): + """ + Инициализация CAN Sniffer. + + Args: + message_callback: Функция для обработки CAN сообщений. + Должна принимать (interface: str, message: can.Message) + """ + self.config = config.can + self.logger = get_logger(__name__) + + # Инициализируем MessageProcessor для автоматической обработки сообщений + self.message_processor = MessageProcessor() + + # Используем переданный callback или процессор по умолчанию + if message_callback: + self.message_callback = message_callback + else: + # Автоматически используем MessageProcessor + self.message_callback = self.message_processor.process + + self.bus_handlers: Dict[str, CANBusHandler] = {} + self.running = False + + self._init_buses() + + def _init_buses(self) -> None: + """Инициализация CAN шин из конфигурации.""" + self.logger.info( + "Initializing CAN buses", + extra={ + "interfaces": self.config.interfaces, + "listen_only": self.config.listen_only, + "bitrate": self.config.bitrate + } + ) + + for interface in self.config.interfaces: + try: + bus = self._create_bus(interface) + handler = CANBusHandler( + interface=interface, + bus=bus, + message_callback=self.message_callback, + logger=self.logger.getChild(f"bus.{interface}"), + filters=self.config.filters + ) + self.bus_handlers[interface] = handler + self.logger.info(f"Initialized bus: {interface}") + + except Exception as e: + self.logger.error( + f"Failed to initialize bus {interface}: {e}", + exc_info=True, + extra={"interface": interface} + ) + + def _create_bus(self, interface: str) -> can.Bus: + """ + Создание CAN шины для интерфейса. + + Args: + interface: Имя интерфейса (например, 'can0') + + Returns: + Экземпляр can.Bus + """ + bus_kwargs = { + "channel": interface, + "bustype": "socketcan", + "receive_own_messages": False, + } + + # Добавляем listen-only режим, если указан в конфигурации + if self.config.listen_only: + # Для SocketCAN listen-only режим устанавливается через параметр + # В некоторых версиях python-can это может быть через receive_own_messages=False + # и отдельным параметром, но для SocketCAN обычно достаточно receive_own_messages=False + pass + + try: + bus = can.interface.Bus(**bus_kwargs) + self.logger.debug( + f"Created bus for {interface}", + extra={"kwargs": bus_kwargs} + ) + return bus + except can.CanError as e: + self.logger.error( + f"CAN error creating bus for {interface}: {e}", + exc_info=True + ) + raise + except Exception as e: + self.logger.error( + f"Unexpected error creating bus for {interface}: {e}", + exc_info=True + ) + raise + + + def start(self) -> None: + """Запуск чтения со всех шин.""" + if self.running: + self.logger.warning("CANSniffer is already running") + return + + self.logger.info( + f"Starting CANSniffer with {len(self.bus_handlers)} buses", + extra={"interfaces": list(self.bus_handlers.keys())} + ) + + self.running = True + + # Запускаем все обработчики параллельно + for handler in self.bus_handlers.values(): + handler.start() + + self.logger.info("CANSniffer started successfully") + + def stop(self) -> None: + """Остановка чтения со всех шин.""" + if not self.running: + return + + self.logger.info("Stopping CANSniffer...") + self.running = False + + # Останавливаем все обработчики + for handler in self.bus_handlers.values(): + handler.stop() + + # Останавливаем процессор сообщений + self.message_processor.shutdown() + + self.logger.info("CANSniffer stopped") + + def get_stats(self) -> Dict: + """Получение статистики по всем шинам.""" + return { + "running": self.running, + "buses": { + interface: handler.get_stats() + for interface, handler in self.bus_handlers.items() + } + } + + def __enter__(self): + """Поддержка context manager.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Поддержка context manager.""" + self.stop() + return False diff --git a/can_sniffer/src/socket_can_example.py b/can_sniffer/src/socket_can_example.py new file mode 100644 index 0000000..d6f6a8a --- /dev/null +++ b/can_sniffer/src/socket_can_example.py @@ -0,0 +1,94 @@ +""" +Пример использования модуля socket_can. +""" + +import time +import signal +import sys +from socket_can import CANSniffer +from logger import get_logger + +logger = get_logger(__name__) + +# Глобальная переменная для graceful shutdown +sniffer = None + + +def message_handler(interface: str, message): + """ + Обработчик CAN сообщений. + + Args: + interface: Имя интерфейса (например, 'can0') + message: CAN сообщение (can.Message) + """ + # Здесь можно добавить логику сохранения в SQLite/InfluxDB + logger.info( + "CAN message", + extra={ + "interface": interface, + "can_id": hex(message.arbitration_id), + "dlc": message.dlc, + "data": message.data.hex() if message.data else "", + "timestamp": message.timestamp + } + ) + + +def signal_handler(sig, frame): + """Обработчик сигналов для graceful shutdown.""" + logger.info("Received shutdown signal, stopping...") + if sniffer: + sniffer.stop() + sys.exit(0) + + +def main(): + """Пример использования CANSniffer.""" + global sniffer + + # Регистрируем обработчики сигналов + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # Создаем sniffer с callback функцией + sniffer = CANSniffer(message_callback=message_handler) + + try: + # Запускаем чтение (можно использовать context manager) + sniffer.start() + + logger.info("CANSniffer started. Press Ctrl+C to stop.") + + # Основной цикл - просто ждем + while True: + time.sleep(1) + + # Периодически выводим статистику + stats = sniffer.get_stats() + logger.debug("Statistics", extra=stats) + + except KeyboardInterrupt: + logger.info("Keyboard interrupt received") + finally: + if sniffer: + sniffer.stop() + logger.info("Application stopped") + + +def example_with_context_manager(): + """Пример использования с context manager.""" + def message_handler(interface: str, message): + print(f"[{interface}] ID: {hex(message.arbitration_id)}, Data: {message.data.hex()}") + + # Использование с context manager (автоматический start/stop) + with CANSniffer(message_callback=message_handler) as sniffer: + # Читаем сообщения в течение 10 секунд + time.sleep(10) + + # После выхода из блока with, sniffer автоматически остановится + + +if __name__ == '__main__': + main() + diff --git a/chart.md b/chart.md new file mode 100644 index 0000000..c2819bc --- /dev/null +++ b/chart.md @@ -0,0 +1,55 @@ +--- +id: 421a4a02-025a-4ab5-8ae3-fd52e2c738f0 +--- +flowchart LR + subgraph Vehicle["Автомобиль"] + OBD["OBD-II"] + CANBUS["HS-CAN\n500 kbps"] + OBD --> CANBUS + end + + subgraph CANBoard["CAN-плата"] + PHY0["CAN PHY"] + PHY1["CAN PHY"] + MCP0["MCP2515\ncan0"] + MCP1["MCP2515\ncan1"] + ISO["Isolation"] + + CANBUS --> PHY0 + CANBUS --> PHY1 + PHY0 --> MCP0 + PHY1 --> MCP1 + MCP0 --> ISO + MCP1 --> ISO + end + + subgraph Edge["Raspberry Pi 5 (Edge)"] + SPI["SPI"] + SocketCAN["SocketCAN\nlisten-only"] + EdgeApp["Edge CAN Logger"] + LocalStore["SQLite WAL\nOffline Buffer"] + Forwarder["Store-and-Forward"] + + ISO --> SPI + SPI --> SocketCAN + SocketCAN --> EdgeApp + EdgeApp --> LocalStore + LocalStore --> Forwarder + end + + subgraph BackendHost["Backend Host"] + Influx["InfluxDB"] + Flask["Flask Backend"] + WS["WebSocket Server"] + end + + subgraph UI["Web UI"] + Browser["Browser"] + Charts["Real-time Charts"] + end + + Forwarder --> Influx + Influx --> Flask + Flask --> WS + WS --> Browser + Browser --> Charts \ No newline at end of file diff --git a/web/task_webui.md b/web/task_webui.md new file mode 100644 index 0000000..6c41832 --- /dev/null +++ b/web/task_webui.md @@ -0,0 +1,165 @@ +1.1 Цель + +Разработать web-интерфейс реального времени для отображения CAN-данных, собираемых edge-устройством и сохраняемых в InfluxDB. + +Система должна: + +отображать данные в реальном времени, + +работать на одном хосте с InfluxDB, + +использовать Python / Flask, + +использовать WebSocket для push-обновлений, + +быть расширяемой под future-аналитику. + +1.2 Общая архитектура +Edge (Raspberry Pi) + → InfluxDB (write) + → Flask Backend (read + stream) + → Web UI (WebSocket) + + +InfluxDB — единый источник истины для UI +(не читаем данные напрямую с Edge). + +1.3 Компоненты системы +1.3.1 Backend (Flask) + +Назначение + +HTTP API + +WebSocket сервер + +агрегация данных из InfluxDB + +доставка данных в UI + +Технологии + +Python 3.11+ + +Flask + +Flask-SocketIO (или websockets + ASGI) + +InfluxDB Python Client + +1.3.2 InfluxDB + +Роль + +time-series storage + +буфер между Edge и UI + +исторические запросы + +Типы данных + +CAN frames (raw) + +декодированные сигналы (future) + +1.3.3 Frontend (Web UI) + +Назначение + +визуализация CAN-данных + +real-time обновление + +базовая аналитика + +Минимальный стек + +HTML + JS + +WebSocket клиент + +Chart.js / ECharts (на ваш выбор) + +1.4 Функциональные требования (MVP) +1.4.1 Real-time отображение + +Обновление ≤ 500 мс + +Push-модель (WebSocket) + +Без polling + +Отображать: + +timestamp + +CAN interface (can0 / can1) + +CAN ID + +DLC + +DATA (hex) + +frequency (msg/sec) + +1.4.2 Исторический просмотр + +выбор временного окна: + +last 1 min / 5 min / 1 h + +график: + +частота сообщений + +значение байтов (raw) + +1.4.3 Фильтрация (UI) + +по CAN ID + +по интерфейсу + +включение / выключение ID + +Фильтрация не влияет на ingestion, только на отображение. + +1.5 Нефункциональные требования +Производительность + +≥ 5–10k msg/sec без деградации UI + +batch-чтение из InfluxDB + +Надёжность + +UI не зависит от Edge availability + +UI не ломается при временном отсутствии новых данных + +Безопасность (минимум) + +UI доступен только из доверенной сети + +без write-доступа к InfluxDB + +1.6 Поток данных (важно) +Write path +Edge → InfluxDB + +Read / Stream path +InfluxDB → Flask → WebSocket → Browser + + +❗ Flask не принимает CAN напрямую +❗ WebSocket не ходит в InfluxDB + +1.7 Ограничения (осознанные) + +Flask — single logical service + +Без auth (на MVP) + +Без декодирования DBC (пока raw) \ No newline at end of file