🚗 CAN Sniffer
Высокопроизводительный CAN bus sniffer для Raspberry Pi с плагинной архитектурой обработчиков, поддержкой параллельного чтения нескольких шин и интеграцией с InfluxDB.
✨ Особенности
- 🔌 Параллельное чтение - одновременная работа с несколькими CAN интерфейсами
- 🧩 Плагинная архитектура - легко добавлять новые обработчики (Kafka, MQTT, WebSocket)
- 📊 InfluxDB интеграция - отправка данных в InfluxDB с retry и health-check
- 💾 SQLite хранилище - локальное временное хранилище с WAL режимом
- ⚡ Асинхронная обработка - неблокирующая обработка сообщений через очередь
- 🔄 Store-and-Forward - надежная доставка данных с retry и backoff
- 📝 Структурированное логирование - детальное логирование с ротацией файлов
- ⚙️ Гибкая конфигурация - JSON файл + переменные окружения
- 🛡️ Graceful shutdown - корректное завершение работы всех компонентов
- 🎯 Типобезопасность - полная типизация с Pydantic
📋 Требования
- Python 3.11+
- Linux (для SocketCAN)
- CAN интерфейсы (например, MCP2515 на Raspberry Pi)
🚀 Установка
1. Клонирование репозитория
git clone <repository-url>
cd can_sniffer
2. Установка зависимостей
pip install -r requirements.txt
3. Настройка CAN интерфейсов
Убедитесь, что CAN интерфейсы настроены и доступны:
# Проверка доступных интерфейсов
ip link show
# Настройка интерфейса (пример для 1Mbps)
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
sudo ip link set can1 type can bitrate 1000000
sudo ip link set can1 up
4. Конфигурация
Скопируйте и отредактируйте config.json:
{
"can": {
"interfaces": ["can0", "can1"],
"listen_only": true,
"bitrate": 500000,
"filters": []
},
"storage": {
"type": "sqlite",
"database_path": "can_offline.db",
"wal_mode": true,
"sync_mode": "NORMAL"
},
"influxdb": {
"enabled": true,
"url": "http://localhost:8086",
"token": "your-token",
"org": "automotive",
"bucket": "can_bus"
}
}
Deployment (Raspberry Pi 5)
Для автоматического запуска на Raspberry Pi 5 с 2-CH CAN HAT используйте скрипты из deploy/:
# Установка с автозапуском через systemd
cd deploy
sudo ./install.sh
Установщик автоматически:
- Настроит CAN интерфейсы при загрузке (
can-setup.service) - Запустит sniffer как systemd сервис (
can-sniffer.service) - Создаст директории для данных и логов в
/opt/can_sniffer/
Подробная документация: deploy/README.md
# Управление сервисом
sudo systemctl status can-sniffer
sudo journalctl -u can-sniffer -f
# Диагностика
sudo ./deploy/diagnose.sh
Использование
Базовый запуск
cd src
python main.py
Запуск как модуль
python -m can_sniffer.src.main
С кастомными обработчиками
from handlers import StorageHandler, InfluxDBHandler
from socket_can.message_processor import MessageProcessor
from socket_can import CANSniffer
# Создаем кастомный pipeline
handlers = [
StorageHandler(enabled=True),
InfluxDBHandler(enabled=True),
# Ваш кастомный обработчик
]
# Используем в MessageProcessor
processor = MessageProcessor(handlers=handlers)
sniffer = CANSniffer()
sniffer.start()
🏗️ Архитектура
┌─────────────┐
│ CAN Bus │
│ (can0/can1)│
└──────┬──────┘
│
▼
┌─────────────────┐
│ CANSniffer │ ← Параллельное чтение
│ (SocketCAN) │
└──────┬──────────┘
│
▼
┌─────────────────┐
│ MessageProcessor │ ← Асинхронная очередь
│ (Queue) │
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Handler Pipeline │
├──────────────────┤
│ • StorageHandler │ → SQLite
│ • InfluxHandler │ → InfluxDB
│ • CustomHandler │ → Kafka/MQTT/WS
└──────────────────┘
Компоненты
- CANFrame - неизменяемая структура данных для CAN сообщений
- CANSniffer - параллельное чтение с нескольких CAN шин
- MessageProcessor - асинхронная обработка через очередь
- BaseHandler - интерфейс для плагинных обработчиков
- StorageHandler - сохранение в SQLite
- InfluxDBHandler - отправка в InfluxDB с retry
📦 Структура проекта
can_sniffer/
├── src/
│ ├── main.py # Точка входа
│ ├── config.py # Конфигурация (Pydantic)
│ ├── config.json # Файл конфигурации
│ ├── logger.py # Логирование
│ ├── can_frame.py # CANFrame dataclass
│ ├── socket_can/
│ │ ├── src.py # CANSniffer
│ │ └── message_processor.py # MessageProcessor
│ ├── handlers/
│ │ ├── base.py # BaseHandler
│ │ ├── storage_handler.py # SQLite handler
│ │ └── influxdb_handler.py # InfluxDB handler
│ ├── storage/
│ │ └── storage.py # SQLite storage
│ └── influxdb_handler/
│ └── influxdb_client.py # InfluxDB client
├── requirements.txt
└── README.md
⚙️ Конфигурация
CAN настройки
{
"can": {
"interfaces": ["can0", "can1"], // Список интерфейсов
"listen_only": true, // Только чтение
"bitrate": 500000, // Скорость (бит/с)
"filters": [] // SocketCAN фильтры
}
}
Storage настройки
{
"storage": {
"type": "sqlite",
"database_path": "can_offline.db",
"wal_mode": true, // WAL режим для производительности
"sync_mode": "NORMAL" // NORMAL/FULL/OFF
}
}
InfluxDB настройки
{
"influxdb": {
"enabled": true,
"url": "http://localhost:8086",
"token": "your-token",
"org": "automotive",
"bucket": "can_bus",
"batch_size": 1000, // Размер батча
"flush_interval": 5, // Интервал отправки (сек)
"max_retries": 3, // Количество повторов
"retry_backoff": 1.0, // Backoff интервал
"health_check_interval": 30 // Проверка здоровья (сек)
}
}
Логирование
{
"logging": {
"level": "INFO", // DEBUG/INFO/WARNING/ERROR
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"file": "can_edge.log",
"max_bytes": 10485760, // 10MB
"backup_count": 5 // Количество ротаций
}
}
Переменные окружения
Конфигурацию можно переопределить через переменные окружения:
export CAN_INTERFACES="can0,can1"
export INFLUXDB_URL="http://localhost:8086"
export INFLUXDB_TOKEN="your-token"
export LOGGING_LEVEL="DEBUG"
🔌 Создание кастомного обработчика
from handlers.base import BaseHandler
from can_frame import CANFrame
from typing import List, Dict, Any
class MyCustomHandler(BaseHandler):
def __init__(self, enabled: bool = True):
super().__init__(name="my_handler", enabled=enabled)
def initialize(self) -> bool:
# Инициализация (подключение к Kafka/MQTT/WebSocket)
if not self.enabled:
return False
# ... ваш код
self._initialized = True
return True
def handle(self, frame: CANFrame) -> bool:
# Обработка одного фрейма
# ... ваш код
return True
def handle_batch(self, frames: List[CANFrame]) -> int:
# Обработка батча
# ... ваш код
return len(frames)
def flush(self) -> None:
# Принудительная отправка
pass
def shutdown(self) -> None:
# Закрытие соединений
self._initialized = False
def get_stats(self) -> Dict[str, Any]:
return {
"handler": self.name,
"enabled": self.enabled,
"initialized": self._initialized
}
📊 Статистика
Приложение выводит статистику каждые 10 секунд:
{
"processed_count": 12345,
"dropped_count": 0,
"queue_size": 0,
"running": true,
"handlers_count": 2,
"storage": {
"total_messages": 12345,
"database_path": "can_offline.db"
},
"influxdb": {
"enabled": true,
"sent_count": 12000,
"failed_count": 0,
"connection_status": "connected"
}
}
🐛 Отладка
Включение DEBUG логирования
Измените в config.json:
{
"logging": {
"level": "DEBUG"
}
}
Проверка CAN интерфейсов
# Проверка статуса
ip link show can0
# Тестовая отправка (если не listen_only)
cansend can0 123#DEADBEEF
# Мониторинг трафика
candump can0
Проверка InfluxDB
# Проверка соединения
curl -G http://localhost:8086/health
# Запрос данных
influx query 'from(bucket:"can_bus") |> range(start: -1h)'
🔧 Разработка
Установка для разработки
pip install -r requirements.txt
Запуск тестов
# TODO: добавить тесты
pytest tests/
Форматирование кода
black src/
isort src/
📝 Лицензия
MIT License - см. LICENSE файл для деталей.
🤝 Вклад
Вклады приветствуются! Пожалуйста:
- Fork проекта
- Создайте feature branch (
git checkout -b feature/AmazingFeature) - Commit изменения (
git commit -m 'Add some AmazingFeature') - Push в branch (
git push origin feature/AmazingFeature) - Откройте Pull Request
📧 Контакты
- Issues: GitHub Issues
- Discussions: GitHub Discussions
🙏 Благодарности
- python-can - библиотека для работы с CAN
- InfluxDB - time-series база данных
- Pydantic - валидация данных
⭐ Если проект полезен, поставьте звезду!