diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..02beea3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,10 @@ +# OBD2 Emulator Dependencies + +# CAN bus interface +python-can>=4.3.0 + +# Configuration validation +pydantic>=2.5.0 + +# Type hints (optional, for development) +typing-extensions>=4.8.0 diff --git a/scripts/monitor_can.sh b/scripts/monitor_can.sh new file mode 100644 index 0000000..4bc92f8 --- /dev/null +++ b/scripts/monitor_can.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# Мониторинг CAN трафика с фильтрацией OBD2 + +INTERFACE=${1:-vcan0} + +echo "=== CAN Monitor (OBD2) ===" +echo "Interface: $INTERFACE" +echo "Filtering: 7DF (requests) and 7E8 (responses)" +echo "Press Ctrl+C to stop" +echo "" + +# Проверка candump +if ! command -v candump &> /dev/null; then + echo "Error: candump not found. Install can-utils:" + echo " sudo apt install can-utils" + exit 1 +fi + +# Мониторинг с фильтром +candump "$INTERFACE" | grep -E "(7DF|7E8)" | while read line; do + timestamp=$(date +"%H:%M:%S.%3N") + + # Парсинг и форматирование + if echo "$line" | grep -q "7DF"; then + # Запрос + pid=$(echo "$line" | awk '{print $3}' | cut -c5-6) + echo "[$timestamp] REQUEST -> PID: 0x$pid" + elif echo "$line" | grep -q "7E8"; then + # Ответ + echo "[$timestamp] RESPONSE <- $line" + fi +done diff --git a/scripts/setup_can.sh b/scripts/setup_can.sh new file mode 100644 index 0000000..453b6cb --- /dev/null +++ b/scripts/setup_can.sh @@ -0,0 +1,62 @@ +#!/bin/bash +# Скрипт настройки CAN интерфейсов на Raspberry Pi 5 с 2CH CAN HAT + +set -e + +BITRATE=${1:-500000} + +echo "=== CAN Interface Setup ===" +echo "Bitrate: $BITRATE bps" +echo "" + +# Проверка прав +if [ "$EUID" -ne 0 ]; then + echo "Please run as root (sudo)" + exit 1 +fi + +# Загрузка модулей +echo "[1/4] Loading kernel modules..." +modprobe can +modprobe can_raw +modprobe mcp251x 2>/dev/null || true # Для SPI CAN контроллеров + +# Настройка can0 +echo "[2/4] Configuring can0..." +if ip link show can0 &>/dev/null; then + ip link set can0 down 2>/dev/null || true + ip link set can0 type can bitrate $BITRATE + ip link set can0 up + echo " can0: UP at $BITRATE bps" +else + echo " can0: Not found (skipping)" +fi + +# Настройка can1 +echo "[3/4] Configuring can1..." +if ip link show can1 &>/dev/null; then + ip link set can1 down 2>/dev/null || true + ip link set can1 type can bitrate $BITRATE + ip link set can1 up + echo " can1: UP at $BITRATE bps" +else + echo " can1: Not found (skipping)" +fi + +# Создание vcan0 для тестирования +echo "[4/4] Creating virtual CAN (vcan0)..." +modprobe vcan +if ! ip link show vcan0 &>/dev/null; then + ip link add dev vcan0 type vcan +fi +ip link set vcan0 up +echo " vcan0: UP (virtual)" + +echo "" +echo "=== Status ===" +ip -details link show type can 2>/dev/null || echo "No physical CAN interfaces" +ip -details link show type vcan 2>/dev/null || echo "No virtual CAN interfaces" + +echo "" +echo "=== Done ===" +echo "You can now run: python src/main.py -i can1 -s city" diff --git a/scripts/test_obd2.py b/scripts/test_obd2.py new file mode 100644 index 0000000..9aa8ebf --- /dev/null +++ b/scripts/test_obd2.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +Тестовый скрипт для проверки OBD2 эмулятора. +Отправляет OBD2 запросы и выводит ответы. +""" +import argparse +import sys +import time + +try: + import can +except ImportError: + print("Error: python-can not installed. Run: pip install python-can") + sys.exit(1) + + +# OBD2 PIDs для тестирования +TEST_PIDS = [ + (0x00, "Supported PIDs [01-20]"), + (0x05, "Coolant Temperature"), + (0x0C, "Engine RPM"), + (0x0D, "Vehicle Speed"), + (0x0F, "Intake Air Temperature"), + (0x11, "Throttle Position"), + (0x2F, "Fuel Tank Level"), + (0x46, "Ambient Temperature"), +] + + +def decode_pid(pid: int, data: bytes) -> str: + """Декодировать значение PID.""" + if len(data) < 1: + return "No data" + + if pid == 0x00: + # Supported PIDs bitmap + if len(data) >= 4: + mask = int.from_bytes(data[:4], "big") + supported = [] + for i in range(32): + if mask & (1 << (31 - i)): + supported.append(f"{i+1:02X}") + return f"PIDs: {', '.join(supported[:10])}..." + return f"Raw: {data.hex()}" + + elif pid == 0x05 or pid == 0x0F or pid == 0x46: + # Temperature: A - 40 + return f"{data[0] - 40} °C" + + elif pid == 0x0C: + # RPM: (A*256 + B) / 4 + if len(data) >= 2: + rpm = (data[0] * 256 + data[1]) / 4 + return f"{rpm:.0f} rpm" + + elif pid == 0x0D: + # Speed: A + return f"{data[0]} km/h" + + elif pid == 0x11 or pid == 0x2F: + # Percentage: A * 100 / 255 + return f"{data[0] * 100 / 255:.1f} %" + + return f"Raw: {data.hex()}" + + +def send_obd2_request(bus: can.Bus, pid: int, timeout: float = 1.0) -> bytes | None: + """Отправить OBD2 запрос и получить ответ.""" + # Формируем запрос Mode 01 + request = can.Message( + arbitration_id=0x7DF, + data=[0x02, 0x01, pid, 0x00, 0x00, 0x00, 0x00, 0x00], + is_extended_id=False + ) + + # Отправляем + bus.send(request) + + # Ждём ответ + start = time.time() + while time.time() - start < timeout: + msg = bus.recv(timeout=0.1) + if msg is None: + continue + + # Проверяем, что это ответ от ECU + if msg.arbitration_id == 0x7E8: + data = bytes(msg.data) + # Проверяем формат ответа + if len(data) >= 3 and data[1] == 0x41 and data[2] == pid: + # Возвращаем данные (без заголовка) + return data[3:3 + data[0] - 2] + + return None + + +def main(): + parser = argparse.ArgumentParser(description="Test OBD2 Emulator") + parser.add_argument( + "-i", "--interface", + default="vcan0", + help="CAN interface [default: vcan0]" + ) + parser.add_argument( + "-c", "--continuous", + action="store_true", + help="Continuous mode (loop forever)" + ) + parser.add_argument( + "--interval", + type=float, + default=1.0, + help="Interval between requests in continuous mode [default: 1.0]" + ) + args = parser.parse_args() + + print(f"=== OBD2 Emulator Test ===") + print(f"Interface: {args.interface}") + print("") + + try: + bus = can.Bus(interface="socketcan", channel=args.interface) + except Exception as e: + print(f"Error: Failed to connect to {args.interface}: {e}") + print("Make sure the interface is up: sudo ip link set vcan0 up") + return 1 + + try: + while True: + print(f"Timestamp: {time.strftime('%H:%M:%S')}") + print("-" * 50) + + for pid, name in TEST_PIDS: + data = send_obd2_request(bus, pid) + if data: + value = decode_pid(pid, data) + print(f" PID {pid:02X} ({name}): {value}") + else: + print(f" PID {pid:02X} ({name}): No response") + + print("") + + if not args.continuous: + break + + time.sleep(args.interval) + + except KeyboardInterrupt: + print("\nStopped") + finally: + bus.shutdown() + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..955763e --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,2 @@ +"""OBD2 Emulator - ECU simulator for testing onboard computers.""" +__version__ = "1.0.0" diff --git a/src/can_interface.py b/src/can_interface.py new file mode 100644 index 0000000..770b70e --- /dev/null +++ b/src/can_interface.py @@ -0,0 +1,171 @@ +""" +Управление CAN интерфейсом с автоматической настройкой. +Поддержка физических (can0, can1) и виртуальных (vcan0) интерфейсов. +""" +import subprocess +import platform +from dataclasses import dataclass +from typing import Callable + +import can + +from config import CANConfig +from logger import get_logger + +logger = get_logger(__name__) + + +class CANInterfaceError(Exception): + """Ошибка настройки CAN интерфейса.""" + pass + + +@dataclass +class CANFrame: + """CAN сообщение.""" + arbitration_id: int + data: bytes + is_extended_id: bool = False + + @classmethod + def from_can_message(cls, msg: can.Message) -> "CANFrame": + return cls( + arbitration_id=msg.arbitration_id, + data=bytes(msg.data), + is_extended_id=msg.is_extended_id + ) + + def to_can_message(self) -> can.Message: + return can.Message( + arbitration_id=self.arbitration_id, + data=self.data, + is_extended_id=self.is_extended_id + ) + + +class CANInterface: + """ + Управление CAN интерфейсом. + Автоматически поднимает интерфейс при инициализации. + """ + + def __init__(self, config: CANConfig): + self.config = config + self._bus: can.Bus | None = None + self._is_setup = False + + def setup(self) -> None: + """Настроить и поднять CAN интерфейс.""" + if self._is_setup: + return + + if platform.system() != "Linux": + logger.warning("CAN interface setup is only supported on Linux. Skipping.") + self._is_setup = True + return + + try: + if self.config.is_virtual: + self._setup_virtual_can() + else: + self._setup_physical_can() + self._is_setup = True + logger.info(f"CAN interface {self.config.interface} is ready") + except Exception as e: + raise CANInterfaceError(f"Failed to setup {self.config.interface}: {e}") + + def _run_command(self, cmd: list[str], check: bool = True) -> subprocess.CompletedProcess: + """Выполнить shell команду.""" + logger.debug(f"Running: {' '.join(cmd)}") + return subprocess.run(cmd, capture_output=True, text=True, check=check) + + def _setup_virtual_can(self) -> None: + """Настройка виртуального CAN интерфейса.""" + interface = self.config.interface + + # Загружаем модуль vcan + self._run_command(["sudo", "modprobe", "vcan"], check=False) + + # Проверяем, существует ли интерфейс + result = self._run_command(["ip", "link", "show", interface], check=False) + + if result.returncode != 0: + # Создаём интерфейс + logger.info(f"Creating virtual CAN interface: {interface}") + self._run_command(["sudo", "ip", "link", "add", "dev", interface, "type", "vcan"]) + + # Поднимаем интерфейс + logger.info(f"Bringing up interface: {interface}") + self._run_command(["sudo", "ip", "link", "set", "up", interface]) + + def _setup_physical_can(self) -> None: + """Настройка физического CAN интерфейса.""" + interface = self.config.interface + bitrate = self.config.bitrate + + # Сначала опускаем интерфейс (если был поднят) + self._run_command(["sudo", "ip", "link", "set", "down", interface], check=False) + + # Настраиваем bitrate и поднимаем + logger.info(f"Setting up physical CAN interface: {interface} at {bitrate} bps") + self._run_command([ + "sudo", "ip", "link", "set", interface, + "type", "can", "bitrate", str(bitrate) + ]) + self._run_command(["sudo", "ip", "link", "set", "up", interface]) + + def connect(self) -> None: + """Подключиться к CAN шине.""" + if not self._is_setup: + self.setup() + + if self._bus is not None: + return + + logger.info(f"Connecting to CAN bus: {self.config.interface}") + self._bus = can.Bus( + interface="socketcan", + channel=self.config.interface, + receive_own_messages=False + ) + + def disconnect(self) -> None: + """Отключиться от CAN шины.""" + if self._bus: + self._bus.shutdown() + self._bus = None + logger.info(f"Disconnected from {self.config.interface}") + + def send(self, frame: CANFrame) -> None: + """Отправить CAN сообщение.""" + if not self._bus: + raise CANInterfaceError("Not connected to CAN bus") + + msg = frame.to_can_message() + self._bus.send(msg) + + if logger.isEnabledFor(10): # DEBUG level + logger.debug(f"TX: {frame.arbitration_id:03X}#{frame.data.hex().upper()}") + + def receive(self, timeout: float = 0.1) -> CANFrame | None: + """Получить CAN сообщение.""" + if not self._bus: + raise CANInterfaceError("Not connected to CAN bus") + + msg = self._bus.recv(timeout=timeout) + if msg is None: + return None + + frame = CANFrame.from_can_message(msg) + + if logger.isEnabledFor(10): # DEBUG level + logger.debug(f"RX: {frame.arbitration_id:03X}#{frame.data.hex().upper()}") + + return frame + + def __enter__(self) -> "CANInterface": + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.disconnect() diff --git a/src/config.json b/src/config.json new file mode 100644 index 0000000..6121895 --- /dev/null +++ b/src/config.json @@ -0,0 +1,25 @@ +{ + "can": { + "interface": "vcan0", + "bitrate": 500000 + }, + "vehicle": { + "idle_rpm": 850, + "max_rpm": 6500, + "redline_rpm": 6000, + "ambient_temp": 20.0, + "target_coolant_temp": 90.0, + "fuel_tank_capacity": 58.0, + "initial_fuel_level": 75.0, + "max_speed": 220, + "engine_displacement": 2.0 + }, + "simulator": { + "update_rate_hz": 10, + "scenario": "idle" + }, + "logging": { + "level": "INFO", + "log_can_frames": false + } +} diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..df60296 --- /dev/null +++ b/src/config.py @@ -0,0 +1,104 @@ +""" +Конфигурация OBD2 эмулятора. +Поддержка физических CAN интерфейсов (can0, can1) и виртуальных (vcan0). +""" +import json +from pathlib import Path +from typing import Literal +from pydantic import BaseModel, Field, field_validator + + +class CANConfig(BaseModel): + """Конфигурация CAN интерфейса.""" + interface: str = Field(default="vcan0", description="CAN interface name") + bitrate: int = Field(default=500000, description="CAN bitrate (ignored for vcan)") + + @field_validator("interface") + @classmethod + def validate_interface(cls, v: str) -> str: + valid_prefixes = ("can", "vcan") + if not any(v.startswith(prefix) for prefix in valid_prefixes): + raise ValueError(f"Interface must start with: {valid_prefixes}") + return v + + @property + def is_virtual(self) -> bool: + """Check if interface is virtual.""" + return self.interface.startswith("vcan") + + +class VehicleConfig(BaseModel): + """Конфигурация симулируемого автомобиля.""" + # Базовые параметры двигателя + idle_rpm: int = Field(default=850, ge=600, le=1200) + max_rpm: int = Field(default=6500, ge=4000, le=9000) + redline_rpm: int = Field(default=6000, ge=3500, le=8500) + + # Температуры + ambient_temp: float = Field(default=20.0, ge=-40, le=50) + target_coolant_temp: float = Field(default=90.0, ge=80, le=105) + + # Топливо + fuel_tank_capacity: float = Field(default=58.0, ge=30, le=100) + initial_fuel_level: float = Field(default=75.0, ge=0, le=100) + + # Характеристики авто + max_speed: int = Field(default=220, ge=100, le=350) + engine_displacement: float = Field(default=2.0, description="Engine displacement in liters") + + +class SimulatorConfig(BaseModel): + """Конфигурация симулятора.""" + update_rate_hz: int = Field(default=10, ge=1, le=100, description="Vehicle state update rate") + scenario: Literal["idle", "city", "highway", "warmup", "manual"] = Field(default="idle") + + +class LoggingConfig(BaseModel): + """Конфигурация логирования.""" + level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(default="INFO") + format: str = Field(default="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s") + log_can_frames: bool = Field(default=False, description="Log all CAN frames (verbose)") + + +class Config(BaseModel): + """Главная конфигурация приложения.""" + can: CANConfig = Field(default_factory=CANConfig) + vehicle: VehicleConfig = Field(default_factory=VehicleConfig) + simulator: SimulatorConfig = Field(default_factory=SimulatorConfig) + logging: LoggingConfig = Field(default_factory=LoggingConfig) + + +def load_config(config_path: Path | None = None) -> Config: + """Загрузка конфигурации из файла или defaults.""" + if config_path and config_path.exists(): + with open(config_path, "r", encoding="utf-8") as f: + data = json.load(f) + return Config.model_validate(data) + return Config() + + +# Глобальный экземпляр конфигурации +_config: Config | None = None + + +def get_config() -> Config: + """Получить глобальную конфигурацию.""" + global _config + if _config is None: + config_path = Path(__file__).parent / "config.json" + _config = load_config(config_path if config_path.exists() else None) + return _config + + +def set_config(config: Config) -> None: + """Установить глобальную конфигурацию.""" + global _config + _config = config + + +# Прокси для удобного доступа +class _ConfigProxy: + def __getattr__(self, name): + return getattr(get_config(), name) + +config = _ConfigProxy() diff --git a/src/logger.py b/src/logger.py new file mode 100644 index 0000000..a08ff6c --- /dev/null +++ b/src/logger.py @@ -0,0 +1,30 @@ +""" +Настройка логирования для OBD2 эмулятора. +""" +import logging +import sys +from config import config + + +def setup_logging() -> None: + """Настройка корневого логгера.""" + root_logger = logging.getLogger() + root_logger.setLevel(config.logging.level) + + # Очищаем существующие хендлеры + root_logger.handlers.clear() + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(config.logging.level) + console_handler.setFormatter(logging.Formatter(config.logging.format)) + root_logger.addHandler(console_handler) + + +def get_logger(name: str) -> logging.Logger: + """Получить логгер по имени.""" + return logging.getLogger(name) + + +# Инициализация при импорте +setup_logging() diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..5c084cd --- /dev/null +++ b/src/main.py @@ -0,0 +1,325 @@ +""" +OBD2 Emulator - Главный модуль. +Эмулятор ECU для тестирования бортовых компьютеров. +""" +import argparse +import signal +import sys +import time +import threading +from pathlib import Path + +from config import Config, CANConfig, VehicleConfig, SimulatorConfig, set_config, get_config +from logger import get_logger, setup_logging +from can_interface import CANInterface, CANFrame, CANInterfaceError +from obd2 import OBD2Protocol +from simulator import VehicleSimulator, ScenarioManager + +logger = get_logger(__name__) + + +class OBD2Emulator: + """ + Главный класс эмулятора OBD2. + Объединяет CAN интерфейс, OBD2 протокол и симулятор автомобиля. + """ + + def __init__(self, config: Config): + self.config = config + self.can_interface = CANInterface(config.can) + self.obd2_protocol = OBD2Protocol() + self.vehicle_simulator = VehicleSimulator(config.vehicle) + self.scenario_manager = ScenarioManager(self.vehicle_simulator) + + self._running = False + self._update_thread: threading.Thread | None = None + self._can_thread: threading.Thread | None = None + + def start(self, scenario: str = "idle") -> None: + """Запустить эмулятор.""" + logger.info("Starting OBD2 Emulator...") + + # Настраиваем CAN интерфейс + try: + self.can_interface.setup() + self.can_interface.connect() + except CANInterfaceError as e: + logger.error(f"Failed to setup CAN interface: {e}") + raise + + # Запускаем сценарий + self.scenario_manager.start(scenario) + + self._running = True + + # Поток обновления симулятора + self._update_thread = threading.Thread( + target=self._update_loop, + name="SimulatorUpdate", + daemon=True + ) + self._update_thread.start() + + # Поток обработки CAN + self._can_thread = threading.Thread( + target=self._can_loop, + name="CANHandler", + daemon=True + ) + self._can_thread.start() + + logger.info(f"OBD2 Emulator started on {self.config.can.interface}") + logger.info(f"Scenario: {scenario}") + + def stop(self) -> None: + """Остановить эмулятор.""" + logger.info("Stopping OBD2 Emulator...") + self._running = False + + self.scenario_manager.stop() + self.vehicle_simulator.stop_engine() + + # Ждём завершения потоков + if self._update_thread and self._update_thread.is_alive(): + self._update_thread.join(timeout=2.0) + + if self._can_thread and self._can_thread.is_alive(): + self._can_thread.join(timeout=2.0) + + self.can_interface.disconnect() + logger.info("OBD2 Emulator stopped") + + def _update_loop(self) -> None: + """Цикл обновления симулятора.""" + update_interval = 1.0 / self.config.simulator.update_rate_hz + + while self._running: + try: + # Обновляем сценарий + self.scenario_manager.update() + + # Обновляем симулятор + self.vehicle_simulator.update() + + time.sleep(update_interval) + + except Exception as e: + logger.error(f"Error in update loop: {e}", exc_info=True) + time.sleep(0.1) + + def _can_loop(self) -> None: + """Цикл обработки CAN сообщений.""" + while self._running: + try: + # Получаем CAN фрейм + frame = self.can_interface.receive(timeout=0.1) + if frame is None: + continue + + # Обрабатываем как OBD2 запрос + response_frame = self.obd2_protocol.process_frame( + frame, + self.vehicle_simulator.get_state() + ) + + # Отправляем ответ + if response_frame: + self.can_interface.send(response_frame) + + except CANInterfaceError as e: + logger.error(f"CAN error: {e}") + time.sleep(0.1) + except Exception as e: + logger.error(f"Error in CAN loop: {e}", exc_info=True) + time.sleep(0.1) + + def switch_scenario(self, scenario: str) -> None: + """Переключить сценарий.""" + self.scenario_manager.stop() + self.scenario_manager.start(scenario) + logger.info(f"Switched to scenario: {scenario}") + + def get_status(self) -> dict: + """Получить статус эмулятора.""" + state = self.vehicle_simulator.get_state() + return { + "running": self._running, + "interface": self.config.can.interface, + "scenario": self.scenario_manager.current_scenario.name if self.scenario_manager.current_scenario else None, + "vehicle": { + "engine_running": state.engine_running, + "rpm": round(state.rpm), + "speed": round(state.speed, 1), + "coolant_temp": round(state.coolant_temp, 1), + "throttle": round(state.throttle_pos, 1), + "fuel_level": round(state.fuel_level, 1), + }, + "obd2_stats": self.obd2_protocol.get_stats(), + } + + def print_status(self) -> None: + """Вывести статус в консоль.""" + state = self.vehicle_simulator.get_state() + stats = self.obd2_protocol.get_stats() + + print("\033[2J\033[H") # Clear screen + print("=" * 60) + print(" OBD2 EMULATOR STATUS") + print("=" * 60) + print(f" Interface: {self.config.can.interface}") + print(f" Scenario: {self.scenario_manager.current_scenario.name if self.scenario_manager.current_scenario else 'None'}") + print("-" * 60) + print(" ENGINE:") + print(f" Running: {'YES' if state.engine_running else 'NO'}") + print(f" RPM: {state.rpm:>6.0f} rpm") + print(f" Load: {state.engine_load:>6.1f} %") + print(f" Throttle: {state.throttle_pos:>6.1f} %") + print("-" * 60) + print(" VEHICLE:") + print(f" Speed: {state.speed:>6.1f} km/h") + print(f" Gear: {state.gear:>6}") + print(f" Distance: {state.total_distance:>6.1f} km") + print("-" * 60) + print(" TEMPERATURES:") + print(f" Coolant: {state.coolant_temp:>6.1f} °C") + print(f" Oil: {state.oil_temp:>6.1f} °C") + print(f" Intake Air: {state.intake_temp:>6.1f} °C") + print(f" Ambient: {state.ambient_temp:>6.1f} °C") + print("-" * 60) + print(" FUEL:") + print(f" Level: {state.fuel_level:>6.1f} %") + print(f" Rate: {state.fuel_rate:>6.2f} L/h") + print("-" * 60) + print(" OBD2 STATS:") + print(f" Requests: {stats['requests_received']:>6}") + print(f" Responses: {stats['responses_sent']:>6}") + print(f" Unsupported: {stats['unsupported_requests']:>6}") + print("=" * 60) + print(" Press Ctrl+C to stop") + print("=" * 60) + + +def parse_args() -> argparse.Namespace: + """Парсинг аргументов командной строки.""" + parser = argparse.ArgumentParser( + description="OBD2 Emulator - ECU simulator for testing onboard computers", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Available scenarios: + idle - Engine idling, vehicle stationary + warmup - Cold start engine warmup + city - City driving with stops + highway - Highway cruising at 110-120 km/h + acceleration - Full throttle acceleration test + dynamic_city - Advanced city driving patterns + manual - Manual control (no automation) + +Examples: + python main.py --interface can1 --scenario city + python main.py --interface vcan0 --scenario highway + python main.py -i vcan0 -s warmup --status-interval 5 + """ + ) + + parser.add_argument( + "-i", "--interface", + default="vcan0", + help="CAN interface (can0, can1, vcan0, etc.) [default: vcan0]" + ) + + parser.add_argument( + "-b", "--bitrate", + type=int, + default=500000, + help="CAN bitrate in bps (ignored for vcan) [default: 500000]" + ) + + parser.add_argument( + "-s", "--scenario", + default="idle", + choices=["idle", "warmup", "city", "highway", "acceleration", "dynamic_city", "manual"], + help="Initial scenario [default: idle]" + ) + + parser.add_argument( + "--status-interval", + type=float, + default=1.0, + help="Status display interval in seconds [default: 1.0]" + ) + + parser.add_argument( + "--no-status", + action="store_true", + help="Disable status display" + ) + + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging" + ) + + return parser.parse_args() + + +def main() -> int: + """Главная функция.""" + args = parse_args() + + # Создаём конфигурацию + config = Config( + can=CANConfig( + interface=args.interface, + bitrate=args.bitrate, + ), + vehicle=VehicleConfig(), + simulator=SimulatorConfig( + scenario=args.scenario, + ), + ) + + if args.debug: + config.logging.level = "DEBUG" + + set_config(config) + setup_logging() + + # Создаём эмулятор + emulator = OBD2Emulator(config) + + # Обработчик сигналов + def signal_handler(sig, frame): + logger.info("Received shutdown signal") + emulator.stop() + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + emulator.start(scenario=args.scenario) + + # Главный цикл с отображением статуса + while True: + if not args.no_status: + emulator.print_status() + + time.sleep(args.status_interval) + + except CANInterfaceError as e: + logger.error(f"CAN interface error: {e}") + return 1 + except KeyboardInterrupt: + pass + except Exception as e: + logger.error(f"Unexpected error: {e}", exc_info=True) + return 1 + finally: + emulator.stop() + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/obd2/__init__.py b/src/obd2/__init__.py new file mode 100644 index 0000000..422682f --- /dev/null +++ b/src/obd2/__init__.py @@ -0,0 +1,12 @@ +"""OBD2 протокол и обработка PIDs.""" +from .pids import PID, PIDRegistry, get_pid_registry +from .protocol import OBD2Protocol, OBD2Request, OBD2Response + +__all__ = [ + "PID", + "PIDRegistry", + "get_pid_registry", + "OBD2Protocol", + "OBD2Request", + "OBD2Response", +] diff --git a/src/obd2/pids.py b/src/obd2/pids.py new file mode 100644 index 0000000..d3be087 --- /dev/null +++ b/src/obd2/pids.py @@ -0,0 +1,476 @@ +""" +Определения OBD2 PIDs согласно SAE J1979. +Поддержка Mode 01 (текущие данные) и Mode 09 (информация об авто). +""" +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Callable, Any + + +class OBD2Mode(IntEnum): + """OBD2 режимы (сервисы).""" + CURRENT_DATA = 0x01 # Текущие данные + FREEZE_FRAME = 0x02 # Стоп-кадр + STORED_DTC = 0x03 # Сохранённые ошибки + CLEAR_DTC = 0x04 # Очистка ошибок + TEST_RESULTS_O2 = 0x05 # Тесты кислородных датчиков + TEST_RESULTS = 0x06 # Результаты тестов + PENDING_DTC = 0x07 # Ожидающие ошибки + CONTROL = 0x08 # Управление + VEHICLE_INFO = 0x09 # Информация об автомобиле + PERMANENT_DTC = 0x0A # Постоянные ошибки + + +@dataclass +class PID: + """Определение OBD2 PID.""" + pid: int + name: str + description: str + unit: str + min_value: float + max_value: float + bytes_count: int + # Функция кодирования: value -> bytes + encoder: Callable[[float], bytes] + # Функция декодирования: bytes -> value (для отладки) + decoder: Callable[[bytes], float] | None = None + + +class PIDRegistry: + """Реестр всех поддерживаемых PIDs.""" + + def __init__(self): + self._pids: dict[int, PID] = {} + self._register_mode01_pids() + + def register(self, pid: PID) -> None: + """Зарегистрировать PID.""" + self._pids[pid.pid] = pid + + def get(self, pid_code: int) -> PID | None: + """Получить PID по коду.""" + return self._pids.get(pid_code) + + def get_supported_pids_mask(self, start_pid: int) -> int: + """ + Получить битовую маску поддерживаемых PIDs. + Для PIDs 0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0 + """ + mask = 0 + for i in range(32): + pid_code = start_pid + i + 1 + if pid_code in self._pids: + mask |= (1 << (31 - i)) + return mask + + def _register_mode01_pids(self) -> None: + """Регистрация Mode 01 PIDs.""" + + # PID 00: Supported PIDs [01-20] + self.register(PID( + pid=0x00, + name="PIDS_SUPPORTED_01_20", + description="Supported PIDs [01-20]", + unit="", + min_value=0, max_value=0xFFFFFFFF, + bytes_count=4, + encoder=lambda v: int(v).to_bytes(4, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 01: Monitor status since DTCs cleared + self.register(PID( + pid=0x01, + name="MONITOR_STATUS", + description="Monitor status since DTCs cleared", + unit="", + min_value=0, max_value=0xFFFFFFFF, + bytes_count=4, + encoder=lambda v: int(v).to_bytes(4, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 03: Fuel system status + self.register(PID( + pid=0x03, + name="FUEL_SYSTEM_STATUS", + description="Fuel system status", + unit="", + min_value=0, max_value=0xFFFF, + bytes_count=2, + encoder=lambda v: int(v).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 04: Calculated engine load + self.register(PID( + pid=0x04, + name="ENGINE_LOAD", + description="Calculated engine load", + unit="%", + min_value=0, max_value=100, + bytes_count=1, + encoder=lambda v: bytes([int(v * 255 / 100)]), + decoder=lambda b: b[0] * 100 / 255 + )) + + # PID 05: Engine coolant temperature + self.register(PID( + pid=0x05, + name="COOLANT_TEMP", + description="Engine coolant temperature", + unit="°C", + min_value=-40, max_value=215, + bytes_count=1, + encoder=lambda v: bytes([int(v + 40)]), + decoder=lambda b: b[0] - 40 + )) + + # PID 06: Short term fuel trim - Bank 1 + self.register(PID( + pid=0x06, + name="SHORT_FUEL_TRIM_1", + description="Short term fuel trim - Bank 1", + unit="%", + min_value=-100, max_value=99.2, + bytes_count=1, + encoder=lambda v: bytes([int((v + 100) * 128 / 100)]), + decoder=lambda b: (b[0] - 128) * 100 / 128 + )) + + # PID 07: Long term fuel trim - Bank 1 + self.register(PID( + pid=0x07, + name="LONG_FUEL_TRIM_1", + description="Long term fuel trim - Bank 1", + unit="%", + min_value=-100, max_value=99.2, + bytes_count=1, + encoder=lambda v: bytes([int((v + 100) * 128 / 100)]), + decoder=lambda b: (b[0] - 128) * 100 / 128 + )) + + # PID 0B: Intake manifold absolute pressure + self.register(PID( + pid=0x0B, + name="INTAKE_PRESSURE", + description="Intake manifold absolute pressure", + unit="kPa", + min_value=0, max_value=255, + bytes_count=1, + encoder=lambda v: bytes([int(v)]), + decoder=lambda b: b[0] + )) + + # PID 0C: Engine RPM + self.register(PID( + pid=0x0C, + name="ENGINE_RPM", + description="Engine RPM", + unit="rpm", + min_value=0, max_value=16383.75, + bytes_count=2, + encoder=lambda v: int(v * 4).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") / 4 + )) + + # PID 0D: Vehicle speed + self.register(PID( + pid=0x0D, + name="VEHICLE_SPEED", + description="Vehicle speed", + unit="km/h", + min_value=0, max_value=255, + bytes_count=1, + encoder=lambda v: bytes([int(v)]), + decoder=lambda b: b[0] + )) + + # PID 0E: Timing advance + self.register(PID( + pid=0x0E, + name="TIMING_ADVANCE", + description="Timing advance", + unit="° before TDC", + min_value=-64, max_value=63.5, + bytes_count=1, + encoder=lambda v: bytes([int((v + 64) * 2)]), + decoder=lambda b: b[0] / 2 - 64 + )) + + # PID 0F: Intake air temperature + self.register(PID( + pid=0x0F, + name="INTAKE_TEMP", + description="Intake air temperature", + unit="°C", + min_value=-40, max_value=215, + bytes_count=1, + encoder=lambda v: bytes([int(v + 40)]), + decoder=lambda b: b[0] - 40 + )) + + # PID 10: MAF air flow rate + self.register(PID( + pid=0x10, + name="MAF_FLOW", + description="MAF air flow rate", + unit="g/s", + min_value=0, max_value=655.35, + bytes_count=2, + encoder=lambda v: int(v * 100).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") / 100 + )) + + # PID 11: Throttle position + self.register(PID( + pid=0x11, + name="THROTTLE_POS", + description="Throttle position", + unit="%", + min_value=0, max_value=100, + bytes_count=1, + encoder=lambda v: bytes([int(v * 255 / 100)]), + decoder=lambda b: b[0] * 100 / 255 + )) + + # PID 1C: OBD standards this vehicle conforms to + self.register(PID( + pid=0x1C, + name="OBD_STANDARD", + description="OBD standards", + unit="", + min_value=0, max_value=255, + bytes_count=1, + encoder=lambda v: bytes([int(v)]), + decoder=lambda b: b[0] + )) + + # PID 1F: Run time since engine start + self.register(PID( + pid=0x1F, + name="RUN_TIME", + description="Run time since engine start", + unit="s", + min_value=0, max_value=65535, + bytes_count=2, + encoder=lambda v: int(v).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 20: Supported PIDs [21-40] + self.register(PID( + pid=0x20, + name="PIDS_SUPPORTED_21_40", + description="Supported PIDs [21-40]", + unit="", + min_value=0, max_value=0xFFFFFFFF, + bytes_count=4, + encoder=lambda v: int(v).to_bytes(4, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 21: Distance traveled with MIL on + self.register(PID( + pid=0x21, + name="DISTANCE_MIL_ON", + description="Distance traveled with MIL on", + unit="km", + min_value=0, max_value=65535, + bytes_count=2, + encoder=lambda v: int(v).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 23: Fuel Rail Gauge Pressure + self.register(PID( + pid=0x23, + name="FUEL_RAIL_PRESSURE", + description="Fuel Rail Gauge Pressure", + unit="kPa", + min_value=0, max_value=655350, + bytes_count=2, + encoder=lambda v: int(v / 10).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") * 10 + )) + + # PID 2F: Fuel Tank Level Input + self.register(PID( + pid=0x2F, + name="FUEL_LEVEL", + description="Fuel Tank Level Input", + unit="%", + min_value=0, max_value=100, + bytes_count=1, + encoder=lambda v: bytes([int(v * 255 / 100)]), + decoder=lambda b: b[0] * 100 / 255 + )) + + # PID 31: Distance since codes cleared + self.register(PID( + pid=0x31, + name="DISTANCE_SINCE_CLR", + description="Distance since codes cleared", + unit="km", + min_value=0, max_value=65535, + bytes_count=2, + encoder=lambda v: int(v).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 33: Absolute Barometric Pressure + self.register(PID( + pid=0x33, + name="BAROMETRIC_PRESSURE", + description="Absolute Barometric Pressure", + unit="kPa", + min_value=0, max_value=255, + bytes_count=1, + encoder=lambda v: bytes([int(v)]), + decoder=lambda b: b[0] + )) + + # PID 40: Supported PIDs [41-60] + self.register(PID( + pid=0x40, + name="PIDS_SUPPORTED_41_60", + description="Supported PIDs [41-60]", + unit="", + min_value=0, max_value=0xFFFFFFFF, + bytes_count=4, + encoder=lambda v: int(v).to_bytes(4, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 42: Control module voltage + self.register(PID( + pid=0x42, + name="CONTROL_MODULE_VOLTAGE", + description="Control module voltage", + unit="V", + min_value=0, max_value=65.535, + bytes_count=2, + encoder=lambda v: int(v * 1000).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") / 1000 + )) + + # PID 45: Relative throttle position + self.register(PID( + pid=0x45, + name="RELATIVE_THROTTLE_POS", + description="Relative throttle position", + unit="%", + min_value=0, max_value=100, + bytes_count=1, + encoder=lambda v: bytes([int(v * 255 / 100)]), + decoder=lambda b: b[0] * 100 / 255 + )) + + # PID 46: Ambient air temperature + self.register(PID( + pid=0x46, + name="AMBIENT_TEMP", + description="Ambient air temperature", + unit="°C", + min_value=-40, max_value=215, + bytes_count=1, + encoder=lambda v: bytes([int(v + 40)]), + decoder=lambda b: b[0] - 40 + )) + + # PID 49: Accelerator pedal position D + self.register(PID( + pid=0x49, + name="ACCELERATOR_POS_D", + description="Accelerator pedal position D", + unit="%", + min_value=0, max_value=100, + bytes_count=1, + encoder=lambda v: bytes([int(v * 255 / 100)]), + decoder=lambda b: b[0] * 100 / 255 + )) + + # PID 4A: Accelerator pedal position E + self.register(PID( + pid=0x4A, + name="ACCELERATOR_POS_E", + description="Accelerator pedal position E", + unit="%", + min_value=0, max_value=100, + bytes_count=1, + encoder=lambda v: bytes([int(v * 255 / 100)]), + decoder=lambda b: b[0] * 100 / 255 + )) + + # PID 4C: Commanded throttle actuator + self.register(PID( + pid=0x4C, + name="COMMANDED_THROTTLE", + description="Commanded throttle actuator", + unit="%", + min_value=0, max_value=100, + bytes_count=1, + encoder=lambda v: bytes([int(v * 255 / 100)]), + decoder=lambda b: b[0] * 100 / 255 + )) + + # PID 5C: Engine oil temperature + self.register(PID( + pid=0x5C, + name="OIL_TEMP", + description="Engine oil temperature", + unit="°C", + min_value=-40, max_value=210, + bytes_count=1, + encoder=lambda v: bytes([int(v + 40)]), + decoder=lambda b: b[0] - 40 + )) + + # PID 5E: Engine fuel rate + self.register(PID( + pid=0x5E, + name="FUEL_RATE", + description="Engine fuel rate", + unit="L/h", + min_value=0, max_value=3276.75, + bytes_count=2, + encoder=lambda v: int(v * 20).to_bytes(2, "big"), + decoder=lambda b: int.from_bytes(b, "big") / 20 + )) + + # PID 60: Supported PIDs [61-80] + self.register(PID( + pid=0x60, + name="PIDS_SUPPORTED_61_80", + description="Supported PIDs [61-80]", + unit="", + min_value=0, max_value=0xFFFFFFFF, + bytes_count=4, + encoder=lambda v: int(v).to_bytes(4, "big"), + decoder=lambda b: int.from_bytes(b, "big") + )) + + # PID 62: Actual engine percent torque + self.register(PID( + pid=0x62, + name="ENGINE_TORQUE_ACTUAL", + description="Actual engine percent torque", + unit="%", + min_value=-125, max_value=130, + bytes_count=1, + encoder=lambda v: bytes([int(v + 125)]), + decoder=lambda b: b[0] - 125 + )) + + +# Синглтон реестра +_pid_registry: PIDRegistry | None = None + + +def get_pid_registry() -> PIDRegistry: + """Получить глобальный реестр PIDs.""" + global _pid_registry + if _pid_registry is None: + _pid_registry = PIDRegistry() + return _pid_registry diff --git a/src/obd2/protocol.py b/src/obd2/protocol.py new file mode 100644 index 0000000..f3cd7b3 --- /dev/null +++ b/src/obd2/protocol.py @@ -0,0 +1,245 @@ +""" +OBD2 протокол поверх CAN (ISO 15765-4). +Обработка запросов и формирование ответов. +""" +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from can_interface import CANFrame +from logger import get_logger +from .pids import OBD2Mode, get_pid_registry + +if TYPE_CHECKING: + from simulator.vehicle import VehicleState + +logger = get_logger(__name__) + + +# Стандартные CAN ID для OBD2 +OBD2_REQUEST_ID = 0x7DF # Broadcast запрос ко всем ECU +OBD2_RESPONSE_ID = 0x7E8 # Ответ от Engine ECU (первый ECU) +OBD2_FUNCTIONAL_RANGE = range(0x7E0, 0x7E8) # ECU физические адреса + + +@dataclass +class OBD2Request: + """Разобранный OBD2 запрос.""" + mode: int + pid: int + raw_data: bytes + + @classmethod + def from_can_frame(cls, frame: CANFrame) -> "OBD2Request | None": + """Распарсить CAN фрейм как OBD2 запрос.""" + # Проверяем ID + if frame.arbitration_id != OBD2_REQUEST_ID and frame.arbitration_id not in OBD2_FUNCTIONAL_RANGE: + return None + + data = frame.data + if len(data) < 2: + return None + + # Первый байт - длина данных (PCI byte для Single Frame) + length = data[0] + if length < 1 or length > 7: + return None + + # Второй байт - режим (mode/service) + mode = data[1] + + # Третий байт - PID (если есть) + pid = data[2] if length > 1 and len(data) > 2 else 0 + + return cls(mode=mode, pid=pid, raw_data=data) + + def __str__(self) -> str: + return f"OBD2Request(mode={self.mode:02X}, pid={self.pid:02X})" + + +@dataclass +class OBD2Response: + """OBD2 ответ для отправки.""" + mode: int # Mode + 0x40 + pid: int + data: bytes + + def to_can_frame(self) -> CANFrame: + """Преобразовать в CAN фрейм.""" + # Single Frame формат: [length, mode+0x40, pid, data...] + payload = bytes([ + len(self.data) + 2, # Length: mode + pid + data + self.mode + 0x40, # Response mode + self.pid, # PID + ]) + self.data + + # Дополняем до 8 байт + payload = payload.ljust(8, b'\x00') + + return CANFrame( + arbitration_id=OBD2_RESPONSE_ID, + data=payload + ) + + +class OBD2Protocol: + """ + Обработчик OBD2 протокола. + Получает запросы, формирует ответы на основе состояния автомобиля. + """ + + def __init__(self): + self.pid_registry = get_pid_registry() + self._stats = { + "requests_received": 0, + "responses_sent": 0, + "unsupported_requests": 0, + } + + def process_frame(self, frame: CANFrame, vehicle_state: "VehicleState") -> CANFrame | None: + """ + Обработать входящий CAN фрейм. + Возвращает ответный фрейм или None, если фрейм не является OBD2 запросом. + """ + request = OBD2Request.from_can_frame(frame) + if request is None: + return None + + self._stats["requests_received"] += 1 + logger.debug(f"Received: {request}") + + response = self._handle_request(request, vehicle_state) + if response: + self._stats["responses_sent"] += 1 + return response.to_can_frame() + + self._stats["unsupported_requests"] += 1 + return None + + def _handle_request(self, request: OBD2Request, state: "VehicleState") -> OBD2Response | None: + """Обработать OBD2 запрос и вернуть ответ.""" + + if request.mode == OBD2Mode.CURRENT_DATA: + return self._handle_mode01(request, state) + elif request.mode == OBD2Mode.VEHICLE_INFO: + return self._handle_mode09(request) + elif request.mode == OBD2Mode.STORED_DTC: + return self._handle_mode03(request) + elif request.mode == OBD2Mode.CLEAR_DTC: + return self._handle_mode04(request) + + logger.debug(f"Unsupported mode: {request.mode:02X}") + return None + + def _handle_mode01(self, request: OBD2Request, state: "VehicleState") -> OBD2Response | None: + """Обработка Mode 01 - текущие данные.""" + pid_code = request.pid + + # Специальные PIDs для маски поддерживаемых PIDs + if pid_code in (0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0): + mask = self.pid_registry.get_supported_pids_mask(pid_code) + return OBD2Response( + mode=request.mode, + pid=pid_code, + data=mask.to_bytes(4, "big") + ) + + # Получаем значение из состояния автомобиля + value = self._get_pid_value(pid_code, state) + if value is None: + logger.debug(f"PID {pid_code:02X} not supported or no value") + return None + + pid_def = self.pid_registry.get(pid_code) + if pid_def is None: + return None + + # Кодируем значение + try: + encoded = pid_def.encoder(value) + return OBD2Response( + mode=request.mode, + pid=pid_code, + data=encoded + ) + except Exception as e: + logger.error(f"Failed to encode PID {pid_code:02X}: {e}") + return None + + def _handle_mode03(self, request: OBD2Request) -> OBD2Response: + """Mode 03 - чтение DTC. Возвращаем 0 ошибок.""" + return OBD2Response( + mode=request.mode, + pid=0x00, + data=bytes([0x00]) # 0 DTCs + ) + + def _handle_mode04(self, request: OBD2Request) -> OBD2Response: + """Mode 04 - очистка DTC.""" + return OBD2Response( + mode=request.mode, + pid=0x00, + data=bytes() + ) + + def _handle_mode09(self, request: OBD2Request) -> OBD2Response | None: + """Mode 09 - информация об автомобиле.""" + pid = request.pid + + if pid == 0x00: + # Supported PIDs + # Поддерживаем: 02 (VIN), 04 (Calibration ID), 0A (ECU name) + mask = 0x54400000 # PIDs 02, 04, 0A + return OBD2Response(mode=request.mode, pid=pid, data=mask.to_bytes(4, "big")) + + elif pid == 0x02: + # VIN - для простоты возвращаем тестовый + # Реальный VIN требует multi-frame ответа, упрощаем + vin = b"TESTVIN123456789" # 17 символов + return OBD2Response(mode=request.mode, pid=pid, data=bytes([1]) + vin[:7]) + + elif pid == 0x04: + # Calibration ID + return OBD2Response(mode=request.mode, pid=pid, data=b"OBD2EMU1") + + elif pid == 0x0A: + # ECU name + return OBD2Response(mode=request.mode, pid=pid, data=b"ECU-SIM") + + return None + + def _get_pid_value(self, pid_code: int, state: "VehicleState") -> float | None: + """Получить значение PID из состояния автомобиля.""" + mapping = { + 0x04: state.engine_load, + 0x05: state.coolant_temp, + 0x06: state.short_fuel_trim, + 0x07: state.long_fuel_trim, + 0x0B: state.intake_pressure, + 0x0C: state.rpm, + 0x0D: state.speed, + 0x0E: state.timing_advance, + 0x0F: state.intake_temp, + 0x10: state.maf_flow, + 0x11: state.throttle_pos, + 0x1C: 6, # OBD-II as defined by ISO 15765-4 CAN + 0x1F: state.run_time, + 0x21: 0, # Distance with MIL on + 0x23: state.fuel_rail_pressure, + 0x2F: state.fuel_level, + 0x31: state.distance_since_clear, + 0x33: state.barometric_pressure, + 0x42: state.control_module_voltage, + 0x45: state.throttle_pos, # Relative = absolute for simplicity + 0x46: state.ambient_temp, + 0x49: state.accelerator_pos, + 0x4A: state.accelerator_pos, + 0x4C: state.throttle_pos, + 0x5C: state.oil_temp, + 0x5E: state.fuel_rate, + 0x62: state.engine_torque, + } + return mapping.get(pid_code) + + def get_stats(self) -> dict: + """Получить статистику протокола.""" + return self._stats.copy() diff --git a/src/simulator/__init__.py b/src/simulator/__init__.py new file mode 100644 index 0000000..1993565 --- /dev/null +++ b/src/simulator/__init__.py @@ -0,0 +1,11 @@ +"""Симулятор автомобиля с динамическими данными.""" +from .vehicle import VehicleState, VehicleSimulator +from .scenarios import Scenario, ScenarioManager, get_scenario_manager + +__all__ = [ + "VehicleState", + "VehicleSimulator", + "Scenario", + "ScenarioManager", + "get_scenario_manager", +] diff --git a/src/simulator/scenarios.py b/src/simulator/scenarios.py new file mode 100644 index 0000000..88eb916 --- /dev/null +++ b/src/simulator/scenarios.py @@ -0,0 +1,324 @@ +""" +Сценарии движения для симулятора. +Каждый сценарий определяет поведение автомобиля во времени. +""" +import math +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Literal + +from logger import get_logger +from .vehicle import VehicleSimulator + +logger = get_logger(__name__) + + +class Scenario(ABC): + """Базовый класс сценария.""" + + name: str = "base" + description: str = "Base scenario" + + def __init__(self, simulator: VehicleSimulator): + self.simulator = simulator + self._start_time: float | None = None + self._is_running = False + + def start(self) -> None: + """Запустить сценарий.""" + self._start_time = time.time() + self._is_running = True + self.simulator.start_engine() + self.on_start() + logger.info(f"Scenario '{self.name}' started") + + def stop(self) -> None: + """Остановить сценарий.""" + self._is_running = False + self.on_stop() + logger.info(f"Scenario '{self.name}' stopped") + + @property + def elapsed_time(self) -> float: + """Время с начала сценария (секунды).""" + if self._start_time is None: + return 0 + return time.time() - self._start_time + + @abstractmethod + def update(self) -> None: + """Обновить состояние сценария.""" + pass + + def on_start(self) -> None: + """Вызывается при запуске сценария.""" + pass + + def on_stop(self) -> None: + """Вызывается при остановке сценария.""" + pass + + +class IdleScenario(Scenario): + """ + Сценарий: Холостой ход. + Двигатель работает на холостых, машина стоит. + """ + + name = "idle" + description = "Engine idling, vehicle stationary" + + def update(self) -> None: + self.simulator.set_target_speed(0) + self.simulator.set_throttle(0) + + +class WarmupScenario(Scenario): + """ + Сценарий: Прогрев двигателя. + Двигатель на холостых, температура постепенно растёт. + """ + + name = "warmup" + description = "Engine warming up from cold start" + + def on_start(self) -> None: + # Холодный старт + state = self.simulator.state + state.coolant_temp = self.simulator.config.ambient_temp + state.oil_temp = self.simulator.config.ambient_temp + state.intake_temp = self.simulator.config.ambient_temp + + def update(self) -> None: + self.simulator.set_target_speed(0) + + # Небольшие колебания холостого хода при холодном двигателе + temp = self.simulator.state.coolant_temp + if temp < 50: + # Повышенные обороты при холодном двигателе + self.simulator.set_throttle(5 + (50 - temp) * 0.1) + else: + self.simulator.set_throttle(0) + + +class CityScenario(Scenario): + """ + Сценарий: Городской цикл. + Разгон до 50, торможение, снова разгон. Имитация светофоров. + """ + + name = "city" + description = "City driving with stops and accelerations" + + CYCLE_DURATION = 60 # Длина одного цикла (секунды) + + def update(self) -> None: + t = self.elapsed_time % self.CYCLE_DURATION + + if t < 10: + # Стоим на светофоре + self.simulator.set_target_speed(0) + elif t < 25: + # Разгон до 50 км/ч + progress = (t - 10) / 15 + self.simulator.set_target_speed(50 * progress) + elif t < 40: + # Едем 50 км/ч + self.simulator.set_target_speed(50) + elif t < 50: + # Торможение + progress = (t - 40) / 10 + self.simulator.set_target_speed(50 * (1 - progress)) + else: + # Стоим + self.simulator.set_target_speed(0) + + +class HighwayScenario(Scenario): + """ + Сценарий: Трасса. + Набор скорости до 110-120 км/ч, круиз с небольшими колебаниями. + """ + + name = "highway" + description = "Highway cruising at 110-120 km/h" + + def update(self) -> None: + t = self.elapsed_time + + if t < 30: + # Разгон до крейсерской скорости + progress = t / 30 + self.simulator.set_target_speed(120 * progress) + else: + # Круиз с небольшими колебаниями (имитация обгонов, горок) + variation = math.sin(t * 0.1) * 10 + self.simulator.set_target_speed(115 + variation) + + +class AccelerationScenario(Scenario): + """ + Сценарий: Тест ускорения. + Резкий разгон от 0 до максимума. + """ + + name = "acceleration" + description = "Full throttle acceleration test" + + def update(self) -> None: + t = self.elapsed_time + + if t < 2: + # Готовимся + self.simulator.set_target_speed(0) + self.simulator.set_throttle(0) + elif t < 15: + # Полный газ! + self.simulator.set_throttle(100) + self.simulator.set_target_speed(0) # Отключаем автопилот + elif t < 20: + # Отпускаем газ + self.simulator.set_throttle(0) + else: + # Сброс - начинаем заново + self._start_time = time.time() + + +class ManualScenario(Scenario): + """ + Сценарий: Ручное управление. + Пользователь сам устанавливает скорость и газ через API. + """ + + name = "manual" + description = "Manual control via API" + + def update(self) -> None: + # Ничего не делаем - управление внешнее + pass + + +class DynamicCityScenario(Scenario): + """ + Сценарий: Продвинутый городской цикл. + Более реалистичное поведение с разной интенсивностью. + """ + + name = "dynamic_city" + description = "Dynamic city driving with variable patterns" + + def __init__(self, simulator: VehicleSimulator): + super().__init__(simulator) + self._phase = 0 + self._phase_start = 0 + self._phases = [ + ("idle", 5), # Стоим 5 сек + ("accelerate", 8), # Разгон 8 сек + ("cruise_30", 10), # Едем 30 км/ч + ("accelerate", 5), # Ещё разгон + ("cruise_50", 15), # Едем 50 км/ч + ("decelerate", 5), # Торможение + ("cruise_30", 8), # Едем 30 км/ч + ("stop", 8), # Полная остановка + ] + + def update(self) -> None: + # Определяем текущую фазу + t = self.elapsed_time + phase_elapsed = t - self._phase_start + phase_name, phase_duration = self._phases[self._phase] + + if phase_elapsed >= phase_duration: + # Переход к следующей фазе + self._phase = (self._phase + 1) % len(self._phases) + self._phase_start = t + phase_elapsed = 0 + phase_name, phase_duration = self._phases[self._phase] + + # Выполняем фазу + progress = phase_elapsed / phase_duration + + if phase_name == "idle": + self.simulator.set_target_speed(0) + elif phase_name == "accelerate": + current = self.simulator.state.speed + self.simulator.set_target_speed(current + 20) + elif phase_name == "cruise_30": + self.simulator.set_target_speed(30) + elif phase_name == "cruise_50": + self.simulator.set_target_speed(50) + elif phase_name == "decelerate": + current = self.simulator.state.speed + self.simulator.set_target_speed(max(0, current - 15)) + elif phase_name == "stop": + self.simulator.set_target_speed(0) + + +# Реестр сценариев +SCENARIOS: dict[str, type[Scenario]] = { + "idle": IdleScenario, + "warmup": WarmupScenario, + "city": CityScenario, + "highway": HighwayScenario, + "acceleration": AccelerationScenario, + "manual": ManualScenario, + "dynamic_city": DynamicCityScenario, +} + + +class ScenarioManager: + """Управление сценариями.""" + + def __init__(self, simulator: VehicleSimulator): + self.simulator = simulator + self._current_scenario: Scenario | None = None + + def load_scenario(self, name: str) -> Scenario: + """Загрузить сценарий по имени.""" + if name not in SCENARIOS: + raise ValueError(f"Unknown scenario: {name}. Available: {list(SCENARIOS.keys())}") + + scenario_class = SCENARIOS[name] + self._current_scenario = scenario_class(self.simulator) + return self._current_scenario + + def start(self, name: str | None = None) -> None: + """Запустить сценарий.""" + if name: + self.load_scenario(name) + + if self._current_scenario: + self._current_scenario.start() + + def stop(self) -> None: + """Остановить текущий сценарий.""" + if self._current_scenario: + self._current_scenario.stop() + + def update(self) -> None: + """Обновить текущий сценарий.""" + if self._current_scenario and self._current_scenario._is_running: + self._current_scenario.update() + + @property + def current_scenario(self) -> Scenario | None: + return self._current_scenario + + @property + def available_scenarios(self) -> list[str]: + return list(SCENARIOS.keys()) + + +# Синглтон +_scenario_manager: ScenarioManager | None = None + + +def get_scenario_manager(simulator: VehicleSimulator | None = None) -> ScenarioManager: + """Получить менеджер сценариев.""" + global _scenario_manager + if _scenario_manager is None: + if simulator is None: + raise ValueError("Simulator required for first initialization") + _scenario_manager = ScenarioManager(simulator) + return _scenario_manager diff --git a/src/simulator/vehicle.py b/src/simulator/vehicle.py new file mode 100644 index 0000000..3b3613d --- /dev/null +++ b/src/simulator/vehicle.py @@ -0,0 +1,432 @@ +""" +Модель автомобиля с физически правдоподобным поведением. +Симулирует двигатель, трансмиссию, температуры и расход топлива. +""" +import math +import time +from dataclasses import dataclass, field +from typing import Callable + +from config import VehicleConfig +from logger import get_logger + +logger = get_logger(__name__) + + +@dataclass +class VehicleState: + """Текущее состояние автомобиля.""" + # Двигатель + rpm: float = 0.0 + engine_load: float = 0.0 + throttle_pos: float = 0.0 + accelerator_pos: float = 0.0 + + # Движение + speed: float = 0.0 # km/h + + # Температуры + coolant_temp: float = 20.0 + intake_temp: float = 20.0 + oil_temp: float = 20.0 + ambient_temp: float = 20.0 + + # Топливо + fuel_level: float = 75.0 # % + fuel_rate: float = 0.0 # L/h + fuel_rail_pressure: float = 35000.0 # kPa + + # Воздух + maf_flow: float = 0.0 # g/s + intake_pressure: float = 101.0 # kPa + + # Другое + timing_advance: float = 10.0 # degrees + short_fuel_trim: float = 0.0 # % + long_fuel_trim: float = 0.0 # % + barometric_pressure: float = 101.0 # kPa + control_module_voltage: float = 14.2 # V + engine_torque: float = 0.0 # % + + # Статистика + run_time: float = 0.0 # seconds + distance_since_clear: float = 0.0 # km + total_distance: float = 0.0 # km + + # Внутреннее состояние + engine_running: bool = False + gear: int = 0 # 0 = N/P, 1-6 = gears + + def __post_init__(self): + self._start_time = time.time() + + +class EngineModel: + """Модель двигателя.""" + + # Передаточные числа (примерные для 6-ступенчатой АКПП) + GEAR_RATIOS = { + 0: 0, # Neutral + 1: 3.5, + 2: 2.1, + 3: 1.4, + 4: 1.0, + 5: 0.75, + 6: 0.6, + } + FINAL_DRIVE = 3.5 + WHEEL_RADIUS = 0.32 # meters + + def __init__(self, config: VehicleConfig): + self.config = config + self.idle_rpm = config.idle_rpm + self.max_rpm = config.max_rpm + self.redline = config.redline_rpm + self.displacement = config.engine_displacement + + def calculate_rpm_from_speed(self, speed_kmh: float, gear: int) -> float: + """Рассчитать RPM из скорости и передачи.""" + if gear == 0 or speed_kmh < 1: + return self.idle_rpm + + speed_ms = speed_kmh / 3.6 + wheel_rpm = (speed_ms / self.WHEEL_RADIUS) * 60 / (2 * math.pi) + engine_rpm = wheel_rpm * self.GEAR_RATIOS[gear] * self.FINAL_DRIVE + + return max(self.idle_rpm, min(engine_rpm, self.max_rpm)) + + def calculate_optimal_gear(self, speed_kmh: float, throttle: float) -> int: + """Определить оптимальную передачу.""" + if speed_kmh < 5: + return 1 if throttle > 0 else 0 + + # Точки переключения (примерные) + shift_points = [ + (0, 1), + (15, 2), + (30, 3), + (50, 4), + (70, 5), + (90, 6), + ] + + gear = 1 + for speed, g in shift_points: + if speed_kmh >= speed: + gear = g + + # При высоком газе держим передачу дольше + if throttle > 70 and gear > 1: + # Проверяем, не уйдут ли обороты в красную зону + rpm = self.calculate_rpm_from_speed(speed_kmh, gear - 1) + if rpm < self.redline: + gear -= 1 + + return gear + + def calculate_engine_load(self, rpm: float, throttle: float, speed: float) -> float: + """Рассчитать нагрузку на двигатель.""" + # Базовая нагрузка от оборотов + rpm_factor = rpm / self.max_rpm + + # Нагрузка от газа + throttle_factor = throttle / 100 + + # Нагрузка от скорости (сопротивление воздуха) + aero_factor = (speed / 200) ** 2 + + load = (throttle_factor * 0.6 + rpm_factor * 0.2 + aero_factor * 0.2) * 100 + return min(100, max(0, load)) + + def calculate_maf(self, rpm: float, load: float) -> float: + """Рассчитать расход воздуха (MAF).""" + # Упрощённая модель: MAF пропорционален RPM * нагрузке * объём двигателя + volumetric_efficiency = 0.85 + air_density = 1.2 # kg/m³ + + # Объём воздуха за минуту (л/мин) + air_volume = (rpm * self.displacement * volumetric_efficiency * (load / 100)) / 2 + + # Преобразуем в g/s + maf = (air_volume / 60) * air_density + return max(0, maf) + + def calculate_fuel_rate(self, rpm: float, load: float) -> float: + """Рассчитать расход топлива (L/h).""" + if rpm < 100: + return 0 + + # Базовый расход на холостых + idle_consumption = 0.8 # L/h + + # Расход пропорционален MAF (стехиометрия ~14.7:1) + maf = self.calculate_maf(rpm, load) + fuel_mass_rate = maf / 14.7 # g/s + fuel_volume_rate = fuel_mass_rate / 750 # L/s (плотность бензина ~750 g/L) + fuel_rate_lh = fuel_volume_rate * 3600 # L/h + + return max(idle_consumption, fuel_rate_lh) + + +class TemperatureModel: + """Модель температур двигателя.""" + + def __init__(self, config: VehicleConfig): + self.ambient = config.ambient_temp + self.target_coolant = config.target_coolant_temp + + def update_coolant_temp( + self, + current: float, + rpm: float, + speed: float, + dt: float + ) -> float: + """Обновить температуру охлаждающей жидкости.""" + # Нагрев от работы двигателя + heat_rate = 0.5 if rpm > 0 else 0 # °C/s при работающем двигателе + + # Охлаждение от радиатора (зависит от скорости) + cooling_rate = 0.1 + (speed / 100) * 0.3 # °C/s + + # Целевая температура + target = self.target_coolant if rpm > 0 else self.ambient + + # Плавное изменение к цели + diff = target - current + change = diff * 0.02 * dt # Медленное изменение + + # Добавляем нагрев/охлаждение + if rpm > 0: + change += heat_rate * dt * (1 - current / self.target_coolant) + + return current + change + + def update_oil_temp( + self, + current: float, + coolant_temp: float, + rpm: float, + dt: float + ) -> float: + """Обновить температуру масла (следует за coolant с задержкой).""" + # Масло нагревается медленнее и до более высокой температуры + target = coolant_temp + 10 if rpm > 0 else self.ambient + diff = target - current + change = diff * 0.01 * dt # Очень медленное изменение + return current + change + + def update_intake_temp( + self, + current: float, + coolant_temp: float, + speed: float, + dt: float + ) -> float: + """Обновить температуру впускного воздуха.""" + # Зависит от скорости (охлаждение потоком) и температуры двигателя + target = self.ambient + (coolant_temp - self.ambient) * 0.1 + if speed > 30: + target = self.ambient + 5 # Охлаждение потоком + diff = target - current + change = diff * 0.1 * dt + return current + change + + +class VehicleSimulator: + """ + Главный симулятор автомобиля. + Объединяет все модели и обновляет состояние. + """ + + def __init__(self, config: VehicleConfig): + self.config = config + self.engine = EngineModel(config) + self.temperature = TemperatureModel(config) + + # Начальное состояние + self.state = VehicleState( + ambient_temp=config.ambient_temp, + intake_temp=config.ambient_temp, + coolant_temp=config.ambient_temp, + oil_temp=config.ambient_temp, + fuel_level=config.initial_fuel_level, + ) + + self._last_update = time.time() + self._target_speed = 0.0 + self._target_throttle = 0.0 + + def start_engine(self) -> None: + """Запустить двигатель.""" + if not self.state.engine_running: + self.state.engine_running = True + self.state.rpm = self.config.idle_rpm + self.state.gear = 0 + logger.info("Engine started") + + def stop_engine(self) -> None: + """Заглушить двигатель.""" + if self.state.engine_running: + self.state.engine_running = False + self.state.rpm = 0 + self.state.gear = 0 + self.state.speed = 0 + self.state.throttle_pos = 0 + self.state.engine_load = 0 + logger.info("Engine stopped") + + def set_target_speed(self, speed: float) -> None: + """Установить целевую скорость (для сценариев).""" + self._target_speed = max(0, min(speed, self.config.max_speed)) + + def set_throttle(self, throttle: float) -> None: + """Установить положение газа (0-100%).""" + self._target_throttle = max(0, min(throttle, 100)) + + def update(self, dt: float | None = None) -> VehicleState: + """ + Обновить состояние автомобиля. + + Args: + dt: Время с последнего обновления (секунды). + Если None, вычисляется автоматически. + """ + now = time.time() + if dt is None: + dt = now - self._last_update + self._last_update = now + + if not self.state.engine_running: + self.state.run_time = 0 + return self.state + + # Обновляем время работы + self.state.run_time += dt + + # Плавное изменение газа + throttle_diff = self._target_throttle - self.state.throttle_pos + self.state.throttle_pos += throttle_diff * min(1.0, dt * 5) # Сглаживание + self.state.accelerator_pos = self.state.throttle_pos + + # Обновляем скорость на основе целевой скорости или газа + self._update_speed(dt) + + # Определяем передачу + self.state.gear = self.engine.calculate_optimal_gear( + self.state.speed, + self.state.throttle_pos + ) + + # Рассчитываем обороты + if self.state.speed < 1: + self.state.rpm = self.config.idle_rpm + self.state.throttle_pos * 10 + else: + self.state.rpm = self.engine.calculate_rpm_from_speed( + self.state.speed, + self.state.gear + ) + + # Нагрузка двигателя + self.state.engine_load = self.engine.calculate_engine_load( + self.state.rpm, + self.state.throttle_pos, + self.state.speed + ) + + # MAF + self.state.maf_flow = self.engine.calculate_maf( + self.state.rpm, + self.state.engine_load + ) + + # Расход топлива + self.state.fuel_rate = self.engine.calculate_fuel_rate( + self.state.rpm, + self.state.engine_load + ) + + # Уменьшаем уровень топлива + fuel_consumed = self.state.fuel_rate * dt / 3600 # L + fuel_percent = (fuel_consumed / self.config.fuel_tank_capacity) * 100 + self.state.fuel_level = max(0, self.state.fuel_level - fuel_percent) + + # Давление во впуске (упрощённо) + self.state.intake_pressure = 101 - (self.state.throttle_pos * 0.6) + + # Опережение зажигания + self.state.timing_advance = 10 + (self.state.rpm / 1000) * 3 + + # Крутящий момент (упрощённо) + self.state.engine_torque = self.state.engine_load * 0.8 + + # Температуры + self.state.coolant_temp = self.temperature.update_coolant_temp( + self.state.coolant_temp, + self.state.rpm, + self.state.speed, + dt + ) + self.state.oil_temp = self.temperature.update_oil_temp( + self.state.oil_temp, + self.state.coolant_temp, + self.state.rpm, + dt + ) + self.state.intake_temp = self.temperature.update_intake_temp( + self.state.intake_temp, + self.state.coolant_temp, + self.state.speed, + dt + ) + + # Пробег + distance_km = (self.state.speed * dt) / 3600 + self.state.total_distance += distance_km + self.state.distance_since_clear += distance_km + + return self.state + + def _update_speed(self, dt: float) -> None: + """Обновить скорость автомобиля.""" + current = self.state.speed + target = self._target_speed + + if target > 0: + # Режим целевой скорости (автопилот сценария) + diff = target - current + # Ускорение/замедление до 3 м/с² (примерно 10 км/ч за секунду) + max_change = 10 * dt + + if abs(diff) < max_change: + self.state.speed = target + else: + self.state.speed += max_change if diff > 0 else -max_change + + # Подстраиваем газ под скорость + if diff > 5: + self._target_throttle = min(100, 30 + diff) + elif diff < -5: + self._target_throttle = 0 + else: + self._target_throttle = 15 + current / 10 + + else: + # Режим ручного газа + throttle = self.state.throttle_pos + + if throttle > 5: + # Ускорение + acceleration = (throttle / 100) * 15 # Max ~15 km/h/s + self.state.speed = min( + self.config.max_speed, + current + acceleration * dt + ) + else: + # Торможение (двигателем + сопротивление) + deceleration = 5 + current * 0.02 # Зависит от скорости + self.state.speed = max(0, current - deceleration * dt) + + def get_state(self) -> VehicleState: + """Получить текущее состояние.""" + return self.state