diff --git a/can_sniffer/config.json.example b/can_sniffer/config.json.example index 4af9442..626b3a3 100644 --- a/can_sniffer/config.json.example +++ b/can_sniffer/config.json.example @@ -8,13 +8,13 @@ "request_id": 2015, "response_id_start": 2024, "response_id_end": 2031, - "request_timeout_ms": 100, + "request_timeout_ms": 150, "retry_count": 2, "auto_discover": true, "polling_groups": [ { "name": "fast", - "interval_ms": 100, + "interval_ms": 200, "pids": ["0C", "0D", "11"], "enabled": true }, diff --git a/can_sniffer/src/obd2/poller.py b/can_sniffer/src/obd2/poller.py index d85ca0d..2b2d74f 100644 --- a/can_sniffer/src/obd2/poller.py +++ b/can_sniffer/src/obd2/poller.py @@ -121,6 +121,9 @@ class OBD2Poller: poller.start() """ + # Minimum delay between any two OBD2 requests (ms) + MIN_REQUEST_INTERVAL_MS = 50 + def __init__( self, transceiver: CANTransceiver, @@ -151,6 +154,10 @@ class OBD2Poller: self._poll_thread: Optional[threading.Thread] = None self._pid_registry = get_pid_registry() + # Track last request time for rate limiting + self._last_request_time: float = 0.0 + self._request_lock = threading.Lock() + # Connect matcher callbacks self._matcher.set_reading_callback(self._on_reading) self._matcher.set_retry_callback(self._on_retry) @@ -262,6 +269,27 @@ class OBD2Poller: self._stats.polling_active = True logger.info("OBD2 Poller resumed") + def _can_send_request(self) -> bool: + """Check if enough time has passed since last request.""" + with self._request_lock: + elapsed_ms = (time.time() - self._last_request_time) * 1000 + return elapsed_ms >= self.MIN_REQUEST_INTERVAL_MS + + def _send_request(self, request: OBD2Request) -> bool: + """Send request with rate limiting.""" + with self._request_lock: + # Wait if needed + elapsed_ms = (time.time() - self._last_request_time) * 1000 + if elapsed_ms < self.MIN_REQUEST_INTERVAL_MS: + wait_time = (self.MIN_REQUEST_INTERVAL_MS - elapsed_ms) / 1000.0 + time.sleep(wait_time) + + # Send + if self._transceiver.send_request(request): + self._last_request_time = time.time() + return True + return False + def request_pid( self, pid: int, @@ -279,7 +307,7 @@ class OBD2Poller: """ request = OBD2Request(mode=OBD2Mode.CURRENT_DATA, pid=pid) - if self._transceiver.send_request(request): + if self._send_request(request): self._matcher.register_request(request, callback) with self._stats_lock: self._stats.total_requests += 1 @@ -299,7 +327,7 @@ class OBD2Poller: for pid in discovery_pids: self.request_pid(pid, self._on_discovery_response) - time.sleep(0.05) # Small delay between requests + # Rate limiting is handled by request_pid def get_stats(self) -> Dict[str, Any]: """Get poller statistics.""" @@ -338,7 +366,7 @@ class OBD2Poller: # Auto-discover if enabled if self._auto_discover: self.discover_supported_pids() - time.sleep(0.5) # Wait for discovery responses + time.sleep(1.0) # Wait for discovery responses self._state = PollerState.RUNNING with self._stats_lock: @@ -350,20 +378,29 @@ class OBD2Poller: time.sleep(0.1) continue - # Poll each group that's due + # Find the next group that's due and poll ONE PID + polled = False for name, group_state in self._groups.items(): if not group_state.group.enabled: continue - if group_state.is_due: + if group_state.is_due and self._can_send_request(): self._poll_group(group_state) + polled = True + break # Only poll one PID per iteration - # Small sleep to prevent busy loop - time.sleep(0.001) + # Sleep based on whether we polled + if polled: + # Short sleep, rate limiting is in _send_request + time.sleep(0.01) + else: + # No polling due, sleep longer + time.sleep(0.02) except Exception as e: logger.error(f"Poll loop error: {e}") self._state = PollerState.ERROR + time.sleep(1.0) logger.debug("Poll loop stopped") @@ -376,12 +413,13 @@ class OBD2Poller: # Skip if PID not in supported set (if we have discovery data) with self._stats_lock: if self._stats.supported_pids and pid not in self._stats.supported_pids: - # Skip unsupported PIDs silently + # Skip unsupported PIDs, but still update timing + group_state.last_poll_time = time.time() return request = OBD2Request(mode=OBD2Mode.CURRENT_DATA, pid=pid) - if self._transceiver.send_request(request): + if self._send_request(request): self._matcher.register_request(request) group_state.last_poll_time = time.time() @@ -411,10 +449,11 @@ class OBD2Poller: except Exception as e: logger.error(f"Reading callback error: {e}") - def _on_retry(self, request: OBD2Request) -> None: + def _on_retry(self, request: OBD2Request, retry_count: int) -> None: """Handle retry request from matcher.""" - self._transceiver.send_request(request) - self._matcher.register_request(request) + # Use rate-limited send + if self._send_request(request): + self._matcher.register_request(request, retry_count=retry_count) def _on_discovery_response(self, reading: OBD2Reading) -> None: """Handle PID discovery response.""" diff --git a/can_sniffer/src/obd2/response_matcher.py b/can_sniffer/src/obd2/response_matcher.py index 4775a34..4d95506 100644 --- a/can_sniffer/src/obd2/response_matcher.py +++ b/can_sniffer/src/obd2/response_matcher.py @@ -112,7 +112,7 @@ class ResponseMatcher: timeout_ms: int = 100, max_retries: int = 2, reading_callback: Optional[Callable[[OBD2Reading], None]] = None, - retry_callback: Optional[Callable[[OBD2Request], None]] = None, + retry_callback: Optional[Callable[[OBD2Request, int], None]] = None, ): """ Initialize response matcher. @@ -166,6 +166,7 @@ class ResponseMatcher: self, request: OBD2Request, callback: Optional[Callable[[OBD2Reading], None]] = None, + retry_count: int = 0, ) -> None: """ Register a sent request for response matching. @@ -173,6 +174,7 @@ class ResponseMatcher: Args: request: The OBD2 request that was sent callback: Optional per-request callback + retry_count: Current retry count (for retry tracking) """ pending = PendingRequest( request=request, @@ -180,14 +182,17 @@ class ResponseMatcher: timeout_ms=self.timeout_ms, callback=callback, max_retries=self.max_retries, + retry_count=retry_count, ) with self._lock: # Replace if already exists (retry case) self._pending[request.request_id] = pending - with self._stats_lock: - self._stats.requests_sent += 1 + # Only count as new request if not a retry + if retry_count == 0: + with self._stats_lock: + self._stats.requests_sent += 1 def match_response(self, response: OBD2Response) -> Optional[OBD2Reading]: """ @@ -316,7 +321,8 @@ class ResponseMatcher: ) try: - self._retry_callback(pending.request) + # Pass retry_count so it's preserved on re-registration + self._retry_callback(pending.request, pending.retry_count) except Exception as e: logger.error(f"Retry callback error: {e}") with self._stats_lock: @@ -336,7 +342,7 @@ class ResponseMatcher: def set_retry_callback( self, - callback: Callable[[OBD2Request], None] + callback: Callable[[OBD2Request, int], None] ) -> None: - """Set callback for retry requests.""" + """Set callback for retry requests (receives request and retry_count).""" self._retry_callback = callback diff --git a/can_sniffer/src/obd2/transceiver.py b/can_sniffer/src/obd2/transceiver.py index 36c2c38..776cfdb 100644 --- a/can_sniffer/src/obd2/transceiver.py +++ b/can_sniffer/src/obd2/transceiver.py @@ -320,8 +320,17 @@ class CANTransceiver: """Transmit loop - runs in dedicated thread.""" logger.debug("TX loop started") + backoff_time = 0.0 + max_backoff = 0.5 # Maximum 500ms backoff + consecutive_errors = 0 + while self._running: try: + # Apply backoff if we had errors + if backoff_time > 0: + time.sleep(backoff_time) + backoff_time = 0.0 + # Get frame from queue with timeout frame = self._tx_queue.get(timeout=0.1) @@ -336,13 +345,16 @@ class CANTransceiver: dlc=frame.dlc ) - # Send - self._bus.send(msg) + # Send with timeout to avoid blocking forever + self._bus.send(msg, timeout=0.1) with self._stats_lock: self._stats.tx_count += 1 self._stats.last_tx_time = time.time() + # Reset error count on success + consecutive_errors = 0 + except Empty: continue @@ -350,12 +362,24 @@ class CANTransceiver: with self._stats_lock: self._stats.tx_errors += 1 self._stats.bus_errors += 1 - logger.warning(f"CAN TX error: {e}") + + consecutive_errors += 1 + error_str = str(e).lower() + + # Check for buffer full error and apply backoff + if "buffer full" in error_str or "transmit" in error_str: + # Exponential backoff: 50ms, 100ms, 200ms, up to max_backoff + backoff_time = min(0.05 * (2 ** (consecutive_errors - 1)), max_backoff) + logger.debug(f"CAN TX buffer full, backing off {backoff_time*1000:.0f}ms") + else: + logger.warning(f"CAN TX error: {e}") + backoff_time = 0.02 # Small delay on other errors except Exception as e: with self._stats_lock: self._stats.tx_errors += 1 logger.error(f"TX loop error: {e}") + backoff_time = 0.05 # Delay on unexpected errors logger.debug("TX loop stopped")