#!/usr/bin/env python3 """ Тестовый скрипт для проверки OBD2 эмулятора. Отправляет OBD2 запросы и выводит ответы. """ import argparse import sys import time try: import can except ImportError: print("Error: python-can not installed. Run: pip install python-can") sys.exit(1) # OBD2 PIDs для тестирования (согласно ECU_EMULATOR_SPEC.md) TEST_PIDS = [ # Supported PIDs (0x00, "Supported PIDs [01-20]"), (0x20, "Supported PIDs [21-40]"), (0x40, "Supported PIDs [41-60]"), # Engine (0x04, "Engine Load"), (0x05, "Coolant Temperature"), (0x0C, "Engine RPM"), (0x0D, "Vehicle Speed"), (0x0F, "Intake Air Temperature"), (0x10, "MAF Air Flow Rate"), (0x11, "Throttle Position"), # Fuel (0x2F, "Fuel Tank Level"), (0x5C, "Oil Temperature"), # Other (0x46, "Ambient Temperature"), ] def decode_pid(pid: int, data: bytes) -> str: """Декодировать значение PID согласно ECU_EMULATOR_SPEC.md.""" if len(data) < 1: return "No data" # Supported PIDs bitmap if pid in (0x00, 0x20, 0x40, 0x60): if len(data) >= 4: mask = int.from_bytes(data[:4], "big") supported = [] base = pid + 1 for i in range(32): if mask & (1 << (31 - i)): supported.append(f"{base + i:02X}") if supported: return f"PIDs: {', '.join(supported)}" return "None" return f"Raw: {data.hex()}" # Temperature PIDs: A - 40 elif pid in (0x05, 0x0F, 0x46, 0x5C): return f"{data[0] - 40} °C" # Engine Load / Throttle / Fuel Level: A * 100 / 255 elif pid in (0x04, 0x11, 0x2F): return f"{data[0] * 100 / 255:.1f} %" # RPM: (A*256 + B) / 4 elif pid == 0x0C: if len(data) >= 2: rpm = (data[0] * 256 + data[1]) / 4 return f"{rpm:.0f} rpm" # Speed: A elif pid == 0x0D: return f"{data[0]} km/h" # MAF: (A*256 + B) / 100 elif pid == 0x10: if len(data) >= 2: maf = (data[0] * 256 + data[1]) / 100 return f"{maf:.2f} g/s" return f"Raw: {data.hex()}" def send_obd2_request(bus: can.Bus, pid: int, timeout: float = 1.0) -> bytes | None: """Отправить OBD2 запрос и получить ответ.""" # Формируем запрос Mode 01 request = can.Message( arbitration_id=0x7DF, data=[0x02, 0x01, pid, 0x00, 0x00, 0x00, 0x00, 0x00], is_extended_id=False ) # Отправляем bus.send(request) # Ждём ответ start = time.time() while time.time() - start < timeout: msg = bus.recv(timeout=0.1) if msg is None: continue # Проверяем, что это ответ от ECU if msg.arbitration_id == 0x7E8: data = bytes(msg.data) # Проверяем формат ответа if len(data) >= 3 and data[1] == 0x41 and data[2] == pid: # Возвращаем данные (без заголовка) return data[3:3 + data[0] - 2] return None def main(): parser = argparse.ArgumentParser(description="Test OBD2 Emulator") parser.add_argument( "-i", "--interface", default="vcan0", help="CAN interface [default: vcan0]" ) parser.add_argument( "-c", "--continuous", action="store_true", help="Continuous mode (loop forever)" ) parser.add_argument( "--interval", type=float, default=1.0, help="Interval between requests in continuous mode [default: 1.0]" ) args = parser.parse_args() print(f"=== OBD2 Emulator Test ===") print(f"Interface: {args.interface}") print("") try: bus = can.Bus(interface="socketcan", channel=args.interface) except Exception as e: print(f"Error: Failed to connect to {args.interface}: {e}") print("Make sure the interface is up: sudo ip link set vcan0 up") return 1 try: while True: print(f"Timestamp: {time.strftime('%H:%M:%S')}") print("-" * 50) for pid, name in TEST_PIDS: data = send_obd2_request(bus, pid) if data: value = decode_pid(pid, data) print(f" PID {pid:02X} ({name}): {value}") else: print(f" PID {pid:02X} ({name}): No response") print("") if not args.continuous: break time.sleep(args.interval) except KeyboardInterrupt: print("\nStopped") finally: bus.shutdown() return 0 if __name__ == "__main__": sys.exit(main())