This commit is contained in:
2026-01-29 22:37:59 +03:00
parent 1659970896
commit d4ffce28d5
16 changed files with 1826 additions and 0 deletions

View File

@@ -0,0 +1,169 @@
"""OBD2 PID scanner for auto-detection."""
from typing import List, Set, Optional
import json
from pathlib import Path
from ..can.interface import CANInterface
from ..logger import get_logger
from .protocol import OBD2Protocol
from .pids import PIDS
class OBD2Scanner:
"""Scanner for detecting supported OBD2 PIDs.
Queries the vehicle ECU to determine which PIDs are supported,
then caches the results for future use.
"""
SUPPORTED_PID_QUERIES = [0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0]
def __init__(
self,
protocol: OBD2Protocol,
cache_file: Optional[str] = None,
):
"""Initialize scanner.
Args:
protocol: OBD2Protocol instance
cache_file: Optional path to cache file for storing results
"""
self.protocol = protocol
self.cache_file = cache_file
self._supported_pids: Set[int] = set()
self._scanned = False
self._logger = get_logger("obd2_client.scanner")
@property
def supported_pids(self) -> List[int]:
"""Get list of supported PIDs (sorted)."""
return sorted(self._supported_pids)
@property
def is_scanned(self) -> bool:
"""Check if scan has been performed."""
return self._scanned
def scan(self, use_cache: bool = True) -> List[int]:
"""Scan for supported PIDs.
Args:
use_cache: If True, try to load from cache first
Returns:
List of supported PID codes
"""
if use_cache and self._load_cache():
self._logger.info(f"Loaded {len(self._supported_pids)} PIDs from cache")
return self.supported_pids
self._logger.info("Starting PID scan...")
self._supported_pids.clear()
for base_pid in self.SUPPORTED_PID_QUERIES:
self._logger.debug(f"Querying supported PIDs from 0x{base_pid:02X}")
supported = self.protocol.query_supported_pids(base_pid)
if not supported:
self._logger.debug(f"No response for base PID 0x{base_pid:02X}")
break
self._supported_pids.update(supported)
self._logger.debug(f"Found {len(supported)} supported PIDs")
next_base = base_pid + 0x20
if next_base not in supported:
break
self._scanned = True
self._logger.info(f"Scan complete. Found {len(self._supported_pids)} supported PIDs")
if self.cache_file:
self._save_cache()
return self.supported_pids
def get_readable_pids(self) -> List[int]:
"""Get PIDs that can be read (have decoders in registry).
Returns:
List of PID codes that are both supported and have decoders
"""
readable = []
for pid_code in self._supported_pids:
pid = PIDS.get(pid_code)
if pid and pid_code not in self.SUPPORTED_PID_QUERIES:
readable.append(pid_code)
return sorted(readable)
def is_pid_supported(self, pid_code: int) -> bool:
"""Check if a specific PID is supported.
Args:
pid_code: PID code to check
Returns:
True if PID is supported
"""
return pid_code in self._supported_pids
def _load_cache(self) -> bool:
"""Load supported PIDs from cache file.
Returns:
True if cache was loaded successfully
"""
if not self.cache_file:
return False
cache_path = Path(self.cache_file)
if not cache_path.exists():
return False
try:
with open(cache_path, "r") as f:
data = json.load(f)
self._supported_pids = set(data.get("supported_pids", []))
self._scanned = True
return True
except Exception as e:
self._logger.warning(f"Failed to load cache: {e}")
return False
def _save_cache(self) -> bool:
"""Save supported PIDs to cache file.
Returns:
True if cache was saved successfully
"""
if not self.cache_file:
return False
try:
cache_path = Path(self.cache_file)
cache_path.parent.mkdir(parents=True, exist_ok=True)
with open(cache_path, "w") as f:
json.dump(
{"supported_pids": sorted(self._supported_pids)},
f,
indent=2,
)
self._logger.info(f"Saved PID cache to {self.cache_file}")
return True
except Exception as e:
self._logger.warning(f"Failed to save cache: {e}")
return False
def print_supported_pids(self) -> None:
"""Print supported PIDs in a readable format."""
print("\n=== Supported PIDs ===")
for pid_code in self.supported_pids:
pid = PIDS.get(pid_code)
if pid:
print(f" 0x{pid_code:02X} - {pid.name}: {pid.description}")
else:
print(f" 0x{pid_code:02X} - Unknown PID")
print(f"\nTotal: {len(self._supported_pids)} PIDs")