"""Page definitions and dynamic content generators.""" from typing import List, Dict, Callable, Optional, Any, Tuple from dataclasses import dataclass from enum import Enum import socket import os import logging from .protocol import Page, PageType _logger = logging.getLogger("obd2_client.pages") class ActionID(Enum): """Action identifiers for menu items.""" RECONNECT_OBD = "reconnect_obd" RESTART_SERVICE = "restart_service" REBOOT_SYSTEM = "reboot_system" SHUTDOWN_SYSTEM = "shutdown_system" CLEAR_CACHE = "clear_cache" TOGGLE_DEBUG = "toggle_debug" @dataclass class PageDefinition: """Page definition with content generator.""" page_type: PageType title: str generator: Callable[["PageManager"], Page] actions: Optional[List[ActionID]] = None class PageManager: """Manages page content and navigation.""" MAX_VISIBLE_LINES = 4 # Flipper display limit (5 lines overlap with footer) def __init__(self): self._pages: List[PageDefinition] = [] self._current_index: int = 0 self._scroll_offsets: Dict[int, int] = {} # page_index -> scroll offset self._pending_action: Optional[ActionID] = None self._data_providers: Dict[str, Callable[[], Any]] = {} self._action_handlers: Dict[ActionID, Callable[[], bool]] = {} self._debug_enabled: bool = False self._register_default_pages() def _register_default_pages(self) -> None: """Register default page definitions.""" # Page 0: Live Vehicle Data self._pages.append(PageDefinition( page_type=PageType.INFO, title="Live Data", generator=self._generate_live_data_page, )) # Page 1: Statistics self._pages.append(PageDefinition( page_type=PageType.INFO, title="Statistics", generator=self._generate_stats_page, )) # Page 2: System Info self._pages.append(PageDefinition( page_type=PageType.INFO, title="System Info", generator=self._generate_system_page, )) # Page 3: Actions Menu self._pages.append(PageDefinition( page_type=PageType.MENU, title="Actions", generator=self._generate_actions_page, actions=[ ActionID.RECONNECT_OBD, ActionID.CLEAR_CACHE, ActionID.REBOOT_SYSTEM, ActionID.SHUTDOWN_SYSTEM, ], )) # Page 4: UPS Status (if available) self._pages.append(PageDefinition( page_type=PageType.INFO, title="UPS Status", generator=self._generate_ups_page, )) # Page 5: Confirm (dynamic) self._pages.append(PageDefinition( page_type=PageType.CONFIRM, title="Confirm", generator=self._generate_confirm_page, )) @property def total_pages(self) -> int: """Total number of pages (excluding hidden confirm page).""" return len(self._pages) - 1 @property def current_index(self) -> int: """Current page index.""" return self._current_index def set_data_provider(self, name: str, provider: Callable[[], Any]) -> None: """Register a data provider function. Args: name: Provider name (e.g., 'vehicle_state', 'poller_stats') provider: Callable that returns current data """ self._data_providers[name] = provider def set_action_handler(self, action_id: ActionID | str, handler: Callable[[], bool]) -> None: """Register an action handler. Args: action_id: Action identifier (ActionID enum or string) handler: Callable that executes action, returns True on success """ self._action_handlers[action_id] = handler def add_page(self, page: Any) -> None: """Add a custom page to the page manager. Args: page: Page object with get_content(), is_enabled(), name, title attributes """ # Create a PageDefinition wrapper for custom page def generator(mgr: "PageManager") -> Page: content = page.get_content() return Page( page_type=PageType(content.page_type.value), title=content.title, lines=content.lines, actions=content.actions, selected=content.selected, ) # Insert before the confirm page (last page) page_def = PageDefinition( page_type=PageType.INFO, title=page.title, generator=generator, ) # Insert at position -1 (before confirm page) self._pages.insert(len(self._pages) - 1, page_def) _logger.info(f"Added custom page: {page.name}") def get_data(self, name: str, default: Any = None) -> Any: """Get data from a provider. Args: name: Provider name default: Default value if provider not found Returns: Data from provider or default """ provider = self._data_providers.get(name) if provider: try: return provider() except Exception: pass return default def get_current_page(self) -> Page: """Get current page content with scroll offset applied.""" if 0 <= self._current_index < len(self._pages): page_def = self._pages[self._current_index] page = page_def.generator(self) # Apply scrolling for info pages with more than MAX_VISIBLE_LINES if page.page_type == PageType.INFO and len(page.lines) > self.MAX_VISIBLE_LINES: offset = self._scroll_offsets.get(self._current_index, 0) max_offset = len(page.lines) - self.MAX_VISIBLE_LINES # Clamp offset offset = max(0, min(offset, max_offset)) self._scroll_offsets[self._current_index] = offset # Apply scroll - show lines[offset:offset+5] visible_lines = page.lines[offset:offset + self.MAX_VISIBLE_LINES] # Add scroll indicators if offset > 0: visible_lines[0] = "^ " + visible_lines[0][:30] if offset < max_offset: visible_lines[-1] = "v " + visible_lines[-1][:30] page.lines = visible_lines return page return Page(PageType.INFO, "Error", ["Invalid page index"]) def scroll_up(self) -> bool: """Scroll current page up. Returns: True if scroll occurred """ if self._current_index not in range(len(self._pages)): _logger.debug(f"scroll_up: invalid page index {self._current_index}") return False page_def = self._pages[self._current_index] if page_def.page_type != PageType.INFO: _logger.debug(f"scroll_up: page type is {page_def.page_type}, not INFO") return False current_offset = self._scroll_offsets.get(self._current_index, 0) _logger.debug(f"scroll_up: current_offset={current_offset}") if current_offset > 0: self._scroll_offsets[self._current_index] = current_offset - 1 _logger.debug(f"scroll_up: new offset={current_offset - 1}") return True _logger.debug("scroll_up: already at top") return False def scroll_down(self) -> bool: """Scroll current page down. Returns: True if scroll occurred """ if self._current_index not in range(len(self._pages)): _logger.debug(f"scroll_down: invalid page index {self._current_index}") return False page_def = self._pages[self._current_index] if page_def.page_type != PageType.INFO: _logger.debug(f"scroll_down: page type is {page_def.page_type}, not INFO") return False # Get full page content to know total lines page = page_def.generator(self) total_lines = len(page.lines) max_offset = max(0, total_lines - self.MAX_VISIBLE_LINES) current_offset = self._scroll_offsets.get(self._current_index, 0) _logger.debug(f"scroll_down: total_lines={total_lines}, max_offset={max_offset}, current_offset={current_offset}") if current_offset < max_offset: self._scroll_offsets[self._current_index] = current_offset + 1 _logger.debug(f"scroll_down: new offset={current_offset + 1}") return True _logger.debug("scroll_down: already at bottom") return False def navigate_next(self) -> bool: """Navigate to next page. Returns: True if navigation occurred """ if self._current_index < self.total_pages - 1: self._current_index += 1 # Reset scroll for new page self._scroll_offsets[self._current_index] = 0 return True return False def navigate_prev(self) -> bool: """Navigate to previous page. Returns: True if navigation occurred """ if self._current_index > 0: self._current_index -= 1 # Reset scroll for new page self._scroll_offsets[self._current_index] = 0 return True return False def select_action(self, index: int) -> Optional[ActionID]: """Select a menu action by index. Args: index: Action index (0-3) Returns: ActionID if requires confirmation, None otherwise """ page_def = self._pages[self._current_index] if page_def.page_type != PageType.MENU or not page_def.actions: return None if 0 <= index < len(page_def.actions): action_id = page_def.actions[index] # Actions requiring confirmation if action_id in (ActionID.REBOOT_SYSTEM, ActionID.SHUTDOWN_SYSTEM): self._pending_action = action_id self._current_index = len(self._pages) - 1 # Go to confirm page return action_id # Execute directly return action_id return None def execute_action(self, action_id: ActionID) -> tuple[bool, str]: """Execute an action. Args: action_id: Action to execute Returns: Tuple of (success, message) """ handler = self._action_handlers.get(action_id) if handler: try: success = handler() if success: return True, self._get_action_success_message(action_id) else: return False, "Action failed" except Exception as e: return False, str(e)[:32] return False, "No handler" def confirm_action(self) -> tuple[bool, str]: """Confirm pending action. Returns: Tuple of (success, message) """ if self._pending_action: action = self._pending_action self._pending_action = None self._current_index = 3 # Back to actions menu return self.execute_action(action) return False, "No pending action" def cancel_action(self) -> None: """Cancel pending action.""" self._pending_action = None self._current_index = 3 # Back to actions menu def _get_action_success_message(self, action_id: ActionID) -> str: """Get success message for action.""" messages = { ActionID.RECONNECT_OBD: "OBD2 reconnected", ActionID.RESTART_SERVICE: "Service restarted", ActionID.REBOOT_SYSTEM: "Rebooting...", ActionID.SHUTDOWN_SYSTEM: "Shutting down...", ActionID.CLEAR_CACHE: "Cache cleared", ActionID.TOGGLE_DEBUG: "Debug toggled", } return messages.get(action_id, "Done") # Page generators def _generate_live_data_page(self, mgr: "PageManager") -> Page: """Generate live vehicle data page with all available PIDs.""" state = mgr.get_data("vehicle_state") lines = [] if state: all_values = state.get_all() _logger.debug(f"Live data: got {len(all_values)} PIDs from state") if all_values: # Format functions for different value types def fmt_int(val): return f"{val.value:.0f} {val.unit}" def fmt_float(val): return f"{val.value:.1f} {val.unit}" # Display order with custom formatting display_order = [ (0x0C, "RPM", fmt_int), # Engine RPM (0x0D, "Speed", fmt_int), # Vehicle speed (0x05, "Coolant", fmt_int), # Coolant temp (0x5C, "Oil temp", fmt_int), # Oil temp (0x0F, "Intake temp", fmt_int), # Intake temp (0x11, "Throttle", fmt_float), # Throttle pos (0x04, "Engine load", fmt_float), # Engine load (0x2F, "Fuel level", fmt_float), # Fuel level (0x10, "MAF", fmt_float), # MAF rate ] shown_pids = set() for pid_code, label, formatter in display_order: if pid_code in all_values: val = all_values[pid_code] lines.append(f"{label}: {formatter(val)}") shown_pids.add(pid_code) # Add any other PIDs not in the display order for pid_code, val in sorted(all_values.items()): if pid_code not in shown_pids: lines.append(f"{val.name}: {val.value:.1f} {val.unit}") if not lines: lines = ["Waiting for data...", "", "Polling active"] else: lines = ["No OBD2 connection", "", "Use Actions menu", "to reconnect"] _logger.debug(f"Live data page: {len(lines)} lines") return Page(PageType.INFO, "Live Data", lines) def _generate_stats_page(self, mgr: "PageManager") -> Page: """Generate statistics page.""" stats = mgr.get_data("poller_stats", {}) uptime = mgr.get_data("uptime", 0) queries = stats.get("queries", 0) successes = stats.get("successes", 0) failures = stats.get("failures", 0) rate = (successes / queries * 100) if queries > 0 else 0 hours = int(uptime // 3600) minutes = int((uptime % 3600) // 60) seconds = int(uptime % 60) # Calculate queries per second qps = queries / uptime if uptime > 0 else 0 lines = [ f"Queries: {queries}", f"Success: {successes}", f"Failures: {failures}", f"Rate: {rate:.1f}%", f"Q/sec: {qps:.1f}", f"Uptime: {hours}h {minutes}m {seconds}s", ] return Page(PageType.INFO, "Statistics", lines) def _generate_system_page(self, mgr: "PageManager") -> Page: """Generate system info page.""" ip = self._get_ip_address() cpu_temp = self._get_cpu_temp() mem_percent, mem_used, mem_total = self._get_memory_info() load = self._get_load_average() lines = [ f"IP: {ip}", f"CPU Temp: {cpu_temp:.1f} C" if cpu_temp else "CPU Temp: ---", f"CPU Load: {load}" if load else "CPU Load: ---", f"Mem: {mem_percent:.0f}%" if mem_percent else "Mem: ---", f"Mem: {mem_used}/{mem_total}MB" if mem_used else "", f"CAN: {mgr.get_data('can_interface', 'can0')}", f"Flipper: Connected", ] # Remove empty lines lines = [l for l in lines if l] return Page(PageType.INFO, "System", lines) def _generate_actions_page(self, mgr: "PageManager") -> Page: """Generate actions menu page.""" actions = [ "Reconnect OBD2", "Clear PID Cache", "Reboot System", "Shutdown System", ] return Page(PageType.MENU, "Actions", actions=actions) def _generate_ups_page(self, mgr: "PageManager") -> Page: """Generate UPS status page.""" ups_data = mgr.get_data("ups") if ups_data is None: lines = ["UPS not available", "", "Check I2C connection"] return Page(PageType.INFO, "UPS Status", lines) # Battery bar visualization capacity = ups_data.capacity if hasattr(ups_data, 'capacity') else 0 filled = int(capacity / 16.67) # 6 segments empty = 6 - filled bar = "[" + "=" * filled + " " * empty + "]" lines = [ f"Bat: {capacity:.1f}% {bar}", f"Voltage: {ups_data.voltage:.2f}V", ] if hasattr(ups_data, 'power_loss') and ups_data.power_loss: lines.append("Power: BATTERY MODE") if capacity < 15: lines.append("!! CRITICAL !!") elif capacity < 25: lines.append("! LOW BATTERY !") else: status = "Charging" if getattr(ups_data, 'is_charging', False) else "Full" lines.append(f"Status: {status}") input_v = getattr(ups_data, 'input_voltage', 0) if input_v > 0: lines.append(f"Input: {input_v:.2f}V") return Page(PageType.INFO, "UPS Status", lines) def _generate_confirm_page(self, mgr: "PageManager") -> Page: """Generate confirmation page.""" if mgr._pending_action == ActionID.REBOOT_SYSTEM: lines = ["Reboot system?", "", "All data will", "be lost"] title = "Confirm Reboot" elif mgr._pending_action == ActionID.SHUTDOWN_SYSTEM: lines = ["Shutdown system?", "", "Manual restart", "required"] title = "Confirm Shutdown" else: lines = ["Confirm action?"] title = "Confirm" return Page(PageType.CONFIRM, title, lines) @staticmethod def _get_ip_address() -> str: """Get local IP address.""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "Unknown" @staticmethod def _get_cpu_temp() -> Optional[float]: """Get CPU temperature (Linux only).""" try: with open("/sys/class/thermal/thermal_zone0/temp", "r") as f: return int(f.read().strip()) / 1000.0 except Exception: return None @staticmethod def _get_memory_info() -> Tuple[Optional[float], Optional[int], Optional[int]]: """Get memory info (percent, used_mb, total_mb).""" try: with open("/proc/meminfo", "r") as f: lines = f.readlines() total_kb = int(lines[0].split()[1]) available_kb = int(lines[2].split()[1]) used_kb = total_kb - available_kb percent = (used_kb / total_kb) * 100 return percent, used_kb // 1024, total_kb // 1024 except Exception: return None, None, None @staticmethod def _get_load_average() -> Optional[str]: """Get system load average.""" try: with open("/proc/loadavg", "r") as f: parts = f.read().split() return f"{parts[0]} {parts[1]} {parts[2]}" except Exception: return None