update code and add flipper zero integration

This commit is contained in:
2026-01-29 23:36:36 +03:00
parent d4ffce28d5
commit a96328d4e6
11 changed files with 2307 additions and 5 deletions

View File

@@ -0,0 +1,357 @@
"""Page definitions and dynamic content generators."""
from typing import List, Dict, Callable, Optional, Any
from dataclasses import dataclass
from enum import Enum
import socket
import os
from .protocol import Page, PageType
class ActionID(Enum):
"""Action identifiers for menu items."""
RECONNECT_OBD = "reconnect_obd"
RESTART_SERVICE = "restart_service"
REBOOT_SYSTEM = "reboot_system"
SHUTDOWN_SYSTEM = "shutdown_system"
CLEAR_CACHE = "clear_cache"
TOGGLE_DEBUG = "toggle_debug"
@dataclass
class PageDefinition:
"""Page definition with content generator."""
page_type: PageType
title: str
generator: Callable[["PageManager"], Page]
actions: Optional[List[ActionID]] = None
class PageManager:
"""Manages page content and navigation."""
def __init__(self):
self._pages: List[PageDefinition] = []
self._current_index: int = 0
self._pending_action: Optional[ActionID] = None
self._data_providers: Dict[str, Callable[[], Any]] = {}
self._action_handlers: Dict[ActionID, Callable[[], bool]] = {}
self._debug_enabled: bool = False
self._register_default_pages()
def _register_default_pages(self) -> None:
"""Register default page definitions."""
# Page 0: Live Vehicle Data
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="Live Data",
generator=self._generate_live_data_page,
))
# Page 1: Statistics
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="Statistics",
generator=self._generate_stats_page,
))
# Page 2: System Info
self._pages.append(PageDefinition(
page_type=PageType.INFO,
title="System Info",
generator=self._generate_system_page,
))
# Page 3: Actions Menu
self._pages.append(PageDefinition(
page_type=PageType.MENU,
title="Actions",
generator=self._generate_actions_page,
actions=[
ActionID.RECONNECT_OBD,
ActionID.CLEAR_CACHE,
ActionID.REBOOT_SYSTEM,
ActionID.SHUTDOWN_SYSTEM,
],
))
# Page 4: Confirm (dynamic)
self._pages.append(PageDefinition(
page_type=PageType.CONFIRM,
title="Confirm",
generator=self._generate_confirm_page,
))
@property
def total_pages(self) -> int:
"""Total number of pages (excluding hidden confirm page)."""
return len(self._pages) - 1
@property
def current_index(self) -> int:
"""Current page index."""
return self._current_index
def set_data_provider(self, name: str, provider: Callable[[], Any]) -> None:
"""Register a data provider function.
Args:
name: Provider name (e.g., 'vehicle_state', 'poller_stats')
provider: Callable that returns current data
"""
self._data_providers[name] = provider
def set_action_handler(self, action_id: ActionID, handler: Callable[[], bool]) -> None:
"""Register an action handler.
Args:
action_id: Action identifier
handler: Callable that executes action, returns True on success
"""
self._action_handlers[action_id] = handler
def get_data(self, name: str, default: Any = None) -> Any:
"""Get data from a provider.
Args:
name: Provider name
default: Default value if provider not found
Returns:
Data from provider or default
"""
provider = self._data_providers.get(name)
if provider:
try:
return provider()
except Exception:
pass
return default
def get_current_page(self) -> Page:
"""Get current page content."""
if 0 <= self._current_index < len(self._pages):
page_def = self._pages[self._current_index]
return page_def.generator(self)
return Page(PageType.INFO, "Error", ["Invalid page index"])
def navigate_next(self) -> bool:
"""Navigate to next page.
Returns:
True if navigation occurred
"""
if self._current_index < self.total_pages - 1:
self._current_index += 1
return True
return False
def navigate_prev(self) -> bool:
"""Navigate to previous page.
Returns:
True if navigation occurred
"""
if self._current_index > 0:
self._current_index -= 1
return True
return False
def select_action(self, index: int) -> Optional[ActionID]:
"""Select a menu action by index.
Args:
index: Action index (0-3)
Returns:
ActionID if requires confirmation, None otherwise
"""
page_def = self._pages[self._current_index]
if page_def.page_type != PageType.MENU or not page_def.actions:
return None
if 0 <= index < len(page_def.actions):
action_id = page_def.actions[index]
# Actions requiring confirmation
if action_id in (ActionID.REBOOT_SYSTEM, ActionID.SHUTDOWN_SYSTEM):
self._pending_action = action_id
self._current_index = len(self._pages) - 1 # Go to confirm page
return action_id
# Execute directly
return action_id
return None
def execute_action(self, action_id: ActionID) -> tuple[bool, str]:
"""Execute an action.
Args:
action_id: Action to execute
Returns:
Tuple of (success, message)
"""
handler = self._action_handlers.get(action_id)
if handler:
try:
success = handler()
if success:
return True, self._get_action_success_message(action_id)
else:
return False, "Action failed"
except Exception as e:
return False, str(e)[:32]
return False, "No handler"
def confirm_action(self) -> tuple[bool, str]:
"""Confirm pending action.
Returns:
Tuple of (success, message)
"""
if self._pending_action:
action = self._pending_action
self._pending_action = None
self._current_index = 3 # Back to actions menu
return self.execute_action(action)
return False, "No pending action"
def cancel_action(self) -> None:
"""Cancel pending action."""
self._pending_action = None
self._current_index = 3 # Back to actions menu
def _get_action_success_message(self, action_id: ActionID) -> str:
"""Get success message for action."""
messages = {
ActionID.RECONNECT_OBD: "OBD2 reconnected",
ActionID.RESTART_SERVICE: "Service restarted",
ActionID.REBOOT_SYSTEM: "Rebooting...",
ActionID.SHUTDOWN_SYSTEM: "Shutting down...",
ActionID.CLEAR_CACHE: "Cache cleared",
ActionID.TOGGLE_DEBUG: "Debug toggled",
}
return messages.get(action_id, "Done")
# Page generators
def _generate_live_data_page(self, mgr: "PageManager") -> Page:
"""Generate live vehicle data page."""
state = mgr.get_data("vehicle_state")
lines = []
if state:
rpm = state.rpm
speed = state.speed
coolant = state.coolant_temp
throttle = state.throttle
fuel = state.fuel_level
lines.append(f"RPM: {rpm:.0f}" if rpm is not None else "RPM: ---")
lines.append(f"Speed: {speed:.0f} km/h" if speed is not None else "Speed: --- km/h")
lines.append(f"Coolant: {coolant:.0f} C" if coolant is not None else "Coolant: --- C")
lines.append(f"Throttle: {throttle:.1f}%" if throttle is not None else "Throttle: ---%")
lines.append(f"Fuel: {fuel:.1f}%" if fuel is not None else "Fuel: ---%")
else:
lines = ["No connection", "to OBD2", "", "Check CAN bus", "connection"]
return Page(PageType.INFO, "Live Data", lines)
def _generate_stats_page(self, mgr: "PageManager") -> Page:
"""Generate statistics page."""
stats = mgr.get_data("poller_stats", {})
uptime = mgr.get_data("uptime", 0)
queries = stats.get("queries", 0)
successes = stats.get("successes", 0)
rate = (successes / queries * 100) if queries > 0 else 0
hours = int(uptime // 3600)
minutes = int((uptime % 3600) // 60)
lines = [
f"Queries: {queries}",
f"Success: {successes}",
f"Rate: {rate:.1f}%",
f"Failures: {stats.get('failures', 0)}",
f"Uptime: {hours}h {minutes}m",
]
return Page(PageType.INFO, "Statistics", lines)
def _generate_system_page(self, mgr: "PageManager") -> Page:
"""Generate system info page."""
ip = self._get_ip_address()
cpu_temp = self._get_cpu_temp()
mem = self._get_memory_usage()
lines = [
f"IP: {ip}",
f"CPU: {cpu_temp:.1f} C" if cpu_temp else "CPU: --- C",
f"Mem: {mem:.1f}%" if mem else "Mem: ---%",
f"CAN: {mgr.get_data('can_interface', 'can0')}",
f"Debug: {'ON' if mgr._debug_enabled else 'OFF'}",
]
return Page(PageType.INFO, "System Info", lines)
def _generate_actions_page(self, mgr: "PageManager") -> Page:
"""Generate actions menu page."""
actions = [
"Reconnect OBD2",
"Clear PID Cache",
"Reboot System",
"Shutdown System",
]
return Page(PageType.MENU, "Actions", actions=actions)
def _generate_confirm_page(self, mgr: "PageManager") -> Page:
"""Generate confirmation page."""
if mgr._pending_action == ActionID.REBOOT_SYSTEM:
lines = ["Reboot system?", "", "All data will", "be lost"]
title = "Confirm Reboot"
elif mgr._pending_action == ActionID.SHUTDOWN_SYSTEM:
lines = ["Shutdown system?", "", "Manual restart", "required"]
title = "Confirm Shutdown"
else:
lines = ["Confirm action?"]
title = "Confirm"
return Page(PageType.CONFIRM, title, lines)
@staticmethod
def _get_ip_address() -> str:
"""Get local IP address."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "Unknown"
@staticmethod
def _get_cpu_temp() -> Optional[float]:
"""Get CPU temperature (Linux only)."""
try:
with open("/sys/class/thermal/thermal_zone0/temp", "r") as f:
return int(f.read().strip()) / 1000.0
except Exception:
return None
@staticmethod
def _get_memory_usage() -> Optional[float]:
"""Get memory usage percentage."""
try:
with open("/proc/meminfo", "r") as f:
lines = f.readlines()
total = int(lines[0].split()[1])
available = int(lines[2].split()[1])
return (1 - available / total) * 100
except Exception:
return None