"""OBD2 PID scanner for auto-detection.""" from typing import List, Set, Optional import json from pathlib import Path from ..can.interface import CANInterface from ..logger import get_logger from .protocol import OBD2Protocol from .pids import PIDS class OBD2Scanner: """Scanner for detecting supported OBD2 PIDs. Queries the vehicle ECU to determine which PIDs are supported, then caches the results for future use. """ SUPPORTED_PID_QUERIES = [0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0] def __init__( self, protocol: OBD2Protocol, cache_file: Optional[str] = None, ): """Initialize scanner. Args: protocol: OBD2Protocol instance cache_file: Optional path to cache file for storing results """ self.protocol = protocol self.cache_file = cache_file self._supported_pids: Set[int] = set() self._scanned = False self._logger = get_logger("obd2_client.scanner") @property def supported_pids(self) -> List[int]: """Get list of supported PIDs (sorted).""" return sorted(self._supported_pids) @property def is_scanned(self) -> bool: """Check if scan has been performed.""" return self._scanned def scan( self, use_cache: bool = True, retries: int = 3, retry_delay: float = 1.0, initial_delay: float = 0.5, ) -> List[int]: """Scan for supported PIDs with retry logic. Args: use_cache: If True, try to load from cache first retries: Number of retry attempts if scan fails retry_delay: Delay between retries (seconds) initial_delay: Delay before first scan attempt (seconds) Returns: List of supported PID codes """ import time if use_cache: if self._load_cache() and len(self._supported_pids) > 0: self._logger.info(f"Loaded {len(self._supported_pids)} PIDs from cache") return self.supported_pids else: self._logger.debug("Cache empty or invalid, will scan") # Initial delay to let CAN interface and ECU stabilize if initial_delay > 0: self._logger.debug(f"Waiting {initial_delay}s for CAN/ECU to stabilize...") time.sleep(initial_delay) for attempt in range(retries): if attempt > 0: self._logger.info(f"Retry {attempt}/{retries} after {retry_delay}s...") time.sleep(retry_delay) self._logger.info(f"Starting PID scan (attempt {attempt + 1}/{retries})...") self._supported_pids.clear() for base_pid in self.SUPPORTED_PID_QUERIES: self._logger.debug(f"Querying supported PIDs from 0x{base_pid:02X}") supported = self.protocol.query_supported_pids(base_pid) if not supported: self._logger.debug(f"No response for base PID 0x{base_pid:02X}") break self._supported_pids.update(supported) self._logger.debug(f"Found {len(supported)} supported PIDs") next_base = base_pid + 0x20 if next_base not in supported: break # Success if we found any PIDs if self._supported_pids: self._scanned = True self._logger.info(f"Scan complete. Found {len(self._supported_pids)} supported PIDs") if self.cache_file: self._save_cache() return self.supported_pids self._logger.warning(f"Scan failed after {retries} attempts") self._scanned = True return self.supported_pids def get_readable_pids(self) -> List[int]: """Get PIDs that can be read (have decoders in registry). Returns: List of PID codes that are both supported and have decoders """ readable = [] for pid_code in self._supported_pids: pid = PIDS.get(pid_code) if pid and pid_code not in self.SUPPORTED_PID_QUERIES: readable.append(pid_code) return sorted(readable) def is_pid_supported(self, pid_code: int) -> bool: """Check if a specific PID is supported. Args: pid_code: PID code to check Returns: True if PID is supported """ return pid_code in self._supported_pids def _load_cache(self) -> bool: """Load supported PIDs from cache file. Returns: True if cache was loaded successfully """ if not self.cache_file: return False cache_path = Path(self.cache_file) if not cache_path.exists(): return False try: with open(cache_path, "r") as f: data = json.load(f) self._supported_pids = set(data.get("supported_pids", [])) self._scanned = True return True except Exception as e: self._logger.warning(f"Failed to load cache: {e}") return False def _save_cache(self) -> bool: """Save supported PIDs to cache file. Returns: True if cache was saved successfully """ if not self.cache_file: return False try: cache_path = Path(self.cache_file) cache_path.parent.mkdir(parents=True, exist_ok=True) with open(cache_path, "w") as f: json.dump( {"supported_pids": sorted(self._supported_pids)}, f, indent=2, ) self._logger.info(f"Saved PID cache to {self.cache_file}") return True except Exception as e: self._logger.warning(f"Failed to save cache: {e}") return False def print_supported_pids(self) -> None: """Print supported PIDs in a readable format.""" print("\n=== Supported PIDs ===") for pid_code in self.supported_pids: pid = PIDS.get(pid_code) if pid: print(f" 0x{pid_code:02X} - {pid.name}: {pid.description}") else: print(f" 0x{pid_code:02X} - Unknown PID") print(f"\nTotal: {len(self._supported_pids)} PIDs")