This commit is contained in:
2026-01-29 17:17:14 +03:00
parent f54778528e
commit 69acad18ca
16 changed files with 2418 additions and 0 deletions

10
requirements.txt Normal file
View File

@@ -0,0 +1,10 @@
# OBD2 Emulator Dependencies
# CAN bus interface
python-can>=4.3.0
# Configuration validation
pydantic>=2.5.0
# Type hints (optional, for development)
typing-extensions>=4.8.0

32
scripts/monitor_can.sh Normal file
View File

@@ -0,0 +1,32 @@
#!/bin/bash
# Мониторинг CAN трафика с фильтрацией OBD2
INTERFACE=${1:-vcan0}
echo "=== CAN Monitor (OBD2) ==="
echo "Interface: $INTERFACE"
echo "Filtering: 7DF (requests) and 7E8 (responses)"
echo "Press Ctrl+C to stop"
echo ""
# Проверка candump
if ! command -v candump &> /dev/null; then
echo "Error: candump not found. Install can-utils:"
echo " sudo apt install can-utils"
exit 1
fi
# Мониторинг с фильтром
candump "$INTERFACE" | grep -E "(7DF|7E8)" | while read line; do
timestamp=$(date +"%H:%M:%S.%3N")
# Парсинг и форматирование
if echo "$line" | grep -q "7DF"; then
# Запрос
pid=$(echo "$line" | awk '{print $3}' | cut -c5-6)
echo "[$timestamp] REQUEST -> PID: 0x$pid"
elif echo "$line" | grep -q "7E8"; then
# Ответ
echo "[$timestamp] RESPONSE <- $line"
fi
done

62
scripts/setup_can.sh Normal file
View File

@@ -0,0 +1,62 @@
#!/bin/bash
# Скрипт настройки CAN интерфейсов на Raspberry Pi 5 с 2CH CAN HAT
set -e
BITRATE=${1:-500000}
echo "=== CAN Interface Setup ==="
echo "Bitrate: $BITRATE bps"
echo ""
# Проверка прав
if [ "$EUID" -ne 0 ]; then
echo "Please run as root (sudo)"
exit 1
fi
# Загрузка модулей
echo "[1/4] Loading kernel modules..."
modprobe can
modprobe can_raw
modprobe mcp251x 2>/dev/null || true # Для SPI CAN контроллеров
# Настройка can0
echo "[2/4] Configuring can0..."
if ip link show can0 &>/dev/null; then
ip link set can0 down 2>/dev/null || true
ip link set can0 type can bitrate $BITRATE
ip link set can0 up
echo " can0: UP at $BITRATE bps"
else
echo " can0: Not found (skipping)"
fi
# Настройка can1
echo "[3/4] Configuring can1..."
if ip link show can1 &>/dev/null; then
ip link set can1 down 2>/dev/null || true
ip link set can1 type can bitrate $BITRATE
ip link set can1 up
echo " can1: UP at $BITRATE bps"
else
echo " can1: Not found (skipping)"
fi
# Создание vcan0 для тестирования
echo "[4/4] Creating virtual CAN (vcan0)..."
modprobe vcan
if ! ip link show vcan0 &>/dev/null; then
ip link add dev vcan0 type vcan
fi
ip link set vcan0 up
echo " vcan0: UP (virtual)"
echo ""
echo "=== Status ==="
ip -details link show type can 2>/dev/null || echo "No physical CAN interfaces"
ip -details link show type vcan 2>/dev/null || echo "No virtual CAN interfaces"
echo ""
echo "=== Done ==="
echo "You can now run: python src/main.py -i can1 -s city"

157
scripts/test_obd2.py Normal file
View File

@@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
Тестовый скрипт для проверки OBD2 эмулятора.
Отправляет OBD2 запросы и выводит ответы.
"""
import argparse
import sys
import time
try:
import can
except ImportError:
print("Error: python-can not installed. Run: pip install python-can")
sys.exit(1)
# OBD2 PIDs для тестирования
TEST_PIDS = [
(0x00, "Supported PIDs [01-20]"),
(0x05, "Coolant Temperature"),
(0x0C, "Engine RPM"),
(0x0D, "Vehicle Speed"),
(0x0F, "Intake Air Temperature"),
(0x11, "Throttle Position"),
(0x2F, "Fuel Tank Level"),
(0x46, "Ambient Temperature"),
]
def decode_pid(pid: int, data: bytes) -> str:
"""Декодировать значение PID."""
if len(data) < 1:
return "No data"
if pid == 0x00:
# Supported PIDs bitmap
if len(data) >= 4:
mask = int.from_bytes(data[:4], "big")
supported = []
for i in range(32):
if mask & (1 << (31 - i)):
supported.append(f"{i+1:02X}")
return f"PIDs: {', '.join(supported[:10])}..."
return f"Raw: {data.hex()}"
elif pid == 0x05 or pid == 0x0F or pid == 0x46:
# Temperature: A - 40
return f"{data[0] - 40} °C"
elif pid == 0x0C:
# RPM: (A*256 + B) / 4
if len(data) >= 2:
rpm = (data[0] * 256 + data[1]) / 4
return f"{rpm:.0f} rpm"
elif pid == 0x0D:
# Speed: A
return f"{data[0]} km/h"
elif pid == 0x11 or pid == 0x2F:
# Percentage: A * 100 / 255
return f"{data[0] * 100 / 255:.1f} %"
return f"Raw: {data.hex()}"
def send_obd2_request(bus: can.Bus, pid: int, timeout: float = 1.0) -> bytes | None:
"""Отправить OBD2 запрос и получить ответ."""
# Формируем запрос Mode 01
request = can.Message(
arbitration_id=0x7DF,
data=[0x02, 0x01, pid, 0x00, 0x00, 0x00, 0x00, 0x00],
is_extended_id=False
)
# Отправляем
bus.send(request)
# Ждём ответ
start = time.time()
while time.time() - start < timeout:
msg = bus.recv(timeout=0.1)
if msg is None:
continue
# Проверяем, что это ответ от ECU
if msg.arbitration_id == 0x7E8:
data = bytes(msg.data)
# Проверяем формат ответа
if len(data) >= 3 and data[1] == 0x41 and data[2] == pid:
# Возвращаем данные (без заголовка)
return data[3:3 + data[0] - 2]
return None
def main():
parser = argparse.ArgumentParser(description="Test OBD2 Emulator")
parser.add_argument(
"-i", "--interface",
default="vcan0",
help="CAN interface [default: vcan0]"
)
parser.add_argument(
"-c", "--continuous",
action="store_true",
help="Continuous mode (loop forever)"
)
parser.add_argument(
"--interval",
type=float,
default=1.0,
help="Interval between requests in continuous mode [default: 1.0]"
)
args = parser.parse_args()
print(f"=== OBD2 Emulator Test ===")
print(f"Interface: {args.interface}")
print("")
try:
bus = can.Bus(interface="socketcan", channel=args.interface)
except Exception as e:
print(f"Error: Failed to connect to {args.interface}: {e}")
print("Make sure the interface is up: sudo ip link set vcan0 up")
return 1
try:
while True:
print(f"Timestamp: {time.strftime('%H:%M:%S')}")
print("-" * 50)
for pid, name in TEST_PIDS:
data = send_obd2_request(bus, pid)
if data:
value = decode_pid(pid, data)
print(f" PID {pid:02X} ({name}): {value}")
else:
print(f" PID {pid:02X} ({name}): No response")
print("")
if not args.continuous:
break
time.sleep(args.interval)
except KeyboardInterrupt:
print("\nStopped")
finally:
bus.shutdown()
return 0
if __name__ == "__main__":
sys.exit(main())

2
src/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
"""OBD2 Emulator - ECU simulator for testing onboard computers."""
__version__ = "1.0.0"

171
src/can_interface.py Normal file
View File

@@ -0,0 +1,171 @@
"""
Управление CAN интерфейсом с автоматической настройкой.
Поддержка физических (can0, can1) и виртуальных (vcan0) интерфейсов.
"""
import subprocess
import platform
from dataclasses import dataclass
from typing import Callable
import can
from config import CANConfig
from logger import get_logger
logger = get_logger(__name__)
class CANInterfaceError(Exception):
"""Ошибка настройки CAN интерфейса."""
pass
@dataclass
class CANFrame:
"""CAN сообщение."""
arbitration_id: int
data: bytes
is_extended_id: bool = False
@classmethod
def from_can_message(cls, msg: can.Message) -> "CANFrame":
return cls(
arbitration_id=msg.arbitration_id,
data=bytes(msg.data),
is_extended_id=msg.is_extended_id
)
def to_can_message(self) -> can.Message:
return can.Message(
arbitration_id=self.arbitration_id,
data=self.data,
is_extended_id=self.is_extended_id
)
class CANInterface:
"""
Управление CAN интерфейсом.
Автоматически поднимает интерфейс при инициализации.
"""
def __init__(self, config: CANConfig):
self.config = config
self._bus: can.Bus | None = None
self._is_setup = False
def setup(self) -> None:
"""Настроить и поднять CAN интерфейс."""
if self._is_setup:
return
if platform.system() != "Linux":
logger.warning("CAN interface setup is only supported on Linux. Skipping.")
self._is_setup = True
return
try:
if self.config.is_virtual:
self._setup_virtual_can()
else:
self._setup_physical_can()
self._is_setup = True
logger.info(f"CAN interface {self.config.interface} is ready")
except Exception as e:
raise CANInterfaceError(f"Failed to setup {self.config.interface}: {e}")
def _run_command(self, cmd: list[str], check: bool = True) -> subprocess.CompletedProcess:
"""Выполнить shell команду."""
logger.debug(f"Running: {' '.join(cmd)}")
return subprocess.run(cmd, capture_output=True, text=True, check=check)
def _setup_virtual_can(self) -> None:
"""Настройка виртуального CAN интерфейса."""
interface = self.config.interface
# Загружаем модуль vcan
self._run_command(["sudo", "modprobe", "vcan"], check=False)
# Проверяем, существует ли интерфейс
result = self._run_command(["ip", "link", "show", interface], check=False)
if result.returncode != 0:
# Создаём интерфейс
logger.info(f"Creating virtual CAN interface: {interface}")
self._run_command(["sudo", "ip", "link", "add", "dev", interface, "type", "vcan"])
# Поднимаем интерфейс
logger.info(f"Bringing up interface: {interface}")
self._run_command(["sudo", "ip", "link", "set", "up", interface])
def _setup_physical_can(self) -> None:
"""Настройка физического CAN интерфейса."""
interface = self.config.interface
bitrate = self.config.bitrate
# Сначала опускаем интерфейс (если был поднят)
self._run_command(["sudo", "ip", "link", "set", "down", interface], check=False)
# Настраиваем bitrate и поднимаем
logger.info(f"Setting up physical CAN interface: {interface} at {bitrate} bps")
self._run_command([
"sudo", "ip", "link", "set", interface,
"type", "can", "bitrate", str(bitrate)
])
self._run_command(["sudo", "ip", "link", "set", "up", interface])
def connect(self) -> None:
"""Подключиться к CAN шине."""
if not self._is_setup:
self.setup()
if self._bus is not None:
return
logger.info(f"Connecting to CAN bus: {self.config.interface}")
self._bus = can.Bus(
interface="socketcan",
channel=self.config.interface,
receive_own_messages=False
)
def disconnect(self) -> None:
"""Отключиться от CAN шины."""
if self._bus:
self._bus.shutdown()
self._bus = None
logger.info(f"Disconnected from {self.config.interface}")
def send(self, frame: CANFrame) -> None:
"""Отправить CAN сообщение."""
if not self._bus:
raise CANInterfaceError("Not connected to CAN bus")
msg = frame.to_can_message()
self._bus.send(msg)
if logger.isEnabledFor(10): # DEBUG level
logger.debug(f"TX: {frame.arbitration_id:03X}#{frame.data.hex().upper()}")
def receive(self, timeout: float = 0.1) -> CANFrame | None:
"""Получить CAN сообщение."""
if not self._bus:
raise CANInterfaceError("Not connected to CAN bus")
msg = self._bus.recv(timeout=timeout)
if msg is None:
return None
frame = CANFrame.from_can_message(msg)
if logger.isEnabledFor(10): # DEBUG level
logger.debug(f"RX: {frame.arbitration_id:03X}#{frame.data.hex().upper()}")
return frame
def __enter__(self) -> "CANInterface":
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.disconnect()

25
src/config.json Normal file
View File

@@ -0,0 +1,25 @@
{
"can": {
"interface": "vcan0",
"bitrate": 500000
},
"vehicle": {
"idle_rpm": 850,
"max_rpm": 6500,
"redline_rpm": 6000,
"ambient_temp": 20.0,
"target_coolant_temp": 90.0,
"fuel_tank_capacity": 58.0,
"initial_fuel_level": 75.0,
"max_speed": 220,
"engine_displacement": 2.0
},
"simulator": {
"update_rate_hz": 10,
"scenario": "idle"
},
"logging": {
"level": "INFO",
"log_can_frames": false
}
}

104
src/config.py Normal file
View File

@@ -0,0 +1,104 @@
"""
Конфигурация OBD2 эмулятора.
Поддержка физических CAN интерфейсов (can0, can1) и виртуальных (vcan0).
"""
import json
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, Field, field_validator
class CANConfig(BaseModel):
"""Конфигурация CAN интерфейса."""
interface: str = Field(default="vcan0", description="CAN interface name")
bitrate: int = Field(default=500000, description="CAN bitrate (ignored for vcan)")
@field_validator("interface")
@classmethod
def validate_interface(cls, v: str) -> str:
valid_prefixes = ("can", "vcan")
if not any(v.startswith(prefix) for prefix in valid_prefixes):
raise ValueError(f"Interface must start with: {valid_prefixes}")
return v
@property
def is_virtual(self) -> bool:
"""Check if interface is virtual."""
return self.interface.startswith("vcan")
class VehicleConfig(BaseModel):
"""Конфигурация симулируемого автомобиля."""
# Базовые параметры двигателя
idle_rpm: int = Field(default=850, ge=600, le=1200)
max_rpm: int = Field(default=6500, ge=4000, le=9000)
redline_rpm: int = Field(default=6000, ge=3500, le=8500)
# Температуры
ambient_temp: float = Field(default=20.0, ge=-40, le=50)
target_coolant_temp: float = Field(default=90.0, ge=80, le=105)
# Топливо
fuel_tank_capacity: float = Field(default=58.0, ge=30, le=100)
initial_fuel_level: float = Field(default=75.0, ge=0, le=100)
# Характеристики авто
max_speed: int = Field(default=220, ge=100, le=350)
engine_displacement: float = Field(default=2.0, description="Engine displacement in liters")
class SimulatorConfig(BaseModel):
"""Конфигурация симулятора."""
update_rate_hz: int = Field(default=10, ge=1, le=100, description="Vehicle state update rate")
scenario: Literal["idle", "city", "highway", "warmup", "manual"] = Field(default="idle")
class LoggingConfig(BaseModel):
"""Конфигурация логирования."""
level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = Field(default="INFO")
format: str = Field(default="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s")
log_can_frames: bool = Field(default=False, description="Log all CAN frames (verbose)")
class Config(BaseModel):
"""Главная конфигурация приложения."""
can: CANConfig = Field(default_factory=CANConfig)
vehicle: VehicleConfig = Field(default_factory=VehicleConfig)
simulator: SimulatorConfig = Field(default_factory=SimulatorConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)
def load_config(config_path: Path | None = None) -> Config:
"""Загрузка конфигурации из файла или defaults."""
if config_path and config_path.exists():
with open(config_path, "r", encoding="utf-8") as f:
data = json.load(f)
return Config.model_validate(data)
return Config()
# Глобальный экземпляр конфигурации
_config: Config | None = None
def get_config() -> Config:
"""Получить глобальную конфигурацию."""
global _config
if _config is None:
config_path = Path(__file__).parent / "config.json"
_config = load_config(config_path if config_path.exists() else None)
return _config
def set_config(config: Config) -> None:
"""Установить глобальную конфигурацию."""
global _config
_config = config
# Прокси для удобного доступа
class _ConfigProxy:
def __getattr__(self, name):
return getattr(get_config(), name)
config = _ConfigProxy()

30
src/logger.py Normal file
View File

@@ -0,0 +1,30 @@
"""
Настройка логирования для OBD2 эмулятора.
"""
import logging
import sys
from config import config
def setup_logging() -> None:
"""Настройка корневого логгера."""
root_logger = logging.getLogger()
root_logger.setLevel(config.logging.level)
# Очищаем существующие хендлеры
root_logger.handlers.clear()
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(config.logging.level)
console_handler.setFormatter(logging.Formatter(config.logging.format))
root_logger.addHandler(console_handler)
def get_logger(name: str) -> logging.Logger:
"""Получить логгер по имени."""
return logging.getLogger(name)
# Инициализация при импорте
setup_logging()

325
src/main.py Normal file
View File

@@ -0,0 +1,325 @@
"""
OBD2 Emulator - Главный модуль.
Эмулятор ECU для тестирования бортовых компьютеров.
"""
import argparse
import signal
import sys
import time
import threading
from pathlib import Path
from config import Config, CANConfig, VehicleConfig, SimulatorConfig, set_config, get_config
from logger import get_logger, setup_logging
from can_interface import CANInterface, CANFrame, CANInterfaceError
from obd2 import OBD2Protocol
from simulator import VehicleSimulator, ScenarioManager
logger = get_logger(__name__)
class OBD2Emulator:
"""
Главный класс эмулятора OBD2.
Объединяет CAN интерфейс, OBD2 протокол и симулятор автомобиля.
"""
def __init__(self, config: Config):
self.config = config
self.can_interface = CANInterface(config.can)
self.obd2_protocol = OBD2Protocol()
self.vehicle_simulator = VehicleSimulator(config.vehicle)
self.scenario_manager = ScenarioManager(self.vehicle_simulator)
self._running = False
self._update_thread: threading.Thread | None = None
self._can_thread: threading.Thread | None = None
def start(self, scenario: str = "idle") -> None:
"""Запустить эмулятор."""
logger.info("Starting OBD2 Emulator...")
# Настраиваем CAN интерфейс
try:
self.can_interface.setup()
self.can_interface.connect()
except CANInterfaceError as e:
logger.error(f"Failed to setup CAN interface: {e}")
raise
# Запускаем сценарий
self.scenario_manager.start(scenario)
self._running = True
# Поток обновления симулятора
self._update_thread = threading.Thread(
target=self._update_loop,
name="SimulatorUpdate",
daemon=True
)
self._update_thread.start()
# Поток обработки CAN
self._can_thread = threading.Thread(
target=self._can_loop,
name="CANHandler",
daemon=True
)
self._can_thread.start()
logger.info(f"OBD2 Emulator started on {self.config.can.interface}")
logger.info(f"Scenario: {scenario}")
def stop(self) -> None:
"""Остановить эмулятор."""
logger.info("Stopping OBD2 Emulator...")
self._running = False
self.scenario_manager.stop()
self.vehicle_simulator.stop_engine()
# Ждём завершения потоков
if self._update_thread and self._update_thread.is_alive():
self._update_thread.join(timeout=2.0)
if self._can_thread and self._can_thread.is_alive():
self._can_thread.join(timeout=2.0)
self.can_interface.disconnect()
logger.info("OBD2 Emulator stopped")
def _update_loop(self) -> None:
"""Цикл обновления симулятора."""
update_interval = 1.0 / self.config.simulator.update_rate_hz
while self._running:
try:
# Обновляем сценарий
self.scenario_manager.update()
# Обновляем симулятор
self.vehicle_simulator.update()
time.sleep(update_interval)
except Exception as e:
logger.error(f"Error in update loop: {e}", exc_info=True)
time.sleep(0.1)
def _can_loop(self) -> None:
"""Цикл обработки CAN сообщений."""
while self._running:
try:
# Получаем CAN фрейм
frame = self.can_interface.receive(timeout=0.1)
if frame is None:
continue
# Обрабатываем как OBD2 запрос
response_frame = self.obd2_protocol.process_frame(
frame,
self.vehicle_simulator.get_state()
)
# Отправляем ответ
if response_frame:
self.can_interface.send(response_frame)
except CANInterfaceError as e:
logger.error(f"CAN error: {e}")
time.sleep(0.1)
except Exception as e:
logger.error(f"Error in CAN loop: {e}", exc_info=True)
time.sleep(0.1)
def switch_scenario(self, scenario: str) -> None:
"""Переключить сценарий."""
self.scenario_manager.stop()
self.scenario_manager.start(scenario)
logger.info(f"Switched to scenario: {scenario}")
def get_status(self) -> dict:
"""Получить статус эмулятора."""
state = self.vehicle_simulator.get_state()
return {
"running": self._running,
"interface": self.config.can.interface,
"scenario": self.scenario_manager.current_scenario.name if self.scenario_manager.current_scenario else None,
"vehicle": {
"engine_running": state.engine_running,
"rpm": round(state.rpm),
"speed": round(state.speed, 1),
"coolant_temp": round(state.coolant_temp, 1),
"throttle": round(state.throttle_pos, 1),
"fuel_level": round(state.fuel_level, 1),
},
"obd2_stats": self.obd2_protocol.get_stats(),
}
def print_status(self) -> None:
"""Вывести статус в консоль."""
state = self.vehicle_simulator.get_state()
stats = self.obd2_protocol.get_stats()
print("\033[2J\033[H") # Clear screen
print("=" * 60)
print(" OBD2 EMULATOR STATUS")
print("=" * 60)
print(f" Interface: {self.config.can.interface}")
print(f" Scenario: {self.scenario_manager.current_scenario.name if self.scenario_manager.current_scenario else 'None'}")
print("-" * 60)
print(" ENGINE:")
print(f" Running: {'YES' if state.engine_running else 'NO'}")
print(f" RPM: {state.rpm:>6.0f} rpm")
print(f" Load: {state.engine_load:>6.1f} %")
print(f" Throttle: {state.throttle_pos:>6.1f} %")
print("-" * 60)
print(" VEHICLE:")
print(f" Speed: {state.speed:>6.1f} km/h")
print(f" Gear: {state.gear:>6}")
print(f" Distance: {state.total_distance:>6.1f} km")
print("-" * 60)
print(" TEMPERATURES:")
print(f" Coolant: {state.coolant_temp:>6.1f} °C")
print(f" Oil: {state.oil_temp:>6.1f} °C")
print(f" Intake Air: {state.intake_temp:>6.1f} °C")
print(f" Ambient: {state.ambient_temp:>6.1f} °C")
print("-" * 60)
print(" FUEL:")
print(f" Level: {state.fuel_level:>6.1f} %")
print(f" Rate: {state.fuel_rate:>6.2f} L/h")
print("-" * 60)
print(" OBD2 STATS:")
print(f" Requests: {stats['requests_received']:>6}")
print(f" Responses: {stats['responses_sent']:>6}")
print(f" Unsupported: {stats['unsupported_requests']:>6}")
print("=" * 60)
print(" Press Ctrl+C to stop")
print("=" * 60)
def parse_args() -> argparse.Namespace:
"""Парсинг аргументов командной строки."""
parser = argparse.ArgumentParser(
description="OBD2 Emulator - ECU simulator for testing onboard computers",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Available scenarios:
idle - Engine idling, vehicle stationary
warmup - Cold start engine warmup
city - City driving with stops
highway - Highway cruising at 110-120 km/h
acceleration - Full throttle acceleration test
dynamic_city - Advanced city driving patterns
manual - Manual control (no automation)
Examples:
python main.py --interface can1 --scenario city
python main.py --interface vcan0 --scenario highway
python main.py -i vcan0 -s warmup --status-interval 5
"""
)
parser.add_argument(
"-i", "--interface",
default="vcan0",
help="CAN interface (can0, can1, vcan0, etc.) [default: vcan0]"
)
parser.add_argument(
"-b", "--bitrate",
type=int,
default=500000,
help="CAN bitrate in bps (ignored for vcan) [default: 500000]"
)
parser.add_argument(
"-s", "--scenario",
default="idle",
choices=["idle", "warmup", "city", "highway", "acceleration", "dynamic_city", "manual"],
help="Initial scenario [default: idle]"
)
parser.add_argument(
"--status-interval",
type=float,
default=1.0,
help="Status display interval in seconds [default: 1.0]"
)
parser.add_argument(
"--no-status",
action="store_true",
help="Disable status display"
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug logging"
)
return parser.parse_args()
def main() -> int:
"""Главная функция."""
args = parse_args()
# Создаём конфигурацию
config = Config(
can=CANConfig(
interface=args.interface,
bitrate=args.bitrate,
),
vehicle=VehicleConfig(),
simulator=SimulatorConfig(
scenario=args.scenario,
),
)
if args.debug:
config.logging.level = "DEBUG"
set_config(config)
setup_logging()
# Создаём эмулятор
emulator = OBD2Emulator(config)
# Обработчик сигналов
def signal_handler(sig, frame):
logger.info("Received shutdown signal")
emulator.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
emulator.start(scenario=args.scenario)
# Главный цикл с отображением статуса
while True:
if not args.no_status:
emulator.print_status()
time.sleep(args.status_interval)
except CANInterfaceError as e:
logger.error(f"CAN interface error: {e}")
return 1
except KeyboardInterrupt:
pass
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
return 1
finally:
emulator.stop()
return 0
if __name__ == "__main__":
sys.exit(main())

12
src/obd2/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
"""OBD2 протокол и обработка PIDs."""
from .pids import PID, PIDRegistry, get_pid_registry
from .protocol import OBD2Protocol, OBD2Request, OBD2Response
__all__ = [
"PID",
"PIDRegistry",
"get_pid_registry",
"OBD2Protocol",
"OBD2Request",
"OBD2Response",
]

476
src/obd2/pids.py Normal file
View File

@@ -0,0 +1,476 @@
"""
Определения OBD2 PIDs согласно SAE J1979.
Поддержка Mode 01 (текущие данные) и Mode 09 (информация об авто).
"""
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Callable, Any
class OBD2Mode(IntEnum):
"""OBD2 режимы (сервисы)."""
CURRENT_DATA = 0x01 # Текущие данные
FREEZE_FRAME = 0x02 # Стоп-кадр
STORED_DTC = 0x03 # Сохранённые ошибки
CLEAR_DTC = 0x04 # Очистка ошибок
TEST_RESULTS_O2 = 0x05 # Тесты кислородных датчиков
TEST_RESULTS = 0x06 # Результаты тестов
PENDING_DTC = 0x07 # Ожидающие ошибки
CONTROL = 0x08 # Управление
VEHICLE_INFO = 0x09 # Информация об автомобиле
PERMANENT_DTC = 0x0A # Постоянные ошибки
@dataclass
class PID:
"""Определение OBD2 PID."""
pid: int
name: str
description: str
unit: str
min_value: float
max_value: float
bytes_count: int
# Функция кодирования: value -> bytes
encoder: Callable[[float], bytes]
# Функция декодирования: bytes -> value (для отладки)
decoder: Callable[[bytes], float] | None = None
class PIDRegistry:
"""Реестр всех поддерживаемых PIDs."""
def __init__(self):
self._pids: dict[int, PID] = {}
self._register_mode01_pids()
def register(self, pid: PID) -> None:
"""Зарегистрировать PID."""
self._pids[pid.pid] = pid
def get(self, pid_code: int) -> PID | None:
"""Получить PID по коду."""
return self._pids.get(pid_code)
def get_supported_pids_mask(self, start_pid: int) -> int:
"""
Получить битовую маску поддерживаемых PIDs.
Для PIDs 0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0
"""
mask = 0
for i in range(32):
pid_code = start_pid + i + 1
if pid_code in self._pids:
mask |= (1 << (31 - i))
return mask
def _register_mode01_pids(self) -> None:
"""Регистрация Mode 01 PIDs."""
# PID 00: Supported PIDs [01-20]
self.register(PID(
pid=0x00,
name="PIDS_SUPPORTED_01_20",
description="Supported PIDs [01-20]",
unit="",
min_value=0, max_value=0xFFFFFFFF,
bytes_count=4,
encoder=lambda v: int(v).to_bytes(4, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 01: Monitor status since DTCs cleared
self.register(PID(
pid=0x01,
name="MONITOR_STATUS",
description="Monitor status since DTCs cleared",
unit="",
min_value=0, max_value=0xFFFFFFFF,
bytes_count=4,
encoder=lambda v: int(v).to_bytes(4, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 03: Fuel system status
self.register(PID(
pid=0x03,
name="FUEL_SYSTEM_STATUS",
description="Fuel system status",
unit="",
min_value=0, max_value=0xFFFF,
bytes_count=2,
encoder=lambda v: int(v).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 04: Calculated engine load
self.register(PID(
pid=0x04,
name="ENGINE_LOAD",
description="Calculated engine load",
unit="%",
min_value=0, max_value=100,
bytes_count=1,
encoder=lambda v: bytes([int(v * 255 / 100)]),
decoder=lambda b: b[0] * 100 / 255
))
# PID 05: Engine coolant temperature
self.register(PID(
pid=0x05,
name="COOLANT_TEMP",
description="Engine coolant temperature",
unit="°C",
min_value=-40, max_value=215,
bytes_count=1,
encoder=lambda v: bytes([int(v + 40)]),
decoder=lambda b: b[0] - 40
))
# PID 06: Short term fuel trim - Bank 1
self.register(PID(
pid=0x06,
name="SHORT_FUEL_TRIM_1",
description="Short term fuel trim - Bank 1",
unit="%",
min_value=-100, max_value=99.2,
bytes_count=1,
encoder=lambda v: bytes([int((v + 100) * 128 / 100)]),
decoder=lambda b: (b[0] - 128) * 100 / 128
))
# PID 07: Long term fuel trim - Bank 1
self.register(PID(
pid=0x07,
name="LONG_FUEL_TRIM_1",
description="Long term fuel trim - Bank 1",
unit="%",
min_value=-100, max_value=99.2,
bytes_count=1,
encoder=lambda v: bytes([int((v + 100) * 128 / 100)]),
decoder=lambda b: (b[0] - 128) * 100 / 128
))
# PID 0B: Intake manifold absolute pressure
self.register(PID(
pid=0x0B,
name="INTAKE_PRESSURE",
description="Intake manifold absolute pressure",
unit="kPa",
min_value=0, max_value=255,
bytes_count=1,
encoder=lambda v: bytes([int(v)]),
decoder=lambda b: b[0]
))
# PID 0C: Engine RPM
self.register(PID(
pid=0x0C,
name="ENGINE_RPM",
description="Engine RPM",
unit="rpm",
min_value=0, max_value=16383.75,
bytes_count=2,
encoder=lambda v: int(v * 4).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big") / 4
))
# PID 0D: Vehicle speed
self.register(PID(
pid=0x0D,
name="VEHICLE_SPEED",
description="Vehicle speed",
unit="km/h",
min_value=0, max_value=255,
bytes_count=1,
encoder=lambda v: bytes([int(v)]),
decoder=lambda b: b[0]
))
# PID 0E: Timing advance
self.register(PID(
pid=0x0E,
name="TIMING_ADVANCE",
description="Timing advance",
unit="° before TDC",
min_value=-64, max_value=63.5,
bytes_count=1,
encoder=lambda v: bytes([int((v + 64) * 2)]),
decoder=lambda b: b[0] / 2 - 64
))
# PID 0F: Intake air temperature
self.register(PID(
pid=0x0F,
name="INTAKE_TEMP",
description="Intake air temperature",
unit="°C",
min_value=-40, max_value=215,
bytes_count=1,
encoder=lambda v: bytes([int(v + 40)]),
decoder=lambda b: b[0] - 40
))
# PID 10: MAF air flow rate
self.register(PID(
pid=0x10,
name="MAF_FLOW",
description="MAF air flow rate",
unit="g/s",
min_value=0, max_value=655.35,
bytes_count=2,
encoder=lambda v: int(v * 100).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big") / 100
))
# PID 11: Throttle position
self.register(PID(
pid=0x11,
name="THROTTLE_POS",
description="Throttle position",
unit="%",
min_value=0, max_value=100,
bytes_count=1,
encoder=lambda v: bytes([int(v * 255 / 100)]),
decoder=lambda b: b[0] * 100 / 255
))
# PID 1C: OBD standards this vehicle conforms to
self.register(PID(
pid=0x1C,
name="OBD_STANDARD",
description="OBD standards",
unit="",
min_value=0, max_value=255,
bytes_count=1,
encoder=lambda v: bytes([int(v)]),
decoder=lambda b: b[0]
))
# PID 1F: Run time since engine start
self.register(PID(
pid=0x1F,
name="RUN_TIME",
description="Run time since engine start",
unit="s",
min_value=0, max_value=65535,
bytes_count=2,
encoder=lambda v: int(v).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 20: Supported PIDs [21-40]
self.register(PID(
pid=0x20,
name="PIDS_SUPPORTED_21_40",
description="Supported PIDs [21-40]",
unit="",
min_value=0, max_value=0xFFFFFFFF,
bytes_count=4,
encoder=lambda v: int(v).to_bytes(4, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 21: Distance traveled with MIL on
self.register(PID(
pid=0x21,
name="DISTANCE_MIL_ON",
description="Distance traveled with MIL on",
unit="km",
min_value=0, max_value=65535,
bytes_count=2,
encoder=lambda v: int(v).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 23: Fuel Rail Gauge Pressure
self.register(PID(
pid=0x23,
name="FUEL_RAIL_PRESSURE",
description="Fuel Rail Gauge Pressure",
unit="kPa",
min_value=0, max_value=655350,
bytes_count=2,
encoder=lambda v: int(v / 10).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big") * 10
))
# PID 2F: Fuel Tank Level Input
self.register(PID(
pid=0x2F,
name="FUEL_LEVEL",
description="Fuel Tank Level Input",
unit="%",
min_value=0, max_value=100,
bytes_count=1,
encoder=lambda v: bytes([int(v * 255 / 100)]),
decoder=lambda b: b[0] * 100 / 255
))
# PID 31: Distance since codes cleared
self.register(PID(
pid=0x31,
name="DISTANCE_SINCE_CLR",
description="Distance since codes cleared",
unit="km",
min_value=0, max_value=65535,
bytes_count=2,
encoder=lambda v: int(v).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 33: Absolute Barometric Pressure
self.register(PID(
pid=0x33,
name="BAROMETRIC_PRESSURE",
description="Absolute Barometric Pressure",
unit="kPa",
min_value=0, max_value=255,
bytes_count=1,
encoder=lambda v: bytes([int(v)]),
decoder=lambda b: b[0]
))
# PID 40: Supported PIDs [41-60]
self.register(PID(
pid=0x40,
name="PIDS_SUPPORTED_41_60",
description="Supported PIDs [41-60]",
unit="",
min_value=0, max_value=0xFFFFFFFF,
bytes_count=4,
encoder=lambda v: int(v).to_bytes(4, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 42: Control module voltage
self.register(PID(
pid=0x42,
name="CONTROL_MODULE_VOLTAGE",
description="Control module voltage",
unit="V",
min_value=0, max_value=65.535,
bytes_count=2,
encoder=lambda v: int(v * 1000).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big") / 1000
))
# PID 45: Relative throttle position
self.register(PID(
pid=0x45,
name="RELATIVE_THROTTLE_POS",
description="Relative throttle position",
unit="%",
min_value=0, max_value=100,
bytes_count=1,
encoder=lambda v: bytes([int(v * 255 / 100)]),
decoder=lambda b: b[0] * 100 / 255
))
# PID 46: Ambient air temperature
self.register(PID(
pid=0x46,
name="AMBIENT_TEMP",
description="Ambient air temperature",
unit="°C",
min_value=-40, max_value=215,
bytes_count=1,
encoder=lambda v: bytes([int(v + 40)]),
decoder=lambda b: b[0] - 40
))
# PID 49: Accelerator pedal position D
self.register(PID(
pid=0x49,
name="ACCELERATOR_POS_D",
description="Accelerator pedal position D",
unit="%",
min_value=0, max_value=100,
bytes_count=1,
encoder=lambda v: bytes([int(v * 255 / 100)]),
decoder=lambda b: b[0] * 100 / 255
))
# PID 4A: Accelerator pedal position E
self.register(PID(
pid=0x4A,
name="ACCELERATOR_POS_E",
description="Accelerator pedal position E",
unit="%",
min_value=0, max_value=100,
bytes_count=1,
encoder=lambda v: bytes([int(v * 255 / 100)]),
decoder=lambda b: b[0] * 100 / 255
))
# PID 4C: Commanded throttle actuator
self.register(PID(
pid=0x4C,
name="COMMANDED_THROTTLE",
description="Commanded throttle actuator",
unit="%",
min_value=0, max_value=100,
bytes_count=1,
encoder=lambda v: bytes([int(v * 255 / 100)]),
decoder=lambda b: b[0] * 100 / 255
))
# PID 5C: Engine oil temperature
self.register(PID(
pid=0x5C,
name="OIL_TEMP",
description="Engine oil temperature",
unit="°C",
min_value=-40, max_value=210,
bytes_count=1,
encoder=lambda v: bytes([int(v + 40)]),
decoder=lambda b: b[0] - 40
))
# PID 5E: Engine fuel rate
self.register(PID(
pid=0x5E,
name="FUEL_RATE",
description="Engine fuel rate",
unit="L/h",
min_value=0, max_value=3276.75,
bytes_count=2,
encoder=lambda v: int(v * 20).to_bytes(2, "big"),
decoder=lambda b: int.from_bytes(b, "big") / 20
))
# PID 60: Supported PIDs [61-80]
self.register(PID(
pid=0x60,
name="PIDS_SUPPORTED_61_80",
description="Supported PIDs [61-80]",
unit="",
min_value=0, max_value=0xFFFFFFFF,
bytes_count=4,
encoder=lambda v: int(v).to_bytes(4, "big"),
decoder=lambda b: int.from_bytes(b, "big")
))
# PID 62: Actual engine percent torque
self.register(PID(
pid=0x62,
name="ENGINE_TORQUE_ACTUAL",
description="Actual engine percent torque",
unit="%",
min_value=-125, max_value=130,
bytes_count=1,
encoder=lambda v: bytes([int(v + 125)]),
decoder=lambda b: b[0] - 125
))
# Синглтон реестра
_pid_registry: PIDRegistry | None = None
def get_pid_registry() -> PIDRegistry:
"""Получить глобальный реестр PIDs."""
global _pid_registry
if _pid_registry is None:
_pid_registry = PIDRegistry()
return _pid_registry

245
src/obd2/protocol.py Normal file
View File

@@ -0,0 +1,245 @@
"""
OBD2 протокол поверх CAN (ISO 15765-4).
Обработка запросов и формирование ответов.
"""
from dataclasses import dataclass
from typing import TYPE_CHECKING
from can_interface import CANFrame
from logger import get_logger
from .pids import OBD2Mode, get_pid_registry
if TYPE_CHECKING:
from simulator.vehicle import VehicleState
logger = get_logger(__name__)
# Стандартные CAN ID для OBD2
OBD2_REQUEST_ID = 0x7DF # Broadcast запрос ко всем ECU
OBD2_RESPONSE_ID = 0x7E8 # Ответ от Engine ECU (первый ECU)
OBD2_FUNCTIONAL_RANGE = range(0x7E0, 0x7E8) # ECU физические адреса
@dataclass
class OBD2Request:
"""Разобранный OBD2 запрос."""
mode: int
pid: int
raw_data: bytes
@classmethod
def from_can_frame(cls, frame: CANFrame) -> "OBD2Request | None":
"""Распарсить CAN фрейм как OBD2 запрос."""
# Проверяем ID
if frame.arbitration_id != OBD2_REQUEST_ID and frame.arbitration_id not in OBD2_FUNCTIONAL_RANGE:
return None
data = frame.data
if len(data) < 2:
return None
# Первый байт - длина данных (PCI byte для Single Frame)
length = data[0]
if length < 1 or length > 7:
return None
# Второй байт - режим (mode/service)
mode = data[1]
# Третий байт - PID (если есть)
pid = data[2] if length > 1 and len(data) > 2 else 0
return cls(mode=mode, pid=pid, raw_data=data)
def __str__(self) -> str:
return f"OBD2Request(mode={self.mode:02X}, pid={self.pid:02X})"
@dataclass
class OBD2Response:
"""OBD2 ответ для отправки."""
mode: int # Mode + 0x40
pid: int
data: bytes
def to_can_frame(self) -> CANFrame:
"""Преобразовать в CAN фрейм."""
# Single Frame формат: [length, mode+0x40, pid, data...]
payload = bytes([
len(self.data) + 2, # Length: mode + pid + data
self.mode + 0x40, # Response mode
self.pid, # PID
]) + self.data
# Дополняем до 8 байт
payload = payload.ljust(8, b'\x00')
return CANFrame(
arbitration_id=OBD2_RESPONSE_ID,
data=payload
)
class OBD2Protocol:
"""
Обработчик OBD2 протокола.
Получает запросы, формирует ответы на основе состояния автомобиля.
"""
def __init__(self):
self.pid_registry = get_pid_registry()
self._stats = {
"requests_received": 0,
"responses_sent": 0,
"unsupported_requests": 0,
}
def process_frame(self, frame: CANFrame, vehicle_state: "VehicleState") -> CANFrame | None:
"""
Обработать входящий CAN фрейм.
Возвращает ответный фрейм или None, если фрейм не является OBD2 запросом.
"""
request = OBD2Request.from_can_frame(frame)
if request is None:
return None
self._stats["requests_received"] += 1
logger.debug(f"Received: {request}")
response = self._handle_request(request, vehicle_state)
if response:
self._stats["responses_sent"] += 1
return response.to_can_frame()
self._stats["unsupported_requests"] += 1
return None
def _handle_request(self, request: OBD2Request, state: "VehicleState") -> OBD2Response | None:
"""Обработать OBD2 запрос и вернуть ответ."""
if request.mode == OBD2Mode.CURRENT_DATA:
return self._handle_mode01(request, state)
elif request.mode == OBD2Mode.VEHICLE_INFO:
return self._handle_mode09(request)
elif request.mode == OBD2Mode.STORED_DTC:
return self._handle_mode03(request)
elif request.mode == OBD2Mode.CLEAR_DTC:
return self._handle_mode04(request)
logger.debug(f"Unsupported mode: {request.mode:02X}")
return None
def _handle_mode01(self, request: OBD2Request, state: "VehicleState") -> OBD2Response | None:
"""Обработка Mode 01 - текущие данные."""
pid_code = request.pid
# Специальные PIDs для маски поддерживаемых PIDs
if pid_code in (0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0):
mask = self.pid_registry.get_supported_pids_mask(pid_code)
return OBD2Response(
mode=request.mode,
pid=pid_code,
data=mask.to_bytes(4, "big")
)
# Получаем значение из состояния автомобиля
value = self._get_pid_value(pid_code, state)
if value is None:
logger.debug(f"PID {pid_code:02X} not supported or no value")
return None
pid_def = self.pid_registry.get(pid_code)
if pid_def is None:
return None
# Кодируем значение
try:
encoded = pid_def.encoder(value)
return OBD2Response(
mode=request.mode,
pid=pid_code,
data=encoded
)
except Exception as e:
logger.error(f"Failed to encode PID {pid_code:02X}: {e}")
return None
def _handle_mode03(self, request: OBD2Request) -> OBD2Response:
"""Mode 03 - чтение DTC. Возвращаем 0 ошибок."""
return OBD2Response(
mode=request.mode,
pid=0x00,
data=bytes([0x00]) # 0 DTCs
)
def _handle_mode04(self, request: OBD2Request) -> OBD2Response:
"""Mode 04 - очистка DTC."""
return OBD2Response(
mode=request.mode,
pid=0x00,
data=bytes()
)
def _handle_mode09(self, request: OBD2Request) -> OBD2Response | None:
"""Mode 09 - информация об автомобиле."""
pid = request.pid
if pid == 0x00:
# Supported PIDs
# Поддерживаем: 02 (VIN), 04 (Calibration ID), 0A (ECU name)
mask = 0x54400000 # PIDs 02, 04, 0A
return OBD2Response(mode=request.mode, pid=pid, data=mask.to_bytes(4, "big"))
elif pid == 0x02:
# VIN - для простоты возвращаем тестовый
# Реальный VIN требует multi-frame ответа, упрощаем
vin = b"TESTVIN123456789" # 17 символов
return OBD2Response(mode=request.mode, pid=pid, data=bytes([1]) + vin[:7])
elif pid == 0x04:
# Calibration ID
return OBD2Response(mode=request.mode, pid=pid, data=b"OBD2EMU1")
elif pid == 0x0A:
# ECU name
return OBD2Response(mode=request.mode, pid=pid, data=b"ECU-SIM")
return None
def _get_pid_value(self, pid_code: int, state: "VehicleState") -> float | None:
"""Получить значение PID из состояния автомобиля."""
mapping = {
0x04: state.engine_load,
0x05: state.coolant_temp,
0x06: state.short_fuel_trim,
0x07: state.long_fuel_trim,
0x0B: state.intake_pressure,
0x0C: state.rpm,
0x0D: state.speed,
0x0E: state.timing_advance,
0x0F: state.intake_temp,
0x10: state.maf_flow,
0x11: state.throttle_pos,
0x1C: 6, # OBD-II as defined by ISO 15765-4 CAN
0x1F: state.run_time,
0x21: 0, # Distance with MIL on
0x23: state.fuel_rail_pressure,
0x2F: state.fuel_level,
0x31: state.distance_since_clear,
0x33: state.barometric_pressure,
0x42: state.control_module_voltage,
0x45: state.throttle_pos, # Relative = absolute for simplicity
0x46: state.ambient_temp,
0x49: state.accelerator_pos,
0x4A: state.accelerator_pos,
0x4C: state.throttle_pos,
0x5C: state.oil_temp,
0x5E: state.fuel_rate,
0x62: state.engine_torque,
}
return mapping.get(pid_code)
def get_stats(self) -> dict:
"""Получить статистику протокола."""
return self._stats.copy()

11
src/simulator/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""Симулятор автомобиля с динамическими данными."""
from .vehicle import VehicleState, VehicleSimulator
from .scenarios import Scenario, ScenarioManager, get_scenario_manager
__all__ = [
"VehicleState",
"VehicleSimulator",
"Scenario",
"ScenarioManager",
"get_scenario_manager",
]

324
src/simulator/scenarios.py Normal file
View File

@@ -0,0 +1,324 @@
"""
Сценарии движения для симулятора.
Каждый сценарий определяет поведение автомобиля во времени.
"""
import math
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Literal
from logger import get_logger
from .vehicle import VehicleSimulator
logger = get_logger(__name__)
class Scenario(ABC):
"""Базовый класс сценария."""
name: str = "base"
description: str = "Base scenario"
def __init__(self, simulator: VehicleSimulator):
self.simulator = simulator
self._start_time: float | None = None
self._is_running = False
def start(self) -> None:
"""Запустить сценарий."""
self._start_time = time.time()
self._is_running = True
self.simulator.start_engine()
self.on_start()
logger.info(f"Scenario '{self.name}' started")
def stop(self) -> None:
"""Остановить сценарий."""
self._is_running = False
self.on_stop()
logger.info(f"Scenario '{self.name}' stopped")
@property
def elapsed_time(self) -> float:
"""Время с начала сценария (секунды)."""
if self._start_time is None:
return 0
return time.time() - self._start_time
@abstractmethod
def update(self) -> None:
"""Обновить состояние сценария."""
pass
def on_start(self) -> None:
"""Вызывается при запуске сценария."""
pass
def on_stop(self) -> None:
"""Вызывается при остановке сценария."""
pass
class IdleScenario(Scenario):
"""
Сценарий: Холостой ход.
Двигатель работает на холостых, машина стоит.
"""
name = "idle"
description = "Engine idling, vehicle stationary"
def update(self) -> None:
self.simulator.set_target_speed(0)
self.simulator.set_throttle(0)
class WarmupScenario(Scenario):
"""
Сценарий: Прогрев двигателя.
Двигатель на холостых, температура постепенно растёт.
"""
name = "warmup"
description = "Engine warming up from cold start"
def on_start(self) -> None:
# Холодный старт
state = self.simulator.state
state.coolant_temp = self.simulator.config.ambient_temp
state.oil_temp = self.simulator.config.ambient_temp
state.intake_temp = self.simulator.config.ambient_temp
def update(self) -> None:
self.simulator.set_target_speed(0)
# Небольшие колебания холостого хода при холодном двигателе
temp = self.simulator.state.coolant_temp
if temp < 50:
# Повышенные обороты при холодном двигателе
self.simulator.set_throttle(5 + (50 - temp) * 0.1)
else:
self.simulator.set_throttle(0)
class CityScenario(Scenario):
"""
Сценарий: Городской цикл.
Разгон до 50, торможение, снова разгон. Имитация светофоров.
"""
name = "city"
description = "City driving with stops and accelerations"
CYCLE_DURATION = 60 # Длина одного цикла (секунды)
def update(self) -> None:
t = self.elapsed_time % self.CYCLE_DURATION
if t < 10:
# Стоим на светофоре
self.simulator.set_target_speed(0)
elif t < 25:
# Разгон до 50 км/ч
progress = (t - 10) / 15
self.simulator.set_target_speed(50 * progress)
elif t < 40:
# Едем 50 км/ч
self.simulator.set_target_speed(50)
elif t < 50:
# Торможение
progress = (t - 40) / 10
self.simulator.set_target_speed(50 * (1 - progress))
else:
# Стоим
self.simulator.set_target_speed(0)
class HighwayScenario(Scenario):
"""
Сценарий: Трасса.
Набор скорости до 110-120 км/ч, круиз с небольшими колебаниями.
"""
name = "highway"
description = "Highway cruising at 110-120 km/h"
def update(self) -> None:
t = self.elapsed_time
if t < 30:
# Разгон до крейсерской скорости
progress = t / 30
self.simulator.set_target_speed(120 * progress)
else:
# Круиз с небольшими колебаниями (имитация обгонов, горок)
variation = math.sin(t * 0.1) * 10
self.simulator.set_target_speed(115 + variation)
class AccelerationScenario(Scenario):
"""
Сценарий: Тест ускорения.
Резкий разгон от 0 до максимума.
"""
name = "acceleration"
description = "Full throttle acceleration test"
def update(self) -> None:
t = self.elapsed_time
if t < 2:
# Готовимся
self.simulator.set_target_speed(0)
self.simulator.set_throttle(0)
elif t < 15:
# Полный газ!
self.simulator.set_throttle(100)
self.simulator.set_target_speed(0) # Отключаем автопилот
elif t < 20:
# Отпускаем газ
self.simulator.set_throttle(0)
else:
# Сброс - начинаем заново
self._start_time = time.time()
class ManualScenario(Scenario):
"""
Сценарий: Ручное управление.
Пользователь сам устанавливает скорость и газ через API.
"""
name = "manual"
description = "Manual control via API"
def update(self) -> None:
# Ничего не делаем - управление внешнее
pass
class DynamicCityScenario(Scenario):
"""
Сценарий: Продвинутый городской цикл.
Более реалистичное поведение с разной интенсивностью.
"""
name = "dynamic_city"
description = "Dynamic city driving with variable patterns"
def __init__(self, simulator: VehicleSimulator):
super().__init__(simulator)
self._phase = 0
self._phase_start = 0
self._phases = [
("idle", 5), # Стоим 5 сек
("accelerate", 8), # Разгон 8 сек
("cruise_30", 10), # Едем 30 км/ч
("accelerate", 5), # Ещё разгон
("cruise_50", 15), # Едем 50 км/ч
("decelerate", 5), # Торможение
("cruise_30", 8), # Едем 30 км/ч
("stop", 8), # Полная остановка
]
def update(self) -> None:
# Определяем текущую фазу
t = self.elapsed_time
phase_elapsed = t - self._phase_start
phase_name, phase_duration = self._phases[self._phase]
if phase_elapsed >= phase_duration:
# Переход к следующей фазе
self._phase = (self._phase + 1) % len(self._phases)
self._phase_start = t
phase_elapsed = 0
phase_name, phase_duration = self._phases[self._phase]
# Выполняем фазу
progress = phase_elapsed / phase_duration
if phase_name == "idle":
self.simulator.set_target_speed(0)
elif phase_name == "accelerate":
current = self.simulator.state.speed
self.simulator.set_target_speed(current + 20)
elif phase_name == "cruise_30":
self.simulator.set_target_speed(30)
elif phase_name == "cruise_50":
self.simulator.set_target_speed(50)
elif phase_name == "decelerate":
current = self.simulator.state.speed
self.simulator.set_target_speed(max(0, current - 15))
elif phase_name == "stop":
self.simulator.set_target_speed(0)
# Реестр сценариев
SCENARIOS: dict[str, type[Scenario]] = {
"idle": IdleScenario,
"warmup": WarmupScenario,
"city": CityScenario,
"highway": HighwayScenario,
"acceleration": AccelerationScenario,
"manual": ManualScenario,
"dynamic_city": DynamicCityScenario,
}
class ScenarioManager:
"""Управление сценариями."""
def __init__(self, simulator: VehicleSimulator):
self.simulator = simulator
self._current_scenario: Scenario | None = None
def load_scenario(self, name: str) -> Scenario:
"""Загрузить сценарий по имени."""
if name not in SCENARIOS:
raise ValueError(f"Unknown scenario: {name}. Available: {list(SCENARIOS.keys())}")
scenario_class = SCENARIOS[name]
self._current_scenario = scenario_class(self.simulator)
return self._current_scenario
def start(self, name: str | None = None) -> None:
"""Запустить сценарий."""
if name:
self.load_scenario(name)
if self._current_scenario:
self._current_scenario.start()
def stop(self) -> None:
"""Остановить текущий сценарий."""
if self._current_scenario:
self._current_scenario.stop()
def update(self) -> None:
"""Обновить текущий сценарий."""
if self._current_scenario and self._current_scenario._is_running:
self._current_scenario.update()
@property
def current_scenario(self) -> Scenario | None:
return self._current_scenario
@property
def available_scenarios(self) -> list[str]:
return list(SCENARIOS.keys())
# Синглтон
_scenario_manager: ScenarioManager | None = None
def get_scenario_manager(simulator: VehicleSimulator | None = None) -> ScenarioManager:
"""Получить менеджер сценариев."""
global _scenario_manager
if _scenario_manager is None:
if simulator is None:
raise ValueError("Simulator required for first initialization")
_scenario_manager = ScenarioManager(simulator)
return _scenario_manager

432
src/simulator/vehicle.py Normal file
View File

@@ -0,0 +1,432 @@
"""
Модель автомобиля с физически правдоподобным поведением.
Симулирует двигатель, трансмиссию, температуры и расход топлива.
"""
import math
import time
from dataclasses import dataclass, field
from typing import Callable
from config import VehicleConfig
from logger import get_logger
logger = get_logger(__name__)
@dataclass
class VehicleState:
"""Текущее состояние автомобиля."""
# Двигатель
rpm: float = 0.0
engine_load: float = 0.0
throttle_pos: float = 0.0
accelerator_pos: float = 0.0
# Движение
speed: float = 0.0 # km/h
# Температуры
coolant_temp: float = 20.0
intake_temp: float = 20.0
oil_temp: float = 20.0
ambient_temp: float = 20.0
# Топливо
fuel_level: float = 75.0 # %
fuel_rate: float = 0.0 # L/h
fuel_rail_pressure: float = 35000.0 # kPa
# Воздух
maf_flow: float = 0.0 # g/s
intake_pressure: float = 101.0 # kPa
# Другое
timing_advance: float = 10.0 # degrees
short_fuel_trim: float = 0.0 # %
long_fuel_trim: float = 0.0 # %
barometric_pressure: float = 101.0 # kPa
control_module_voltage: float = 14.2 # V
engine_torque: float = 0.0 # %
# Статистика
run_time: float = 0.0 # seconds
distance_since_clear: float = 0.0 # km
total_distance: float = 0.0 # km
# Внутреннее состояние
engine_running: bool = False
gear: int = 0 # 0 = N/P, 1-6 = gears
def __post_init__(self):
self._start_time = time.time()
class EngineModel:
"""Модель двигателя."""
# Передаточные числа (примерные для 6-ступенчатой АКПП)
GEAR_RATIOS = {
0: 0, # Neutral
1: 3.5,
2: 2.1,
3: 1.4,
4: 1.0,
5: 0.75,
6: 0.6,
}
FINAL_DRIVE = 3.5
WHEEL_RADIUS = 0.32 # meters
def __init__(self, config: VehicleConfig):
self.config = config
self.idle_rpm = config.idle_rpm
self.max_rpm = config.max_rpm
self.redline = config.redline_rpm
self.displacement = config.engine_displacement
def calculate_rpm_from_speed(self, speed_kmh: float, gear: int) -> float:
"""Рассчитать RPM из скорости и передачи."""
if gear == 0 or speed_kmh < 1:
return self.idle_rpm
speed_ms = speed_kmh / 3.6
wheel_rpm = (speed_ms / self.WHEEL_RADIUS) * 60 / (2 * math.pi)
engine_rpm = wheel_rpm * self.GEAR_RATIOS[gear] * self.FINAL_DRIVE
return max(self.idle_rpm, min(engine_rpm, self.max_rpm))
def calculate_optimal_gear(self, speed_kmh: float, throttle: float) -> int:
"""Определить оптимальную передачу."""
if speed_kmh < 5:
return 1 if throttle > 0 else 0
# Точки переключения (примерные)
shift_points = [
(0, 1),
(15, 2),
(30, 3),
(50, 4),
(70, 5),
(90, 6),
]
gear = 1
for speed, g in shift_points:
if speed_kmh >= speed:
gear = g
# При высоком газе держим передачу дольше
if throttle > 70 and gear > 1:
# Проверяем, не уйдут ли обороты в красную зону
rpm = self.calculate_rpm_from_speed(speed_kmh, gear - 1)
if rpm < self.redline:
gear -= 1
return gear
def calculate_engine_load(self, rpm: float, throttle: float, speed: float) -> float:
"""Рассчитать нагрузку на двигатель."""
# Базовая нагрузка от оборотов
rpm_factor = rpm / self.max_rpm
# Нагрузка от газа
throttle_factor = throttle / 100
# Нагрузка от скорости (сопротивление воздуха)
aero_factor = (speed / 200) ** 2
load = (throttle_factor * 0.6 + rpm_factor * 0.2 + aero_factor * 0.2) * 100
return min(100, max(0, load))
def calculate_maf(self, rpm: float, load: float) -> float:
"""Рассчитать расход воздуха (MAF)."""
# Упрощённая модель: MAF пропорционален RPM * нагрузке * объём двигателя
volumetric_efficiency = 0.85
air_density = 1.2 # kg/m³
# Объём воздуха за минуту (л/мин)
air_volume = (rpm * self.displacement * volumetric_efficiency * (load / 100)) / 2
# Преобразуем в g/s
maf = (air_volume / 60) * air_density
return max(0, maf)
def calculate_fuel_rate(self, rpm: float, load: float) -> float:
"""Рассчитать расход топлива (L/h)."""
if rpm < 100:
return 0
# Базовый расход на холостых
idle_consumption = 0.8 # L/h
# Расход пропорционален MAF (стехиометрия ~14.7:1)
maf = self.calculate_maf(rpm, load)
fuel_mass_rate = maf / 14.7 # g/s
fuel_volume_rate = fuel_mass_rate / 750 # L/s (плотность бензина ~750 g/L)
fuel_rate_lh = fuel_volume_rate * 3600 # L/h
return max(idle_consumption, fuel_rate_lh)
class TemperatureModel:
"""Модель температур двигателя."""
def __init__(self, config: VehicleConfig):
self.ambient = config.ambient_temp
self.target_coolant = config.target_coolant_temp
def update_coolant_temp(
self,
current: float,
rpm: float,
speed: float,
dt: float
) -> float:
"""Обновить температуру охлаждающей жидкости."""
# Нагрев от работы двигателя
heat_rate = 0.5 if rpm > 0 else 0 # °C/s при работающем двигателе
# Охлаждение от радиатора (зависит от скорости)
cooling_rate = 0.1 + (speed / 100) * 0.3 # °C/s
# Целевая температура
target = self.target_coolant if rpm > 0 else self.ambient
# Плавное изменение к цели
diff = target - current
change = diff * 0.02 * dt # Медленное изменение
# Добавляем нагрев/охлаждение
if rpm > 0:
change += heat_rate * dt * (1 - current / self.target_coolant)
return current + change
def update_oil_temp(
self,
current: float,
coolant_temp: float,
rpm: float,
dt: float
) -> float:
"""Обновить температуру масла (следует за coolant с задержкой)."""
# Масло нагревается медленнее и до более высокой температуры
target = coolant_temp + 10 if rpm > 0 else self.ambient
diff = target - current
change = diff * 0.01 * dt # Очень медленное изменение
return current + change
def update_intake_temp(
self,
current: float,
coolant_temp: float,
speed: float,
dt: float
) -> float:
"""Обновить температуру впускного воздуха."""
# Зависит от скорости (охлаждение потоком) и температуры двигателя
target = self.ambient + (coolant_temp - self.ambient) * 0.1
if speed > 30:
target = self.ambient + 5 # Охлаждение потоком
diff = target - current
change = diff * 0.1 * dt
return current + change
class VehicleSimulator:
"""
Главный симулятор автомобиля.
Объединяет все модели и обновляет состояние.
"""
def __init__(self, config: VehicleConfig):
self.config = config
self.engine = EngineModel(config)
self.temperature = TemperatureModel(config)
# Начальное состояние
self.state = VehicleState(
ambient_temp=config.ambient_temp,
intake_temp=config.ambient_temp,
coolant_temp=config.ambient_temp,
oil_temp=config.ambient_temp,
fuel_level=config.initial_fuel_level,
)
self._last_update = time.time()
self._target_speed = 0.0
self._target_throttle = 0.0
def start_engine(self) -> None:
"""Запустить двигатель."""
if not self.state.engine_running:
self.state.engine_running = True
self.state.rpm = self.config.idle_rpm
self.state.gear = 0
logger.info("Engine started")
def stop_engine(self) -> None:
"""Заглушить двигатель."""
if self.state.engine_running:
self.state.engine_running = False
self.state.rpm = 0
self.state.gear = 0
self.state.speed = 0
self.state.throttle_pos = 0
self.state.engine_load = 0
logger.info("Engine stopped")
def set_target_speed(self, speed: float) -> None:
"""Установить целевую скорость (для сценариев)."""
self._target_speed = max(0, min(speed, self.config.max_speed))
def set_throttle(self, throttle: float) -> None:
"""Установить положение газа (0-100%)."""
self._target_throttle = max(0, min(throttle, 100))
def update(self, dt: float | None = None) -> VehicleState:
"""
Обновить состояние автомобиля.
Args:
dt: Время с последнего обновления (секунды).
Если None, вычисляется автоматически.
"""
now = time.time()
if dt is None:
dt = now - self._last_update
self._last_update = now
if not self.state.engine_running:
self.state.run_time = 0
return self.state
# Обновляем время работы
self.state.run_time += dt
# Плавное изменение газа
throttle_diff = self._target_throttle - self.state.throttle_pos
self.state.throttle_pos += throttle_diff * min(1.0, dt * 5) # Сглаживание
self.state.accelerator_pos = self.state.throttle_pos
# Обновляем скорость на основе целевой скорости или газа
self._update_speed(dt)
# Определяем передачу
self.state.gear = self.engine.calculate_optimal_gear(
self.state.speed,
self.state.throttle_pos
)
# Рассчитываем обороты
if self.state.speed < 1:
self.state.rpm = self.config.idle_rpm + self.state.throttle_pos * 10
else:
self.state.rpm = self.engine.calculate_rpm_from_speed(
self.state.speed,
self.state.gear
)
# Нагрузка двигателя
self.state.engine_load = self.engine.calculate_engine_load(
self.state.rpm,
self.state.throttle_pos,
self.state.speed
)
# MAF
self.state.maf_flow = self.engine.calculate_maf(
self.state.rpm,
self.state.engine_load
)
# Расход топлива
self.state.fuel_rate = self.engine.calculate_fuel_rate(
self.state.rpm,
self.state.engine_load
)
# Уменьшаем уровень топлива
fuel_consumed = self.state.fuel_rate * dt / 3600 # L
fuel_percent = (fuel_consumed / self.config.fuel_tank_capacity) * 100
self.state.fuel_level = max(0, self.state.fuel_level - fuel_percent)
# Давление во впуске (упрощённо)
self.state.intake_pressure = 101 - (self.state.throttle_pos * 0.6)
# Опережение зажигания
self.state.timing_advance = 10 + (self.state.rpm / 1000) * 3
# Крутящий момент (упрощённо)
self.state.engine_torque = self.state.engine_load * 0.8
# Температуры
self.state.coolant_temp = self.temperature.update_coolant_temp(
self.state.coolant_temp,
self.state.rpm,
self.state.speed,
dt
)
self.state.oil_temp = self.temperature.update_oil_temp(
self.state.oil_temp,
self.state.coolant_temp,
self.state.rpm,
dt
)
self.state.intake_temp = self.temperature.update_intake_temp(
self.state.intake_temp,
self.state.coolant_temp,
self.state.speed,
dt
)
# Пробег
distance_km = (self.state.speed * dt) / 3600
self.state.total_distance += distance_km
self.state.distance_since_clear += distance_km
return self.state
def _update_speed(self, dt: float) -> None:
"""Обновить скорость автомобиля."""
current = self.state.speed
target = self._target_speed
if target > 0:
# Режим целевой скорости (автопилот сценария)
diff = target - current
# Ускорение/замедление до 3 м/с² (примерно 10 км/ч за секунду)
max_change = 10 * dt
if abs(diff) < max_change:
self.state.speed = target
else:
self.state.speed += max_change if diff > 0 else -max_change
# Подстраиваем газ под скорость
if diff > 5:
self._target_throttle = min(100, 30 + diff)
elif diff < -5:
self._target_throttle = 0
else:
self._target_throttle = 15 + current / 10
else:
# Режим ручного газа
throttle = self.state.throttle_pos
if throttle > 5:
# Ускорение
acceleration = (throttle / 100) * 15 # Max ~15 km/h/s
self.state.speed = min(
self.config.max_speed,
current + acceleration * dt
)
else:
# Торможение (двигателем + сопротивление)
deceleration = 5 + current * 0.02 # Зависит от скорости
self.state.speed = max(0, current - deceleration * dt)
def get_state(self) -> VehicleState:
"""Получить текущее состояние."""
return self.state