Update README
This commit is contained in:
@@ -1,287 +1,382 @@
|
||||
# 🚗 CAN Sniffer
|
||||
# CAN Sniffer
|
||||
|
||||
[](https://www.python.org/downloads/)
|
||||
[](LICENSE)
|
||||
[](https://github.com/psf/black)
|
||||
[](https://github.com/python/mypy)
|
||||
|
||||
> Высокопроизводительный CAN bus sniffer для Raspberry Pi с плагинной архитектурой обработчиков, поддержкой параллельного чтения нескольких шин и интеграцией с InfluxDB.
|
||||
High-performance CAN bus sniffer for Raspberry Pi with offline-first architecture, plugin-based handler system, and store-and-forward synchronization to PostgreSQL.
|
||||
|
||||
## ✨ Особенности
|
||||
## Features
|
||||
|
||||
- 🔌 **Параллельное чтение** - одновременная работа с несколькими CAN интерфейсами
|
||||
- 🧩 **Плагинная архитектура** - легко добавлять новые обработчики (Kafka, MQTT, WebSocket)
|
||||
- 📊 **InfluxDB интеграция** - отправка данных в InfluxDB с retry и health-check
|
||||
- 💾 **SQLite хранилище** - локальное временное хранилище с WAL режимом
|
||||
- ⚡ **Асинхронная обработка** - неблокирующая обработка сообщений через очередь
|
||||
- 🔄 **Store-and-Forward** - надежная доставка данных с retry и backoff
|
||||
- 📝 **Структурированное логирование** - детальное логирование с ротацией файлов
|
||||
- ⚙️ **Гибкая конфигурация** - JSON файл + переменные окружения
|
||||
- 🛡️ **Graceful shutdown** - корректное завершение работы всех компонентов
|
||||
- 🎯 **Типобезопасность** - полная типизация с Pydantic
|
||||
- **Parallel CAN Reading** - Simultaneous capture from multiple CAN interfaces with dedicated threads
|
||||
- **Offline-First Architecture** - All messages stored locally in SQLite; PostgreSQL sync is secondary
|
||||
- **Plugin Handler System** - Easily extensible with custom handlers (Kafka, MQTT, WebSocket, etc.)
|
||||
- **Store-and-Forward Sync** - Reliable delivery to PostgreSQL with retry, backoff, and automatic recovery
|
||||
- **Backpressure Management** - Adaptive read delays prevent queue overflow under high load
|
||||
- **Flipper Zero Integration** - UART-based status reporting to Flipper Zero device
|
||||
- **Structured Logging** - Detailed logs with ANSI colors, rotation, and CAN frame formatting
|
||||
- **Flexible Configuration** - JSON config with environment variable overrides
|
||||
- **Graceful Shutdown** - Proper cleanup of all threads and connections on SIGINT/SIGTERM
|
||||
- **Type Safety** - Full type hints with Pydantic validation
|
||||
|
||||
## 📋 Требования
|
||||
## Requirements
|
||||
|
||||
- Python 3.11+
|
||||
- Linux (для SocketCAN)
|
||||
- CAN интерфейсы (например, MCP2515 на Raspberry Pi)
|
||||
- Linux with SocketCAN support
|
||||
- CAN interface hardware (MCP2515, MCP251xFD, or similar)
|
||||
- SQLite (bundled with Python)
|
||||
- PostgreSQL 12+ (optional, for real-time sync)
|
||||
- Flipper Zero with UART (optional, for status display)
|
||||
|
||||
## 🚀 Установка
|
||||
## Installation
|
||||
|
||||
### 1. Клонирование репозитория
|
||||
### 1. Clone the Repository
|
||||
|
||||
```bash
|
||||
git clone <repository-url>
|
||||
cd can_sniffer
|
||||
```
|
||||
|
||||
### 2. Установка зависимостей
|
||||
### 2. Create Virtual Environment
|
||||
|
||||
```bash
|
||||
python -m venv venv
|
||||
source venv/bin/activate # Linux/macOS
|
||||
```
|
||||
|
||||
### 3. Install Dependencies
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### 3. Настройка CAN интерфейсов
|
||||
|
||||
Убедитесь, что CAN интерфейсы настроены и доступны:
|
||||
### 4. Configure CAN Interfaces
|
||||
|
||||
```bash
|
||||
# Проверка доступных интерфейсов
|
||||
# Check available interfaces
|
||||
ip link show
|
||||
|
||||
# Настройка интерфейса (пример для 1Mbps)
|
||||
# Configure interface (example for 1Mbps)
|
||||
sudo ip link set can0 type can bitrate 1000000
|
||||
sudo ip link set can0 up
|
||||
|
||||
# For dual-channel setup
|
||||
sudo ip link set can1 type can bitrate 1000000
|
||||
sudo ip link set can1 up
|
||||
|
||||
# Verify interface is operational
|
||||
candump can0
|
||||
```
|
||||
|
||||
### 4. Конфигурация
|
||||
### 5. Configure Application
|
||||
|
||||
Скопируйте и отредактируйте `config.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"can": {
|
||||
"interfaces": ["can0", "can1"],
|
||||
"listen_only": true,
|
||||
"bitrate": 500000,
|
||||
"filters": []
|
||||
},
|
||||
"storage": {
|
||||
"type": "sqlite",
|
||||
"database_path": "can_offline.db",
|
||||
"wal_mode": true,
|
||||
"sync_mode": "NORMAL"
|
||||
},
|
||||
"influxdb": {
|
||||
"enabled": true,
|
||||
"url": "http://localhost:8086",
|
||||
"token": "your-token",
|
||||
"org": "automotive",
|
||||
"bucket": "can_bus"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Deployment (Raspberry Pi 5)
|
||||
|
||||
Для автоматического запуска на Raspberry Pi 5 с 2-CH CAN HAT используйте скрипты из `deploy/`:
|
||||
Copy and edit `src/config.json`:
|
||||
|
||||
```bash
|
||||
# Установка с автозапуском через systemd
|
||||
cd deploy
|
||||
sudo ./install.sh
|
||||
cp src/config.json.example src/config.json
|
||||
# Edit configuration as needed
|
||||
```
|
||||
|
||||
Установщик автоматически:
|
||||
- Настроит CAN интерфейсы при загрузке (`can-setup.service`)
|
||||
- Запустит sniffer как systemd сервис (`can-sniffer.service`)
|
||||
- Создаст директории для данных и логов в `/opt/can_sniffer/`
|
||||
## Usage
|
||||
|
||||
Подробная документация: [deploy/README.md](deploy/README.md)
|
||||
|
||||
```bash
|
||||
# Управление сервисом
|
||||
sudo systemctl status can-sniffer
|
||||
sudo journalctl -u can-sniffer -f
|
||||
|
||||
# Диагностика
|
||||
sudo ./deploy/diagnose.sh
|
||||
```
|
||||
|
||||
## Использование
|
||||
|
||||
### Базовый запуск
|
||||
### Basic Run
|
||||
|
||||
```bash
|
||||
cd src
|
||||
python main.py
|
||||
```
|
||||
|
||||
### Запуск как модуль
|
||||
### Run as Module
|
||||
|
||||
```bash
|
||||
python -m can_sniffer.src.main
|
||||
```
|
||||
|
||||
### С кастомными обработчиками
|
||||
### Run with Debug Logging
|
||||
|
||||
```python
|
||||
from handlers import StorageHandler, InfluxDBHandler
|
||||
from socket_can.message_processor import MessageProcessor
|
||||
from socket_can import CANSniffer
|
||||
|
||||
# Создаем кастомный pipeline
|
||||
handlers = [
|
||||
StorageHandler(enabled=True),
|
||||
InfluxDBHandler(enabled=True),
|
||||
# Ваш кастомный обработчик
|
||||
]
|
||||
|
||||
# Используем в MessageProcessor
|
||||
processor = MessageProcessor(handlers=handlers)
|
||||
sniffer = CANSniffer()
|
||||
sniffer.start()
|
||||
```bash
|
||||
CAN_SNIFFER_LOGGING__LEVEL=DEBUG python main.py
|
||||
```
|
||||
|
||||
## 🏗️ Архитектура
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────┐
|
||||
│ CAN Bus │
|
||||
│ (can0/can1)│
|
||||
└──────┬──────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ CANSniffer │ ← Параллельное чтение
|
||||
│ (SocketCAN) │
|
||||
└──────┬──────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────┐
|
||||
│ MessageProcessor │ ← Асинхронная очередь
|
||||
│ (Queue) │
|
||||
└──────┬───────────┘
|
||||
│
|
||||
▼
|
||||
┌──────────────────┐
|
||||
│ Handler Pipeline │
|
||||
├──────────────────┤
|
||||
│ • StorageHandler │ → SQLite
|
||||
│ • InfluxHandler │ → InfluxDB
|
||||
│ • CustomHandler │ → Kafka/MQTT/WS
|
||||
└──────────────────┘
|
||||
Physical CAN Bus (can0, can1)
|
||||
│
|
||||
├──► [CANBusHandler-can0] (Thread)
|
||||
│ │ • Reads via bus.recv()
|
||||
│ │ • Creates CANFrame objects
|
||||
│ │ • Monitors queue backpressure
|
||||
│ └──► enqueue()
|
||||
│
|
||||
├──► [CANBusHandler-can1] (Thread)
|
||||
│ └──► enqueue()
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────┐
|
||||
│ MessageProcessor Queue (100K) │
|
||||
│ • Non-blocking enqueue │
|
||||
│ • Batch accumulation (10K / 0.1s) │
|
||||
└────────────────┬────────────────────────┘
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────────┐
|
||||
│ Handler Pipeline │
|
||||
├─────────────────────────────────────────┤
|
||||
│ StorageHandler ──► SQLite (offline) │
|
||||
│ • Always enabled │
|
||||
│ • WAL mode, batch inserts │
|
||||
│ • Sets processed=0 for sync │
|
||||
├─────────────────────────────────────────┤
|
||||
│ PostgreSQLHandler ──► PostgreSQL │
|
||||
│ • Optional (config-controlled) │
|
||||
│ • Async queue with separate thread │
|
||||
│ • Syncs unprocessed from SQLite │
|
||||
│ • Retry with exponential backoff │
|
||||
├─────────────────────────────────────────┤
|
||||
│ FlipperHandler ──► UART (/dev/ttyAMA0) │
|
||||
│ • Optional status display │
|
||||
│ • Handshake protocol │
|
||||
│ • Periodic stats transmission │
|
||||
└─────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Компоненты
|
||||
### Key Components
|
||||
|
||||
- **CANFrame** - неизменяемая структура данных для CAN сообщений
|
||||
- **CANSniffer** - параллельное чтение с нескольких CAN шин
|
||||
- **MessageProcessor** - асинхронная обработка через очередь
|
||||
- **BaseHandler** - интерфейс для плагинных обработчиков
|
||||
- **StorageHandler** - сохранение в SQLite
|
||||
- **InfluxDBHandler** - отправка в InfluxDB с retry
|
||||
| Component | File | Description |
|
||||
|-----------|------|-------------|
|
||||
| **CANFrame** | `can_frame.py` | Immutable dataclass for CAN messages |
|
||||
| **CANSniffer** | `socket_can/src.py` | Orchestrates parallel CAN reading |
|
||||
| **CANBusHandler** | `socket_can/src.py` | Per-interface reader with backpressure |
|
||||
| **MessageProcessor** | `socket_can/message_processor.py` | Async queue and handler pipeline |
|
||||
| **BaseHandler** | `handlers/base.py` | Abstract interface for plugins |
|
||||
| **StorageHandler** | `handlers/storage_handler.py` | SQLite persistence |
|
||||
| **PostgreSQLHandler** | `handlers/postgresql_handler.py` | PostgreSQL forwarding |
|
||||
| **FlipperHandler** | `handlers/flipper_handler.py` | Flipper Zero UART |
|
||||
| **Storage** | `storage/storage.py` | SQLite singleton with WAL |
|
||||
| **PostgreSQLClient** | `postgresql_handler/postgresql_client.py` | Connection pool and sync |
|
||||
| **Config** | `config.py` | Pydantic configuration |
|
||||
| **Logger** | `logger.py` | Structured logging system |
|
||||
|
||||
## 📦 Структура проекта
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
can_sniffer/
|
||||
├── src/
|
||||
│ ├── main.py # Точка входа
|
||||
│ ├── config.py # Конфигурация (Pydantic)
|
||||
│ ├── config.json # Файл конфигурации
|
||||
│ ├── logger.py # Логирование
|
||||
│ ├── can_frame.py # CANFrame dataclass
|
||||
│ ├── main.py # Entry point, signal handling
|
||||
│ ├── config.py # Pydantic configuration
|
||||
│ ├── config.json # Runtime configuration
|
||||
│ ├── can_frame.py # CANFrame dataclass
|
||||
│ ├── logger.py # Structured logging
|
||||
│ ├── socket_can/
|
||||
│ │ ├── src.py # CANSniffer
|
||||
│ │ └── message_processor.py # MessageProcessor
|
||||
│ │ ├── __init__.py
|
||||
│ │ ├── src.py # CANSniffer, CANBusHandler
|
||||
│ │ └── message_processor.py # MessageProcessor
|
||||
│ ├── handlers/
|
||||
│ │ ├── base.py # BaseHandler
|
||||
│ │ ├── storage_handler.py # SQLite handler
|
||||
│ │ └── influxdb_handler.py # InfluxDB handler
|
||||
│ │ ├── __init__.py
|
||||
│ │ ├── base.py # BaseHandler interface
|
||||
│ │ ├── storage_handler.py # SQLite handler
|
||||
│ │ ├── postgresql_handler.py # PostgreSQL handler
|
||||
│ │ └── flipper_handler.py # Flipper Zero handler
|
||||
│ ├── storage/
|
||||
│ │ └── storage.py # SQLite storage
|
||||
│ └── influxdb_handler/
|
||||
│ └── influxdb_client.py # InfluxDB client
|
||||
│ │ ├── __init__.py
|
||||
│ │ └── storage.py # SQLite singleton
|
||||
│ └── postgresql_handler/
|
||||
│ ├── __init__.py
|
||||
│ └── postgresql_client.py # PostgreSQL client
|
||||
├── deploy/
|
||||
│ ├── README.md # Deployment documentation
|
||||
│ ├── install.sh # Installation script
|
||||
│ ├── diagnose.sh # Diagnostic script
|
||||
│ └── config.production.json # Production config example
|
||||
├── requirements.txt
|
||||
└── README.md
|
||||
```
|
||||
|
||||
## ⚙️ Конфигурация
|
||||
## Configuration
|
||||
|
||||
### CAN настройки
|
||||
Configuration is loaded from (in order of precedence):
|
||||
1. `src/config.json`
|
||||
2. `~/.can_sniffer/config.json`
|
||||
3. Environment variables (override)
|
||||
|
||||
### Full Configuration Reference
|
||||
|
||||
```json
|
||||
{
|
||||
"can": {
|
||||
"interfaces": ["can0", "can1"], // Список интерфейсов
|
||||
"listen_only": true, // Только чтение
|
||||
"bitrate": 500000, // Скорость (бит/с)
|
||||
"filters": [] // SocketCAN фильтры
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Storage настройки
|
||||
|
||||
```json
|
||||
{
|
||||
"interfaces": ["can0", "can1"],
|
||||
"listen_only": true,
|
||||
"bitrate": 1000000,
|
||||
"filters": []
|
||||
},
|
||||
"storage": {
|
||||
"type": "sqlite",
|
||||
"database_path": "can_offline.db",
|
||||
"wal_mode": true, // WAL режим для производительности
|
||||
"sync_mode": "NORMAL" // NORMAL/FULL/OFF
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### InfluxDB настройки
|
||||
|
||||
```json
|
||||
{
|
||||
"influxdb": {
|
||||
"wal_mode": true,
|
||||
"sync_mode": "NORMAL",
|
||||
"retention_days": 7
|
||||
},
|
||||
"postgresql": {
|
||||
"enabled": true,
|
||||
"url": "http://localhost:8086",
|
||||
"token": "your-token",
|
||||
"org": "automotive",
|
||||
"bucket": "can_bus",
|
||||
"batch_size": 1000, // Размер батча
|
||||
"flush_interval": 5, // Интервал отправки (сек)
|
||||
"max_retries": 3, // Количество повторов
|
||||
"retry_backoff": 1.0, // Backoff интервал
|
||||
"health_check_interval": 30 // Проверка здоровья (сек)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Логирование
|
||||
|
||||
```json
|
||||
{
|
||||
"host": "localhost",
|
||||
"port": 5432,
|
||||
"database": "canbus",
|
||||
"user": "canbus",
|
||||
"password": "your-password",
|
||||
"batch_size": 10000,
|
||||
"flush_interval": 5,
|
||||
"max_retries": 3,
|
||||
"retry_backoff": 1.0,
|
||||
"connection_pool_size": 5,
|
||||
"connection_timeout": 10,
|
||||
"sync_interval": 30.0
|
||||
},
|
||||
"flipper": {
|
||||
"enabled": false,
|
||||
"device": "/dev/ttyAMA0",
|
||||
"baudrate": 115200,
|
||||
"send_interval": 1.0
|
||||
},
|
||||
"logging": {
|
||||
"level": "INFO", // DEBUG/INFO/WARNING/ERROR
|
||||
"level": "INFO",
|
||||
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
"file": "can_edge.log",
|
||||
"max_bytes": 10485760, // 10MB
|
||||
"backup_count": 5 // Количество ротаций
|
||||
"max_bytes": 10485760,
|
||||
"backup_count": 5
|
||||
},
|
||||
"general": {
|
||||
"buffer_size": 100000,
|
||||
"batch_size": 10000,
|
||||
"batch_interval": 0.1,
|
||||
"max_retries": 3,
|
||||
"retry_delay": 1.0
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Переменные окружения
|
||||
### Configuration Sections
|
||||
|
||||
Конфигурацию можно переопределить через переменные окружения:
|
||||
#### CAN Settings
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `interfaces` | list | `["can0"]` | CAN interfaces to monitor |
|
||||
| `listen_only` | bool | `true` | Read-only mode (no frame transmission) |
|
||||
| `bitrate` | int | `500000` | Bus speed (must match physical bus) |
|
||||
| `filters` | list | `[]` | SocketCAN filters for selective capture |
|
||||
|
||||
#### Storage Settings (SQLite)
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `database_path` | str | `can_offline.db` | SQLite database file path |
|
||||
| `wal_mode` | bool | `true` | Enable WAL for better concurrency |
|
||||
| `sync_mode` | str | `NORMAL` | SQLite synchronous mode (NORMAL/FULL/OFF) |
|
||||
| `retention_days` | int | `7` | Auto-cleanup older messages |
|
||||
|
||||
#### PostgreSQL Settings
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `enabled` | bool | `false` | Enable PostgreSQL forwarding |
|
||||
| `host` | str | `localhost` | PostgreSQL server address |
|
||||
| `port` | int | `5432` | PostgreSQL server port |
|
||||
| `database` | str | `canbus` | Database name |
|
||||
| `user` | str | `canbus` | Database user |
|
||||
| `password` | str | - | Database password |
|
||||
| `batch_size` | int | `1000` | Messages per batch insert |
|
||||
| `flush_interval` | int | `5` | Seconds between flushes |
|
||||
| `max_retries` | int | `3` | Retry attempts on failure |
|
||||
| `retry_backoff` | float | `1.0` | Exponential backoff base |
|
||||
| `connection_pool_size` | int | `5` | Connection pool max size |
|
||||
| `connection_timeout` | int | `10` | Connection timeout seconds |
|
||||
| `sync_interval` | float | `30.0` | SQLite sync interval seconds |
|
||||
|
||||
#### Flipper Zero Settings
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `enabled` | bool | `false` | Enable Flipper Zero UART |
|
||||
| `device` | str | `/dev/ttyAMA0` | Serial device path |
|
||||
| `baudrate` | int | `115200` | Serial baud rate |
|
||||
| `send_interval` | float | `1.0` | Stats transmission interval |
|
||||
|
||||
#### Logging Settings
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `level` | str | `INFO` | Log level (DEBUG/INFO/WARNING/ERROR) |
|
||||
| `format` | str | - | Log message format |
|
||||
| `file` | str | `can_edge.log` | Log file path |
|
||||
| `max_bytes` | int | `10485760` | Max log file size (10MB) |
|
||||
| `backup_count` | int | `5` | Number of rotated log files |
|
||||
|
||||
#### General Settings
|
||||
|
||||
| Parameter | Type | Default | Description |
|
||||
|-----------|------|---------|-------------|
|
||||
| `buffer_size` | int | `100000` | Message queue capacity |
|
||||
| `batch_size` | int | `10000` | Handler batch size |
|
||||
| `batch_interval` | float | `0.1` | Batch accumulation time |
|
||||
| `max_retries` | int | `3` | General retry attempts |
|
||||
| `retry_delay` | float | `1.0` | Retry delay seconds |
|
||||
|
||||
### Environment Variables
|
||||
|
||||
Override any configuration using the `CAN_SNIFFER_` prefix with `__` for nested keys:
|
||||
|
||||
```bash
|
||||
export CAN_INTERFACES="can0,can1"
|
||||
export INFLUXDB_URL="http://localhost:8086"
|
||||
export INFLUXDB_TOKEN="your-token"
|
||||
export LOGGING_LEVEL="DEBUG"
|
||||
# CAN configuration
|
||||
export CAN_SNIFFER_CAN__INTERFACES="can0,can1"
|
||||
export CAN_SNIFFER_CAN__BITRATE="1000000"
|
||||
|
||||
# PostgreSQL configuration
|
||||
export CAN_SNIFFER_POSTGRESQL__ENABLED="true"
|
||||
export CAN_SNIFFER_POSTGRESQL__HOST="192.168.1.100"
|
||||
export CAN_SNIFFER_POSTGRESQL__PASSWORD="secret"
|
||||
|
||||
# Logging level
|
||||
export CAN_SNIFFER_LOGGING__LEVEL="DEBUG"
|
||||
|
||||
# Flipper Zero
|
||||
export CAN_SNIFFER_FLIPPER__ENABLED="true"
|
||||
export CAN_SNIFFER_FLIPPER__DEVICE="/dev/ttyUSB0"
|
||||
```
|
||||
|
||||
## 🔌 Создание кастомного обработчика
|
||||
## Database Schema
|
||||
|
||||
### SQLite (Local Storage)
|
||||
|
||||
```sql
|
||||
CREATE TABLE can_messages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
timestamp REAL NOT NULL, -- Unix timestamp with microseconds
|
||||
interface TEXT NOT NULL, -- CAN interface name
|
||||
can_id INTEGER NOT NULL, -- CAN identifier
|
||||
can_id_hex TEXT NOT NULL, -- Hex representation for queries
|
||||
is_extended INTEGER NOT NULL, -- Extended frame flag (0/1)
|
||||
dlc INTEGER NOT NULL, -- Data length code
|
||||
data BLOB NOT NULL, -- Raw payload
|
||||
data_hex TEXT NOT NULL, -- Hex representation
|
||||
processed INTEGER DEFAULT 0, -- Sync flag for PostgreSQL
|
||||
created_at REAL DEFAULT (strftime('%s', 'now'))
|
||||
);
|
||||
|
||||
-- Performance indices
|
||||
CREATE INDEX idx_timestamp ON can_messages(timestamp);
|
||||
CREATE INDEX idx_interface ON can_messages(interface);
|
||||
CREATE INDEX idx_can_id ON can_messages(can_id);
|
||||
CREATE INDEX idx_processed ON can_messages(processed);
|
||||
CREATE INDEX idx_timestamp_interface ON can_messages(timestamp, interface);
|
||||
```
|
||||
|
||||
### PostgreSQL (Remote Storage)
|
||||
|
||||
The same schema is used for PostgreSQL. The `processed` flag in SQLite tracks which messages have been synced.
|
||||
|
||||
## Creating Custom Handlers
|
||||
|
||||
Implement the `BaseHandler` interface to create custom processors:
|
||||
|
||||
```python
|
||||
from handlers.base import BaseHandler
|
||||
@@ -291,72 +386,230 @@ from typing import List, Dict, Any
|
||||
class MyCustomHandler(BaseHandler):
|
||||
def __init__(self, enabled: bool = True):
|
||||
super().__init__(name="my_handler", enabled=enabled)
|
||||
|
||||
self._connection = None
|
||||
|
||||
def initialize(self) -> bool:
|
||||
# Инициализация (подключение к Kafka/MQTT/WebSocket)
|
||||
"""Initialize resources (connections, buffers, etc.)"""
|
||||
if not self.enabled:
|
||||
return False
|
||||
# ... ваш код
|
||||
self._initialized = True
|
||||
return True
|
||||
|
||||
|
||||
try:
|
||||
# Your initialization code
|
||||
self._connection = self._connect()
|
||||
self._initialized = True
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Init failed: {e}")
|
||||
return False
|
||||
|
||||
def handle(self, frame: CANFrame) -> bool:
|
||||
# Обработка одного фрейма
|
||||
# ... ваш код
|
||||
"""Process a single CAN frame"""
|
||||
# Your single-frame processing
|
||||
return True
|
||||
|
||||
|
||||
def handle_batch(self, frames: List[CANFrame]) -> int:
|
||||
# Обработка батча
|
||||
# ... ваш код
|
||||
return len(frames)
|
||||
|
||||
"""Process a batch of frames (optimized)"""
|
||||
if not self._initialized:
|
||||
return 0
|
||||
|
||||
try:
|
||||
# Your batch processing code
|
||||
for frame in frames:
|
||||
self._send(frame)
|
||||
return len(frames)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Batch failed: {e}")
|
||||
return 0
|
||||
|
||||
def flush(self) -> None:
|
||||
# Принудительная отправка
|
||||
pass
|
||||
|
||||
"""Force send any buffered data"""
|
||||
if self._connection:
|
||||
self._connection.flush()
|
||||
|
||||
def shutdown(self) -> None:
|
||||
# Закрытие соединений
|
||||
"""Clean up resources"""
|
||||
if self._connection:
|
||||
self._connection.close()
|
||||
self._initialized = False
|
||||
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Return handler statistics"""
|
||||
return {
|
||||
"handler": self.name,
|
||||
"enabled": self.enabled,
|
||||
"initialized": self._initialized
|
||||
"initialized": self._initialized,
|
||||
"sent_count": self._sent_count
|
||||
}
|
||||
```
|
||||
|
||||
## 📊 Статистика
|
||||
### Registering Custom Handlers
|
||||
|
||||
Приложение выводит статистику каждые 10 секунд:
|
||||
Add your handler to the MessageProcessor in `main.py`:
|
||||
|
||||
```python
|
||||
from handlers.my_handler import MyCustomHandler
|
||||
|
||||
# In MessageProcessor initialization
|
||||
handlers = [
|
||||
StorageHandler(enabled=True),
|
||||
PostgreSQLHandler(enabled=config.postgresql.enabled),
|
||||
MyCustomHandler(enabled=True), # Your custom handler
|
||||
]
|
||||
```
|
||||
|
||||
## Flipper Zero Integration
|
||||
|
||||
The Flipper Zero handler provides real-time status display via UART.
|
||||
|
||||
### Protocol
|
||||
|
||||
```
|
||||
Flipper → RPi: INIT:flipper\n (Handshake request)
|
||||
RPi → Flipper: ACK:rpi5,ip=x.x.x.x\n (Acknowledgment)
|
||||
RPi → Flipper: STATS:ip=...,total=...,pending=...,processed=...\n
|
||||
Flipper → RPi: STOP:flipper\n (Stop request)
|
||||
```
|
||||
|
||||
### Hardware Setup
|
||||
|
||||
1. Connect Flipper Zero UART to Raspberry Pi GPIO:
|
||||
- Flipper TX → RPi RX (GPIO 15 / Pin 10)
|
||||
- Flipper RX → RPi TX (GPIO 14 / Pin 8)
|
||||
- GND → GND
|
||||
|
||||
2. Disable serial console on RPi:
|
||||
```bash
|
||||
sudo raspi-config
|
||||
# Interface Options → Serial Port → No (login shell) → Yes (hardware)
|
||||
```
|
||||
|
||||
3. Enable in config:
|
||||
```json
|
||||
{
|
||||
"flipper": {
|
||||
"enabled": true,
|
||||
"device": "/dev/ttyAMA0",
|
||||
"baudrate": 115200
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Deployment
|
||||
|
||||
### Raspberry Pi 5 with systemd
|
||||
|
||||
For production deployment on Raspberry Pi 5 with 2-CH CAN HAT:
|
||||
|
||||
```bash
|
||||
cd deploy
|
||||
sudo ./install.sh
|
||||
```
|
||||
|
||||
This creates two systemd services:
|
||||
- `can-setup.service` - Configures CAN interfaces at boot
|
||||
- `can-sniffer.service` - Main application service
|
||||
|
||||
### Service Management
|
||||
|
||||
```bash
|
||||
# Check status
|
||||
sudo systemctl status can-sniffer
|
||||
|
||||
# View logs
|
||||
sudo journalctl -u can-sniffer -f
|
||||
|
||||
# Restart service
|
||||
sudo systemctl restart can-sniffer
|
||||
|
||||
# Run diagnostics
|
||||
sudo ./deploy/diagnose.sh
|
||||
```
|
||||
|
||||
See [deploy/README.md](deploy/README.md) for detailed deployment instructions.
|
||||
|
||||
## Statistics Output
|
||||
|
||||
The application logs statistics every 10 seconds:
|
||||
|
||||
```json
|
||||
{
|
||||
"processed_count": 12345,
|
||||
"dropped_count": 0,
|
||||
"queue_size": 0,
|
||||
"running": true,
|
||||
"handlers_count": 2,
|
||||
"storage": {
|
||||
"total_messages": 12345,
|
||||
"database_path": "can_offline.db"
|
||||
"sniffer": {
|
||||
"can0": {
|
||||
"messages": 15234,
|
||||
"errors": 0,
|
||||
"running": true
|
||||
},
|
||||
"can1": {
|
||||
"messages": 8721,
|
||||
"errors": 0,
|
||||
"running": true
|
||||
}
|
||||
},
|
||||
"influxdb": {
|
||||
"processor": {
|
||||
"processed_count": 23955,
|
||||
"dropped_count": 0,
|
||||
"queue_size": 142,
|
||||
"queue_full_count": 0
|
||||
},
|
||||
"storage": {
|
||||
"total_messages": 23955,
|
||||
"database_size_mb": 12.4
|
||||
},
|
||||
"postgresql": {
|
||||
"enabled": true,
|
||||
"sent_count": 12000,
|
||||
"failed_count": 0,
|
||||
"connection_status": "connected"
|
||||
"status": "CONNECTED",
|
||||
"sent_count": 23800,
|
||||
"pending_sync": 155,
|
||||
"failed_count": 0
|
||||
},
|
||||
"flipper": {
|
||||
"enabled": true,
|
||||
"connected": true,
|
||||
"stats_sent": 24
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 🐛 Отладка
|
||||
## Troubleshooting
|
||||
|
||||
### Включение DEBUG логирования
|
||||
### CAN Interface Issues
|
||||
|
||||
Измените в `config.json`:
|
||||
```bash
|
||||
# Check interface status
|
||||
ip -details link show can0
|
||||
|
||||
```json
|
||||
# Monitor raw traffic
|
||||
candump can0
|
||||
|
||||
# Check for errors
|
||||
dmesg | grep -i can
|
||||
|
||||
# Reset interface
|
||||
sudo ip link set can0 down
|
||||
sudo ip link set can0 type can bitrate 1000000
|
||||
sudo ip link set can0 up
|
||||
```
|
||||
|
||||
### PostgreSQL Connection Issues
|
||||
|
||||
```bash
|
||||
# Test connection
|
||||
psql -h <host> -p <port> -U <user> -d <database>
|
||||
|
||||
# Check pending sync count in SQLite
|
||||
sqlite3 can_offline.db "SELECT COUNT(*) FROM can_messages WHERE processed=0"
|
||||
|
||||
# Force sync (by restarting service)
|
||||
sudo systemctl restart can-sniffer
|
||||
```
|
||||
|
||||
### Debug Logging
|
||||
|
||||
```bash
|
||||
# Enable debug via environment
|
||||
CAN_SNIFFER_LOGGING__LEVEL=DEBUG python main.py
|
||||
|
||||
# Or edit config.json
|
||||
{
|
||||
"logging": {
|
||||
"level": "DEBUG"
|
||||
@@ -364,77 +617,79 @@ class MyCustomHandler(BaseHandler):
|
||||
}
|
||||
```
|
||||
|
||||
### Проверка CAN интерфейсов
|
||||
### Queue Overflow
|
||||
|
||||
```bash
|
||||
# Проверка статуса
|
||||
ip link show can0
|
||||
If you see `dropped_count > 0` or `queue_full_count > 0`:
|
||||
|
||||
# Тестовая отправка (если не listen_only)
|
||||
cansend can0 123#DEADBEEF
|
||||
1. Increase buffer size:
|
||||
```json
|
||||
{ "general": { "buffer_size": 500000 } }
|
||||
```
|
||||
|
||||
# Мониторинг трафика
|
||||
candump can0
|
||||
```
|
||||
2. Increase batch size for faster processing:
|
||||
```json
|
||||
{ "general": { "batch_size": 50000 } }
|
||||
```
|
||||
|
||||
### Проверка InfluxDB
|
||||
3. Check handler performance (PostgreSQL latency, disk I/O)
|
||||
|
||||
```bash
|
||||
# Проверка соединения
|
||||
curl -G http://localhost:8086/health
|
||||
## Development
|
||||
|
||||
# Запрос данных
|
||||
influx query 'from(bucket:"can_bus") |> range(start: -1h)'
|
||||
```
|
||||
|
||||
## 🔧 Разработка
|
||||
|
||||
### Установка для разработки
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
### Запуск тестов
|
||||
|
||||
```bash
|
||||
# TODO: добавить тесты
|
||||
pytest tests/
|
||||
```
|
||||
|
||||
### Форматирование кода
|
||||
### Code Formatting
|
||||
|
||||
```bash
|
||||
black src/
|
||||
isort src/
|
||||
```
|
||||
|
||||
## 📝 Лицензия
|
||||
### Type Checking
|
||||
|
||||
MIT License - см. [LICENSE](LICENSE) файл для деталей.
|
||||
```bash
|
||||
mypy src/
|
||||
```
|
||||
|
||||
## 🤝 Вклад
|
||||
### Running Tests
|
||||
|
||||
Вклады приветствуются! Пожалуйста:
|
||||
```bash
|
||||
pytest tests/
|
||||
```
|
||||
|
||||
1. Fork проекта
|
||||
2. Создайте feature branch (`git checkout -b feature/AmazingFeature`)
|
||||
3. Commit изменения (`git commit -m 'Add some AmazingFeature'`)
|
||||
4. Push в branch (`git push origin feature/AmazingFeature`)
|
||||
5. Откройте Pull Request
|
||||
## Design Constraints
|
||||
|
||||
## 📧 Контакты
|
||||
These constraints are intentional and should be preserved:
|
||||
|
||||
- Issues: [GitHub Issues](https://github.com/yourusername/can_sniffer/issues)
|
||||
- Discussions: [GitHub Discussions](https://github.com/yourusername/can_sniffer/discussions)
|
||||
1. **Read-Only CAN** - No `send()` calls; this is a passive sniffer
|
||||
2. **Offline-First** - SQLite is always written; PostgreSQL is optional
|
||||
3. **Non-Blocking** - No blocking operations in the CAN read loop
|
||||
4. **Single-Process** - No multiprocessing; threads only
|
||||
5. **Graceful Shutdown** - All threads properly terminated
|
||||
|
||||
## 🙏 Благодарности
|
||||
## Dependencies
|
||||
|
||||
- [python-can](https://github.com/hardbyte/python-can) - библиотека для работы с CAN
|
||||
- [InfluxDB](https://www.influxdata.com/) - time-series база данных
|
||||
- [Pydantic](https://docs.pydantic.dev/) - валидация данных
|
||||
| Package | Version | Purpose |
|
||||
|---------|---------|---------|
|
||||
| pydantic | >=2.0.0 | Configuration validation |
|
||||
| pydantic-settings | >=2.0.0 | Settings management |
|
||||
| python-can | >=4.0.0 | SocketCAN interface |
|
||||
| psycopg2-binary | >=2.9.0 | PostgreSQL client |
|
||||
| pyserial | >=3.5 | Flipper Zero UART |
|
||||
|
||||
---
|
||||
## License
|
||||
|
||||
⭐ Если проект полезен, поставьте звезду!
|
||||
MIT License - see [LICENSE](LICENSE) file for details.
|
||||
|
||||
## Contributing
|
||||
|
||||
Contributions are welcome! Please:
|
||||
|
||||
1. Fork the repository
|
||||
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
|
||||
3. Commit changes (`git commit -m 'Add amazing feature'`)
|
||||
4. Push to branch (`git push origin feature/amazing-feature`)
|
||||
5. Open a Pull Request
|
||||
|
||||
## Acknowledgments
|
||||
|
||||
- [python-can](https://github.com/hardbyte/python-can) - CAN bus library
|
||||
- [Pydantic](https://docs.pydantic.dev/) - Data validation
|
||||
- [psycopg2](https://www.psycopg.org/) - PostgreSQL adapter
|
||||
|
||||
Reference in New Issue
Block a user