increased time for request
This commit is contained in:
@@ -8,13 +8,13 @@
|
|||||||
"request_id": 2015,
|
"request_id": 2015,
|
||||||
"response_id_start": 2024,
|
"response_id_start": 2024,
|
||||||
"response_id_end": 2031,
|
"response_id_end": 2031,
|
||||||
"request_timeout_ms": 100,
|
"request_timeout_ms": 150,
|
||||||
"retry_count": 2,
|
"retry_count": 2,
|
||||||
"auto_discover": true,
|
"auto_discover": true,
|
||||||
"polling_groups": [
|
"polling_groups": [
|
||||||
{
|
{
|
||||||
"name": "fast",
|
"name": "fast",
|
||||||
"interval_ms": 100,
|
"interval_ms": 200,
|
||||||
"pids": ["0C", "0D", "11"],
|
"pids": ["0C", "0D", "11"],
|
||||||
"enabled": true
|
"enabled": true
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -121,6 +121,9 @@ class OBD2Poller:
|
|||||||
poller.start()
|
poller.start()
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Minimum delay between any two OBD2 requests (ms)
|
||||||
|
MIN_REQUEST_INTERVAL_MS = 50
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
transceiver: CANTransceiver,
|
transceiver: CANTransceiver,
|
||||||
@@ -151,6 +154,10 @@ class OBD2Poller:
|
|||||||
self._poll_thread: Optional[threading.Thread] = None
|
self._poll_thread: Optional[threading.Thread] = None
|
||||||
self._pid_registry = get_pid_registry()
|
self._pid_registry = get_pid_registry()
|
||||||
|
|
||||||
|
# Track last request time for rate limiting
|
||||||
|
self._last_request_time: float = 0.0
|
||||||
|
self._request_lock = threading.Lock()
|
||||||
|
|
||||||
# Connect matcher callbacks
|
# Connect matcher callbacks
|
||||||
self._matcher.set_reading_callback(self._on_reading)
|
self._matcher.set_reading_callback(self._on_reading)
|
||||||
self._matcher.set_retry_callback(self._on_retry)
|
self._matcher.set_retry_callback(self._on_retry)
|
||||||
@@ -262,6 +269,27 @@ class OBD2Poller:
|
|||||||
self._stats.polling_active = True
|
self._stats.polling_active = True
|
||||||
logger.info("OBD2 Poller resumed")
|
logger.info("OBD2 Poller resumed")
|
||||||
|
|
||||||
|
def _can_send_request(self) -> bool:
|
||||||
|
"""Check if enough time has passed since last request."""
|
||||||
|
with self._request_lock:
|
||||||
|
elapsed_ms = (time.time() - self._last_request_time) * 1000
|
||||||
|
return elapsed_ms >= self.MIN_REQUEST_INTERVAL_MS
|
||||||
|
|
||||||
|
def _send_request(self, request: OBD2Request) -> bool:
|
||||||
|
"""Send request with rate limiting."""
|
||||||
|
with self._request_lock:
|
||||||
|
# Wait if needed
|
||||||
|
elapsed_ms = (time.time() - self._last_request_time) * 1000
|
||||||
|
if elapsed_ms < self.MIN_REQUEST_INTERVAL_MS:
|
||||||
|
wait_time = (self.MIN_REQUEST_INTERVAL_MS - elapsed_ms) / 1000.0
|
||||||
|
time.sleep(wait_time)
|
||||||
|
|
||||||
|
# Send
|
||||||
|
if self._transceiver.send_request(request):
|
||||||
|
self._last_request_time = time.time()
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def request_pid(
|
def request_pid(
|
||||||
self,
|
self,
|
||||||
pid: int,
|
pid: int,
|
||||||
@@ -279,7 +307,7 @@ class OBD2Poller:
|
|||||||
"""
|
"""
|
||||||
request = OBD2Request(mode=OBD2Mode.CURRENT_DATA, pid=pid)
|
request = OBD2Request(mode=OBD2Mode.CURRENT_DATA, pid=pid)
|
||||||
|
|
||||||
if self._transceiver.send_request(request):
|
if self._send_request(request):
|
||||||
self._matcher.register_request(request, callback)
|
self._matcher.register_request(request, callback)
|
||||||
with self._stats_lock:
|
with self._stats_lock:
|
||||||
self._stats.total_requests += 1
|
self._stats.total_requests += 1
|
||||||
@@ -299,7 +327,7 @@ class OBD2Poller:
|
|||||||
|
|
||||||
for pid in discovery_pids:
|
for pid in discovery_pids:
|
||||||
self.request_pid(pid, self._on_discovery_response)
|
self.request_pid(pid, self._on_discovery_response)
|
||||||
time.sleep(0.05) # Small delay between requests
|
# Rate limiting is handled by request_pid
|
||||||
|
|
||||||
def get_stats(self) -> Dict[str, Any]:
|
def get_stats(self) -> Dict[str, Any]:
|
||||||
"""Get poller statistics."""
|
"""Get poller statistics."""
|
||||||
@@ -338,7 +366,7 @@ class OBD2Poller:
|
|||||||
# Auto-discover if enabled
|
# Auto-discover if enabled
|
||||||
if self._auto_discover:
|
if self._auto_discover:
|
||||||
self.discover_supported_pids()
|
self.discover_supported_pids()
|
||||||
time.sleep(0.5) # Wait for discovery responses
|
time.sleep(1.0) # Wait for discovery responses
|
||||||
|
|
||||||
self._state = PollerState.RUNNING
|
self._state = PollerState.RUNNING
|
||||||
with self._stats_lock:
|
with self._stats_lock:
|
||||||
@@ -350,20 +378,29 @@ class OBD2Poller:
|
|||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Poll each group that's due
|
# Find the next group that's due and poll ONE PID
|
||||||
|
polled = False
|
||||||
for name, group_state in self._groups.items():
|
for name, group_state in self._groups.items():
|
||||||
if not group_state.group.enabled:
|
if not group_state.group.enabled:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if group_state.is_due:
|
if group_state.is_due and self._can_send_request():
|
||||||
self._poll_group(group_state)
|
self._poll_group(group_state)
|
||||||
|
polled = True
|
||||||
|
break # Only poll one PID per iteration
|
||||||
|
|
||||||
# Small sleep to prevent busy loop
|
# Sleep based on whether we polled
|
||||||
time.sleep(0.001)
|
if polled:
|
||||||
|
# Short sleep, rate limiting is in _send_request
|
||||||
|
time.sleep(0.01)
|
||||||
|
else:
|
||||||
|
# No polling due, sleep longer
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Poll loop error: {e}")
|
logger.error(f"Poll loop error: {e}")
|
||||||
self._state = PollerState.ERROR
|
self._state = PollerState.ERROR
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
logger.debug("Poll loop stopped")
|
logger.debug("Poll loop stopped")
|
||||||
|
|
||||||
@@ -376,12 +413,13 @@ class OBD2Poller:
|
|||||||
# Skip if PID not in supported set (if we have discovery data)
|
# Skip if PID not in supported set (if we have discovery data)
|
||||||
with self._stats_lock:
|
with self._stats_lock:
|
||||||
if self._stats.supported_pids and pid not in self._stats.supported_pids:
|
if self._stats.supported_pids and pid not in self._stats.supported_pids:
|
||||||
# Skip unsupported PIDs silently
|
# Skip unsupported PIDs, but still update timing
|
||||||
|
group_state.last_poll_time = time.time()
|
||||||
return
|
return
|
||||||
|
|
||||||
request = OBD2Request(mode=OBD2Mode.CURRENT_DATA, pid=pid)
|
request = OBD2Request(mode=OBD2Mode.CURRENT_DATA, pid=pid)
|
||||||
|
|
||||||
if self._transceiver.send_request(request):
|
if self._send_request(request):
|
||||||
self._matcher.register_request(request)
|
self._matcher.register_request(request)
|
||||||
group_state.last_poll_time = time.time()
|
group_state.last_poll_time = time.time()
|
||||||
|
|
||||||
@@ -411,10 +449,11 @@ class OBD2Poller:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Reading callback error: {e}")
|
logger.error(f"Reading callback error: {e}")
|
||||||
|
|
||||||
def _on_retry(self, request: OBD2Request) -> None:
|
def _on_retry(self, request: OBD2Request, retry_count: int) -> None:
|
||||||
"""Handle retry request from matcher."""
|
"""Handle retry request from matcher."""
|
||||||
self._transceiver.send_request(request)
|
# Use rate-limited send
|
||||||
self._matcher.register_request(request)
|
if self._send_request(request):
|
||||||
|
self._matcher.register_request(request, retry_count=retry_count)
|
||||||
|
|
||||||
def _on_discovery_response(self, reading: OBD2Reading) -> None:
|
def _on_discovery_response(self, reading: OBD2Reading) -> None:
|
||||||
"""Handle PID discovery response."""
|
"""Handle PID discovery response."""
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ class ResponseMatcher:
|
|||||||
timeout_ms: int = 100,
|
timeout_ms: int = 100,
|
||||||
max_retries: int = 2,
|
max_retries: int = 2,
|
||||||
reading_callback: Optional[Callable[[OBD2Reading], None]] = None,
|
reading_callback: Optional[Callable[[OBD2Reading], None]] = None,
|
||||||
retry_callback: Optional[Callable[[OBD2Request], None]] = None,
|
retry_callback: Optional[Callable[[OBD2Request, int], None]] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Initialize response matcher.
|
Initialize response matcher.
|
||||||
@@ -166,6 +166,7 @@ class ResponseMatcher:
|
|||||||
self,
|
self,
|
||||||
request: OBD2Request,
|
request: OBD2Request,
|
||||||
callback: Optional[Callable[[OBD2Reading], None]] = None,
|
callback: Optional[Callable[[OBD2Reading], None]] = None,
|
||||||
|
retry_count: int = 0,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Register a sent request for response matching.
|
Register a sent request for response matching.
|
||||||
@@ -173,6 +174,7 @@ class ResponseMatcher:
|
|||||||
Args:
|
Args:
|
||||||
request: The OBD2 request that was sent
|
request: The OBD2 request that was sent
|
||||||
callback: Optional per-request callback
|
callback: Optional per-request callback
|
||||||
|
retry_count: Current retry count (for retry tracking)
|
||||||
"""
|
"""
|
||||||
pending = PendingRequest(
|
pending = PendingRequest(
|
||||||
request=request,
|
request=request,
|
||||||
@@ -180,14 +182,17 @@ class ResponseMatcher:
|
|||||||
timeout_ms=self.timeout_ms,
|
timeout_ms=self.timeout_ms,
|
||||||
callback=callback,
|
callback=callback,
|
||||||
max_retries=self.max_retries,
|
max_retries=self.max_retries,
|
||||||
|
retry_count=retry_count,
|
||||||
)
|
)
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
# Replace if already exists (retry case)
|
# Replace if already exists (retry case)
|
||||||
self._pending[request.request_id] = pending
|
self._pending[request.request_id] = pending
|
||||||
|
|
||||||
with self._stats_lock:
|
# Only count as new request if not a retry
|
||||||
self._stats.requests_sent += 1
|
if retry_count == 0:
|
||||||
|
with self._stats_lock:
|
||||||
|
self._stats.requests_sent += 1
|
||||||
|
|
||||||
def match_response(self, response: OBD2Response) -> Optional[OBD2Reading]:
|
def match_response(self, response: OBD2Response) -> Optional[OBD2Reading]:
|
||||||
"""
|
"""
|
||||||
@@ -316,7 +321,8 @@ class ResponseMatcher:
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._retry_callback(pending.request)
|
# Pass retry_count so it's preserved on re-registration
|
||||||
|
self._retry_callback(pending.request, pending.retry_count)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Retry callback error: {e}")
|
logger.error(f"Retry callback error: {e}")
|
||||||
with self._stats_lock:
|
with self._stats_lock:
|
||||||
@@ -336,7 +342,7 @@ class ResponseMatcher:
|
|||||||
|
|
||||||
def set_retry_callback(
|
def set_retry_callback(
|
||||||
self,
|
self,
|
||||||
callback: Callable[[OBD2Request], None]
|
callback: Callable[[OBD2Request, int], None]
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Set callback for retry requests."""
|
"""Set callback for retry requests (receives request and retry_count)."""
|
||||||
self._retry_callback = callback
|
self._retry_callback = callback
|
||||||
|
|||||||
@@ -320,8 +320,17 @@ class CANTransceiver:
|
|||||||
"""Transmit loop - runs in dedicated thread."""
|
"""Transmit loop - runs in dedicated thread."""
|
||||||
logger.debug("TX loop started")
|
logger.debug("TX loop started")
|
||||||
|
|
||||||
|
backoff_time = 0.0
|
||||||
|
max_backoff = 0.5 # Maximum 500ms backoff
|
||||||
|
consecutive_errors = 0
|
||||||
|
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
|
# Apply backoff if we had errors
|
||||||
|
if backoff_time > 0:
|
||||||
|
time.sleep(backoff_time)
|
||||||
|
backoff_time = 0.0
|
||||||
|
|
||||||
# Get frame from queue with timeout
|
# Get frame from queue with timeout
|
||||||
frame = self._tx_queue.get(timeout=0.1)
|
frame = self._tx_queue.get(timeout=0.1)
|
||||||
|
|
||||||
@@ -336,13 +345,16 @@ class CANTransceiver:
|
|||||||
dlc=frame.dlc
|
dlc=frame.dlc
|
||||||
)
|
)
|
||||||
|
|
||||||
# Send
|
# Send with timeout to avoid blocking forever
|
||||||
self._bus.send(msg)
|
self._bus.send(msg, timeout=0.1)
|
||||||
|
|
||||||
with self._stats_lock:
|
with self._stats_lock:
|
||||||
self._stats.tx_count += 1
|
self._stats.tx_count += 1
|
||||||
self._stats.last_tx_time = time.time()
|
self._stats.last_tx_time = time.time()
|
||||||
|
|
||||||
|
# Reset error count on success
|
||||||
|
consecutive_errors = 0
|
||||||
|
|
||||||
except Empty:
|
except Empty:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -350,12 +362,24 @@ class CANTransceiver:
|
|||||||
with self._stats_lock:
|
with self._stats_lock:
|
||||||
self._stats.tx_errors += 1
|
self._stats.tx_errors += 1
|
||||||
self._stats.bus_errors += 1
|
self._stats.bus_errors += 1
|
||||||
logger.warning(f"CAN TX error: {e}")
|
|
||||||
|
consecutive_errors += 1
|
||||||
|
error_str = str(e).lower()
|
||||||
|
|
||||||
|
# Check for buffer full error and apply backoff
|
||||||
|
if "buffer full" in error_str or "transmit" in error_str:
|
||||||
|
# Exponential backoff: 50ms, 100ms, 200ms, up to max_backoff
|
||||||
|
backoff_time = min(0.05 * (2 ** (consecutive_errors - 1)), max_backoff)
|
||||||
|
logger.debug(f"CAN TX buffer full, backing off {backoff_time*1000:.0f}ms")
|
||||||
|
else:
|
||||||
|
logger.warning(f"CAN TX error: {e}")
|
||||||
|
backoff_time = 0.02 # Small delay on other errors
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
with self._stats_lock:
|
with self._stats_lock:
|
||||||
self._stats.tx_errors += 1
|
self._stats.tx_errors += 1
|
||||||
logger.error(f"TX loop error: {e}")
|
logger.error(f"TX loop error: {e}")
|
||||||
|
backoff_time = 0.05 # Delay on unexpected errors
|
||||||
|
|
||||||
logger.debug("TX loop stopped")
|
logger.debug("TX loop stopped")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user