From 2f189df8ee95315355ead2f62ae6098c2ea7b0be Mon Sep 17 00:00:00 2001 From: qsethuk Date: Wed, 7 Jan 2026 03:21:18 +0300 Subject: [PATCH] add readme --- can_sniffer/README.md | 412 ++++++++++++++++++++ can_sniffer/src/handlers/example_handler.py | 144 ------- 2 files changed, 412 insertions(+), 144 deletions(-) create mode 100644 can_sniffer/README.md delete mode 100644 can_sniffer/src/handlers/example_handler.py diff --git a/can_sniffer/README.md b/can_sniffer/README.md new file mode 100644 index 0000000..2980ff8 --- /dev/null +++ b/can_sniffer/README.md @@ -0,0 +1,412 @@ +# 🚗 CAN Sniffer + +[![Python](https://img.shields.io/badge/Python-3.11+-blue.svg)](https://www.python.org/downloads/) +[![License](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE) +[![Code Style](https://img.shields.io/badge/Code%20Style-Black-black.svg)](https://github.com/psf/black) +[![Type Checking](https://img.shields.io/badge/Type%20Checking-mypy-blue.svg)](https://github.com/python/mypy) + +> Высокопроизводительный CAN bus sniffer для Raspberry Pi с плагинной архитектурой обработчиков, поддержкой параллельного чтения нескольких шин и интеграцией с InfluxDB. + +## ✨ Особенности + +- 🔌 **Параллельное чтение** - одновременная работа с несколькими CAN интерфейсами +- 🧩 **Плагинная архитектура** - легко добавлять новые обработчики (Kafka, MQTT, WebSocket) +- 📊 **InfluxDB интеграция** - отправка данных в InfluxDB с retry и health-check +- 💾 **SQLite хранилище** - локальное временное хранилище с WAL режимом +- ⚡ **Асинхронная обработка** - неблокирующая обработка сообщений через очередь +- 🔄 **Store-and-Forward** - надежная доставка данных с retry и backoff +- 📝 **Структурированное логирование** - детальное логирование с ротацией файлов +- ⚙️ **Гибкая конфигурация** - JSON файл + переменные окружения +- 🛡️ **Graceful shutdown** - корректное завершение работы всех компонентов +- 🎯 **Типобезопасность** - полная типизация с Pydantic + +## 📋 Требования + +- Python 3.11+ +- Linux (для SocketCAN) +- CAN интерфейсы (например, MCP2515 на Raspberry Pi) + +## 🚀 Установка + +### 1. Клонирование репозитория + +```bash +git clone +cd can_sniffer +``` + +### 2. Установка зависимостей + +```bash +pip install -r requirements.txt +``` + +### 3. Настройка CAN интерфейсов + +Убедитесь, что CAN интерфейсы настроены и доступны: + +```bash +# Проверка доступных интерфейсов +ip link show + +# Настройка интерфейса (пример) +sudo ip link set can0 type can bitrate 500000 +sudo ip link set can0 up +``` + +### 4. Конфигурация + +Скопируйте и отредактируйте `config.json`: + +```json +{ + "can": { + "interfaces": ["can0", "can1"], + "listen_only": true, + "bitrate": 500000, + "filters": [] + }, + "storage": { + "type": "sqlite", + "database_path": "can_offline.db", + "wal_mode": true, + "sync_mode": "NORMAL" + }, + "influxdb": { + "enabled": true, + "url": "http://localhost:8086", + "token": "your-token", + "org": "automotive", + "bucket": "can_bus" + } +} +``` + +## 🎮 Использование + +### Базовый запуск + +```bash +cd src +python main.py +``` + +### Запуск как модуль + +```bash +python -m can_sniffer.src.main +``` + +### С кастомными обработчиками + +```python +from handlers import StorageHandler, InfluxDBHandler +from socket_can.message_processor import MessageProcessor +from socket_can import CANSniffer + +# Создаем кастомный pipeline +handlers = [ + StorageHandler(enabled=True), + InfluxDBHandler(enabled=True), + # Ваш кастомный обработчик +] + +# Используем в MessageProcessor +processor = MessageProcessor(handlers=handlers) +sniffer = CANSniffer() +sniffer.start() +``` + +## 🏗️ Архитектура + +``` +┌─────────────┐ +│ CAN Bus │ +│ (can0/can1)│ +└──────┬──────┘ + │ + ▼ +┌─────────────────┐ +│ CANSniffer │ ← Параллельное чтение +│ (SocketCAN) │ +└──────┬──────────┘ + │ + ▼ +┌─────────────────┐ +│ MessageProcessor │ ← Асинхронная очередь +│ (Queue) │ +└──────┬───────────┘ + │ + ▼ +┌──────────────────┐ +│ Handler Pipeline │ +├──────────────────┤ +│ • StorageHandler │ → SQLite +│ • InfluxHandler │ → InfluxDB +│ • CustomHandler │ → Kafka/MQTT/WS +└──────────────────┘ +``` + +### Компоненты + +- **CANFrame** - неизменяемая структура данных для CAN сообщений +- **CANSniffer** - параллельное чтение с нескольких CAN шин +- **MessageProcessor** - асинхронная обработка через очередь +- **BaseHandler** - интерфейс для плагинных обработчиков +- **StorageHandler** - сохранение в SQLite +- **InfluxDBHandler** - отправка в InfluxDB с retry + +## 📦 Структура проекта + +``` +can_sniffer/ +├── src/ +│ ├── main.py # Точка входа +│ ├── config.py # Конфигурация (Pydantic) +│ ├── config.json # Файл конфигурации +│ ├── logger.py # Логирование +│ ├── can_frame.py # CANFrame dataclass +│ ├── socket_can/ +│ │ ├── src.py # CANSniffer +│ │ └── message_processor.py # MessageProcessor +│ ├── handlers/ +│ │ ├── base.py # BaseHandler +│ │ ├── storage_handler.py # SQLite handler +│ │ └── influxdb_handler.py # InfluxDB handler +│ ├── storage/ +│ │ └── storage.py # SQLite storage +│ └── influxdb_handler/ +│ └── influxdb_client.py # InfluxDB client +├── requirements.txt +└── README.md +``` + +## ⚙️ Конфигурация + +### CAN настройки + +```json +{ + "can": { + "interfaces": ["can0", "can1"], // Список интерфейсов + "listen_only": true, // Только чтение + "bitrate": 500000, // Скорость (бит/с) + "filters": [] // SocketCAN фильтры + } +} +``` + +### Storage настройки + +```json +{ + "storage": { + "type": "sqlite", + "database_path": "can_offline.db", + "wal_mode": true, // WAL режим для производительности + "sync_mode": "NORMAL" // NORMAL/FULL/OFF + } +} +``` + +### InfluxDB настройки + +```json +{ + "influxdb": { + "enabled": true, + "url": "http://localhost:8086", + "token": "your-token", + "org": "automotive", + "bucket": "can_bus", + "batch_size": 1000, // Размер батча + "flush_interval": 5, // Интервал отправки (сек) + "max_retries": 3, // Количество повторов + "retry_backoff": 1.0, // Backoff интервал + "health_check_interval": 30 // Проверка здоровья (сек) + } +} +``` + +### Логирование + +```json +{ + "logging": { + "level": "INFO", // DEBUG/INFO/WARNING/ERROR + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + "file": "can_edge.log", + "max_bytes": 10485760, // 10MB + "backup_count": 5 // Количество ротаций + } +} +``` + +### Переменные окружения + +Конфигурацию можно переопределить через переменные окружения: + +```bash +export CAN_INTERFACES="can0,can1" +export INFLUXDB_URL="http://localhost:8086" +export INFLUXDB_TOKEN="your-token" +export LOGGING_LEVEL="DEBUG" +``` + +## 🔌 Создание кастомного обработчика + +```python +from handlers.base import BaseHandler +from can_frame import CANFrame +from typing import List, Dict, Any + +class MyCustomHandler(BaseHandler): + def __init__(self, enabled: bool = True): + super().__init__(name="my_handler", enabled=enabled) + + def initialize(self) -> bool: + # Инициализация (подключение к Kafka/MQTT/WebSocket) + if not self.enabled: + return False + # ... ваш код + self._initialized = True + return True + + def handle(self, frame: CANFrame) -> bool: + # Обработка одного фрейма + # ... ваш код + return True + + def handle_batch(self, frames: List[CANFrame]) -> int: + # Обработка батча + # ... ваш код + return len(frames) + + def flush(self) -> None: + # Принудительная отправка + pass + + def shutdown(self) -> None: + # Закрытие соединений + self._initialized = False + + def get_stats(self) -> Dict[str, Any]: + return { + "handler": self.name, + "enabled": self.enabled, + "initialized": self._initialized + } +``` + +## 📊 Статистика + +Приложение выводит статистику каждые 10 секунд: + +```json +{ + "processed_count": 12345, + "dropped_count": 0, + "queue_size": 0, + "running": true, + "handlers_count": 2, + "storage": { + "total_messages": 12345, + "database_path": "can_offline.db" + }, + "influxdb": { + "enabled": true, + "sent_count": 12000, + "failed_count": 0, + "connection_status": "connected" + } +} +``` + +## 🐛 Отладка + +### Включение DEBUG логирования + +Измените в `config.json`: + +```json +{ + "logging": { + "level": "DEBUG" + } +} +``` + +### Проверка CAN интерфейсов + +```bash +# Проверка статуса +ip link show can0 + +# Тестовая отправка (если не listen_only) +cansend can0 123#DEADBEEF + +# Мониторинг трафика +candump can0 +``` + +### Проверка InfluxDB + +```bash +# Проверка соединения +curl -G http://localhost:8086/health + +# Запрос данных +influx query 'from(bucket:"can_bus") |> range(start: -1h)' +``` + +## 🔧 Разработка + +### Установка для разработки + +```bash +pip install -r requirements.txt +``` + +### Запуск тестов + +```bash +# TODO: добавить тесты +pytest tests/ +``` + +### Форматирование кода + +```bash +black src/ +isort src/ +``` + +## 📝 Лицензия + +MIT License - см. [LICENSE](LICENSE) файл для деталей. + +## 🤝 Вклад + +Вклады приветствуются! Пожалуйста: + +1. Fork проекта +2. Создайте feature branch (`git checkout -b feature/AmazingFeature`) +3. Commit изменения (`git commit -m 'Add some AmazingFeature'`) +4. Push в branch (`git push origin feature/AmazingFeature`) +5. Откройте Pull Request + +## 📧 Контакты + +- Issues: [GitHub Issues](https://github.com/yourusername/can_sniffer/issues) +- Discussions: [GitHub Discussions](https://github.com/yourusername/can_sniffer/discussions) + +## 🙏 Благодарности + +- [python-can](https://github.com/hardbyte/python-can) - библиотека для работы с CAN +- [InfluxDB](https://www.influxdata.com/) - time-series база данных +- [Pydantic](https://docs.pydantic.dev/) - валидация данных + +--- + +⭐ Если проект полезен, поставьте звезду! + diff --git a/can_sniffer/src/handlers/example_handler.py b/can_sniffer/src/handlers/example_handler.py deleted file mode 100644 index b677766..0000000 --- a/can_sniffer/src/handlers/example_handler.py +++ /dev/null @@ -1,144 +0,0 @@ -""" -Пример обработчика для демонстрации плагинной архитектуры. - -Этот файл показывает, как легко добавить новый обработчик (например, Kafka, MQTT, WebSocket). -""" - -from typing import List, Dict, Any -from can_frame import CANFrame -from .base import BaseHandler - - -class ExampleHandler(BaseHandler): - """ - Пример обработчика для демонстрации. - - Этот обработчик можно использовать как шаблон для создания новых: - - KafkaHandler - - MQTTHandler - - WebSocketHandler - - FileHandler - и т.д. - """ - - def __init__(self, enabled: bool = True): - """Инициализация примера обработчика.""" - super().__init__(name="example", enabled=enabled) - self.processed_count = 0 - self.failed_count = 0 - - def initialize(self) -> bool: - """Инициализация обработчика.""" - if not self.enabled: - return False - - try: - # Здесь должна быть логика подключения к внешнему сервису - # Например: self.kafka_producer = KafkaProducer(...) - # Или: self.mqtt_client = mqtt.Client(...) - self._initialized = True - self.logger.info("Example handler initialized") - return True - except Exception as e: - self.logger.error(f"Failed to initialize example handler: {e}", exc_info=True) - return False - - def handle(self, frame: CANFrame) -> bool: - """Обработка одного CAN фрейма.""" - if not self.enabled or not self._initialized: - return False - - try: - # Здесь должна быть логика отправки одного фрейма - # Например: self.kafka_producer.send('can-topic', frame.to_dict()) - # Или: self.mqtt_client.publish('can/data', frame.to_dict()) - - self.processed_count += 1 - return True - except Exception as e: - self.logger.error( - f"Failed to handle frame: {e}", - exc_info=True, - extra={"can_id": frame.can_id_hex} - ) - self.failed_count += 1 - return False - - def handle_batch(self, frames: List[CANFrame]) -> int: - """Обработка батча CAN фреймов.""" - if not self.enabled or not self._initialized or not frames: - return 0 - - try: - # Здесь должна быть логика пакетной отправки - # Например: self.kafka_producer.send_batch([...]) - # Или: self.mqtt_client.publish_batch([...]) - - processed = 0 - for frame in frames: - if self.handle(frame): - processed += 1 - - return processed - except Exception as e: - self.logger.error( - f"Failed to handle batch: {e}", - exc_info=True, - extra={"batch_size": len(frames)} - ) - self.failed_count += len(frames) - return 0 - - def flush(self) -> None: - """Принудительная отправка накопленных данных.""" - if not self.enabled or not self._initialized: - return - - try: - # Здесь должна быть логика принудительной отправки - # Например: self.kafka_producer.flush() - # Или: self.mqtt_client.flush() - pass - except Exception as e: - self.logger.error(f"Failed to flush: {e}", exc_info=True) - - def shutdown(self) -> None: - """Корректное завершение работы обработчика.""" - if self._initialized: - try: - # Здесь должна быть логика закрытия соединений - # Например: self.kafka_producer.close() - # Или: self.mqtt_client.disconnect() - self.logger.info("Example handler closed") - except Exception as e: - self.logger.error(f"Error closing example handler: {e}", exc_info=True) - self._initialized = False - - def get_stats(self) -> Dict[str, Any]: - """Получение статистики обработчика.""" - return { - "handler": self.name, - "enabled": self.enabled, - "initialized": self._initialized, - "processed_count": self.processed_count, - "failed_count": self.failed_count - } - - -# Пример использования: -# -# from handlers import BaseHandler, StorageHandler, InfluxDBHandler -# from handlers.example_handler import ExampleHandler -# from socket_can.message_processor import MessageProcessor -# -# # Создаем кастомный pipeline -# handlers = [ -# StorageHandler(enabled=True), -# InfluxDBHandler(enabled=True), -# ExampleHandler(enabled=True), # Новый обработчик! -# ] -# -# # Используем в MessageProcessor -# processor = MessageProcessor(handlers=handlers) -# processor.start() -