#!/usr/bin/env python3 """ Тестовый скрипт для проверки UDS протокола эмулятора. Отправляет запросы UDS Service 0x22 и проверяет ответы. Использование: python test_uds.py --interface vcan0 Требования: - Запущенный эмулятор на том же интерфейсе - pip install python-can """ import argparse import sys import time from dataclasses import dataclass import can @dataclass class UDSTestCase: """Тестовый случай для UDS.""" name: str request_id: int response_id: int did: int description: str # Тестовые случаи согласно спецификации ENGINE_ECU_TESTS = [ UDSTestCase("Boost Pressure Actual", 0x7E0, 0x7E8, 0x202A, "Boost in kPa"), UDSTestCase("Boost Pressure Target", 0x7E0, 0x7E8, 0x2029, "Target boost in kPa"), UDSTestCase("Fuel Rail Pressure", 0x7E0, 0x7E8, 0xF423, "FRP in kPa"), UDSTestCase("Lambda Actual", 0x7E0, 0x7E8, 0x10C0, "Lambda value"), UDSTestCase("Lambda Target", 0x7E0, 0x7E8, 0x1456, "Target lambda"), UDSTestCase("Torque Actual", 0x7E0, 0x7E8, 0x437C, "Torque in Nm"), UDSTestCase("Wastegate Actual", 0x7E0, 0x7E8, 0x39A2, "Wastegate %"), UDSTestCase("Wastegate Target", 0x7E0, 0x7E8, 0x39A3, "Target wastegate %"), UDSTestCase("Ignition Timing", 0x7E0, 0x7E8, 0x2004, "Timing in degrees"), UDSTestCase("Timing Correction Cyl1", 0x7E0, 0x7E8, 0x200A, "Knock retard"), UDSTestCase("Timing Correction Cyl2", 0x7E0, 0x7E8, 0x200B, "Knock retard"), UDSTestCase("Timing Correction Cyl3", 0x7E0, 0x7E8, 0x200C, "Knock retard"), UDSTestCase("Timing Correction Cyl4", 0x7E0, 0x7E8, 0x200D, "Knock retard"), ] DSG_TESTS = [ UDSTestCase("Current Gear", 0x7E1, 0x7E9, 0x3816, "DSG gear"), UDSTestCase("Gear Selector", 0x7E1, 0x7E9, 0x3815, "PRNDM position"), UDSTestCase("Trans Oil Temp", 0x7E1, 0x7E9, 0x1940, "Temperature"), ] CLUSTER_TESTS = [ UDSTestCase("Cluster RPM", 0x714, 0x77E, 0x22D1, "RPM from cluster"), UDSTestCase("Ambient Light", 0x714, 0x77E, 0x224D, "Light sensor"), ] def build_uds_request(did: int) -> bytes: """Построить UDS запрос Service 0x22.""" did_h = (did >> 8) & 0xFF did_l = did & 0xFF # [length, service, did_h, did_l, padding...] return bytes([0x03, 0x22, did_h, did_l, 0x55, 0x55, 0x55, 0x55]) def parse_uds_response(data: bytes) -> tuple[bool, int | None, bytes | None]: """ Распарсить UDS ответ. Returns: (is_positive, did, data) или (False, None, None) для negative response """ if len(data) < 4: return False, None, None length = data[0] service = data[1] if service == 0x62: # Positive response did = (data[2] << 8) | data[3] response_data = data[4:4 + length - 3] return True, did, response_data elif service == 0x7F: # Negative response error_code = data[3] if len(data) > 3 else 0 return False, None, bytes([error_code]) return False, None, None def decode_value(did: int, data: bytes) -> str: """Декодировать значение UDS ответа.""" if len(data) == 0: return "No data" if len(data) == 1: raw = data[0] # Temperature PIDs (offset -40) if did in (0x1940,): return f"{raw - 40}°C" # Gear if did == 0x3816: gear_map = {0x00: "N", 0x0C: "R", 0x02: "1", 0x04: "2", 0x06: "3", 0x08: "4", 0x0A: "5", 0x0E: "6", 0x10: "7"} return gear_map.get(raw, f"Unknown ({raw:02X})") # Selector if did == 0x3815: pos_map = {0: "P", 1: "R", 2: "N", 3: "D", 4: "S", 5: "M"} return pos_map.get(raw, f"Unknown ({raw})") # Ambient light if did == 0x224D: return f"{raw} (0=dark, 255=bright)" return f"{raw}" if len(data) >= 2: raw = (data[0] << 8) | data[1] # Boost pressure (scale 0.1) if did in (0x202A, 0x2029): return f"{raw * 0.1:.1f} kPa" # Fuel rail pressure (scale 10) if did == 0xF423: return f"{raw * 10} kPa ({raw * 10 / 100:.0f} bar)" # Lambda (scale 0.000977) if did in (0x10C0, 0x1456): return f"{raw * 0.000977:.3f} λ" # Torque (scale 0.1) if did == 0x437C: return f"{raw * 0.1:.1f} Nm" # Wastegate (scale 0.01) if did in (0x39A2, 0x39A3): return f"{raw * 0.01:.1f}%" # Signed values (timing) if did in (0x2004, 0x200A, 0x200B, 0x200C, 0x200D): if raw >= 0x8000: raw -= 0x10000 return f"{raw * 0.01:.2f}°" # RPM (scale /4) if did == 0x22D1: return f"{raw / 4:.0f} RPM" return f"0x{raw:04X} ({raw})" return data.hex().upper() def run_test(bus: can.Bus, test: UDSTestCase, timeout: float = 0.5) -> bool: """Выполнить один тест UDS.""" request_data = build_uds_request(test.did) # Отправляем запрос msg = can.Message( arbitration_id=test.request_id, data=request_data, is_extended_id=False ) bus.send(msg) # Ждём ответ start_time = time.time() while time.time() - start_time < timeout: response = bus.recv(timeout=0.1) if response is None: continue if response.arbitration_id == test.response_id: is_positive, did, data = parse_uds_response(bytes(response.data)) if is_positive and did == test.did: decoded = decode_value(did, data) print(f" ✓ {test.name}: {decoded}") return True elif not is_positive: error_code = data[0] if data else 0 error_names = { 0x31: "Request Out of Range", 0x22: "Conditions Not Correct", 0x33: "Security Access Denied", 0x10: "General Reject", } error_name = error_names.get(error_code, f"Unknown (0x{error_code:02X})") print(f" ✗ {test.name}: Negative response - {error_name}") return False print(f" ✗ {test.name}: No response (timeout)") return False def main(): parser = argparse.ArgumentParser(description="Test UDS protocol of OBD2 emulator") parser.add_argument("-i", "--interface", default="vcan0", help="CAN interface") parser.add_argument("--timeout", type=float, default=0.5, help="Response timeout") parser.add_argument("--engine", action="store_true", help="Test Engine ECU only") parser.add_argument("--dsg", action="store_true", help="Test DSG only") parser.add_argument("--cluster", action="store_true", help="Test Cluster only") args = parser.parse_args() print(f"Connecting to {args.interface}...") try: bus = can.Bus(interface="socketcan", channel=args.interface) except Exception as e: print(f"Error: Cannot connect to {args.interface}: {e}") return 1 print("=" * 60) print(" UDS PROTOCOL TEST") print("=" * 60) # Определяем какие тесты запускать run_all = not (args.engine or args.dsg or args.cluster) total_tests = 0 passed_tests = 0 if run_all or args.engine: print("\n[Engine ECU - 0x7E0]") for test in ENGINE_ECU_TESTS: total_tests += 1 if run_test(bus, test, args.timeout): passed_tests += 1 if run_all or args.dsg: print("\n[DSG Transmission - 0x7E1]") for test in DSG_TESTS: total_tests += 1 if run_test(bus, test, args.timeout): passed_tests += 1 if run_all or args.cluster: print("\n[Instrument Cluster - 0x714]") for test in CLUSTER_TESTS: total_tests += 1 if run_test(bus, test, args.timeout): passed_tests += 1 print("\n" + "=" * 60) print(f"Results: {passed_tests}/{total_tests} tests passed") print("=" * 60) bus.shutdown() return 0 if passed_tests == total_tests else 1 if __name__ == "__main__": sys.exit(main())