Files
can_sniffer/can_sniffer

CAN Sniffer

Python License Code Style

High-performance CAN bus sniffer for Raspberry Pi with offline-first architecture, plugin-based handler system, and store-and-forward synchronization to PostgreSQL.

Features

  • Parallel CAN Reading - Simultaneous capture from multiple CAN interfaces with dedicated threads
  • Offline-First Architecture - All messages stored locally in SQLite; PostgreSQL sync is secondary
  • Plugin Handler System - Easily extensible with custom handlers (Kafka, MQTT, WebSocket, etc.)
  • Store-and-Forward Sync - Reliable delivery to PostgreSQL with retry, backoff, and automatic recovery
  • Backpressure Management - Adaptive read delays prevent queue overflow under high load
  • Flipper Zero Integration - UART-based status reporting to Flipper Zero device
  • Structured Logging - Detailed logs with ANSI colors, rotation, and CAN frame formatting
  • Flexible Configuration - JSON config with environment variable overrides
  • Graceful Shutdown - Proper cleanup of all threads and connections on SIGINT/SIGTERM
  • Type Safety - Full type hints with Pydantic validation

Requirements

  • Python 3.11+
  • Linux with SocketCAN support
  • CAN interface hardware (MCP2515, MCP251xFD, or similar)
  • SQLite (bundled with Python)
  • PostgreSQL 12+ (optional, for real-time sync)
  • Flipper Zero with UART (optional, for status display)

Installation

1. Clone the Repository

git clone <repository-url>
cd can_sniffer

2. Create Virtual Environment

python -m venv venv
source venv/bin/activate  # Linux/macOS

3. Install Dependencies

pip install -r requirements.txt

4. Configure CAN Interfaces

# Check available interfaces
ip link show

# Configure interface (example for 1Mbps)
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up

# For dual-channel setup
sudo ip link set can1 type can bitrate 1000000
sudo ip link set can1 up

# Verify interface is operational
candump can0

5. Configure Application

Copy and edit src/config.json:

cp src/config.json.example src/config.json
# Edit configuration as needed

Usage

Basic Run

cd src
python main.py

Run as Module

python -m can_sniffer.src.main

Run with Debug Logging

CAN_SNIFFER_LOGGING__LEVEL=DEBUG python main.py

Architecture

Physical CAN Bus (can0, can1)
         │
         ├──► [CANBusHandler-can0] (Thread)
         │    │ • Reads via bus.recv()
         │    │ • Creates CANFrame objects
         │    │ • Monitors queue backpressure
         │    └──► enqueue()
         │
         ├──► [CANBusHandler-can1] (Thread)
         │    └──► enqueue()
         │
         ▼
┌─────────────────────────────────────────┐
│     MessageProcessor Queue (100K)       │
│     • Non-blocking enqueue              │
│     • Batch accumulation (10K / 0.1s)   │
└────────────────┬────────────────────────┘
                 │
                 ▼
┌─────────────────────────────────────────┐
│         Handler Pipeline                │
├─────────────────────────────────────────┤
│ StorageHandler ──► SQLite (offline)     │
│     • Always enabled                    │
│     • WAL mode, batch inserts           │
│     • Sets processed=0 for sync         │
├─────────────────────────────────────────┤
│ PostgreSQLHandler ──► PostgreSQL        │
│     • Optional (config-controlled)      │
│     • Async queue with separate thread  │
│     • Syncs unprocessed from SQLite     │
│     • Retry with exponential backoff    │
├─────────────────────────────────────────┤
│ FlipperHandler ──► UART (/dev/ttyAMA0)  │
│     • Optional status display           │
│     • Handshake protocol                │
│     • Periodic stats transmission       │
└─────────────────────────────────────────┘

Key Components

Component File Description
CANFrame can_frame.py Immutable dataclass for CAN messages
CANSniffer socket_can/src.py Orchestrates parallel CAN reading
CANBusHandler socket_can/src.py Per-interface reader with backpressure
MessageProcessor socket_can/message_processor.py Async queue and handler pipeline
BaseHandler handlers/base.py Abstract interface for plugins
StorageHandler handlers/storage_handler.py SQLite persistence
PostgreSQLHandler handlers/postgresql_handler.py PostgreSQL forwarding
FlipperHandler handlers/flipper_handler.py Flipper Zero UART
Storage storage/storage.py SQLite singleton with WAL
PostgreSQLClient postgresql_handler/postgresql_client.py Connection pool and sync
Config config.py Pydantic configuration
Logger logger.py Structured logging system

Project Structure

can_sniffer/
├── src/
│   ├── main.py                           # Entry point, signal handling
│   ├── config.py                         # Pydantic configuration
│   ├── config.json                       # Runtime configuration
│   ├── can_frame.py                      # CANFrame dataclass
│   ├── logger.py                         # Structured logging
│   ├── socket_can/
│   │   ├── __init__.py
│   │   ├── src.py                        # CANSniffer, CANBusHandler
│   │   └── message_processor.py          # MessageProcessor
│   ├── handlers/
│   │   ├── __init__.py
│   │   ├── base.py                       # BaseHandler interface
│   │   ├── storage_handler.py            # SQLite handler
│   │   ├── postgresql_handler.py         # PostgreSQL handler
│   │   └── flipper_handler.py            # Flipper Zero handler
│   ├── storage/
│   │   ├── __init__.py
│   │   └── storage.py                    # SQLite singleton
│   └── postgresql_handler/
│       ├── __init__.py
│       └── postgresql_client.py          # PostgreSQL client
├── deploy/
│   ├── README.md                         # Deployment documentation
│   ├── install.sh                        # Installation script
│   ├── diagnose.sh                       # Diagnostic script
│   └── config.production.json            # Production config example
├── requirements.txt
└── README.md

Configuration

Configuration is loaded from (in order of precedence):

  1. src/config.json
  2. ~/.can_sniffer/config.json
  3. Environment variables (override)

Full Configuration Reference

{
  "can": {
    "interfaces": ["can0", "can1"],
    "listen_only": true,
    "bitrate": 1000000,
    "filters": []
  },
  "storage": {
    "database_path": "can_offline.db",
    "wal_mode": true,
    "sync_mode": "NORMAL",
    "retention_days": 7
  },
  "postgresql": {
    "enabled": true,
    "host": "localhost",
    "port": 5432,
    "database": "canbus",
    "user": "canbus",
    "password": "your-password",
    "batch_size": 10000,
    "flush_interval": 5,
    "max_retries": 3,
    "retry_backoff": 1.0,
    "connection_pool_size": 5,
    "connection_timeout": 10,
    "sync_interval": 30.0
  },
  "flipper": {
    "enabled": false,
    "device": "/dev/ttyAMA0",
    "baudrate": 115200,
    "send_interval": 1.0
  },
  "logging": {
    "level": "INFO",
    "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    "file": "can_edge.log",
    "max_bytes": 10485760,
    "backup_count": 5
  },
  "general": {
    "buffer_size": 100000,
    "batch_size": 10000,
    "batch_interval": 0.1,
    "max_retries": 3,
    "retry_delay": 1.0
  }
}

Configuration Sections

CAN Settings

Parameter Type Default Description
interfaces list ["can0"] CAN interfaces to monitor
listen_only bool true Read-only mode (no frame transmission)
bitrate int 500000 Bus speed (must match physical bus)
filters list [] SocketCAN filters for selective capture

Storage Settings (SQLite)

Parameter Type Default Description
database_path str can_offline.db SQLite database file path
wal_mode bool true Enable WAL for better concurrency
sync_mode str NORMAL SQLite synchronous mode (NORMAL/FULL/OFF)
retention_days int 7 Auto-cleanup older messages

PostgreSQL Settings

Parameter Type Default Description
enabled bool false Enable PostgreSQL forwarding
host str localhost PostgreSQL server address
port int 5432 PostgreSQL server port
database str canbus Database name
user str canbus Database user
password str - Database password
batch_size int 1000 Messages per batch insert
flush_interval int 5 Seconds between flushes
max_retries int 3 Retry attempts on failure
retry_backoff float 1.0 Exponential backoff base
connection_pool_size int 5 Connection pool max size
connection_timeout int 10 Connection timeout seconds
sync_interval float 30.0 SQLite sync interval seconds

Flipper Zero Settings

Parameter Type Default Description
enabled bool false Enable Flipper Zero UART
device str /dev/ttyAMA0 Serial device path
baudrate int 115200 Serial baud rate
send_interval float 1.0 Stats transmission interval

Logging Settings

Parameter Type Default Description
level str INFO Log level (DEBUG/INFO/WARNING/ERROR)
format str - Log message format
file str can_edge.log Log file path
max_bytes int 10485760 Max log file size (10MB)
backup_count int 5 Number of rotated log files

General Settings

Parameter Type Default Description
buffer_size int 100000 Message queue capacity
batch_size int 10000 Handler batch size
batch_interval float 0.1 Batch accumulation time
max_retries int 3 General retry attempts
retry_delay float 1.0 Retry delay seconds

Environment Variables

Override any configuration using the CAN_SNIFFER_ prefix with __ for nested keys:

# CAN configuration
export CAN_SNIFFER_CAN__INTERFACES="can0,can1"
export CAN_SNIFFER_CAN__BITRATE="1000000"

# PostgreSQL configuration
export CAN_SNIFFER_POSTGRESQL__ENABLED="true"
export CAN_SNIFFER_POSTGRESQL__HOST="192.168.1.100"
export CAN_SNIFFER_POSTGRESQL__PASSWORD="secret"

# Logging level
export CAN_SNIFFER_LOGGING__LEVEL="DEBUG"

# Flipper Zero
export CAN_SNIFFER_FLIPPER__ENABLED="true"
export CAN_SNIFFER_FLIPPER__DEVICE="/dev/ttyUSB0"

Database Schema

SQLite (Local Storage)

CREATE TABLE can_messages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    timestamp REAL NOT NULL,           -- Unix timestamp with microseconds
    interface TEXT NOT NULL,           -- CAN interface name
    can_id INTEGER NOT NULL,           -- CAN identifier
    can_id_hex TEXT NOT NULL,          -- Hex representation for queries
    is_extended INTEGER NOT NULL,      -- Extended frame flag (0/1)
    dlc INTEGER NOT NULL,              -- Data length code
    data BLOB NOT NULL,                -- Raw payload
    data_hex TEXT NOT NULL,            -- Hex representation
    processed INTEGER DEFAULT 0,       -- Sync flag for PostgreSQL
    created_at REAL DEFAULT (strftime('%s', 'now'))
);

-- Performance indices
CREATE INDEX idx_timestamp ON can_messages(timestamp);
CREATE INDEX idx_interface ON can_messages(interface);
CREATE INDEX idx_can_id ON can_messages(can_id);
CREATE INDEX idx_processed ON can_messages(processed);
CREATE INDEX idx_timestamp_interface ON can_messages(timestamp, interface);

PostgreSQL (Remote Storage)

The same schema is used for PostgreSQL. The processed flag in SQLite tracks which messages have been synced.

Creating Custom Handlers

Implement the BaseHandler interface to create custom processors:

from handlers.base import BaseHandler
from can_frame import CANFrame
from typing import List, Dict, Any

class MyCustomHandler(BaseHandler):
    def __init__(self, enabled: bool = True):
        super().__init__(name="my_handler", enabled=enabled)
        self._connection = None

    def initialize(self) -> bool:
        """Initialize resources (connections, buffers, etc.)"""
        if not self.enabled:
            return False

        try:
            # Your initialization code
            self._connection = self._connect()
            self._initialized = True
            return True
        except Exception as e:
            self.logger.error(f"Init failed: {e}")
            return False

    def handle(self, frame: CANFrame) -> bool:
        """Process a single CAN frame"""
        # Your single-frame processing
        return True

    def handle_batch(self, frames: List[CANFrame]) -> int:
        """Process a batch of frames (optimized)"""
        if not self._initialized:
            return 0

        try:
            # Your batch processing code
            for frame in frames:
                self._send(frame)
            return len(frames)
        except Exception as e:
            self.logger.error(f"Batch failed: {e}")
            return 0

    def flush(self) -> None:
        """Force send any buffered data"""
        if self._connection:
            self._connection.flush()

    def shutdown(self) -> None:
        """Clean up resources"""
        if self._connection:
            self._connection.close()
        self._initialized = False

    def get_stats(self) -> Dict[str, Any]:
        """Return handler statistics"""
        return {
            "handler": self.name,
            "enabled": self.enabled,
            "initialized": self._initialized,
            "sent_count": self._sent_count
        }

Registering Custom Handlers

Add your handler to the MessageProcessor in main.py:

from handlers.my_handler import MyCustomHandler

# In MessageProcessor initialization
handlers = [
    StorageHandler(enabled=True),
    PostgreSQLHandler(enabled=config.postgresql.enabled),
    MyCustomHandler(enabled=True),  # Your custom handler
]

Flipper Zero Integration

The Flipper Zero handler provides real-time status display via UART.

Protocol

Flipper → RPi:  INIT:flipper\n         (Handshake request)
RPi → Flipper:  ACK:rpi5,ip=x.x.x.x\n  (Acknowledgment)
RPi → Flipper:  STATS:ip=...,total=...,pending=...,processed=...\n
Flipper → RPi:  STOP:flipper\n         (Stop request)

Hardware Setup

  1. Connect Flipper Zero UART to Raspberry Pi GPIO:

    • Flipper TX → RPi RX (GPIO 15 / Pin 10)
    • Flipper RX → RPi TX (GPIO 14 / Pin 8)
    • GND → GND
  2. Disable serial console on RPi:

    sudo raspi-config
    # Interface Options → Serial Port → No (login shell) → Yes (hardware)
    
  3. Enable in config:

    {
      "flipper": {
        "enabled": true,
        "device": "/dev/ttyAMA0",
        "baudrate": 115200
      }
    }
    

Deployment

Raspberry Pi 5 with systemd

For production deployment on Raspberry Pi 5 with 2-CH CAN HAT:

cd deploy
sudo ./install.sh

This creates two systemd services:

  • can-setup.service - Configures CAN interfaces at boot
  • can-sniffer.service - Main application service

Service Management

# Check status
sudo systemctl status can-sniffer

# View logs
sudo journalctl -u can-sniffer -f

# Restart service
sudo systemctl restart can-sniffer

# Run diagnostics
sudo ./deploy/diagnose.sh

See deploy/README.md for detailed deployment instructions.

Statistics Output

The application logs statistics every 10 seconds:

{
  "sniffer": {
    "can0": {
      "messages": 15234,
      "errors": 0,
      "running": true
    },
    "can1": {
      "messages": 8721,
      "errors": 0,
      "running": true
    }
  },
  "processor": {
    "processed_count": 23955,
    "dropped_count": 0,
    "queue_size": 142,
    "queue_full_count": 0
  },
  "storage": {
    "total_messages": 23955,
    "database_size_mb": 12.4
  },
  "postgresql": {
    "enabled": true,
    "status": "CONNECTED",
    "sent_count": 23800,
    "pending_sync": 155,
    "failed_count": 0
  },
  "flipper": {
    "enabled": true,
    "connected": true,
    "stats_sent": 24
  }
}

Troubleshooting

CAN Interface Issues

# Check interface status
ip -details link show can0

# Monitor raw traffic
candump can0

# Check for errors
dmesg | grep -i can

# Reset interface
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up

PostgreSQL Connection Issues

# Test connection
psql -h <host> -p <port> -U <user> -d <database>

# Check pending sync count in SQLite
sqlite3 can_offline.db "SELECT COUNT(*) FROM can_messages WHERE processed=0"

# Force sync (by restarting service)
sudo systemctl restart can-sniffer

Debug Logging

# Enable debug via environment
CAN_SNIFFER_LOGGING__LEVEL=DEBUG python main.py

# Or edit config.json
{
  "logging": {
    "level": "DEBUG"
  }
}

Queue Overflow

If you see dropped_count > 0 or queue_full_count > 0:

  1. Increase buffer size:

    { "general": { "buffer_size": 500000 } }
    
  2. Increase batch size for faster processing:

    { "general": { "batch_size": 50000 } }
    
  3. Check handler performance (PostgreSQL latency, disk I/O)

Development

Code Formatting

black src/
isort src/

Type Checking

mypy src/

Running Tests

pytest tests/

Design Constraints

These constraints are intentional and should be preserved:

  1. Read-Only CAN - No send() calls; this is a passive sniffer
  2. Offline-First - SQLite is always written; PostgreSQL is optional
  3. Non-Blocking - No blocking operations in the CAN read loop
  4. Single-Process - No multiprocessing; threads only
  5. Graceful Shutdown - All threads properly terminated

Dependencies

Package Version Purpose
pydantic >=2.0.0 Configuration validation
pydantic-settings >=2.0.0 Settings management
python-can >=4.0.0 SocketCAN interface
psycopg2-binary >=2.9.0 PostgreSQL client
pyserial >=3.5 Flipper Zero UART

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! Please:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit changes (git commit -m 'Add amazing feature')
  4. Push to branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Acknowledgments