🚗 CAN Sniffer
Высокопроизводительный CAN bus sniffer для Raspberry Pi с плагинной архитектурой обработчиков, поддержкой параллельного чтения нескольких шин и интеграцией с InfluxDB.
✨ Особенности
- 🔌 Параллельное чтение - одновременная работа с несколькими CAN интерфейсами
- 🧩 Плагинная архитектура - легко добавлять новые обработчики (Kafka, MQTT, WebSocket)
- 📊 InfluxDB интеграция - отправка данных в InfluxDB с retry и health-check
- 💾 SQLite хранилище - локальное временное хранилище с WAL режимом
- ⚡ Асинхронная обработка - неблокирующая обработка сообщений через очередь
- 🔄 Store-and-Forward - надежная доставка данных с retry и backoff
- 📝 Структурированное логирование - детальное логирование с ротацией файлов
- ⚙️ Гибкая конфигурация - JSON файл + переменные окружения
- 🛡️ Graceful shutdown - корректное завершение работы всех компонентов
- 🎯 Типобезопасность - полная типизация с Pydantic
📋 Требования
- Python 3.11+
- Linux (для SocketCAN)
- CAN интерфейсы (например, MCP2515 на Raspberry Pi)
🚀 Установка
1. Клонирование репозитория
git clone <repository-url>
cd can_sniffer
2. Установка зависимостей
pip install -r requirements.txt
3. Настройка CAN интерфейсов
Убедитесь, что CAN интерфейсы настроены и доступны:
# Проверка доступных интерфейсов
ip link show
# Настройка интерфейса (пример)
sudo ip link set can0 type can bitrate 500000
sudo ip link set can0 up
4. Конфигурация
Скопируйте и отредактируйте config.json:
{
"can": {
"interfaces": ["can0", "can1"],
"listen_only": true,
"bitrate": 500000,
"filters": []
},
"storage": {
"type": "sqlite",
"database_path": "can_offline.db",
"wal_mode": true,
"sync_mode": "NORMAL"
},
"influxdb": {
"enabled": true,
"url": "http://localhost:8086",
"token": "your-token",
"org": "automotive",
"bucket": "can_bus"
}
}
🎮 Использование
Базовый запуск
cd src
python main.py
Запуск как модуль
python -m can_sniffer.src.main
С кастомными обработчиками
from handlers import StorageHandler, InfluxDBHandler
from socket_can.message_processor import MessageProcessor
from socket_can import CANSniffer
# Создаем кастомный pipeline
handlers = [
StorageHandler(enabled=True),
InfluxDBHandler(enabled=True),
# Ваш кастомный обработчик
]
# Используем в MessageProcessor
processor = MessageProcessor(handlers=handlers)
sniffer = CANSniffer()
sniffer.start()
🏗️ Архитектура
┌─────────────┐
│ CAN Bus │
│ (can0/can1)│
└──────┬──────┘
│
▼
┌─────────────────┐
│ CANSniffer │ ← Параллельное чтение
│ (SocketCAN) │
└──────┬──────────┘
│
▼
┌─────────────────┐
│ MessageProcessor │ ← Асинхронная очередь
│ (Queue) │
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Handler Pipeline │
├──────────────────┤
│ • StorageHandler │ → SQLite
│ • InfluxHandler │ → InfluxDB
│ • CustomHandler │ → Kafka/MQTT/WS
└──────────────────┘
Компоненты
- CANFrame - неизменяемая структура данных для CAN сообщений
- CANSniffer - параллельное чтение с нескольких CAN шин
- MessageProcessor - асинхронная обработка через очередь
- BaseHandler - интерфейс для плагинных обработчиков
- StorageHandler - сохранение в SQLite
- InfluxDBHandler - отправка в InfluxDB с retry
📦 Структура проекта
can_sniffer/
├── src/
│ ├── main.py # Точка входа
│ ├── config.py # Конфигурация (Pydantic)
│ ├── config.json # Файл конфигурации
│ ├── logger.py # Логирование
│ ├── can_frame.py # CANFrame dataclass
│ ├── socket_can/
│ │ ├── src.py # CANSniffer
│ │ └── message_processor.py # MessageProcessor
│ ├── handlers/
│ │ ├── base.py # BaseHandler
│ │ ├── storage_handler.py # SQLite handler
│ │ └── influxdb_handler.py # InfluxDB handler
│ ├── storage/
│ │ └── storage.py # SQLite storage
│ └── influxdb_handler/
│ └── influxdb_client.py # InfluxDB client
├── requirements.txt
└── README.md
⚙️ Конфигурация
CAN настройки
{
"can": {
"interfaces": ["can0", "can1"], // Список интерфейсов
"listen_only": true, // Только чтение
"bitrate": 500000, // Скорость (бит/с)
"filters": [] // SocketCAN фильтры
}
}
Storage настройки
{
"storage": {
"type": "sqlite",
"database_path": "can_offline.db",
"wal_mode": true, // WAL режим для производительности
"sync_mode": "NORMAL" // NORMAL/FULL/OFF
}
}
InfluxDB настройки
{
"influxdb": {
"enabled": true,
"url": "http://localhost:8086",
"token": "your-token",
"org": "automotive",
"bucket": "can_bus",
"batch_size": 1000, // Размер батча
"flush_interval": 5, // Интервал отправки (сек)
"max_retries": 3, // Количество повторов
"retry_backoff": 1.0, // Backoff интервал
"health_check_interval": 30 // Проверка здоровья (сек)
}
}
Логирование
{
"logging": {
"level": "INFO", // DEBUG/INFO/WARNING/ERROR
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"file": "can_edge.log",
"max_bytes": 10485760, // 10MB
"backup_count": 5 // Количество ротаций
}
}
Переменные окружения
Конфигурацию можно переопределить через переменные окружения:
export CAN_INTERFACES="can0,can1"
export INFLUXDB_URL="http://localhost:8086"
export INFLUXDB_TOKEN="your-token"
export LOGGING_LEVEL="DEBUG"
🔌 Создание кастомного обработчика
from handlers.base import BaseHandler
from can_frame import CANFrame
from typing import List, Dict, Any
class MyCustomHandler(BaseHandler):
def __init__(self, enabled: bool = True):
super().__init__(name="my_handler", enabled=enabled)
def initialize(self) -> bool:
# Инициализация (подключение к Kafka/MQTT/WebSocket)
if not self.enabled:
return False
# ... ваш код
self._initialized = True
return True
def handle(self, frame: CANFrame) -> bool:
# Обработка одного фрейма
# ... ваш код
return True
def handle_batch(self, frames: List[CANFrame]) -> int:
# Обработка батча
# ... ваш код
return len(frames)
def flush(self) -> None:
# Принудительная отправка
pass
def shutdown(self) -> None:
# Закрытие соединений
self._initialized = False
def get_stats(self) -> Dict[str, Any]:
return {
"handler": self.name,
"enabled": self.enabled,
"initialized": self._initialized
}
📊 Статистика
Приложение выводит статистику каждые 10 секунд:
{
"processed_count": 12345,
"dropped_count": 0,
"queue_size": 0,
"running": true,
"handlers_count": 2,
"storage": {
"total_messages": 12345,
"database_path": "can_offline.db"
},
"influxdb": {
"enabled": true,
"sent_count": 12000,
"failed_count": 0,
"connection_status": "connected"
}
}
🐛 Отладка
Включение DEBUG логирования
Измените в config.json:
{
"logging": {
"level": "DEBUG"
}
}
Проверка CAN интерфейсов
# Проверка статуса
ip link show can0
# Тестовая отправка (если не listen_only)
cansend can0 123#DEADBEEF
# Мониторинг трафика
candump can0
Проверка InfluxDB
# Проверка соединения
curl -G http://localhost:8086/health
# Запрос данных
influx query 'from(bucket:"can_bus") |> range(start: -1h)'
🔧 Разработка
Установка для разработки
pip install -r requirements.txt
Запуск тестов
# TODO: добавить тесты
pytest tests/
Форматирование кода
black src/
isort src/
📝 Лицензия
MIT License - см. LICENSE файл для деталей.
🤝 Вклад
Вклады приветствуются! Пожалуйста:
- Fork проекта
- Создайте feature branch (
git checkout -b feature/AmazingFeature) - Commit изменения (
git commit -m 'Add some AmazingFeature') - Push в branch (
git push origin feature/AmazingFeature) - Откройте Pull Request
📧 Контакты
- Issues: GitHub Issues
- Discussions: GitHub Discussions
🙏 Благодарности
- python-can - библиотека для работы с CAN
- InfluxDB - time-series база данных
- Pydantic - валидация данных
⭐ Если проект полезен, поставьте звезду!