add readme

This commit is contained in:
2026-01-07 03:21:18 +03:00
parent 0d2be70032
commit 2f189df8ee
2 changed files with 412 additions and 144 deletions

412
can_sniffer/README.md Normal file
View File

@@ -0,0 +1,412 @@
# 🚗 CAN Sniffer
[![Python](https://img.shields.io/badge/Python-3.11+-blue.svg)](https://www.python.org/downloads/)
[![License](https://img.shields.io/badge/License-MIT-green.svg)](LICENSE)
[![Code Style](https://img.shields.io/badge/Code%20Style-Black-black.svg)](https://github.com/psf/black)
[![Type Checking](https://img.shields.io/badge/Type%20Checking-mypy-blue.svg)](https://github.com/python/mypy)
> Высокопроизводительный CAN bus sniffer для Raspberry Pi с плагинной архитектурой обработчиков, поддержкой параллельного чтения нескольких шин и интеграцией с InfluxDB.
## ✨ Особенности
- 🔌 **Параллельное чтение** - одновременная работа с несколькими CAN интерфейсами
- 🧩 **Плагинная архитектура** - легко добавлять новые обработчики (Kafka, MQTT, WebSocket)
- 📊 **InfluxDB интеграция** - отправка данных в InfluxDB с retry и health-check
- 💾 **SQLite хранилище** - локальное временное хранилище с WAL режимом
-**Асинхронная обработка** - неблокирующая обработка сообщений через очередь
- 🔄 **Store-and-Forward** - надежная доставка данных с retry и backoff
- 📝 **Структурированное логирование** - детальное логирование с ротацией файлов
- ⚙️ **Гибкая конфигурация** - JSON файл + переменные окружения
- 🛡️ **Graceful shutdown** - корректное завершение работы всех компонентов
- 🎯 **Типобезопасность** - полная типизация с Pydantic
## 📋 Требования
- Python 3.11+
- Linux (для SocketCAN)
- CAN интерфейсы (например, MCP2515 на Raspberry Pi)
## 🚀 Установка
### 1. Клонирование репозитория
```bash
git clone <repository-url>
cd can_sniffer
```
### 2. Установка зависимостей
```bash
pip install -r requirements.txt
```
### 3. Настройка CAN интерфейсов
Убедитесь, что CAN интерфейсы настроены и доступны:
```bash
# Проверка доступных интерфейсов
ip link show
# Настройка интерфейса (пример)
sudo ip link set can0 type can bitrate 500000
sudo ip link set can0 up
```
### 4. Конфигурация
Скопируйте и отредактируйте `config.json`:
```json
{
"can": {
"interfaces": ["can0", "can1"],
"listen_only": true,
"bitrate": 500000,
"filters": []
},
"storage": {
"type": "sqlite",
"database_path": "can_offline.db",
"wal_mode": true,
"sync_mode": "NORMAL"
},
"influxdb": {
"enabled": true,
"url": "http://localhost:8086",
"token": "your-token",
"org": "automotive",
"bucket": "can_bus"
}
}
```
## 🎮 Использование
### Базовый запуск
```bash
cd src
python main.py
```
### Запуск как модуль
```bash
python -m can_sniffer.src.main
```
### С кастомными обработчиками
```python
from handlers import StorageHandler, InfluxDBHandler
from socket_can.message_processor import MessageProcessor
from socket_can import CANSniffer
# Создаем кастомный pipeline
handlers = [
StorageHandler(enabled=True),
InfluxDBHandler(enabled=True),
# Ваш кастомный обработчик
]
# Используем в MessageProcessor
processor = MessageProcessor(handlers=handlers)
sniffer = CANSniffer()
sniffer.start()
```
## 🏗️ Архитектура
```
┌─────────────┐
│ CAN Bus │
│ (can0/can1)│
└──────┬──────┘
┌─────────────────┐
│ CANSniffer │ ← Параллельное чтение
│ (SocketCAN) │
└──────┬──────────┘
┌─────────────────┐
│ MessageProcessor │ ← Асинхронная очередь
│ (Queue) │
└──────┬───────────┘
┌──────────────────┐
│ Handler Pipeline │
├──────────────────┤
│ • StorageHandler │ → SQLite
│ • InfluxHandler │ → InfluxDB
│ • CustomHandler │ → Kafka/MQTT/WS
└──────────────────┘
```
### Компоненты
- **CANFrame** - неизменяемая структура данных для CAN сообщений
- **CANSniffer** - параллельное чтение с нескольких CAN шин
- **MessageProcessor** - асинхронная обработка через очередь
- **BaseHandler** - интерфейс для плагинных обработчиков
- **StorageHandler** - сохранение в SQLite
- **InfluxDBHandler** - отправка в InfluxDB с retry
## 📦 Структура проекта
```
can_sniffer/
├── src/
│ ├── main.py # Точка входа
│ ├── config.py # Конфигурация (Pydantic)
│ ├── config.json # Файл конфигурации
│ ├── logger.py # Логирование
│ ├── can_frame.py # CANFrame dataclass
│ ├── socket_can/
│ │ ├── src.py # CANSniffer
│ │ └── message_processor.py # MessageProcessor
│ ├── handlers/
│ │ ├── base.py # BaseHandler
│ │ ├── storage_handler.py # SQLite handler
│ │ └── influxdb_handler.py # InfluxDB handler
│ ├── storage/
│ │ └── storage.py # SQLite storage
│ └── influxdb_handler/
│ └── influxdb_client.py # InfluxDB client
├── requirements.txt
└── README.md
```
## ⚙️ Конфигурация
### CAN настройки
```json
{
"can": {
"interfaces": ["can0", "can1"], // Список интерфейсов
"listen_only": true, // Только чтение
"bitrate": 500000, // Скорость (бит/с)
"filters": [] // SocketCAN фильтры
}
}
```
### Storage настройки
```json
{
"storage": {
"type": "sqlite",
"database_path": "can_offline.db",
"wal_mode": true, // WAL режим для производительности
"sync_mode": "NORMAL" // NORMAL/FULL/OFF
}
}
```
### InfluxDB настройки
```json
{
"influxdb": {
"enabled": true,
"url": "http://localhost:8086",
"token": "your-token",
"org": "automotive",
"bucket": "can_bus",
"batch_size": 1000, // Размер батча
"flush_interval": 5, // Интервал отправки (сек)
"max_retries": 3, // Количество повторов
"retry_backoff": 1.0, // Backoff интервал
"health_check_interval": 30 // Проверка здоровья (сек)
}
}
```
### Логирование
```json
{
"logging": {
"level": "INFO", // DEBUG/INFO/WARNING/ERROR
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"file": "can_edge.log",
"max_bytes": 10485760, // 10MB
"backup_count": 5 // Количество ротаций
}
}
```
### Переменные окружения
Конфигурацию можно переопределить через переменные окружения:
```bash
export CAN_INTERFACES="can0,can1"
export INFLUXDB_URL="http://localhost:8086"
export INFLUXDB_TOKEN="your-token"
export LOGGING_LEVEL="DEBUG"
```
## 🔌 Создание кастомного обработчика
```python
from handlers.base import BaseHandler
from can_frame import CANFrame
from typing import List, Dict, Any
class MyCustomHandler(BaseHandler):
def __init__(self, enabled: bool = True):
super().__init__(name="my_handler", enabled=enabled)
def initialize(self) -> bool:
# Инициализация (подключение к Kafka/MQTT/WebSocket)
if not self.enabled:
return False
# ... ваш код
self._initialized = True
return True
def handle(self, frame: CANFrame) -> bool:
# Обработка одного фрейма
# ... ваш код
return True
def handle_batch(self, frames: List[CANFrame]) -> int:
# Обработка батча
# ... ваш код
return len(frames)
def flush(self) -> None:
# Принудительная отправка
pass
def shutdown(self) -> None:
# Закрытие соединений
self._initialized = False
def get_stats(self) -> Dict[str, Any]:
return {
"handler": self.name,
"enabled": self.enabled,
"initialized": self._initialized
}
```
## 📊 Статистика
Приложение выводит статистику каждые 10 секунд:
```json
{
"processed_count": 12345,
"dropped_count": 0,
"queue_size": 0,
"running": true,
"handlers_count": 2,
"storage": {
"total_messages": 12345,
"database_path": "can_offline.db"
},
"influxdb": {
"enabled": true,
"sent_count": 12000,
"failed_count": 0,
"connection_status": "connected"
}
}
```
## 🐛 Отладка
### Включение DEBUG логирования
Измените в `config.json`:
```json
{
"logging": {
"level": "DEBUG"
}
}
```
### Проверка CAN интерфейсов
```bash
# Проверка статуса
ip link show can0
# Тестовая отправка (если не listen_only)
cansend can0 123#DEADBEEF
# Мониторинг трафика
candump can0
```
### Проверка InfluxDB
```bash
# Проверка соединения
curl -G http://localhost:8086/health
# Запрос данных
influx query 'from(bucket:"can_bus") |> range(start: -1h)'
```
## 🔧 Разработка
### Установка для разработки
```bash
pip install -r requirements.txt
```
### Запуск тестов
```bash
# TODO: добавить тесты
pytest tests/
```
### Форматирование кода
```bash
black src/
isort src/
```
## 📝 Лицензия
MIT License - см. [LICENSE](LICENSE) файл для деталей.
## 🤝 Вклад
Вклады приветствуются! Пожалуйста:
1. Fork проекта
2. Создайте feature branch (`git checkout -b feature/AmazingFeature`)
3. Commit изменения (`git commit -m 'Add some AmazingFeature'`)
4. Push в branch (`git push origin feature/AmazingFeature`)
5. Откройте Pull Request
## 📧 Контакты
- Issues: [GitHub Issues](https://github.com/yourusername/can_sniffer/issues)
- Discussions: [GitHub Discussions](https://github.com/yourusername/can_sniffer/discussions)
## 🙏 Благодарности
- [python-can](https://github.com/hardbyte/python-can) - библиотека для работы с CAN
- [InfluxDB](https://www.influxdata.com/) - time-series база данных
- [Pydantic](https://docs.pydantic.dev/) - валидация данных
---
⭐ Если проект полезен, поставьте звезду!

View File

@@ -1,144 +0,0 @@
"""
Пример обработчика для демонстрации плагинной архитектуры.
Этот файл показывает, как легко добавить новый обработчик (например, Kafka, MQTT, WebSocket).
"""
from typing import List, Dict, Any
from can_frame import CANFrame
from .base import BaseHandler
class ExampleHandler(BaseHandler):
"""
Пример обработчика для демонстрации.
Этот обработчик можно использовать как шаблон для создания новых:
- KafkaHandler
- MQTTHandler
- WebSocketHandler
- FileHandler
и т.д.
"""
def __init__(self, enabled: bool = True):
"""Инициализация примера обработчика."""
super().__init__(name="example", enabled=enabled)
self.processed_count = 0
self.failed_count = 0
def initialize(self) -> bool:
"""Инициализация обработчика."""
if not self.enabled:
return False
try:
# Здесь должна быть логика подключения к внешнему сервису
# Например: self.kafka_producer = KafkaProducer(...)
# Или: self.mqtt_client = mqtt.Client(...)
self._initialized = True
self.logger.info("Example handler initialized")
return True
except Exception as e:
self.logger.error(f"Failed to initialize example handler: {e}", exc_info=True)
return False
def handle(self, frame: CANFrame) -> bool:
"""Обработка одного CAN фрейма."""
if not self.enabled or not self._initialized:
return False
try:
# Здесь должна быть логика отправки одного фрейма
# Например: self.kafka_producer.send('can-topic', frame.to_dict())
# Или: self.mqtt_client.publish('can/data', frame.to_dict())
self.processed_count += 1
return True
except Exception as e:
self.logger.error(
f"Failed to handle frame: {e}",
exc_info=True,
extra={"can_id": frame.can_id_hex}
)
self.failed_count += 1
return False
def handle_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча CAN фреймов."""
if not self.enabled or not self._initialized or not frames:
return 0
try:
# Здесь должна быть логика пакетной отправки
# Например: self.kafka_producer.send_batch([...])
# Или: self.mqtt_client.publish_batch([...])
processed = 0
for frame in frames:
if self.handle(frame):
processed += 1
return processed
except Exception as e:
self.logger.error(
f"Failed to handle batch: {e}",
exc_info=True,
extra={"batch_size": len(frames)}
)
self.failed_count += len(frames)
return 0
def flush(self) -> None:
"""Принудительная отправка накопленных данных."""
if not self.enabled or not self._initialized:
return
try:
# Здесь должна быть логика принудительной отправки
# Например: self.kafka_producer.flush()
# Или: self.mqtt_client.flush()
pass
except Exception as e:
self.logger.error(f"Failed to flush: {e}", exc_info=True)
def shutdown(self) -> None:
"""Корректное завершение работы обработчика."""
if self._initialized:
try:
# Здесь должна быть логика закрытия соединений
# Например: self.kafka_producer.close()
# Или: self.mqtt_client.disconnect()
self.logger.info("Example handler closed")
except Exception as e:
self.logger.error(f"Error closing example handler: {e}", exc_info=True)
self._initialized = False
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики обработчика."""
return {
"handler": self.name,
"enabled": self.enabled,
"initialized": self._initialized,
"processed_count": self.processed_count,
"failed_count": self.failed_count
}
# Пример использования:
#
# from handlers import BaseHandler, StorageHandler, InfluxDBHandler
# from handlers.example_handler import ExampleHandler
# from socket_can.message_processor import MessageProcessor
#
# # Создаем кастомный pipeline
# handlers = [
# StorageHandler(enabled=True),
# InfluxDBHandler(enabled=True),
# ExampleHandler(enabled=True), # Новый обработчик!
# ]
#
# # Используем в MessageProcessor
# processor = MessageProcessor(handlers=handlers)
# processor.start()