diff --git a/obd2_client/README.md b/obd2_client/README.md new file mode 100644 index 0000000..695dcc6 --- /dev/null +++ b/obd2_client/README.md @@ -0,0 +1,127 @@ +# OBD2 Client для Skoda Kodiaq 2021 на RPi5 + +Python приложение для чтения OBD2 данных через WaveShare 2-CH CAN HAT. + +## Установка + +```bash +pip install -r requirements.txt +``` + +## Конфигурация RPi5 + +### /boot/config.txt +``` +dtparam=spi=on +dtoverlay=mcp2515-can0,oscillator=16000000,interrupt=23 +``` + +### Инициализация CAN интерфейса +```bash +sudo ip link set can0 up type can bitrate 500000 +sudo ifconfig can0 txqueuelen 65536 +``` + +## Использование + +### Запуск с реальным авто +```bash +python -m src.main --interface can0 +``` + +### Только сканирование PID +```bash +python -m src.main --interface can0 --scan-only +``` + +### Тестирование с виртуальным CAN +```bash +# Создание виртуального интерфейса +sudo modprobe vcan +sudo ip link add dev vcan0 type vcan +sudo ip link set up vcan0 + +# Запуск клиента +python -m src.main --interface vcan0 --virtual +``` + +### Параметры командной строки + +| Параметр | Описание | +|----------|----------| +| `-i, --interface` | CAN интерфейс (can0, vcan0) | +| `-c, --config` | Путь к config.json | +| `-v, --virtual` | Использовать виртуальный CAN | +| `--scan-only` | Только сканировать PID | +| `--debug` | Включить отладочный вывод | + +## Поддерживаемые PID + +| PID | Параметр | Единицы | +|-----|----------|---------| +| 0x04 | Engine Load | % | +| 0x05 | Coolant Temp | °C | +| 0x0C | Engine RPM | RPM | +| 0x0D | Vehicle Speed | km/h | +| 0x0F | Intake Air Temp | °C | +| 0x10 | MAF Rate | g/s | +| 0x11 | Throttle Position | % | +| 0x2F | Fuel Level | % | +| 0x5C | Oil Temperature | °C | + +## Конфигурация + +Файл `config.json`: + +```json +{ + "can": { + "interface": "can0", + "bitrate": 500000, + "virtual": false + }, + "obd2": { + "request_id": "0x7DF", + "response_id": "0x7E8", + "timeout": 0.1 + }, + "polling": { + "interval_fast": 0.1, + "interval_slow": 1.0, + "fast_pids": ["0x0C", "0x0D", "0x11"], + "slow_pids": ["0x05", "0x2F", "0x5C"] + } +} +``` + +## Переменные окружения + +- `OBD2_CONFIG_PATH` - путь к config.json +- `OBD2_CAN_INTERFACE` - CAN интерфейс +- `OBD2_CAN_BITRATE` - битрейт +- `OBD2_CAN_VIRTUAL` - использовать виртуальный CAN (true/false) +- `OBD2_TIMEOUT` - таймаут ответа + +## Структура проекта + +``` +obd2_client/ +├── src/ +│ ├── __init__.py +│ ├── main.py # Точка входа, CLI +│ ├── config.py # Конфигурация +│ ├── logger.py # Логирование +│ ├── can/ +│ │ ├── frame.py # CAN фрейм dataclass +│ │ └── interface.py # Абстракция CAN шины +│ ├── obd2/ +│ │ ├── pids.py # Определения PID +│ │ ├── protocol.py # OBD2 запросы/ответы +│ │ └── scanner.py # Автодетект PID +│ └── vehicle/ +│ ├── state.py # Состояние авто +│ └── poller.py # Циклический опрос +├── config.json +├── requirements.txt +└── README.md +``` diff --git a/obd2_client/requirements.txt b/obd2_client/requirements.txt new file mode 100644 index 0000000..9fbe0e7 --- /dev/null +++ b/obd2_client/requirements.txt @@ -0,0 +1 @@ +python-can>=4.0.0 diff --git a/obd2_client/src/__init__.py b/obd2_client/src/__init__.py new file mode 100644 index 0000000..5ef9e6e --- /dev/null +++ b/obd2_client/src/__init__.py @@ -0,0 +1,3 @@ +"""OBD2 Client for Skoda Kodiaq 2021 on RPi5 with WaveShare 2-CH CAN HAT.""" + +__version__ = "1.0.0" diff --git a/obd2_client/src/can/__init__.py b/obd2_client/src/can/__init__.py new file mode 100644 index 0000000..c252888 --- /dev/null +++ b/obd2_client/src/can/__init__.py @@ -0,0 +1,6 @@ +"""CAN bus abstraction layer.""" + +from .frame import CANFrame +from .interface import CANInterface + +__all__ = ["CANFrame", "CANInterface"] diff --git a/obd2_client/src/can/frame.py b/obd2_client/src/can/frame.py new file mode 100644 index 0000000..22af114 --- /dev/null +++ b/obd2_client/src/can/frame.py @@ -0,0 +1,77 @@ +"""CAN frame data structures.""" + +from dataclasses import dataclass, field +from typing import List, Optional +import time + + +@dataclass +class CANFrame: + """Represents a CAN bus frame. + + Attributes: + arbitration_id: CAN message identifier (11-bit or 29-bit) + data: Payload data (up to 8 bytes for standard CAN) + timestamp: Time when frame was received/created + is_extended_id: True if using 29-bit extended identifier + is_remote_frame: True if this is a remote transmission request + is_error_frame: True if this is an error frame + """ + + arbitration_id: int + data: bytes = field(default_factory=bytes) + timestamp: float = field(default_factory=time.time) + is_extended_id: bool = False + is_remote_frame: bool = False + is_error_frame: bool = False + + def __post_init__(self): + """Validate and normalize data.""" + if isinstance(self.data, (list, tuple)): + self.data = bytes(self.data) + if len(self.data) > 8: + raise ValueError("CAN frame data cannot exceed 8 bytes") + + @property + def dlc(self) -> int: + """Data length code (number of data bytes).""" + return len(self.data) + + def pad_to_8_bytes(self) -> "CANFrame": + """Return a new frame with data padded to 8 bytes with zeros.""" + padded_data = self.data + bytes(8 - len(self.data)) + return CANFrame( + arbitration_id=self.arbitration_id, + data=padded_data, + timestamp=self.timestamp, + is_extended_id=self.is_extended_id, + is_remote_frame=self.is_remote_frame, + is_error_frame=self.is_error_frame, + ) + + def to_hex_string(self) -> str: + """Return hex representation of frame data.""" + return " ".join(f"{b:02X}" for b in self.data) + + def __str__(self) -> str: + """Human-readable string representation.""" + id_str = f"0x{self.arbitration_id:03X}" + if self.is_extended_id: + id_str = f"0x{self.arbitration_id:08X}" + data_str = self.to_hex_string() + return f"CAN[{id_str}] [{self.dlc}] {data_str}" + + @classmethod + def from_obd2_request(cls, mode: int, pid: int, request_id: int = 0x7DF) -> "CANFrame": + """Create a standard OBD2 request frame. + + Args: + mode: OBD2 mode (e.g., 0x01 for current data) + pid: Parameter ID to request + request_id: CAN arbitration ID for requests (default: 0x7DF broadcast) + + Returns: + CANFrame configured as OBD2 request + """ + data = bytes([0x02, mode, pid, 0x00, 0x00, 0x00, 0x00, 0x00]) + return cls(arbitration_id=request_id, data=data) diff --git a/obd2_client/src/can/interface.py b/obd2_client/src/can/interface.py new file mode 100644 index 0000000..33d9081 --- /dev/null +++ b/obd2_client/src/can/interface.py @@ -0,0 +1,197 @@ +"""CAN bus interface abstraction.""" + +import time +from typing import Optional, List +from contextlib import contextmanager + +try: + import can +except ImportError: + can = None + +from .frame import CANFrame +from ..logger import get_logger + + +class CANInterface: + """CAN bus interface using python-can library. + + Supports both physical CAN interfaces (can0) and virtual interfaces (vcan0). + """ + + def __init__( + self, + interface: str = "can0", + bitrate: int = 500000, + virtual: bool = False, + ): + """Initialize CAN interface. + + Args: + interface: CAN interface name (e.g., 'can0', 'vcan0') + bitrate: CAN bus bitrate (default: 500000 for OBD2) + virtual: If True, use virtual CAN (no bitrate needed) + """ + self.interface = interface + self.bitrate = bitrate + self.virtual = virtual + self._bus: Optional["can.Bus"] = None + self._logger = get_logger("obd2_client.can") + + @property + def is_connected(self) -> bool: + """Check if interface is connected.""" + return self._bus is not None + + def connect(self) -> None: + """Connect to CAN bus. + + Raises: + RuntimeError: If python-can is not installed + can.CanError: If connection fails + """ + if can is None: + raise RuntimeError( + "python-can is not installed. Run: pip install python-can" + ) + + if self._bus is not None: + self._logger.warning("Already connected to CAN bus") + return + + bus_type = "virtual" if self.virtual else "socketcan" + + self._logger.info( + f"Connecting to {self.interface} (type={bus_type}, bitrate={self.bitrate})" + ) + + if self.virtual: + self._bus = can.Bus(channel=self.interface, interface="virtual") + else: + self._bus = can.Bus( + channel=self.interface, + interface="socketcan", + bitrate=self.bitrate, + ) + + self._logger.info(f"Connected to {self.interface}") + + def disconnect(self) -> None: + """Disconnect from CAN bus.""" + if self._bus is not None: + self._logger.info(f"Disconnecting from {self.interface}") + self._bus.shutdown() + self._bus = None + + def send(self, frame: CANFrame) -> bool: + """Send a CAN frame. + + Args: + frame: CANFrame to send + + Returns: + True if sent successfully + + Raises: + RuntimeError: If not connected + """ + if self._bus is None: + raise RuntimeError("Not connected to CAN bus") + + msg = can.Message( + arbitration_id=frame.arbitration_id, + data=frame.data, + is_extended_id=frame.is_extended_id, + is_remote_frame=frame.is_remote_frame, + ) + + try: + self._bus.send(msg) + self._logger.debug(f"TX: {frame}") + return True + except can.CanError as e: + self._logger.error(f"Failed to send frame: {e}") + return False + + def receive(self, timeout: float = 0.1) -> Optional[CANFrame]: + """Receive a CAN frame. + + Args: + timeout: Maximum time to wait for a frame (seconds) + + Returns: + CANFrame if received, None on timeout + + Raises: + RuntimeError: If not connected + """ + if self._bus is None: + raise RuntimeError("Not connected to CAN bus") + + msg = self._bus.recv(timeout=timeout) + + if msg is None: + return None + + frame = CANFrame( + arbitration_id=msg.arbitration_id, + data=bytes(msg.data), + timestamp=msg.timestamp if msg.timestamp else time.time(), + is_extended_id=msg.is_extended_id, + is_remote_frame=msg.is_remote_frame, + is_error_frame=msg.is_error_frame, + ) + + self._logger.debug(f"RX: {frame}") + return frame + + def receive_filtered( + self, + arbitration_ids: List[int], + timeout: float = 0.1, + ) -> Optional[CANFrame]: + """Receive a CAN frame matching specific arbitration IDs. + + Args: + arbitration_ids: List of acceptable arbitration IDs + timeout: Maximum time to wait (seconds) + + Returns: + CANFrame if received and matching, None on timeout + """ + start_time = time.time() + + while (time.time() - start_time) < timeout: + remaining = timeout - (time.time() - start_time) + if remaining <= 0: + break + + frame = self.receive(timeout=min(remaining, 0.01)) + if frame and frame.arbitration_id in arbitration_ids: + return frame + + return None + + @contextmanager + def session(self): + """Context manager for CAN bus session. + + Usage: + with can_interface.session(): + can_interface.send(frame) + """ + try: + self.connect() + yield self + finally: + self.disconnect() + + def __enter__(self): + """Enter context manager.""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Exit context manager.""" + self.disconnect() + return False diff --git a/obd2_client/src/config.py b/obd2_client/src/config.py new file mode 100644 index 0000000..f4c8dfa --- /dev/null +++ b/obd2_client/src/config.py @@ -0,0 +1,127 @@ +"""Configuration management for OBD2 client.""" + +import json +import os +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional + + +@dataclass +class CANConfig: + """CAN bus configuration.""" + + interface: str = "can0" + bitrate: int = 500000 + virtual: bool = False + + +@dataclass +class OBD2Config: + """OBD2 protocol configuration.""" + + request_id: int = 0x7DF + response_id: int = 0x7E8 + timeout: float = 0.1 + + +@dataclass +class PollingConfig: + """Polling configuration.""" + + interval_fast: float = 0.1 + interval_slow: float = 1.0 + fast_pids: List[int] = field(default_factory=lambda: [0x0C, 0x0D, 0x11]) + slow_pids: List[int] = field(default_factory=lambda: [0x05, 0x2F, 0x5C]) + + +@dataclass +class Config: + """Main configuration container.""" + + can: CANConfig = field(default_factory=CANConfig) + obd2: OBD2Config = field(default_factory=OBD2Config) + polling: PollingConfig = field(default_factory=PollingConfig) + + @classmethod + def load(cls, config_path: Optional[str] = None) -> "Config": + """Load configuration from JSON file and environment variables. + + Args: + config_path: Path to config.json file + + Returns: + Config instance + """ + config = cls() + + if config_path is None: + config_path = os.environ.get( + "OBD2_CONFIG_PATH", + str(Path(__file__).parent.parent / "config.json"), + ) + + config_file = Path(config_path) + if config_file.exists(): + with open(config_file, "r") as f: + data = json.load(f) + config._load_from_dict(data) + + config._load_from_env() + + return config + + def _load_from_dict(self, data: dict) -> None: + """Load configuration from dictionary.""" + if "can" in data: + can_data = data["can"] + self.can.interface = can_data.get("interface", self.can.interface) + self.can.bitrate = can_data.get("bitrate", self.can.bitrate) + self.can.virtual = can_data.get("virtual", self.can.virtual) + + if "obd2" in data: + obd2_data = data["obd2"] + if "request_id" in obd2_data: + self.obd2.request_id = self._parse_hex(obd2_data["request_id"]) + if "response_id" in obd2_data: + self.obd2.response_id = self._parse_hex(obd2_data["response_id"]) + self.obd2.timeout = obd2_data.get("timeout", self.obd2.timeout) + + if "polling" in data: + poll_data = data["polling"] + self.polling.interval_fast = poll_data.get( + "interval_fast", self.polling.interval_fast + ) + self.polling.interval_slow = poll_data.get( + "interval_slow", self.polling.interval_slow + ) + if "fast_pids" in poll_data: + self.polling.fast_pids = [ + self._parse_hex(p) for p in poll_data["fast_pids"] + ] + if "slow_pids" in poll_data: + self.polling.slow_pids = [ + self._parse_hex(p) for p in poll_data["slow_pids"] + ] + + def _load_from_env(self) -> None: + """Load configuration from environment variables.""" + if "OBD2_CAN_INTERFACE" in os.environ: + self.can.interface = os.environ["OBD2_CAN_INTERFACE"] + if "OBD2_CAN_BITRATE" in os.environ: + self.can.bitrate = int(os.environ["OBD2_CAN_BITRATE"]) + if "OBD2_CAN_VIRTUAL" in os.environ: + self.can.virtual = os.environ["OBD2_CAN_VIRTUAL"].lower() == "true" + if "OBD2_TIMEOUT" in os.environ: + self.obd2.timeout = float(os.environ["OBD2_TIMEOUT"]) + + @staticmethod + def _parse_hex(value) -> int: + """Parse hex string or int to int.""" + if isinstance(value, int): + return value + if isinstance(value, str): + if value.startswith("0x") or value.startswith("0X"): + return int(value, 16) + return int(value) + return value diff --git a/obd2_client/src/logger.py b/obd2_client/src/logger.py new file mode 100644 index 0000000..d7e4696 --- /dev/null +++ b/obd2_client/src/logger.py @@ -0,0 +1,57 @@ +"""Logging configuration for OBD2 client.""" + +import logging +import sys +from typing import Optional + + +def setup_logger( + name: str = "obd2_client", + level: int = logging.INFO, + log_file: Optional[str] = None, +) -> logging.Logger: + """Configure and return a logger instance. + + Args: + name: Logger name + level: Logging level (default: INFO) + log_file: Optional file path for logging + + Returns: + Configured logger instance + """ + logger = logging.getLogger(name) + logger.setLevel(level) + + if logger.handlers: + return logger + + formatter = logging.Formatter( + "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(level) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + if log_file: + file_handler = logging.FileHandler(log_file) + file_handler.setLevel(level) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + + +def get_logger(name: str = "obd2_client") -> logging.Logger: + """Get an existing logger or create a new one. + + Args: + name: Logger name + + Returns: + Logger instance + """ + return logging.getLogger(name) diff --git a/obd2_client/src/main.py b/obd2_client/src/main.py new file mode 100644 index 0000000..dcd6a35 --- /dev/null +++ b/obd2_client/src/main.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +"""OBD2 Client - Main entry point.""" + +import argparse +import signal +import sys +import time +from pathlib import Path + +from .config import Config +from .logger import setup_logger, get_logger +from .can.interface import CANInterface +from .obd2.protocol import OBD2Protocol +from .obd2.scanner import OBD2Scanner +from .vehicle.state import VehicleState +from .vehicle.poller import VehiclePoller + + +class OBD2Client: + """Main OBD2 client application.""" + + def __init__(self, config: Config): + self.config = config + self._logger = get_logger("obd2_client") + self._running = False + + self.can_interface = CANInterface( + interface=config.can.interface, + bitrate=config.can.bitrate, + virtual=config.can.virtual, + ) + + self.protocol = OBD2Protocol( + can_interface=self.can_interface, + request_id=config.obd2.request_id, + response_id=config.obd2.response_id, + timeout=config.obd2.timeout, + ) + + cache_file = str(Path(__file__).parent.parent / "pid_cache.json") + self.scanner = OBD2Scanner(self.protocol, cache_file=cache_file) + + self.state = VehicleState() + + self.poller = VehiclePoller( + protocol=self.protocol, + state=self.state, + fast_interval=config.polling.interval_fast, + slow_interval=config.polling.interval_slow, + fast_pids=config.polling.fast_pids, + slow_pids=config.polling.slow_pids, + ) + + def run(self, scan_only: bool = False, monitor: bool = True) -> None: + """Run the OBD2 client. + + Args: + scan_only: If True, only scan for PIDs and exit + monitor: If True, continuously monitor and display values + """ + self._running = True + + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + try: + self._logger.info(f"Connecting to {self.config.can.interface}...") + self.can_interface.connect() + + self._logger.info("Scanning for supported PIDs...") + supported = self.scanner.scan() + + if not supported: + self._logger.error("No PIDs supported or no response from ECU") + return + + self.scanner.print_supported_pids() + + if scan_only: + return + + readable_pids = self.scanner.get_readable_pids() + if not readable_pids: + self._logger.warning("No readable PIDs found") + return + + fast_pids = [p for p in self.config.polling.fast_pids if p in readable_pids] + slow_pids = [p for p in self.config.polling.slow_pids if p in readable_pids] + + if not fast_pids and not slow_pids: + fast_pids = readable_pids[:3] + slow_pids = readable_pids[3:] + + self.poller.set_pids(fast_pids, slow_pids) + + self._logger.info("Starting poller...") + self.poller.start() + + if monitor: + self._monitor_loop() + + except KeyboardInterrupt: + pass + finally: + self._shutdown() + + def _monitor_loop(self) -> None: + """Display live values in console.""" + print("\n" + "=" * 50) + print(" OBD2 Live Monitor (Ctrl+C to stop)") + print("=" * 50 + "\n") + + while self._running: + self._clear_screen() + self._print_header() + self._print_values() + self._print_stats() + + time.sleep(0.5) + + def _clear_screen(self) -> None: + """Clear console screen.""" + print("\033[H\033[J", end="") + + def _print_header(self) -> None: + """Print header.""" + print("=" * 50) + print(" OBD2 Live Monitor - Skoda Kodiaq 2021") + print("=" * 50) + print() + + def _print_values(self) -> None: + """Print current values.""" + values = self.state.get_all() + + if not values: + print(" Waiting for data...") + return + + print(" Current Values:") + print(" " + "-" * 40) + + rpm = self.state.rpm + speed = self.state.speed + coolant = self.state.coolant_temp + throttle = self.state.throttle + fuel = self.state.fuel_level + oil = self.state.oil_temp + + if rpm is not None: + print(f" Engine RPM: {rpm:>8.0f} RPM") + if speed is not None: + print(f" Vehicle Speed: {speed:>8.0f} km/h") + if throttle is not None: + print(f" Throttle: {throttle:>8.1f} %") + if coolant is not None: + print(f" Coolant Temp: {coolant:>8.0f} °C") + if oil is not None: + print(f" Oil Temp: {oil:>8.0f} °C") + if fuel is not None: + print(f" Fuel Level: {fuel:>8.1f} %") + + print(" " + "-" * 40) + print() + + def _print_stats(self) -> None: + """Print polling statistics.""" + stats = self.poller.get_stats() + total = stats["queries"] + success = stats["successes"] + rate = (success / total * 100) if total > 0 else 0 + + print(f" Stats: {total} queries, {success} success ({rate:.1f}%)") + print() + print(" Press Ctrl+C to stop") + + def _signal_handler(self, signum, frame) -> None: + """Handle shutdown signals.""" + self._logger.info("Shutdown signal received") + self._running = False + + def _shutdown(self) -> None: + """Clean shutdown.""" + self._logger.info("Shutting down...") + + if self.poller.is_running: + self.poller.stop() + + if self.can_interface.is_connected: + self.can_interface.disconnect() + + self._logger.info("Shutdown complete") + + +def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="OBD2 Client for Skoda Kodiaq 2021", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + + parser.add_argument( + "-i", "--interface", + default=None, + help="CAN interface (e.g., can0, vcan0)", + ) + + parser.add_argument( + "-c", "--config", + default=None, + help="Path to config.json", + ) + + parser.add_argument( + "-v", "--virtual", + action="store_true", + help="Use virtual CAN interface", + ) + + parser.add_argument( + "--scan-only", + action="store_true", + help="Only scan for supported PIDs and exit", + ) + + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + + args = parser.parse_args() + + import logging + log_level = logging.DEBUG if args.debug else logging.INFO + setup_logger(level=log_level) + + config = Config.load(args.config) + + if args.interface: + config.can.interface = args.interface + if args.virtual: + config.can.virtual = True + + client = OBD2Client(config) + client.run(scan_only=args.scan_only) + + +if __name__ == "__main__": + main() diff --git a/obd2_client/src/obd2/__init__.py b/obd2_client/src/obd2/__init__.py new file mode 100644 index 0000000..ef4f765 --- /dev/null +++ b/obd2_client/src/obd2/__init__.py @@ -0,0 +1,7 @@ +"""OBD2 protocol implementation.""" + +from .pids import PID, PIDRegistry +from .protocol import OBD2Protocol +from .scanner import OBD2Scanner + +__all__ = ["PID", "PIDRegistry", "OBD2Protocol", "OBD2Scanner"] diff --git a/obd2_client/src/obd2/pids.py b/obd2_client/src/obd2/pids.py new file mode 100644 index 0000000..7a707ce --- /dev/null +++ b/obd2_client/src/obd2/pids.py @@ -0,0 +1,259 @@ +"""OBD2 PID definitions and decoders.""" + +from dataclasses import dataclass +from typing import Callable, Dict, List, Optional, Any + + +@dataclass +class PID: + """OBD2 Parameter ID definition. + + Attributes: + code: PID code (e.g., 0x0C for RPM) + name: Human-readable name + description: Detailed description + unit: Unit of measurement + min_value: Minimum possible value + max_value: Maximum possible value + decoder: Function to decode raw bytes to value + num_bytes: Number of data bytes in response + """ + + code: int + name: str + description: str + unit: str + min_value: float + max_value: float + decoder: Callable[[bytes], float] + num_bytes: int = 1 + + def decode(self, data: bytes) -> float: + """Decode raw bytes to value using the decoder function.""" + return self.decoder(data) + + def __str__(self) -> str: + return f"PID[0x{self.code:02X}] {self.name} ({self.unit})" + + +class PIDRegistry: + """Registry of supported OBD2 PIDs.""" + + def __init__(self): + self._pids: Dict[int, PID] = {} + self._register_standard_pids() + + def _register_standard_pids(self) -> None: + """Register standard Mode 01 PIDs.""" + + # PID 0x00 - Supported PIDs [01-20] + self.register( + PID( + code=0x00, + name="PIDS_A", + description="Supported PIDs [01-20]", + unit="bitmask", + min_value=0, + max_value=0xFFFFFFFF, + decoder=lambda d: int.from_bytes(d[:4], "big"), + num_bytes=4, + ) + ) + + # PID 0x04 - Engine Load + self.register( + PID( + code=0x04, + name="ENGINE_LOAD", + description="Calculated engine load", + unit="%", + min_value=0, + max_value=100, + decoder=lambda d: d[0] * 100 / 255, + num_bytes=1, + ) + ) + + # PID 0x05 - Coolant Temperature + self.register( + PID( + code=0x05, + name="COOLANT_TEMP", + description="Engine coolant temperature", + unit="°C", + min_value=-40, + max_value=215, + decoder=lambda d: d[0] - 40, + num_bytes=1, + ) + ) + + # PID 0x0C - Engine RPM + self.register( + PID( + code=0x0C, + name="ENGINE_RPM", + description="Engine RPM", + unit="RPM", + min_value=0, + max_value=16383.75, + decoder=lambda d: (d[0] * 256 + d[1]) / 4, + num_bytes=2, + ) + ) + + # PID 0x0D - Vehicle Speed + self.register( + PID( + code=0x0D, + name="VEHICLE_SPEED", + description="Vehicle speed", + unit="km/h", + min_value=0, + max_value=255, + decoder=lambda d: d[0], + num_bytes=1, + ) + ) + + # PID 0x0F - Intake Air Temperature + self.register( + PID( + code=0x0F, + name="INTAKE_TEMP", + description="Intake air temperature", + unit="°C", + min_value=-40, + max_value=215, + decoder=lambda d: d[0] - 40, + num_bytes=1, + ) + ) + + # PID 0x10 - MAF Air Flow Rate + self.register( + PID( + code=0x10, + name="MAF_RATE", + description="Mass air flow rate", + unit="g/s", + min_value=0, + max_value=655.35, + decoder=lambda d: (d[0] * 256 + d[1]) / 100, + num_bytes=2, + ) + ) + + # PID 0x11 - Throttle Position + self.register( + PID( + code=0x11, + name="THROTTLE_POS", + description="Throttle position", + unit="%", + min_value=0, + max_value=100, + decoder=lambda d: d[0] * 100 / 255, + num_bytes=1, + ) + ) + + # PID 0x20 - Supported PIDs [21-40] + self.register( + PID( + code=0x20, + name="PIDS_B", + description="Supported PIDs [21-40]", + unit="bitmask", + min_value=0, + max_value=0xFFFFFFFF, + decoder=lambda d: int.from_bytes(d[:4], "big"), + num_bytes=4, + ) + ) + + # PID 0x2F - Fuel Tank Level + self.register( + PID( + code=0x2F, + name="FUEL_LEVEL", + description="Fuel tank level input", + unit="%", + min_value=0, + max_value=100, + decoder=lambda d: d[0] * 100 / 255, + num_bytes=1, + ) + ) + + # PID 0x40 - Supported PIDs [41-60] + self.register( + PID( + code=0x40, + name="PIDS_C", + description="Supported PIDs [41-60]", + unit="bitmask", + min_value=0, + max_value=0xFFFFFFFF, + decoder=lambda d: int.from_bytes(d[:4], "big"), + num_bytes=4, + ) + ) + + # PID 0x5C - Oil Temperature + self.register( + PID( + code=0x5C, + name="OIL_TEMP", + description="Engine oil temperature", + unit="°C", + min_value=-40, + max_value=210, + decoder=lambda d: d[0] - 40, + num_bytes=1, + ) + ) + + # PID 0x60 - Supported PIDs [61-80] + self.register( + PID( + code=0x60, + name="PIDS_D", + description="Supported PIDs [61-80]", + unit="bitmask", + min_value=0, + max_value=0xFFFFFFFF, + decoder=lambda d: int.from_bytes(d[:4], "big"), + num_bytes=4, + ) + ) + + def register(self, pid: PID) -> None: + """Register a PID in the registry.""" + self._pids[pid.code] = pid + + def get(self, code: int) -> Optional[PID]: + """Get a PID by its code.""" + return self._pids.get(code) + + def get_all(self) -> List[PID]: + """Get all registered PIDs.""" + return list(self._pids.values()) + + def get_data_pids(self) -> List[PID]: + """Get all data PIDs (excluding supported PID queries).""" + return [ + pid + for pid in self._pids.values() + if pid.code not in (0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0) + ] + + def __contains__(self, code: int) -> bool: + return code in self._pids + + def __getitem__(self, code: int) -> PID: + return self._pids[code] + + +# Global registry instance +PIDS = PIDRegistry() diff --git a/obd2_client/src/obd2/protocol.py b/obd2_client/src/obd2/protocol.py new file mode 100644 index 0000000..f13a772 --- /dev/null +++ b/obd2_client/src/obd2/protocol.py @@ -0,0 +1,174 @@ +"""OBD2 protocol implementation.""" + +from dataclasses import dataclass +from typing import Optional, Tuple + +from ..can.frame import CANFrame +from ..can.interface import CANInterface +from ..logger import get_logger +from .pids import PID, PIDRegistry, PIDS + + +@dataclass +class OBD2Response: + """OBD2 response data. + + Attributes: + pid: PID that was queried + raw_data: Raw response data bytes + value: Decoded value + unit: Unit of measurement + """ + + pid: PID + raw_data: bytes + value: float + unit: str + + def __str__(self) -> str: + return f"{self.pid.name}: {self.value:.2f} {self.unit}" + + +class OBD2Protocol: + """OBD2 protocol handler for Mode 01 (current data). + + Handles request/response encoding and decoding for OBD2 PIDs. + """ + + MODE_CURRENT_DATA = 0x01 + MODE_RESPONSE_OFFSET = 0x40 + + def __init__( + self, + can_interface: CANInterface, + request_id: int = 0x7DF, + response_id: int = 0x7E8, + timeout: float = 0.1, + pid_registry: Optional[PIDRegistry] = None, + ): + """Initialize OBD2 protocol handler. + + Args: + can_interface: CAN interface to use + request_id: CAN ID for OBD2 requests (default: 0x7DF broadcast) + response_id: CAN ID for OBD2 responses (default: 0x7E8 ECU) + timeout: Response timeout in seconds + pid_registry: PID registry (default: global PIDS) + """ + self.can = can_interface + self.request_id = request_id + self.response_id = response_id + self.timeout = timeout + self.pids = pid_registry or PIDS + self._logger = get_logger("obd2_client.protocol") + + def build_request(self, mode: int, pid: int) -> CANFrame: + """Build an OBD2 request frame. + + Args: + mode: OBD2 mode (e.g., 0x01) + pid: Parameter ID to request + + Returns: + CANFrame ready to send + """ + data = bytes([0x02, mode, pid, 0x00, 0x00, 0x00, 0x00, 0x00]) + return CANFrame(arbitration_id=self.request_id, data=data) + + def parse_response(self, frame: CANFrame, expected_pid: int) -> Optional[bytes]: + """Parse OBD2 response frame. + + Args: + frame: Received CAN frame + expected_pid: PID we're expecting a response for + + Returns: + Data bytes if valid response, None otherwise + """ + if len(frame.data) < 3: + return None + + length = frame.data[0] + mode = frame.data[1] + pid = frame.data[2] + + expected_mode = self.MODE_CURRENT_DATA + self.MODE_RESPONSE_OFFSET + + if mode != expected_mode: + self._logger.debug(f"Unexpected mode: 0x{mode:02X}") + return None + + if pid != expected_pid: + self._logger.debug(f"Unexpected PID: 0x{pid:02X}") + return None + + data_length = length - 2 + if data_length <= 0: + return None + + return frame.data[3 : 3 + data_length] + + def query_pid(self, pid_code: int) -> Optional[OBD2Response]: + """Query a single OBD2 PID. + + Args: + pid_code: PID code to query + + Returns: + OBD2Response if successful, None otherwise + """ + pid = self.pids.get(pid_code) + if pid is None: + self._logger.warning(f"Unknown PID: 0x{pid_code:02X}") + return None + + request = self.build_request(self.MODE_CURRENT_DATA, pid_code) + + if not self.can.send(request): + self._logger.error(f"Failed to send request for PID 0x{pid_code:02X}") + return None + + response_ids = [self.response_id + i for i in range(8)] + frame = self.can.receive_filtered(response_ids, timeout=self.timeout) + + if frame is None: + self._logger.debug(f"No response for PID 0x{pid_code:02X}") + return None + + raw_data = self.parse_response(frame, pid_code) + if raw_data is None: + return None + + try: + value = pid.decode(raw_data) + return OBD2Response( + pid=pid, + raw_data=raw_data, + value=value, + unit=pid.unit, + ) + except Exception as e: + self._logger.error(f"Failed to decode PID 0x{pid_code:02X}: {e}") + return None + + def query_supported_pids(self, base_pid: int = 0x00) -> list[int]: + """Query supported PIDs starting from a base PID. + + Args: + base_pid: Base PID (0x00, 0x20, 0x40, etc.) + + Returns: + List of supported PID codes + """ + response = self.query_pid(base_pid) + if response is None: + return [] + + bitmask = int(response.value) + supported = [] + + for i in range(32): + if bitmask & (1 << (31 - i)): + supported.append(base_pid + i + 1) + + return supported diff --git a/obd2_client/src/obd2/scanner.py b/obd2_client/src/obd2/scanner.py new file mode 100644 index 0000000..654734f --- /dev/null +++ b/obd2_client/src/obd2/scanner.py @@ -0,0 +1,169 @@ +"""OBD2 PID scanner for auto-detection.""" + +from typing import List, Set, Optional +import json +from pathlib import Path + +from ..can.interface import CANInterface +from ..logger import get_logger +from .protocol import OBD2Protocol +from .pids import PIDS + + +class OBD2Scanner: + """Scanner for detecting supported OBD2 PIDs. + + Queries the vehicle ECU to determine which PIDs are supported, + then caches the results for future use. + """ + + SUPPORTED_PID_QUERIES = [0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0] + + def __init__( + self, + protocol: OBD2Protocol, + cache_file: Optional[str] = None, + ): + """Initialize scanner. + + Args: + protocol: OBD2Protocol instance + cache_file: Optional path to cache file for storing results + """ + self.protocol = protocol + self.cache_file = cache_file + self._supported_pids: Set[int] = set() + self._scanned = False + self._logger = get_logger("obd2_client.scanner") + + @property + def supported_pids(self) -> List[int]: + """Get list of supported PIDs (sorted).""" + return sorted(self._supported_pids) + + @property + def is_scanned(self) -> bool: + """Check if scan has been performed.""" + return self._scanned + + def scan(self, use_cache: bool = True) -> List[int]: + """Scan for supported PIDs. + + Args: + use_cache: If True, try to load from cache first + + Returns: + List of supported PID codes + """ + if use_cache and self._load_cache(): + self._logger.info(f"Loaded {len(self._supported_pids)} PIDs from cache") + return self.supported_pids + + self._logger.info("Starting PID scan...") + self._supported_pids.clear() + + for base_pid in self.SUPPORTED_PID_QUERIES: + self._logger.debug(f"Querying supported PIDs from 0x{base_pid:02X}") + + supported = self.protocol.query_supported_pids(base_pid) + if not supported: + self._logger.debug(f"No response for base PID 0x{base_pid:02X}") + break + + self._supported_pids.update(supported) + self._logger.debug(f"Found {len(supported)} supported PIDs") + + next_base = base_pid + 0x20 + if next_base not in supported: + break + + self._scanned = True + self._logger.info(f"Scan complete. Found {len(self._supported_pids)} supported PIDs") + + if self.cache_file: + self._save_cache() + + return self.supported_pids + + def get_readable_pids(self) -> List[int]: + """Get PIDs that can be read (have decoders in registry). + + Returns: + List of PID codes that are both supported and have decoders + """ + readable = [] + for pid_code in self._supported_pids: + pid = PIDS.get(pid_code) + if pid and pid_code not in self.SUPPORTED_PID_QUERIES: + readable.append(pid_code) + return sorted(readable) + + def is_pid_supported(self, pid_code: int) -> bool: + """Check if a specific PID is supported. + + Args: + pid_code: PID code to check + + Returns: + True if PID is supported + """ + return pid_code in self._supported_pids + + def _load_cache(self) -> bool: + """Load supported PIDs from cache file. + + Returns: + True if cache was loaded successfully + """ + if not self.cache_file: + return False + + cache_path = Path(self.cache_file) + if not cache_path.exists(): + return False + + try: + with open(cache_path, "r") as f: + data = json.load(f) + self._supported_pids = set(data.get("supported_pids", [])) + self._scanned = True + return True + except Exception as e: + self._logger.warning(f"Failed to load cache: {e}") + return False + + def _save_cache(self) -> bool: + """Save supported PIDs to cache file. + + Returns: + True if cache was saved successfully + """ + if not self.cache_file: + return False + + try: + cache_path = Path(self.cache_file) + cache_path.parent.mkdir(parents=True, exist_ok=True) + + with open(cache_path, "w") as f: + json.dump( + {"supported_pids": sorted(self._supported_pids)}, + f, + indent=2, + ) + self._logger.info(f"Saved PID cache to {self.cache_file}") + return True + except Exception as e: + self._logger.warning(f"Failed to save cache: {e}") + return False + + def print_supported_pids(self) -> None: + """Print supported PIDs in a readable format.""" + print("\n=== Supported PIDs ===") + for pid_code in self.supported_pids: + pid = PIDS.get(pid_code) + if pid: + print(f" 0x{pid_code:02X} - {pid.name}: {pid.description}") + else: + print(f" 0x{pid_code:02X} - Unknown PID") + print(f"\nTotal: {len(self._supported_pids)} PIDs") diff --git a/obd2_client/src/vehicle/__init__.py b/obd2_client/src/vehicle/__init__.py new file mode 100644 index 0000000..0c578b4 --- /dev/null +++ b/obd2_client/src/vehicle/__init__.py @@ -0,0 +1,6 @@ +"""Vehicle state and polling.""" + +from .state import VehicleState +from .poller import VehiclePoller + +__all__ = ["VehicleState", "VehiclePoller"] diff --git a/obd2_client/src/vehicle/poller.py b/obd2_client/src/vehicle/poller.py new file mode 100644 index 0000000..19abef0 --- /dev/null +++ b/obd2_client/src/vehicle/poller.py @@ -0,0 +1,193 @@ +"""Vehicle PID polling service.""" + +import threading +import time +from typing import List, Optional, Set +from enum import Enum + +from ..obd2.protocol import OBD2Protocol +from ..obd2.pids import PIDS +from ..logger import get_logger +from .state import VehicleState + + +class PollerState(Enum): + """Poller state enum.""" + + STOPPED = "stopped" + RUNNING = "running" + PAUSED = "paused" + + +class VehiclePoller: + """Background poller for vehicle PIDs. + + Supports priority-based polling with fast and slow intervals: + - Fast PIDs (RPM, speed, throttle): polled frequently + - Slow PIDs (temperatures, fuel level): polled less frequently + """ + + def __init__( + self, + protocol: OBD2Protocol, + state: VehicleState, + fast_interval: float = 0.1, + slow_interval: float = 1.0, + fast_pids: Optional[List[int]] = None, + slow_pids: Optional[List[int]] = None, + ): + """Initialize poller. + + Args: + protocol: OBD2Protocol instance + state: VehicleState to update + fast_interval: Polling interval for fast PIDs (seconds) + slow_interval: Polling interval for slow PIDs (seconds) + fast_pids: List of PIDs to poll frequently + slow_pids: List of PIDs to poll less frequently + """ + self.protocol = protocol + self.state = state + self.fast_interval = fast_interval + self.slow_interval = slow_interval + + self.fast_pids: Set[int] = set(fast_pids or [0x0C, 0x0D, 0x11]) + self.slow_pids: Set[int] = set(slow_pids or [0x05, 0x2F, 0x5C]) + + self._state = PollerState.STOPPED + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._pause_event = threading.Event() + self._pause_event.set() + + self._logger = get_logger("obd2_client.poller") + self._stats = {"queries": 0, "successes": 0, "failures": 0} + + @property + def poller_state(self) -> PollerState: + """Get current poller state.""" + return self._state + + @property + def is_running(self) -> bool: + """Check if poller is running.""" + return self._state == PollerState.RUNNING + + def set_pids( + self, + fast_pids: Optional[List[int]] = None, + slow_pids: Optional[List[int]] = None, + ) -> None: + """Update PIDs to poll. + + Args: + fast_pids: New list of fast PIDs (or None to keep current) + slow_pids: New list of slow PIDs (or None to keep current) + """ + if fast_pids is not None: + self.fast_pids = set(fast_pids) + if slow_pids is not None: + self.slow_pids = set(slow_pids) + + def start(self) -> None: + """Start the polling thread.""" + if self._thread is not None and self._thread.is_alive(): + self._logger.warning("Poller already running") + return + + self._stop_event.clear() + self._pause_event.set() + self._state = PollerState.RUNNING + + self._thread = threading.Thread(target=self._poll_loop, daemon=True) + self._thread.start() + self._logger.info("Poller started") + + def stop(self, timeout: float = 2.0) -> None: + """Stop the polling thread. + + Args: + timeout: Maximum time to wait for thread to stop + """ + if self._thread is None: + return + + self._stop_event.set() + self._pause_event.set() + + self._thread.join(timeout=timeout) + if self._thread.is_alive(): + self._logger.warning("Poller thread did not stop cleanly") + + self._thread = None + self._state = PollerState.STOPPED + self._logger.info("Poller stopped") + + def pause(self) -> None: + """Pause polling.""" + self._pause_event.clear() + self._state = PollerState.PAUSED + self._logger.info("Poller paused") + + def resume(self) -> None: + """Resume polling after pause.""" + self._pause_event.set() + self._state = PollerState.RUNNING + self._logger.info("Poller resumed") + + def get_stats(self) -> dict: + """Get polling statistics.""" + return dict(self._stats) + + def _poll_loop(self) -> None: + """Main polling loop.""" + last_slow_poll = 0.0 + + while not self._stop_event.is_set(): + self._pause_event.wait() + + if self._stop_event.is_set(): + break + + current_time = time.time() + + for pid_code in self.fast_pids: + if self._stop_event.is_set(): + break + self._poll_pid(pid_code) + + if (current_time - last_slow_poll) >= self.slow_interval: + for pid_code in self.slow_pids: + if self._stop_event.is_set(): + break + self._poll_pid(pid_code) + last_slow_poll = current_time + + sleep_time = self.fast_interval - (time.time() - current_time) + if sleep_time > 0: + self._stop_event.wait(sleep_time) + + def _poll_pid(self, pid_code: int) -> bool: + """Poll a single PID and update state. + + Args: + pid_code: PID to poll + + Returns: + True if successful + """ + self._stats["queries"] += 1 + + response = self.protocol.query_pid(pid_code) + if response is None: + self._stats["failures"] += 1 + return False + + self._stats["successes"] += 1 + self.state.update( + pid_code=pid_code, + name=response.pid.name, + value=response.value, + unit=response.unit, + ) + return True diff --git a/obd2_client/src/vehicle/state.py b/obd2_client/src/vehicle/state.py new file mode 100644 index 0000000..7ed208b --- /dev/null +++ b/obd2_client/src/vehicle/state.py @@ -0,0 +1,173 @@ +"""Vehicle state management.""" + +from dataclasses import dataclass, field +from typing import Dict, Optional, Any +from datetime import datetime +import threading + + +@dataclass +class PIDValue: + """Single PID value with metadata. + + Attributes: + pid_code: PID code + name: PID name + value: Current value + unit: Unit of measurement + timestamp: When value was last updated + """ + + pid_code: int + name: str + value: float + unit: str + timestamp: datetime = field(default_factory=datetime.now) + + def __str__(self) -> str: + return f"{self.name}: {self.value:.1f} {self.unit}" + + def age_seconds(self) -> float: + """Get age of value in seconds.""" + return (datetime.now() - self.timestamp).total_seconds() + + +class VehicleState: + """Thread-safe container for current vehicle state. + + Stores latest values for all monitored PIDs with timestamps. + """ + + def __init__(self): + self._values: Dict[int, PIDValue] = {} + self._lock = threading.RLock() + self._callbacks: list = [] + + def update( + self, + pid_code: int, + name: str, + value: float, + unit: str, + ) -> None: + """Update a PID value. + + Args: + pid_code: PID code + name: PID name + value: New value + unit: Unit of measurement + """ + with self._lock: + old_value = self._values.get(pid_code) + new_value = PIDValue( + pid_code=pid_code, + name=name, + value=value, + unit=unit, + ) + self._values[pid_code] = new_value + + for callback in self._callbacks: + try: + callback(new_value, old_value) + except Exception: + pass + + def get(self, pid_code: int) -> Optional[PIDValue]: + """Get current value for a PID. + + Args: + pid_code: PID code + + Returns: + PIDValue if available, None otherwise + """ + with self._lock: + return self._values.get(pid_code) + + def get_all(self) -> Dict[int, PIDValue]: + """Get all current values. + + Returns: + Dictionary of PID code to PIDValue + """ + with self._lock: + return dict(self._values) + + def clear(self) -> None: + """Clear all stored values.""" + with self._lock: + self._values.clear() + + def add_callback(self, callback) -> None: + """Add a callback for value updates. + + Callback signature: callback(new_value: PIDValue, old_value: Optional[PIDValue]) + """ + self._callbacks.append(callback) + + def remove_callback(self, callback) -> None: + """Remove a callback.""" + if callback in self._callbacks: + self._callbacks.remove(callback) + + @property + def rpm(self) -> Optional[float]: + """Get current RPM value.""" + val = self.get(0x0C) + return val.value if val else None + + @property + def speed(self) -> Optional[float]: + """Get current speed value (km/h).""" + val = self.get(0x0D) + return val.value if val else None + + @property + def coolant_temp(self) -> Optional[float]: + """Get current coolant temperature (°C).""" + val = self.get(0x05) + return val.value if val else None + + @property + def throttle(self) -> Optional[float]: + """Get current throttle position (%).""" + val = self.get(0x11) + return val.value if val else None + + @property + def fuel_level(self) -> Optional[float]: + """Get current fuel level (%).""" + val = self.get(0x2F) + return val.value if val else None + + @property + def oil_temp(self) -> Optional[float]: + """Get current oil temperature (°C).""" + val = self.get(0x5C) + return val.value if val else None + + def to_dict(self) -> Dict[str, Any]: + """Convert state to dictionary.""" + with self._lock: + return { + val.name: { + "value": val.value, + "unit": val.unit, + "timestamp": val.timestamp.isoformat(), + } + for val in self._values.values() + } + + def __str__(self) -> str: + """Format state as string.""" + with self._lock: + if not self._values: + return "No data" + + lines = [] + for pid_code in sorted(self._values.keys()): + val = self._values[pid_code] + lines.append(f" {val}") + return "\n".join(lines)