fix get PIDs and restart auto

This commit is contained in:
2026-01-30 00:05:27 +03:00
parent a96328d4e6
commit 9a3fe4457c
3 changed files with 106 additions and 29 deletions

View File

@@ -46,43 +46,70 @@ class OBD2Scanner:
"""Check if scan has been performed."""
return self._scanned
def scan(self, use_cache: bool = True) -> List[int]:
"""Scan for supported PIDs.
def scan(
self,
use_cache: bool = True,
retries: int = 3,
retry_delay: float = 1.0,
initial_delay: float = 0.5,
) -> List[int]:
"""Scan for supported PIDs with retry logic.
Args:
use_cache: If True, try to load from cache first
retries: Number of retry attempts if scan fails
retry_delay: Delay between retries (seconds)
initial_delay: Delay before first scan attempt (seconds)
Returns:
List of supported PID codes
"""
import time
if use_cache and self._load_cache():
self._logger.info(f"Loaded {len(self._supported_pids)} PIDs from cache")
return self.supported_pids
self._logger.info("Starting PID scan...")
self._supported_pids.clear()
# Initial delay to let CAN interface and ECU stabilize
if initial_delay > 0:
self._logger.debug(f"Waiting {initial_delay}s for CAN/ECU to stabilize...")
time.sleep(initial_delay)
for base_pid in self.SUPPORTED_PID_QUERIES:
self._logger.debug(f"Querying supported PIDs from 0x{base_pid:02X}")
for attempt in range(retries):
if attempt > 0:
self._logger.info(f"Retry {attempt}/{retries} after {retry_delay}s...")
time.sleep(retry_delay)
supported = self.protocol.query_supported_pids(base_pid)
if not supported:
self._logger.debug(f"No response for base PID 0x{base_pid:02X}")
break
self._logger.info(f"Starting PID scan (attempt {attempt + 1}/{retries})...")
self._supported_pids.clear()
self._supported_pids.update(supported)
self._logger.debug(f"Found {len(supported)} supported PIDs")
for base_pid in self.SUPPORTED_PID_QUERIES:
self._logger.debug(f"Querying supported PIDs from 0x{base_pid:02X}")
next_base = base_pid + 0x20
if next_base not in supported:
break
supported = self.protocol.query_supported_pids(base_pid)
if not supported:
self._logger.debug(f"No response for base PID 0x{base_pid:02X}")
break
self._supported_pids.update(supported)
self._logger.debug(f"Found {len(supported)} supported PIDs")
next_base = base_pid + 0x20
if next_base not in supported:
break
# Success if we found any PIDs
if self._supported_pids:
self._scanned = True
self._logger.info(f"Scan complete. Found {len(self._supported_pids)} supported PIDs")
if self.cache_file:
self._save_cache()
return self.supported_pids
self._logger.warning(f"Scan failed after {retries} attempts")
self._scanned = True
self._logger.info(f"Scan complete. Found {len(self._supported_pids)} supported PIDs")
if self.cache_file:
self._save_cache()
return self.supported_pids
def get_readable_pids(self) -> List[int]: