Files
obd_emulator/scripts/test_uds.py
Alexander Poletaev 2ec05d2e9d Add UDS protocol support (ISO 14229) for ECU emulator
Implement UDS Service 0x22 (Read Data By Identifier) with support for:
- Engine ECU (0x7E0): boost pressure, fuel rail pressure, lambda, torque,
  wastegate position, ignition timing, knock correction per cylinder
- DSG Transmission (0x7E1): current gear, selector position, oil temp
- Instrument Cluster (0x714): RPM, ambient light sensor

Also adds ECU_EMULATOR_SPEC.md with full protocol documentation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 16:28:55 +03:00

251 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
Тестовый скрипт для проверки UDS протокола эмулятора.
Отправляет запросы UDS Service 0x22 и проверяет ответы.
Использование:
python test_uds.py --interface vcan0
Требования:
- Запущенный эмулятор на том же интерфейсе
- pip install python-can
"""
import argparse
import sys
import time
from dataclasses import dataclass
import can
@dataclass
class UDSTestCase:
"""Тестовый случай для UDS."""
name: str
request_id: int
response_id: int
did: int
description: str
# Тестовые случаи согласно спецификации
ENGINE_ECU_TESTS = [
UDSTestCase("Boost Pressure Actual", 0x7E0, 0x7E8, 0x202A, "Boost in kPa"),
UDSTestCase("Boost Pressure Target", 0x7E0, 0x7E8, 0x2029, "Target boost in kPa"),
UDSTestCase("Fuel Rail Pressure", 0x7E0, 0x7E8, 0xF423, "FRP in kPa"),
UDSTestCase("Lambda Actual", 0x7E0, 0x7E8, 0x10C0, "Lambda value"),
UDSTestCase("Lambda Target", 0x7E0, 0x7E8, 0x1456, "Target lambda"),
UDSTestCase("Torque Actual", 0x7E0, 0x7E8, 0x437C, "Torque in Nm"),
UDSTestCase("Wastegate Actual", 0x7E0, 0x7E8, 0x39A2, "Wastegate %"),
UDSTestCase("Wastegate Target", 0x7E0, 0x7E8, 0x39A3, "Target wastegate %"),
UDSTestCase("Ignition Timing", 0x7E0, 0x7E8, 0x2004, "Timing in degrees"),
UDSTestCase("Timing Correction Cyl1", 0x7E0, 0x7E8, 0x200A, "Knock retard"),
UDSTestCase("Timing Correction Cyl2", 0x7E0, 0x7E8, 0x200B, "Knock retard"),
UDSTestCase("Timing Correction Cyl3", 0x7E0, 0x7E8, 0x200C, "Knock retard"),
UDSTestCase("Timing Correction Cyl4", 0x7E0, 0x7E8, 0x200D, "Knock retard"),
]
DSG_TESTS = [
UDSTestCase("Current Gear", 0x7E1, 0x7E9, 0x3816, "DSG gear"),
UDSTestCase("Gear Selector", 0x7E1, 0x7E9, 0x3815, "PRNDM position"),
UDSTestCase("Trans Oil Temp", 0x7E1, 0x7E9, 0x1940, "Temperature"),
]
CLUSTER_TESTS = [
UDSTestCase("Cluster RPM", 0x714, 0x77E, 0x22D1, "RPM from cluster"),
UDSTestCase("Ambient Light", 0x714, 0x77E, 0x224D, "Light sensor"),
]
def build_uds_request(did: int) -> bytes:
"""Построить UDS запрос Service 0x22."""
did_h = (did >> 8) & 0xFF
did_l = did & 0xFF
# [length, service, did_h, did_l, padding...]
return bytes([0x03, 0x22, did_h, did_l, 0x55, 0x55, 0x55, 0x55])
def parse_uds_response(data: bytes) -> tuple[bool, int | None, bytes | None]:
"""
Распарсить UDS ответ.
Returns: (is_positive, did, data) или (False, None, None) для negative response
"""
if len(data) < 4:
return False, None, None
length = data[0]
service = data[1]
if service == 0x62: # Positive response
did = (data[2] << 8) | data[3]
response_data = data[4:4 + length - 3]
return True, did, response_data
elif service == 0x7F: # Negative response
error_code = data[3] if len(data) > 3 else 0
return False, None, bytes([error_code])
return False, None, None
def decode_value(did: int, data: bytes) -> str:
"""Декодировать значение UDS ответа."""
if len(data) == 0:
return "No data"
if len(data) == 1:
raw = data[0]
# Temperature PIDs (offset -40)
if did in (0x1940,):
return f"{raw - 40}°C"
# Gear
if did == 0x3816:
gear_map = {0x00: "N", 0x0C: "R", 0x02: "1", 0x04: "2", 0x06: "3",
0x08: "4", 0x0A: "5", 0x0E: "6", 0x10: "7"}
return gear_map.get(raw, f"Unknown ({raw:02X})")
# Selector
if did == 0x3815:
pos_map = {0: "P", 1: "R", 2: "N", 3: "D", 4: "S", 5: "M"}
return pos_map.get(raw, f"Unknown ({raw})")
# Ambient light
if did == 0x224D:
return f"{raw} (0=dark, 255=bright)"
return f"{raw}"
if len(data) >= 2:
raw = (data[0] << 8) | data[1]
# Boost pressure (scale 0.1)
if did in (0x202A, 0x2029):
return f"{raw * 0.1:.1f} kPa"
# Fuel rail pressure (scale 10)
if did == 0xF423:
return f"{raw * 10} kPa ({raw * 10 / 100:.0f} bar)"
# Lambda (scale 0.000977)
if did in (0x10C0, 0x1456):
return f"{raw * 0.000977:.3f} λ"
# Torque (scale 0.1)
if did == 0x437C:
return f"{raw * 0.1:.1f} Nm"
# Wastegate (scale 0.01)
if did in (0x39A2, 0x39A3):
return f"{raw * 0.01:.1f}%"
# Signed values (timing)
if did in (0x2004, 0x200A, 0x200B, 0x200C, 0x200D):
if raw >= 0x8000:
raw -= 0x10000
return f"{raw * 0.01:.2f}°"
# RPM (scale /4)
if did == 0x22D1:
return f"{raw / 4:.0f} RPM"
return f"0x{raw:04X} ({raw})"
return data.hex().upper()
def run_test(bus: can.Bus, test: UDSTestCase, timeout: float = 0.5) -> bool:
"""Выполнить один тест UDS."""
request_data = build_uds_request(test.did)
# Отправляем запрос
msg = can.Message(
arbitration_id=test.request_id,
data=request_data,
is_extended_id=False
)
bus.send(msg)
# Ждём ответ
start_time = time.time()
while time.time() - start_time < timeout:
response = bus.recv(timeout=0.1)
if response is None:
continue
if response.arbitration_id == test.response_id:
is_positive, did, data = parse_uds_response(bytes(response.data))
if is_positive and did == test.did:
decoded = decode_value(did, data)
print(f"{test.name}: {decoded}")
return True
elif not is_positive:
error_code = data[0] if data else 0
error_names = {
0x31: "Request Out of Range",
0x22: "Conditions Not Correct",
0x33: "Security Access Denied",
0x10: "General Reject",
}
error_name = error_names.get(error_code, f"Unknown (0x{error_code:02X})")
print(f"{test.name}: Negative response - {error_name}")
return False
print(f"{test.name}: No response (timeout)")
return False
def main():
parser = argparse.ArgumentParser(description="Test UDS protocol of OBD2 emulator")
parser.add_argument("-i", "--interface", default="vcan0", help="CAN interface")
parser.add_argument("--timeout", type=float, default=0.5, help="Response timeout")
parser.add_argument("--engine", action="store_true", help="Test Engine ECU only")
parser.add_argument("--dsg", action="store_true", help="Test DSG only")
parser.add_argument("--cluster", action="store_true", help="Test Cluster only")
args = parser.parse_args()
print(f"Connecting to {args.interface}...")
try:
bus = can.Bus(interface="socketcan", channel=args.interface)
except Exception as e:
print(f"Error: Cannot connect to {args.interface}: {e}")
return 1
print("=" * 60)
print(" UDS PROTOCOL TEST")
print("=" * 60)
# Определяем какие тесты запускать
run_all = not (args.engine or args.dsg or args.cluster)
total_tests = 0
passed_tests = 0
if run_all or args.engine:
print("\n[Engine ECU - 0x7E0]")
for test in ENGINE_ECU_TESTS:
total_tests += 1
if run_test(bus, test, args.timeout):
passed_tests += 1
if run_all or args.dsg:
print("\n[DSG Transmission - 0x7E1]")
for test in DSG_TESTS:
total_tests += 1
if run_test(bus, test, args.timeout):
passed_tests += 1
if run_all or args.cluster:
print("\n[Instrument Cluster - 0x714]")
for test in CLUSTER_TESTS:
total_tests += 1
if run_test(bus, test, args.timeout):
passed_tests += 1
print("\n" + "=" * 60)
print(f"Results: {passed_tests}/{total_tests} tests passed")
print("=" * 60)
bus.shutdown()
return 0 if passed_tests == total_tests else 1
if __name__ == "__main__":
sys.exit(main())