Files
carpibord/obd2_client/src/flipper/pages.py

684 lines
24 KiB
Python

"""Page definitions and dynamic content generators."""
from typing import List, Dict, Callable, Optional, Any, Tuple
from dataclasses import dataclass
from enum import Enum
import socket
import os
import logging
from .protocol import Page, PageType
_logger = logging.getLogger("obd2_client.pages")
class ActionID(Enum):
"""Action identifiers for menu items."""
RECONNECT_OBD = "reconnect_obd"
RESTART_SERVICE = "restart_service"
REBOOT_SYSTEM = "reboot_system"
SHUTDOWN_SYSTEM = "shutdown_system"
CLEAR_CACHE = "clear_cache"
TOGGLE_DEBUG = "toggle_debug"
@dataclass
class PageDefinition:
"""Page definition with content generator."""
page_type: PageType
title: str
generator: Callable[["PageManager"], Page]
actions: Optional[List[ActionID]] = None
class PageManager:
"""Manages page content and navigation."""
MAX_VISIBLE_LINES = 4 # Flipper display limit (5 lines overlap with footer)
def __init__(self):
self._pages: List[PageDefinition] = []
self._current_index: int = 0
self._scroll_offsets: Dict[int, int] = {} # page_index -> scroll offset
self._pending_action: Optional[ActionID] = None
self._data_providers: Dict[str, Callable[[], Any]] = {}
self._action_handlers: Dict[ActionID, Callable[[], bool]] = {}
self._debug_enabled: bool = False
self._register_default_pages()
def _register_default_pages(self) -> None:
"""Register default page definitions."""
# Page 0: Live Vehicle Data
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="Live Data",
generator=self._generate_live_data_page,
))
# Page 1: Statistics
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="Statistics",
generator=self._generate_stats_page,
))
# Page 2: System Info
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="System Info",
generator=self._generate_system_page,
))
# Page 3: App Status (handlers, storage, sync state)
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="App Status",
generator=self._generate_app_status_page,
))
# Page 4: UDS Extended Data
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="UDS Data",
generator=self._generate_uds_page,
))
# Page 5: UPS Status
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="UPS Status",
generator=self._generate_ups_page,
))
# Page 6: Actions Menu (last navigable page)
self._pages.append(PageDefinition(
page_type=PageType.MENU,
title="Actions",
generator=self._generate_actions_page,
actions=[
ActionID.RECONNECT_OBD,
ActionID.CLEAR_CACHE,
ActionID.REBOOT_SYSTEM,
ActionID.SHUTDOWN_SYSTEM,
],
))
# Page 6: Confirm (dynamic, hidden from navigation)
self._pages.append(PageDefinition(
page_type=PageType.CONFIRM,
title="Confirm",
generator=self._generate_confirm_page,
))
@property
def total_pages(self) -> int:
"""Total number of pages (excluding hidden confirm page)."""
return len(self._pages) - 1
@property
def current_index(self) -> int:
"""Current page index."""
return self._current_index
def set_data_provider(self, name: str, provider: Callable[[], Any]) -> None:
"""Register a data provider function.
Args:
name: Provider name (e.g., 'vehicle_state', 'poller_stats')
provider: Callable that returns current data
"""
self._data_providers[name] = provider
def set_action_handler(self, action_id: ActionID | str, handler: Callable[[], bool]) -> None:
"""Register an action handler.
Args:
action_id: Action identifier (ActionID enum or string)
handler: Callable that executes action, returns True on success
"""
self._action_handlers[action_id] = handler
def add_page(self, page: Any) -> None:
"""Add a custom page to the page manager.
Args:
page: Page object with get_content(), is_enabled(), name, title attributes
"""
# Create a PageDefinition wrapper for custom page
def generator(mgr: "PageManager") -> Page:
content = page.get_content()
return Page(
page_type=PageType(content.page_type.value),
title=content.title,
lines=content.lines,
actions=content.actions,
selected=content.selected,
)
# Insert before the confirm page (last page)
page_def = PageDefinition(
page_type=PageType.INFO,
title=page.title,
generator=generator,
)
# Insert at position -1 (before confirm page)
self._pages.insert(len(self._pages) - 1, page_def)
_logger.info(f"Added custom page: {page.name}")
def get_data(self, name: str, default: Any = None) -> Any:
"""Get data from a provider.
Args:
name: Provider name
default: Default value if provider not found
Returns:
Data from provider or default
"""
provider = self._data_providers.get(name)
if provider:
try:
return provider()
except Exception:
pass
return default
def get_current_page(self) -> Page:
"""Get current page content with scroll offset applied."""
if 0 <= self._current_index < len(self._pages):
page_def = self._pages[self._current_index]
page = page_def.generator(self)
# Apply scrolling for info pages with more than MAX_VISIBLE_LINES
if page.page_type == PageType.INFO and len(page.lines) > self.MAX_VISIBLE_LINES:
offset = self._scroll_offsets.get(self._current_index, 0)
max_offset = len(page.lines) - self.MAX_VISIBLE_LINES
# Clamp offset
offset = max(0, min(offset, max_offset))
self._scroll_offsets[self._current_index] = offset
# Apply scroll - show lines[offset:offset+5]
visible_lines = page.lines[offset:offset + self.MAX_VISIBLE_LINES]
# Add scroll indicators
if offset > 0:
visible_lines[0] = "^ " + visible_lines[0][:30]
if offset < max_offset:
visible_lines[-1] = "v " + visible_lines[-1][:30]
page.lines = visible_lines
return page
return Page(PageType.INFO, "Error", ["Invalid page index"])
def scroll_up(self) -> bool:
"""Scroll current page up.
Returns:
True if scroll occurred
"""
if self._current_index not in range(len(self._pages)):
_logger.debug(f"scroll_up: invalid page index {self._current_index}")
return False
page_def = self._pages[self._current_index]
if page_def.page_type != PageType.INFO:
_logger.debug(f"scroll_up: page type is {page_def.page_type}, not INFO")
return False
current_offset = self._scroll_offsets.get(self._current_index, 0)
_logger.debug(f"scroll_up: current_offset={current_offset}")
if current_offset > 0:
self._scroll_offsets[self._current_index] = current_offset - 1
_logger.debug(f"scroll_up: new offset={current_offset - 1}")
return True
_logger.debug("scroll_up: already at top")
return False
def scroll_down(self) -> bool:
"""Scroll current page down.
Returns:
True if scroll occurred
"""
if self._current_index not in range(len(self._pages)):
_logger.debug(f"scroll_down: invalid page index {self._current_index}")
return False
page_def = self._pages[self._current_index]
if page_def.page_type != PageType.INFO:
_logger.debug(f"scroll_down: page type is {page_def.page_type}, not INFO")
return False
# Get full page content to know total lines
page = page_def.generator(self)
total_lines = len(page.lines)
max_offset = max(0, total_lines - self.MAX_VISIBLE_LINES)
current_offset = self._scroll_offsets.get(self._current_index, 0)
_logger.debug(f"scroll_down: total_lines={total_lines}, max_offset={max_offset}, current_offset={current_offset}")
if current_offset < max_offset:
self._scroll_offsets[self._current_index] = current_offset + 1
_logger.debug(f"scroll_down: new offset={current_offset + 1}")
return True
_logger.debug("scroll_down: already at bottom")
return False
def navigate_next(self) -> bool:
"""Navigate to next page.
Returns:
True if navigation occurred
"""
if self._current_index < self.total_pages - 1:
self._current_index += 1
# Reset scroll for new page
self._scroll_offsets[self._current_index] = 0
return True
return False
def navigate_prev(self) -> bool:
"""Navigate to previous page.
Returns:
True if navigation occurred
"""
if self._current_index > 0:
self._current_index -= 1
# Reset scroll for new page
self._scroll_offsets[self._current_index] = 0
return True
return False
def select_action(self, index: int) -> Optional[ActionID]:
"""Select a menu action by index.
Args:
index: Action index (0-3)
Returns:
ActionID if requires confirmation, None otherwise
"""
page_def = self._pages[self._current_index]
if page_def.page_type != PageType.MENU or not page_def.actions:
return None
if 0 <= index < len(page_def.actions):
action_id = page_def.actions[index]
# Actions requiring confirmation
if action_id in (ActionID.REBOOT_SYSTEM, ActionID.SHUTDOWN_SYSTEM):
self._pending_action = action_id
self._current_index = len(self._pages) - 1 # Go to confirm page
return action_id
# Execute directly
return action_id
return None
def execute_action(self, action_id: ActionID) -> tuple[bool, str]:
"""Execute an action.
Args:
action_id: Action to execute
Returns:
Tuple of (success, message)
"""
handler = self._action_handlers.get(action_id)
if handler:
try:
success = handler()
if success:
return True, self._get_action_success_message(action_id)
else:
return False, "Action failed"
except Exception as e:
return False, str(e)[:32]
return False, "No handler"
def confirm_action(self) -> tuple[bool, str]:
"""Confirm pending action.
Returns:
Tuple of (success, message)
"""
if self._pending_action:
action = self._pending_action
self._pending_action = None
self._current_index = 6 # Back to actions menu
return self.execute_action(action)
return False, "No pending action"
def cancel_action(self) -> None:
"""Cancel pending action."""
self._pending_action = None
self._current_index = 6 # Back to actions menu
def _get_action_success_message(self, action_id: ActionID) -> str:
"""Get success message for action."""
messages = {
ActionID.RECONNECT_OBD: "OBD2 reconnected",
ActionID.RESTART_SERVICE: "Service restarted",
ActionID.REBOOT_SYSTEM: "Rebooting...",
ActionID.SHUTDOWN_SYSTEM: "Shutting down...",
ActionID.CLEAR_CACHE: "Cache cleared",
ActionID.TOGGLE_DEBUG: "Debug toggled",
}
return messages.get(action_id, "Done")
# Page generators
def _generate_live_data_page(self, mgr: "PageManager") -> Page:
"""Generate live vehicle data page with all available PIDs."""
state = mgr.get_data("vehicle_state")
lines = []
if state:
all_values = state.get_all()
_logger.debug(f"Live data: got {len(all_values)} PIDs from state")
if all_values:
# Format functions for different value types
def fmt_int(val):
return f"{val.value:.0f} {val.unit}"
def fmt_float(val):
return f"{val.value:.1f} {val.unit}"
# Display order with custom formatting
display_order = [
(0x0C, "RPM", fmt_int), # Engine RPM
(0x0D, "Speed", fmt_int), # Vehicle speed
(0x05, "Coolant", fmt_int), # Coolant temp
(0x5C, "Oil temp", fmt_int), # Oil temp
(0x0F, "Intake temp", fmt_int), # Intake temp
(0x11, "Throttle", fmt_float), # Throttle pos
(0x04, "Engine load", fmt_float), # Engine load
(0x2F, "Fuel level", fmt_float), # Fuel level
(0x10, "MAF", fmt_float), # MAF rate
]
shown_pids = set()
for pid_code, label, formatter in display_order:
if pid_code in all_values:
val = all_values[pid_code]
lines.append(f"{label}: {formatter(val)}")
shown_pids.add(pid_code)
# Add any other PIDs not in the display order
for pid_code, val in sorted(all_values.items()):
if pid_code not in shown_pids:
lines.append(f"{val.name}: {val.value:.1f} {val.unit}")
if not lines:
lines = ["Waiting for data...", "", "Polling active"]
else:
lines = ["No OBD2 connection", "", "Use Actions menu", "to reconnect"]
_logger.debug(f"Live data page: {len(lines)} lines")
return Page(PageType.INFO, "Live Data", lines)
def _generate_stats_page(self, mgr: "PageManager") -> Page:
"""Generate statistics page."""
stats = mgr.get_data("poller_stats", {})
uptime = mgr.get_data("uptime", 0)
queries = stats.get("queries", 0)
successes = stats.get("successes", 0)
failures = stats.get("failures", 0)
rate = (successes / queries * 100) if queries > 0 else 0
hours = int(uptime // 3600)
minutes = int((uptime % 3600) // 60)
seconds = int(uptime % 60)
# Calculate queries per second
qps = queries / uptime if uptime > 0 else 0
lines = [
f"Queries: {queries}",
f"Success: {successes}",
f"Failures: {failures}",
f"Rate: {rate:.1f}%",
f"Q/sec: {qps:.1f}",
f"Uptime: {hours}h {minutes}m {seconds}s",
]
return Page(PageType.INFO, "Statistics", lines)
def _generate_app_status_page(self, mgr: "PageManager") -> Page:
"""Generate application status page with config and handler states."""
lines = []
# Pipeline stats
pipeline_stats = mgr.get_data("pipeline_stats", {})
if pipeline_stats:
total = pipeline_stats.get("total_readings", 0)
handlers = pipeline_stats.get("handlers", {})
lines.append(f"Readings: {total}")
# Storage handler status
storage = handlers.get("storage", {})
if storage:
saved = storage.get("saved_count", 0)
pending = storage.get("pending_in_batch", 0)
db_size = storage.get("db_size_mb", 0)
lines.append(f"SQLite: {saved} saved")
lines.append(f"Pending: {pending}, DB: {db_size}MB")
# PostgreSQL handler status
pg = handlers.get("postgresql", {})
if pg:
connected = pg.get("connected", False)
synced = pg.get("synced_count", 0)
status = "OK" if connected else "Offline"
lines.append(f"PG: {status}, synced: {synced}")
else:
lines.append("Pipeline: Not active")
# Config info
config = mgr.get_data("config")
if config:
lines.append(f"CAN: {config.can.interface}")
if config.postgresql.enabled:
lines.append(f"PG: {config.postgresql.host}")
else:
lines.append("PG: Disabled")
# Session info
session = mgr.get_data("session_id")
if session:
lines.append(f"Session: {session}")
if not lines:
lines = ["No status data", "available"]
return Page(PageType.INFO, "App Status", lines)
def _generate_system_page(self, mgr: "PageManager") -> Page:
"""Generate system info page."""
ip = self._get_ip_address()
cpu_temp = self._get_cpu_temp()
mem_percent, mem_used, mem_total = self._get_memory_info()
load = self._get_load_average()
lines = [
f"IP: {ip}",
f"CPU Temp: {cpu_temp:.1f} C" if cpu_temp else "CPU Temp: ---",
f"CPU Load: {load}" if load else "CPU Load: ---",
f"Mem: {mem_percent:.0f}%" if mem_percent else "Mem: ---",
f"Mem: {mem_used}/{mem_total}MB" if mem_used else "",
f"CAN: {mgr.get_data('can_interface', 'can0')}",
f"Flipper: Connected",
]
# Remove empty lines
lines = [l for l in lines if l]
return Page(PageType.INFO, "System", lines)
def _generate_uds_page(self, mgr: "PageManager") -> Page:
"""Generate UDS extended diagnostics page."""
uds_values = mgr.get_data("uds_values", {})
uds_stats = mgr.get_data("uds_stats", {})
if not uds_values and not uds_stats:
lines = ["UDS not enabled", "", "Enable in config.json:", '"uds": {"enabled": true}']
return Page(PageType.INFO, "UDS Data", lines)
lines = []
# Key UDS values
# Boost pressure (0x202A)
if 0x202A in uds_values:
v = uds_values[0x202A]
lines.append(f"Boost: {v.value:.0f} kPa")
# Torque (0x437C)
if 0x437C in uds_values:
v = uds_values[0x437C]
lines.append(f"Torque: {v.value:.0f} Nm")
# Lambda (0x10C0)
if 0x10C0 in uds_values:
v = uds_values[0x10C0]
lines.append(f"Lambda: {v.value:.3f}")
# Ignition timing (0x2004)
if 0x2004 in uds_values:
v = uds_values[0x2004]
lines.append(f"Timing: {v.value:.1f} deg")
# Wastegate (0x39A2)
if 0x39A2 in uds_values:
v = uds_values[0x39A2]
lines.append(f"Wastegate: {v.value:.0f}%")
# Gear (0x3816)
if 0x3816 in uds_values:
v = uds_values[0x3816]
gear = int(v.value)
gear_str = "R" if gear == -1 else ("N" if gear == 0 else str(gear))
lines.append(f"Gear: {gear_str}")
# Stats
if uds_stats:
queries = uds_stats.get("queries", 0)
successes = uds_stats.get("successes", 0)
rate = (successes / queries * 100) if queries > 0 else 0
lines.append(f"UDS: {rate:.0f}% success")
if not lines:
lines = ["Waiting for UDS data..."]
return Page(PageType.INFO, "UDS Data", lines)
def _generate_actions_page(self, mgr: "PageManager") -> Page:
"""Generate actions menu page."""
actions = [
"Reconnect OBD2",
"Clear PID Cache",
"Reboot System",
"Shutdown System",
]
return Page(PageType.MENU, "Actions", actions=actions)
def _generate_ups_page(self, mgr: "PageManager") -> Page:
"""Generate UPS status page."""
ups_data = mgr.get_data("ups")
if ups_data is None:
lines = ["UPS not available", "", "Check I2C connection"]
return Page(PageType.INFO, "UPS Status", lines)
# Battery bar visualization
capacity = ups_data.capacity if hasattr(ups_data, 'capacity') else 0
filled = int(capacity / 16.67) # 6 segments
empty = 6 - filled
bar = "[" + "=" * filled + " " * empty + "]"
lines = [
f"Bat: {capacity:.1f}% {bar}",
f"Voltage: {ups_data.voltage:.2f}V",
]
if hasattr(ups_data, 'power_loss') and ups_data.power_loss:
lines.append("Power: BATTERY MODE")
if capacity < 15:
lines.append("!! CRITICAL !!")
elif capacity < 25:
lines.append("! LOW BATTERY !")
else:
status = "Charging" if getattr(ups_data, 'is_charging', False) else "Full"
lines.append(f"Status: {status}")
input_v = getattr(ups_data, 'input_voltage', 0)
if input_v > 0:
lines.append(f"Input: {input_v:.2f}V")
return Page(PageType.INFO, "UPS Status", lines)
def _generate_confirm_page(self, mgr: "PageManager") -> Page:
"""Generate confirmation page."""
if mgr._pending_action == ActionID.REBOOT_SYSTEM:
lines = ["Reboot system?", "", "All data will", "be lost"]
title = "Confirm Reboot"
elif mgr._pending_action == ActionID.SHUTDOWN_SYSTEM:
lines = ["Shutdown system?", "", "Manual restart", "required"]
title = "Confirm Shutdown"
else:
lines = ["Confirm action?"]
title = "Confirm"
return Page(PageType.CONFIRM, title, lines)
@staticmethod
def _get_ip_address() -> str:
"""Get local IP address."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "Unknown"
@staticmethod
def _get_cpu_temp() -> Optional[float]:
"""Get CPU temperature (Linux only)."""
try:
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
return int(f.read().strip()) / 1000.0
except Exception:
return None
@staticmethod
def _get_memory_info() -> Tuple[Optional[float], Optional[int], Optional[int]]:
"""Get memory info (percent, used_mb, total_mb)."""
try:
with open("/proc/meminfo", "r") as f:
lines = f.readlines()
total_kb = int(lines[0].split()[1])
available_kb = int(lines[2].split()[1])
used_kb = total_kb - available_kb
percent = (used_kb / total_kb) * 100
return percent, used_kb // 1024, total_kb // 1024
except Exception:
return None, None, None
@staticmethod
def _get_load_average() -> Optional[str]:
"""Get system load average."""
try:
with open("/proc/loadavg", "r") as f:
parts = f.read().split()
return f"{parts[0]} {parts[1]} {parts[2]}"
except Exception:
return None