diff --git a/can_sniffer/src/flipper/__init__.py b/can_sniffer/src/flipper/__init__.py new file mode 100644 index 0000000..7330f82 --- /dev/null +++ b/can_sniffer/src/flipper/__init__.py @@ -0,0 +1,11 @@ +""" +Flipper Zero Dynamic UI System. + +This module provides a plugin-based page system for Flipper Zero display +with bidirectional communication via UART. +""" + +from flipper.protocol import PageType, PageContent, Protocol +from flipper.page_manager import PageManager + +__all__ = ["PageType", "PageContent", "Protocol", "PageManager"] diff --git a/can_sniffer/src/flipper/page_manager.py b/can_sniffer/src/flipper/page_manager.py new file mode 100644 index 0000000..1f4b812 --- /dev/null +++ b/can_sniffer/src/flipper/page_manager.py @@ -0,0 +1,255 @@ +""" +Page Manager for Flipper Zero UI. + +Manages page navigation, state, and content generation. +""" + +import threading +from typing import List, Optional, Dict, Any + +from flipper.protocol import Protocol, PageContent, Command, CommandType +from flipper.pages.base import BasePage + + +class PageManager: + """ + Manages Flipper Zero pages and navigation. + + Handles: + - Page registration and lifecycle + - Navigation between pages + - Command processing + - Content generation + """ + + def __init__(self): + """Initialize page manager.""" + self._pages: List[BasePage] = [] + self._current_index = 0 + self._lock = threading.Lock() + self._last_result: Optional[str] = None + + def register_page(self, page: BasePage) -> None: + """ + Register a page. + + Args: + page: Page instance to register + """ + with self._lock: + # Check for duplicate names + for existing in self._pages: + if existing.name == page.name: + raise ValueError(f"Page '{page.name}' already registered") + + self._pages.append(page) + + def unregister_page(self, name: str) -> bool: + """ + Unregister a page by name. + + Args: + name: Page name to remove + + Returns: + True if page was found and removed + """ + with self._lock: + for i, page in enumerate(self._pages): + if page.name == name: + self._pages.pop(i) + # Adjust current index if needed + if self._current_index >= len(self._pages): + self._current_index = max(0, len(self._pages) - 1) + return True + return False + + def get_enabled_pages(self) -> List[BasePage]: + """Get list of enabled pages.""" + with self._lock: + return [p for p in self._pages if p.is_enabled()] + + def get_page_count(self) -> int: + """Get number of enabled pages.""" + return len(self.get_enabled_pages()) + + def get_current_page(self) -> Optional[BasePage]: + """Get currently active page.""" + pages = self.get_enabled_pages() + with self._lock: + if 0 <= self._current_index < len(pages): + return pages[self._current_index] + return None + + def get_current_index(self) -> int: + """Get current page index.""" + with self._lock: + return self._current_index + + def navigate_next(self) -> bool: + """ + Navigate to next page. + + Returns: + True if navigation occurred + """ + pages = self.get_enabled_pages() + + with self._lock: + # Don't navigate if current page has pending action + if self._current_index < len(pages): + current = pages[self._current_index] + if current.has_pending_action(): + return False + + current.on_leave() + + if self._current_index < len(pages) - 1: + self._current_index += 1 + pages[self._current_index].on_enter() + return True + + return False + + def navigate_prev(self) -> bool: + """ + Navigate to previous page. + + Returns: + True if navigation occurred + """ + pages = self.get_enabled_pages() + + with self._lock: + # Don't navigate if current page has pending action + if self._current_index < len(pages): + current = pages[self._current_index] + if current.has_pending_action(): + return False + + current.on_leave() + + if self._current_index > 0: + self._current_index -= 1 + pages[self._current_index].on_enter() + return True + + return False + + def navigate_to(self, index: int) -> bool: + """ + Navigate to specific page index. + + Args: + index: Page index to navigate to + + Returns: + True if navigation occurred + """ + pages = self.get_enabled_pages() + + with self._lock: + if 0 <= index < len(pages): + if self._current_index < len(pages): + pages[self._current_index].on_leave() + + self._current_index = index + pages[self._current_index].on_enter() + return True + + return False + + def get_current_content(self) -> Optional[str]: + """ + Get encoded content for current page. + + Returns: + Protocol-encoded page content string or None + """ + page = self.get_current_page() + if page is None: + return None + + content = page.get_content() + total = self.get_page_count() + index = self.get_current_index() + + return Protocol.encode_page(index, total, content) + + def process_command(self, command: Command) -> Optional[str]: + """ + Process command from Flipper Zero. + + Args: + command: Parsed command + + Returns: + Result message or None + """ + self._last_result = None + + if command.cmd_type == CommandType.NAV_NEXT: + self.navigate_next() + return None + + if command.cmd_type == CommandType.NAV_PREV: + self.navigate_prev() + return None + + if command.cmd_type == CommandType.SELECT: + page = self.get_current_page() + if page: + result = page.handle_select(command.select_index) + self._last_result = result + return result + return None + + if command.cmd_type == CommandType.CONFIRM: + page = self.get_current_page() + if page: + result = page.handle_confirm() + self._last_result = result + return result + return None + + if command.cmd_type == CommandType.CANCEL: + page = self.get_current_page() + if page: + result = page.handle_cancel() + self._last_result = result + return result + return None + + if command.cmd_type == CommandType.REFRESH: + # Just return None, content will be sent anyway + return None + + return None + + def get_last_result(self) -> Optional[str]: + """Get last action result message.""" + return self._last_result + + def clear_last_result(self) -> None: + """Clear last result message.""" + self._last_result = None + + def get_stats(self) -> Dict[str, Any]: + """Get manager statistics.""" + pages = self.get_enabled_pages() + + return { + "total_pages": len(self._pages), + "enabled_pages": len(pages), + "current_index": self._current_index, + "current_page": pages[self._current_index].name if pages else None, + "pages": [p.name for p in pages] + } + + def shutdown(self) -> None: + """Shutdown all pages.""" + with self._lock: + for page in self._pages: + page.on_leave() + self._pages.clear() + self._current_index = 0 diff --git a/can_sniffer/src/flipper/pages/__init__.py b/can_sniffer/src/flipper/pages/__init__.py new file mode 100644 index 0000000..de58538 --- /dev/null +++ b/can_sniffer/src/flipper/pages/__init__.py @@ -0,0 +1,20 @@ +""" +Flipper Zero Pages. + +Each page is a plugin that provides content for display on Flipper Zero +and handles user actions. +""" + +from flipper.pages.base import BasePage +from flipper.pages.can_stats import CANStatsPage +from flipper.pages.ups_status import UPSStatusPage +from flipper.pages.system_info import SystemInfoPage +from flipper.pages.actions import ActionsPage + +__all__ = [ + "BasePage", + "CANStatsPage", + "UPSStatusPage", + "SystemInfoPage", + "ActionsPage", +] diff --git a/can_sniffer/src/flipper/pages/actions.py b/can_sniffer/src/flipper/pages/actions.py new file mode 100644 index 0000000..de04346 --- /dev/null +++ b/can_sniffer/src/flipper/pages/actions.py @@ -0,0 +1,163 @@ +""" +Actions Page. + +Menu page for system control actions (shutdown, reboot, etc.) +""" + +from typing import Optional, Callable + +from flipper.pages.base import BasePage +from flipper.protocol import PageContent, PageType +from flipper.providers.system_provider import SystemProvider + + +class ActionsPage(BasePage): + """ + Page with system control actions. + + Provides menu items for: + - Shutdown + - Reboot + - Cancel pending shutdown + + Dangerous actions require confirmation before execution. + """ + + # Action identifiers + ACTION_SHUTDOWN = "shutdown" + ACTION_REBOOT = "reboot" + ACTION_CANCEL = "cancel_shutdown" + + def __init__(self, on_result: Optional[Callable[[str], None]] = None): + """ + Initialize actions page. + + Args: + on_result: Callback when action result is available + """ + super().__init__( + name="actions", + title="Actions", + icon="settings" + ) + self._provider = SystemProvider() + self._on_result = on_result + self._confirm_mode = False + self._pending_action: Optional[str] = None + + # Menu items: (label, action_id, requires_confirm) + self._menu_items = [ + ("Shutdown", self.ACTION_SHUTDOWN, True), + ("Reboot", self.ACTION_REBOOT, True), + ("Cancel Shutdown", self.ACTION_CANCEL, False), + ] + + def get_content(self) -> PageContent: + """Get page content.""" + if self._confirm_mode and self._pending_action: + # Show confirmation dialog + action_name = self._get_action_label(self._pending_action) + return PageContent( + page_type=PageType.CONFIRM, + title=f"Confirm {action_name}?", + lines=[f"Are you sure you want to {action_name.lower()}?"], + actions=["Yes", "No"], + selected=1, # Default to "No" for safety + icon=self.icon + ) + + # Normal menu mode + labels = [item[0] for item in self._menu_items] + return PageContent( + page_type=PageType.MENU, + title=self.title, + lines=[], + actions=labels, + selected=self._selected_index, + icon=self.icon + ) + + def handle_select(self, index: int) -> Optional[str]: + """Handle menu item selection.""" + self._selected_index = index + + if 0 <= index < len(self._menu_items): + label, action_id, requires_confirm = self._menu_items[index] + + if requires_confirm: + # Enter confirmation mode + self._confirm_mode = True + self._pending_action = action_id + return None # Will show confirm dialog + else: + # Execute immediately + return self._execute_action(action_id) + + return None + + def handle_confirm(self) -> Optional[str]: + """Execute confirmed action.""" + if self._pending_action: + action_id = self._pending_action + self._confirm_mode = False + self._pending_action = None + return self._execute_action(action_id) + + return None + + def handle_cancel(self) -> Optional[str]: + """Cancel pending action.""" + self._confirm_mode = False + self._pending_action = None + return "Cancelled" + + def has_pending_action(self) -> bool: + """Check if in confirmation mode.""" + return self._confirm_mode + + def _execute_action(self, action_id: str) -> str: + """ + Execute the specified action. + + Args: + action_id: Action identifier + + Returns: + Result message + """ + result = "" + + if action_id == self.ACTION_SHUTDOWN: + if self._provider.shutdown(delay_minutes=1): + result = "Shutdown in 1 min" + else: + result = "Shutdown failed" + + elif action_id == self.ACTION_REBOOT: + if self._provider.reboot(): + result = "Rebooting..." + else: + result = "Reboot failed" + + elif action_id == self.ACTION_CANCEL: + if self._provider.cancel_shutdown(): + result = "Shutdown cancelled" + else: + result = "No pending shutdown" + + # Notify callback if available + if self._on_result: + self._on_result(result) + + return result + + def _get_action_label(self, action_id: str) -> str: + """Get human-readable label for action ID.""" + for label, aid, _ in self._menu_items: + if aid == action_id: + return label + return action_id + + def get_provider(self) -> SystemProvider: + """Get the System provider instance.""" + return self._provider diff --git a/can_sniffer/src/flipper/pages/base.py b/can_sniffer/src/flipper/pages/base.py new file mode 100644 index 0000000..ebf857b --- /dev/null +++ b/can_sniffer/src/flipper/pages/base.py @@ -0,0 +1,260 @@ +""" +Base Page Interface for Flipper Zero UI. + +All pages must inherit from BasePage and implement required methods. +""" + +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, Callable + +from flipper.protocol import PageContent, PageType + + +class BasePage(ABC): + """ + Abstract base class for Flipper Zero pages. + + Each page represents a screen on Flipper Zero display. + Pages can be informational (read-only), interactive menus, + or confirmation dialogs. + + Attributes: + name: Unique identifier for the page + title: Display title (shown in header) + icon: Icon identifier for visual representation + enabled: Whether the page is active + """ + + def __init__( + self, + name: str, + title: str, + icon: str = "", + enabled: bool = True + ): + """ + Initialize base page. + + Args: + name: Unique page identifier + title: Display title + icon: Icon identifier + enabled: Whether page is enabled + """ + self.name = name + self.title = title + self.icon = icon + self.enabled = enabled + self._selected_index = 0 + self._pending_action: Optional[str] = None + + @abstractmethod + def get_content(self) -> PageContent: + """ + Get current page content for display. + + Returns: + PageContent object with type, title, lines, and actions + """ + pass + + def handle_select(self, index: int) -> Optional[str]: + """ + Handle menu item selection. + + Called when user presses OK on a menu item. + + Args: + index: Selected item index + + Returns: + Optional result message to display, or None + """ + self._selected_index = index + return None + + def handle_confirm(self) -> Optional[str]: + """ + Handle confirmation action. + + Called when user confirms a pending action. + + Returns: + Optional result message, or None + """ + return None + + def handle_cancel(self) -> Optional[str]: + """ + Handle cancel action. + + Called when user cancels a pending action. + + Returns: + Optional result message, or None + """ + self._pending_action = None + return None + + def get_selected_index(self) -> int: + """Get currently selected index for menu pages.""" + return self._selected_index + + def set_selected_index(self, index: int) -> None: + """Set selected index for menu pages.""" + self._selected_index = index + + def has_pending_action(self) -> bool: + """Check if there's a pending action requiring confirmation.""" + return self._pending_action is not None + + def get_pending_action(self) -> Optional[str]: + """Get pending action name.""" + return self._pending_action + + def is_enabled(self) -> bool: + """Check if page is enabled.""" + return self.enabled + + def on_enter(self) -> None: + """Called when user navigates to this page.""" + pass + + def on_leave(self) -> None: + """Called when user navigates away from this page.""" + pass + + +class InfoPage(BasePage): + """ + Base class for information-only pages. + + Info pages display read-only data that updates periodically. + They have no interactive elements. + """ + + def __init__( + self, + name: str, + title: str, + icon: str = "", + enabled: bool = True + ): + super().__init__(name, title, icon, enabled) + + @abstractmethod + def get_lines(self) -> list[str]: + """ + Get content lines for display. + + Returns: + List of strings to display (max 4-5 lines) + """ + pass + + def get_content(self) -> PageContent: + """Get page content as INFO type.""" + return PageContent( + page_type=PageType.INFO, + title=self.title, + lines=self.get_lines(), + icon=self.icon + ) + + +class MenuPage(BasePage): + """ + Base class for menu pages. + + Menu pages display selectable items that can trigger actions. + """ + + def __init__( + self, + name: str, + title: str, + icon: str = "", + enabled: bool = True + ): + super().__init__(name, title, icon, enabled) + self._menu_items: list[tuple[str, Callable[[], Optional[str]]]] = [] + + def add_item(self, label: str, action: Callable[[], Optional[str]]) -> None: + """ + Add menu item. + + Args: + label: Display label for the item + action: Callback function when item is selected + """ + self._menu_items.append((label, action)) + + def clear_items(self) -> None: + """Clear all menu items.""" + self._menu_items.clear() + self._selected_index = 0 + + def get_content(self) -> PageContent: + """Get page content as MENU type.""" + labels = [item[0] for item in self._menu_items] + return PageContent( + page_type=PageType.MENU, + title=self.title, + lines=[], + actions=labels, + selected=self._selected_index, + icon=self.icon + ) + + def handle_select(self, index: int) -> Optional[str]: + """Execute action for selected menu item.""" + self._selected_index = index + + if 0 <= index < len(self._menu_items): + _, action = self._menu_items[index] + return action() + + return None + + +class ConfirmPage(BasePage): + """ + Confirmation dialog page. + + Used to confirm dangerous actions like shutdown or reboot. + """ + + def __init__( + self, + name: str, + title: str, + message: str, + on_confirm: Callable[[], Optional[str]], + on_cancel: Optional[Callable[[], Optional[str]]] = None, + icon: str = "", + ): + super().__init__(name, title, icon, enabled=True) + self.message = message + self._on_confirm = on_confirm + self._on_cancel = on_cancel + + def get_content(self) -> PageContent: + """Get page content as CONFIRM type.""" + return PageContent( + page_type=PageType.CONFIRM, + title=self.title, + lines=[self.message], + actions=["Yes", "No"], + selected=1, # Default to "No" for safety + icon=self.icon + ) + + def handle_confirm(self) -> Optional[str]: + """Execute confirm action.""" + return self._on_confirm() + + def handle_cancel(self) -> Optional[str]: + """Execute cancel action.""" + if self._on_cancel: + return self._on_cancel() + return None diff --git a/can_sniffer/src/flipper/pages/can_stats.py b/can_sniffer/src/flipper/pages/can_stats.py new file mode 100644 index 0000000..ccff3ea --- /dev/null +++ b/can_sniffer/src/flipper/pages/can_stats.py @@ -0,0 +1,66 @@ +""" +CAN Statistics Page. + +Displays CAN sniffer statistics on Flipper Zero. +""" + +from flipper.pages.base import InfoPage +from flipper.providers.can_provider import CANProvider + + +class CANStatsPage(InfoPage): + """ + Page displaying CAN bus statistics. + + Shows: + - Total frames received + - Pending/processed frames + - Queue status + - Dropped frames (if any) + """ + + def __init__(self): + super().__init__( + name="can_stats", + title="CAN Statistics", + icon="can" + ) + self._provider = CANProvider() + + def get_lines(self) -> list[str]: + """Get statistics lines for display.""" + data = self._provider.get_data() + + lines = [ + f"Total: {self._format_number(data.total_frames)}", + f"Processed: {self._format_number(data.processed_frames)}", + f"Queue: {data.queue_size}/{data.queue_capacity}", + ] + + # Add interface breakdown if available + if data.interfaces: + iface_str = ", ".join( + f"{k}: {self._format_number(v)}" + for k, v in data.interfaces.items() + ) + if len(iface_str) <= 25: + lines.append(iface_str) + + # Show dropped count if non-zero + if data.dropped_frames > 0: + lines.append(f"Dropped: {data.dropped_frames}") + + return lines + + def _format_number(self, num: int) -> str: + """Format large numbers with K/M suffix.""" + if num >= 1_000_000: + return f"{num / 1_000_000:.1f}M" + elif num >= 1_000: + return f"{num / 1_000:.1f}K" + else: + return str(num) + + def get_provider(self) -> CANProvider: + """Get the CAN provider instance.""" + return self._provider diff --git a/can_sniffer/src/flipper/pages/system_info.py b/can_sniffer/src/flipper/pages/system_info.py new file mode 100644 index 0000000..b554175 --- /dev/null +++ b/can_sniffer/src/flipper/pages/system_info.py @@ -0,0 +1,72 @@ +""" +System Information Page. + +Displays Raspberry Pi system metrics on Flipper Zero. +""" + +from flipper.pages.base import InfoPage +from flipper.providers.system_provider import SystemProvider + + +class SystemInfoPage(InfoPage): + """ + Page displaying Raspberry Pi system metrics. + + Shows: + - CPU temperature + - Power consumption + - Fan RPM + - Input voltage + """ + + def __init__(self): + super().__init__( + name="system_info", + title="System Info", + icon="cpu" + ) + self._provider = SystemProvider() + + def get_lines(self) -> list[str]: + """Get system info lines for display.""" + data = self._provider.get_data() + + lines = [ + f"CPU Temp: {data.cpu_temp:.1f}C {self._get_temp_indicator(data.cpu_temp)}", + f"Power: {data.power_watts:.2f}W", + ] + + # Fan RPM (may not be available on all systems) + if data.fan_rpm > 0: + lines.append(f"Fan: {data.fan_rpm} RPM") + else: + lines.append("Fan: N/A") + + # Input voltage + if data.input_voltage > 0: + lines.append(f"Input: {data.input_voltage:.2f}V") + + return lines + + def _get_temp_indicator(self, temp: float) -> str: + """ + Get temperature status indicator. + + Args: + temp: Temperature in Celsius + + Returns: + Status indicator string + """ + if temp >= 80: + return "(!)" # Critical + elif temp >= 70: + return "(W)" # Warning + elif temp >= 60: + return "(~)" # Warm + else: + return "" # OK + + def get_provider(self) -> SystemProvider: + """Get the System provider instance.""" + return self._provider diff --git a/can_sniffer/src/flipper/pages/ups_status.py b/can_sniffer/src/flipper/pages/ups_status.py new file mode 100644 index 0000000..196ef11 --- /dev/null +++ b/can_sniffer/src/flipper/pages/ups_status.py @@ -0,0 +1,85 @@ +""" +UPS Status Page. + +Displays X120x UPS battery and power status on Flipper Zero. +""" + +from flipper.pages.base import InfoPage +from flipper.providers.ups_provider import UPSProvider + + +class UPSStatusPage(InfoPage): + """ + Page displaying UPS (X120x) status. + + Shows: + - Battery percentage and voltage + - Charging status + - Power loss indicator + - Input voltage + """ + + def __init__(self): + super().__init__( + name="ups_status", + title="UPS Status", + icon="battery" + ) + self._provider = UPSProvider() + + def get_lines(self) -> list[str]: + """Get UPS status lines for display.""" + if not self._provider.is_available(): + return [ + "UPS not available", + self._provider.get_last_error() or "Check I2C connection" + ] + + data = self._provider.get_data() + + # Battery bar visualization + battery_bar = self._get_battery_bar(data.capacity) + + lines = [ + f"Bat: {data.capacity:.1f}% {battery_bar}", + f"Voltage: {data.voltage:.2f}V", + ] + + # Charging/power status + if data.power_loss: + lines.append("Power: BATTERY MODE") + if data.capacity < 15: + lines.append("WARNING: Critical!") + elif data.capacity < 25: + lines.append("WARNING: Low battery") + else: + status = "Charging" if data.is_charging else "Full" + lines.append(f"Status: {status}") + lines.append(f"Input: {data.input_voltage:.2f}V") + + return lines + + def _get_battery_bar(self, percent: float) -> str: + """ + Create ASCII battery bar. + + Args: + percent: Battery percentage (0-100) + + Returns: + Battery visualization string like "[==== ]" + """ + filled = int(percent / 20) # 5 segments + empty = 5 - filled + + bar = "[" + "=" * filled + " " * empty + "]" + + return bar + + def is_enabled(self) -> bool: + """Check if page should be shown.""" + return self.enabled and self._provider.is_available() + + def get_provider(self) -> UPSProvider: + """Get the UPS provider instance.""" + return self._provider diff --git a/can_sniffer/src/flipper/protocol.py b/can_sniffer/src/flipper/protocol.py new file mode 100644 index 0000000..37556ec --- /dev/null +++ b/can_sniffer/src/flipper/protocol.py @@ -0,0 +1,194 @@ +""" +Flipper Zero Communication Protocol. + +Defines message formats for RPi <-> Flipper Zero UART communication. + +Protocol Messages: + RPi -> Flipper: + PAGE:/|||<lines>|<actions>|<selected> + ACK:<device>,ip=<ip> + RESULT:<status>|<message> + + Flipper -> RPi: + INIT:<device> + STOP:<device> + CMD:NAV:<next|prev> + CMD:SELECT:<index> + CMD:CONFIRM + CMD:CANCEL + CMD:REFRESH +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import List, Optional + + +class PageType(Enum): + """Types of pages for different UI layouts.""" + + INFO = "info" # Read-only information display + MENU = "menu" # Selectable menu items + CONFIRM = "confirm" # Confirmation dialog (Yes/No) + + +class CommandType(Enum): + """Types of commands from Flipper to RPi.""" + + INIT = "INIT" + STOP = "STOP" + NAV_NEXT = "NAV:next" + NAV_PREV = "NAV:prev" + SELECT = "SELECT" + CONFIRM = "CONFIRM" + CANCEL = "CANCEL" + REFRESH = "REFRESH" + + +@dataclass +class PageContent: + """ + Content structure for a page to be displayed on Flipper Zero. + + Attributes: + page_type: Type of page (info, menu, confirm) + title: Page title (max ~20 chars for display) + lines: Content lines to display (max 4-5 lines) + actions: Available actions/menu items (for menu type) + selected: Currently selected item index (for menu type) + icon: Optional icon identifier for the page + """ + + page_type: PageType + title: str + lines: List[str] = field(default_factory=list) + actions: List[str] = field(default_factory=list) + selected: int = 0 + icon: str = "" + + def __post_init__(self): + # Truncate title if too long + if len(self.title) > 20: + self.title = self.title[:17] + "..." + + +@dataclass +class Command: + """Parsed command from Flipper Zero.""" + + cmd_type: CommandType + payload: str = "" + + @property + def select_index(self) -> int: + """Get selected index for SELECT command.""" + if self.cmd_type == CommandType.SELECT: + try: + return int(self.payload) + except ValueError: + return 0 + return 0 + + +class Protocol: + """ + Protocol encoder/decoder for Flipper Zero communication. + + Message format uses pipe (|) as field separator and semicolon (;) + as item separator within fields. + """ + + FIELD_SEP = "|" + ITEM_SEP = ";" + + @staticmethod + def encode_page( + page_index: int, + total_pages: int, + content: PageContent + ) -> str: + """ + Encode page content to protocol message. + + Args: + page_index: Current page index (0-based) + total_pages: Total number of pages + content: Page content to encode + + Returns: + Encoded message string with newline + """ + # PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected> + lines_str = Protocol.ITEM_SEP.join(content.lines) + actions_str = Protocol.ITEM_SEP.join(content.actions) + + parts = [ + f"PAGE:{page_index}/{total_pages}", + content.page_type.value, + content.title, + lines_str, + actions_str, + str(content.selected) + ] + + return Protocol.FIELD_SEP.join(parts) + "\n" + + @staticmethod + def encode_ack(device: str, ip: str) -> str: + """Encode ACK message.""" + return f"ACK:{device},ip={ip}\n" + + @staticmethod + def encode_result(success: bool, message: str) -> str: + """Encode result message after action execution.""" + status = "OK" if success else "ERROR" + return f"RESULT:{status}|{message}\n" + + @staticmethod + def decode_command(line: str) -> Optional[Command]: + """ + Decode command from Flipper Zero. + + Args: + line: Raw command line (without newline) + + Returns: + Parsed Command object or None if invalid + """ + line = line.strip() + + if not line: + return None + + # INIT:<device> + if line.startswith("INIT:"): + return Command(CommandType.INIT, line[5:]) + + # STOP:<device> + if line.startswith("STOP:"): + return Command(CommandType.STOP, line[5:]) + + # CMD:NAV:next / CMD:NAV:prev + if line == "CMD:NAV:next": + return Command(CommandType.NAV_NEXT) + + if line == "CMD:NAV:prev": + return Command(CommandType.NAV_PREV) + + # CMD:SELECT:<index> + if line.startswith("CMD:SELECT:"): + return Command(CommandType.SELECT, line[11:]) + + # CMD:CONFIRM + if line == "CMD:CONFIRM": + return Command(CommandType.CONFIRM) + + # CMD:CANCEL + if line == "CMD:CANCEL": + return Command(CommandType.CANCEL) + + # CMD:REFRESH + if line == "CMD:REFRESH": + return Command(CommandType.REFRESH) + + return None diff --git a/can_sniffer/src/flipper/providers/__init__.py b/can_sniffer/src/flipper/providers/__init__.py new file mode 100644 index 0000000..c0d7d2b --- /dev/null +++ b/can_sniffer/src/flipper/providers/__init__.py @@ -0,0 +1,18 @@ +""" +Data Providers for Flipper Zero Pages. + +Providers abstract data sources (UPS, system metrics, CAN stats) +from the pages that display them. +""" + +from flipper.providers.base import BaseProvider +from flipper.providers.ups_provider import UPSProvider +from flipper.providers.system_provider import SystemProvider +from flipper.providers.can_provider import CANProvider + +__all__ = [ + "BaseProvider", + "UPSProvider", + "SystemProvider", + "CANProvider", +] diff --git a/can_sniffer/src/flipper/providers/base.py b/can_sniffer/src/flipper/providers/base.py new file mode 100644 index 0000000..71f3257 --- /dev/null +++ b/can_sniffer/src/flipper/providers/base.py @@ -0,0 +1,119 @@ +""" +Base Provider Interface. + +Providers are singleton data sources that pages can query. +They handle caching and thread-safe data access. +""" + +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional +import threading +import time + + +class BaseProvider(ABC): + """ + Abstract base class for data providers. + + Providers: + - Are singletons (one instance per type) + - Cache data with configurable TTL + - Are thread-safe + - Handle errors gracefully + + Attributes: + name: Provider identifier + cache_ttl: Cache time-to-live in seconds + """ + + _instances: Dict[str, "BaseProvider"] = {} + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + """Singleton pattern implementation.""" + with cls._lock: + if cls.__name__ not in cls._instances: + instance = super().__new__(cls) + cls._instances[cls.__name__] = instance + return cls._instances[cls.__name__] + + def __init__(self, name: str, cache_ttl: float = 1.0): + """ + Initialize provider. + + Args: + name: Provider identifier + cache_ttl: Cache TTL in seconds + """ + # Prevent re-initialization for singleton + if hasattr(self, "_initialized") and self._initialized: + return + + self.name = name + self.cache_ttl = cache_ttl + self._cache: Dict[str, Any] = {} + self._cache_timestamps: Dict[str, float] = {} + self._data_lock = threading.Lock() + self._initialized = True + self._available = True + self._last_error: Optional[str] = None + + def _get_cached(self, key: str) -> Optional[Any]: + """ + Get cached value if not expired. + + Args: + key: Cache key + + Returns: + Cached value or None if expired/missing + """ + with self._data_lock: + if key not in self._cache: + return None + + timestamp = self._cache_timestamps.get(key, 0) + if time.time() - timestamp > self.cache_ttl: + return None + + return self._cache[key] + + def _set_cached(self, key: str, value: Any) -> None: + """ + Set cached value. + + Args: + key: Cache key + value: Value to cache + """ + with self._data_lock: + self._cache[key] = value + self._cache_timestamps[key] = time.time() + + def _clear_cache(self) -> None: + """Clear all cached values.""" + with self._data_lock: + self._cache.clear() + self._cache_timestamps.clear() + + @abstractmethod + def refresh(self) -> bool: + """ + Refresh provider data from source. + + Returns: + True if refresh successful + """ + pass + + def is_available(self) -> bool: + """Check if provider is available.""" + return self._available + + def get_last_error(self) -> Optional[str]: + """Get last error message.""" + return self._last_error + + def shutdown(self) -> None: + """Cleanup provider resources.""" + self._clear_cache() diff --git a/can_sniffer/src/flipper/providers/can_provider.py b/can_sniffer/src/flipper/providers/can_provider.py new file mode 100644 index 0000000..bf3be98 --- /dev/null +++ b/can_sniffer/src/flipper/providers/can_provider.py @@ -0,0 +1,135 @@ +""" +CAN Statistics Provider. + +Provides CAN sniffer statistics from the message processor. +""" + +from typing import Dict, Any, Optional, Callable +from dataclasses import dataclass, field + +from flipper.providers.base import BaseProvider + + +@dataclass +class CANData: + """CAN statistics data.""" + + total_frames: int = 0 + pending_frames: int = 0 + processed_frames: int = 0 + dropped_frames: int = 0 + queue_size: int = 0 + queue_capacity: int = 100000 + interfaces: Dict[str, int] = field(default_factory=dict) + + +class CANProvider(BaseProvider): + """ + Provider for CAN sniffer statistics. + + This provider is updated by the FlipperHandler when + CAN frames are processed. + """ + + def __init__(self): + super().__init__(name="can", cache_ttl=0.5) + self._data = CANData() + self._stats_callback: Optional[Callable[[], Dict[str, Any]]] = None + + def set_stats_callback(self, callback: Callable[[], Dict[str, Any]]) -> None: + """ + Set callback to retrieve stats from message processor. + + Args: + callback: Function that returns stats dictionary + """ + self._stats_callback = callback + + def update_stats( + self, + total: int = 0, + pending: int = 0, + processed: int = 0, + dropped: int = 0, + queue_size: int = 0 + ) -> None: + """ + Update CAN statistics directly. + + Args: + total: Total frames received + pending: Pending frames in queue + processed: Processed frames + dropped: Dropped frames + queue_size: Current queue size + """ + self._data.total_frames = total + self._data.pending_frames = pending + self._data.processed_frames = processed + self._data.dropped_frames = dropped + self._data.queue_size = queue_size + self._set_cached("data", self._data) + + def update_interface_stats(self, interface: str, count: int) -> None: + """ + Update per-interface statistics. + + Args: + interface: Interface name (e.g., "can0") + count: Message count for this interface + """ + self._data.interfaces[interface] = count + + def refresh(self) -> bool: + """Refresh stats from callback if available.""" + if self._stats_callback: + try: + stats = self._stats_callback() + self._data.total_frames = stats.get("total_frames", 0) + self._data.pending_frames = stats.get("pending_frames", 0) + self._data.processed_frames = stats.get("processed_frames", 0) + self._data.dropped_frames = stats.get("dropped_count", 0) + self._data.queue_size = stats.get("queue_size", 0) + self._set_cached("data", self._data) + return True + except Exception as e: + self._last_error = str(e) + return False + + return True + + def get_data(self) -> CANData: + """Get current CAN statistics.""" + cached = self._get_cached("data") + if cached is not None: + return cached + + self.refresh() + return self._data + + def get_total_frames(self) -> int: + """Get total frames received.""" + return self.get_data().total_frames + + def get_pending_frames(self) -> int: + """Get pending frames in queue.""" + return self.get_data().pending_frames + + def get_processed_frames(self) -> int: + """Get processed frames.""" + return self.get_data().processed_frames + + def get_dropped_frames(self) -> int: + """Get dropped frames.""" + return self.get_data().dropped_frames + + def get_queue_fill_percent(self) -> float: + """Get queue fill percentage.""" + data = self.get_data() + if data.queue_capacity == 0: + return 0.0 + return (data.queue_size / data.queue_capacity) * 100 + + def get_interface_count(self, interface: str) -> int: + """Get message count for specific interface.""" + return self.get_data().interfaces.get(interface, 0) diff --git a/can_sniffer/src/flipper/providers/system_provider.py b/can_sniffer/src/flipper/providers/system_provider.py new file mode 100644 index 0000000..0b969c0 --- /dev/null +++ b/can_sniffer/src/flipper/providers/system_provider.py @@ -0,0 +1,240 @@ +""" +System Provider for Raspberry Pi metrics and control. + +Provides CPU temperature, power consumption, fan RPM, +and system control actions (shutdown, reboot). +""" + +import os +import subprocess +from typing import Optional +from dataclasses import dataclass +from pathlib import Path + +from flipper.providers.base import BaseProvider + + +@dataclass +class SystemData: + """System metrics data.""" + + cpu_temp: float = 0.0 + cpu_volts: float = 0.0 + cpu_amps: float = 0.0 + power_watts: float = 0.0 + fan_rpm: int = 0 + input_voltage: float = 0.0 + + +class SystemProvider(BaseProvider): + """ + Provider for Raspberry Pi system metrics. + + Uses vcgencmd for hardware metrics and sysfs for fan speed. + Also provides system control actions. + """ + + def __init__(self): + super().__init__(name="system", cache_ttl=2.0) + self._data = SystemData() + + def _read_metric(self, args: list, strip_chars: str) -> Optional[float]: + """ + Read a hardware metric using vcgencmd. + + Args: + args: Command arguments + strip_chars: Characters to strip from value + + Returns: + Float value or None on error + """ + try: + output = subprocess.check_output(args).decode("utf-8") + value_str = output.split("=")[1].strip().rstrip(strip_chars) + return float(value_str) + except Exception: + return None + + def _read_cpu_temp(self) -> float: + """Read CPU temperature in Celsius.""" + result = self._read_metric(["vcgencmd", "measure_temp"], "'C") + return result if result is not None else 0.0 + + def _read_cpu_volts(self) -> float: + """Read CPU core voltage.""" + result = self._read_metric(["vcgencmd", "pmic_read_adc", "VDD_CORE_V"], "V") + return result if result is not None else 0.0 + + def _read_cpu_amps(self) -> float: + """Read CPU core current.""" + result = self._read_metric(["vcgencmd", "pmic_read_adc", "VDD_CORE_A"], "A") + return result if result is not None else 0.0 + + def _read_input_voltage(self) -> float: + """Read external 5V input voltage.""" + result = self._read_metric(["vcgencmd", "pmic_read_adc", "EXT5V_V"], "V") + return result if result is not None else 0.0 + + def _read_power_watts(self) -> float: + """ + Calculate total system power consumption. + + Reads all PMIC ADC values and calculates wattage. + """ + try: + output = subprocess.check_output(["vcgencmd", "pmic_read_adc"]).decode("utf-8") + lines = output.strip().split("\n") + + amperages = {} + voltages = {} + + for line in lines: + line = line.strip() + if not line: + continue + + parts = line.split() + if len(parts) < 2: + continue + + label = parts[0] + value_str = parts[-1] + + if "=" not in value_str: + continue + + val = float(value_str.split("=")[1][:-1]) + short_label = label[:-2] + + if label.endswith("A"): + amperages[short_label] = val + else: + voltages[short_label] = val + + # Calculate total wattage + wattage = sum( + amperages[key] * voltages[key] + for key in amperages + if key in voltages + ) + + return wattage + + except Exception: + return 0.0 + + def _read_fan_rpm(self) -> int: + """ + Read fan RPM from sysfs. + + Returns: + Fan speed in RPM or 0 if not available + """ + try: + sys_path = Path("/sys/devices/platform/cooling_fan") + fan_files = list(sys_path.rglob("fan1_input")) + + if not fan_files: + return 0 + + with open(fan_files[0], "r") as f: + rpm = f.read().strip() + + return int(rpm) + + except Exception: + return 0 + + def refresh(self) -> bool: + """Refresh system metrics.""" + try: + self._data = SystemData( + cpu_temp=self._read_cpu_temp(), + cpu_volts=self._read_cpu_volts(), + cpu_amps=self._read_cpu_amps(), + power_watts=self._read_power_watts(), + fan_rpm=self._read_fan_rpm(), + input_voltage=self._read_input_voltage() + ) + + self._set_cached("data", self._data) + return True + + except Exception as e: + self._last_error = str(e) + return False + + def get_data(self) -> SystemData: + """Get current system data.""" + cached = self._get_cached("data") + if cached is not None: + return cached + + self.refresh() + return self._data + + def get_cpu_temp(self) -> float: + """Get CPU temperature in Celsius.""" + return self.get_data().cpu_temp + + def get_power_watts(self) -> float: + """Get total power consumption in Watts.""" + return self.get_data().power_watts + + def get_fan_rpm(self) -> int: + """Get fan speed in RPM.""" + return self.get_data().fan_rpm + + def shutdown(self, delay_minutes: int = 0) -> bool: + """ + Shutdown the system. + + Args: + delay_minutes: Delay before shutdown (0 = immediate) + + Returns: + True if command executed + """ + try: + if delay_minutes > 0: + cmd = f"sudo shutdown -P +{delay_minutes}" + else: + cmd = "sudo shutdown -P now" + + subprocess.Popen(cmd, shell=True) + return True + + except Exception as e: + self._last_error = str(e) + return False + + def reboot(self) -> bool: + """ + Reboot the system. + + Returns: + True if command executed + """ + try: + subprocess.Popen("sudo reboot", shell=True) + return True + + except Exception as e: + self._last_error = str(e) + return False + + def cancel_shutdown(self) -> bool: + """ + Cancel pending shutdown. + + Returns: + True if command executed + """ + try: + subprocess.call("sudo shutdown -c", shell=True) + return True + + except Exception as e: + self._last_error = str(e) + return False diff --git a/can_sniffer/src/flipper/providers/ups_provider.py b/can_sniffer/src/flipper/providers/ups_provider.py new file mode 100644 index 0000000..780da4d --- /dev/null +++ b/can_sniffer/src/flipper/providers/ups_provider.py @@ -0,0 +1,228 @@ +""" +UPS Provider for X120x UPS on Raspberry Pi. + +Reads battery status, voltage, and charging state via I2C (SMBus). +Based on https://github.com/suptronics/x120x +""" + +import struct +from typing import Optional, Tuple +from dataclasses import dataclass + +from flipper.providers.base import BaseProvider + + +@dataclass +class UPSData: + """UPS status data.""" + + voltage: float = 0.0 + capacity: float = 0.0 + is_charging: bool = False + power_loss: bool = False + input_voltage: float = 0.0 + + +class UPSProvider(BaseProvider): + """ + Provider for X120x UPS data. + + Reads from MAX17048 fuel gauge via I2C (address 0x36). + Also monitors power loss detection pin (GPIO 6). + + Hardware requirements: + - X120x UPS HAT connected via I2C + - smbus2 library installed + - gpiozero for power loss detection + """ + + # I2C address for MAX17048 fuel gauge + I2C_ADDRESS = 0x36 + I2C_BUS = 1 + + # GPIO pins + CHG_ONOFF_PIN = 16 + PLD_PIN = 6 # Power Loss Detection + + def __init__(self): + super().__init__(name="ups", cache_ttl=2.0) + + self._bus = None + self._pld_button = None + self._data = UPSData() + + self._init_hardware() + + def _init_hardware(self) -> None: + """Initialize I2C and GPIO.""" + try: + import smbus2 + self._bus = smbus2.SMBus(self.I2C_BUS) + self._available = True + except ImportError: + self._last_error = "smbus2 not installed" + self._available = False + except Exception as e: + self._last_error = f"I2C init failed: {e}" + self._available = False + + try: + from gpiozero import Button + self._pld_button = Button(self.PLD_PIN) + except ImportError: + pass # GPIO optional + except Exception: + pass # GPIO optional + + def _read_voltage_and_capacity(self) -> Tuple[float, float]: + """ + Read voltage and capacity from MAX17048. + + Returns: + Tuple of (voltage in V, capacity in %) + """ + if not self._bus: + return 0.0, 0.0 + + try: + # Read voltage register (0x02) + voltage_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x02) + # Read capacity register (0x04) + capacity_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x04) + + # Swap bytes (little-endian to big-endian) + voltage_swapped = struct.unpack("<H", struct.pack(">H", voltage_read))[0] + capacity_swapped = struct.unpack("<H", struct.pack(">H", capacity_read))[0] + + # Calculate actual values + voltage = voltage_swapped * 1.25 / 1000 / 16 + capacity = capacity_swapped / 256 + + return voltage, capacity + + except Exception as e: + self._last_error = f"Read error: {e}" + return 0.0, 0.0 + + def _get_power_loss_state(self) -> bool: + """ + Check power loss detection pin. + + Returns: + True if power loss detected (running on battery) + """ + if self._pld_button is None: + return False + + try: + # Button pressed = power OK, not pressed = power loss + return not self._pld_button.is_pressed + except Exception: + return False + + def _read_input_voltage(self) -> float: + """ + Read input voltage using vcgencmd. + + Returns: + Input voltage in V + """ + try: + from subprocess import check_output + output = check_output(["vcgencmd", "pmic_read_adc", "EXT5V_V"]).decode("utf-8") + value_str = output.split("=")[1].strip().rstrip("V") + return float(value_str) + except Exception: + return 0.0 + + def refresh(self) -> bool: + """Refresh UPS data from hardware.""" + if not self._available: + return False + + try: + voltage, capacity = self._read_voltage_and_capacity() + power_loss = self._get_power_loss_state() + input_voltage = self._read_input_voltage() + + # Determine charging state (charge disabled above 90%) + is_charging = capacity < 90 and not power_loss + + self._data = UPSData( + voltage=voltage, + capacity=capacity, + is_charging=is_charging, + power_loss=power_loss, + input_voltage=input_voltage + ) + + self._set_cached("data", self._data) + return True + + except Exception as e: + self._last_error = str(e) + return False + + def get_data(self) -> UPSData: + """ + Get current UPS data. + + Returns: + UPSData object with current status + """ + cached = self._get_cached("data") + if cached is not None: + return cached + + self.refresh() + return self._data + + def get_voltage(self) -> float: + """Get battery voltage in V.""" + return self.get_data().voltage + + def get_capacity(self) -> float: + """Get battery capacity in %.""" + return self.get_data().capacity + + def is_charging(self) -> bool: + """Check if battery is charging.""" + return self.get_data().is_charging + + def has_power_loss(self) -> bool: + """Check if external power is lost.""" + return self.get_data().power_loss + + def get_input_voltage(self) -> float: + """Get input voltage in V.""" + return self.get_data().input_voltage + + def get_status_string(self) -> str: + """ + Get human-readable status string. + + Returns: + Status string like "OK", "Battery", "Charging" + """ + data = self.get_data() + + if data.power_loss: + if data.capacity < 15: + return "CRITICAL" + elif data.capacity < 25: + return "LOW" + else: + return "Battery" + elif data.is_charging: + return "Charging" + else: + return "OK" + + def shutdown(self) -> None: + """Cleanup resources.""" + super().shutdown() + if self._bus: + try: + self._bus.close() + except Exception: + pass diff --git a/can_sniffer/src/handlers/flipper_handler.py b/can_sniffer/src/handlers/flipper_handler.py index b4096a0..7268268 100644 --- a/can_sniffer/src/handlers/flipper_handler.py +++ b/can_sniffer/src/handlers/flipper_handler.py @@ -1,15 +1,22 @@ """ -Flipper Zero UART Handler with Handshake Protocol. +Flipper Zero Dynamic UI Handler. -Waits for INIT command from Flipper Zero before sending statistics. -Supports handshake protocol for secure connection establishment. +Provides multi-page interface with bidirectional communication via UART. +Supports pluggable pages for CAN stats, UPS status, system info, and actions. Protocol: - 1. RPI5 waits in passive mode, listening for commands - 2. Flipper sends: INIT:flipper\n - 3. RPI5 responds: ACK:rpi5,ip=x.x.x.x\n - 4. RPI5 starts sending: STATS:ip=...,total=...,pending=...,processed=...\n - 5. Flipper sends: STOP:flipper\n to disconnect + RPi -> Flipper: + PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected> + ACK:<device>,ip=<ip> + RESULT:<status>|<message> + + Flipper -> RPi: + INIT:<device> + STOP:<device> + CMD:NAV:<next|prev> + CMD:SELECT:<index> + CMD:CONFIRM / CMD:CANCEL + CMD:REFRESH """ import socket @@ -22,6 +29,10 @@ from can_frame import CANFrame from config import config from logger import get_logger +from flipper.protocol import Protocol, Command, CommandType +from flipper.page_manager import PageManager +from flipper.pages import CANStatsPage, UPSStatusPage, SystemInfoPage, ActionsPage + logger = get_logger(__name__) @@ -57,16 +68,14 @@ class FlipperHandler(BaseHandler): """ Handler that communicates with Flipper Zero via UART. - Implements handshake protocol: - - Waits for INIT:flipper command - - Responds with ACK:rpi5,ip=x.x.x.x - - Sends STATS periodically while connected - - Stops on STOP:flipper command + Provides dynamic multi-page interface: + - CAN Statistics + - UPS Status (if available) + - System Information + - Actions Menu - UART Configuration: - - Device: /dev/ttyAMA0 (or configured device) - - Baud: 115200 - - Format: 8N1 + Implements handshake protocol for connection management + and bidirectional command processing. """ def __init__(self, enabled: Optional[bool] = None): @@ -83,6 +92,7 @@ class FlipperHandler(BaseHandler): super().__init__(name="flipper_handler", enabled=enabled) + # Serial configuration self.serial_port: Optional[Any] = None self.device = "/dev/ttyAMA0" self.baudrate = 115200 @@ -113,6 +123,39 @@ class FlipperHandler(BaseHandler): # IP address self._ip_address = "0.0.0.0" + # Page manager + self._page_manager = PageManager() + self._setup_pages() + + def _setup_pages(self) -> None: + """Setup default pages.""" + # CAN Statistics (always available) + can_page = CANStatsPage() + self._page_manager.register_page(can_page) + + # Keep reference to CAN provider for stats updates + self._can_provider = can_page.get_provider() + + # UPS Status (if available) + ups_page = UPSStatusPage() + self._page_manager.register_page(ups_page) + + # System Information + system_page = SystemInfoPage() + self._page_manager.register_page(system_page) + + # Actions Menu + actions_page = ActionsPage(on_result=self._on_action_result) + self._page_manager.register_page(actions_page) + + def _on_action_result(self, result: str) -> None: + """Handle action result from actions page.""" + self.logger.info(f"Action result: {result}") + # Send result to Flipper + if self._connected: + msg = Protocol.encode_result(True, result) + self._send_raw(msg) + def initialize(self) -> bool: """ Initialize UART connection. @@ -162,13 +205,15 @@ class FlipperHandler(BaseHandler): ) self._rx_thread.start() - # Start TX thread (sends stats when connected) + # Start TX thread (sends page content when connected) self._tx_thread = threading.Thread( target=self._tx_loop, name="FlipperTX", daemon=True ) self._tx_thread.start() - self.logger.info("Flipper handler started, waiting for connection...") + self.logger.info( + f"Flipper handler started with {self._page_manager.get_page_count()} pages" + ) def _rx_loop(self) -> None: """Receive loop - listens for commands from Flipper.""" @@ -198,41 +243,61 @@ class FlipperHandler(BaseHandler): self.logger.debug(f"RX error: {e}") time.sleep(0.1) - def _process_command(self, command: str) -> None: + def _process_command(self, raw_command: str) -> None: """ Process received command from Flipper. Args: - command: Received command string + raw_command: Raw command string """ - self.logger.info(f"Received command: {command}") + self.logger.debug(f"RX: {raw_command}") - if command.startswith("INIT:"): - # Handshake initiation - client_id = command[5:].strip() + # Handle handshake commands directly + if raw_command.startswith("INIT:"): + client_id = raw_command[5:].strip() self.logger.info(f"Handshake request from: {client_id}") - # Send ACK with IP address + # Update IP and send ACK self._ip_address = get_ip_address() - ack_msg = f"ACK:rpi5,ip={self._ip_address}\n" + ack_msg = Protocol.encode_ack("rpi5", self._ip_address) self._send_raw(ack_msg) self._connected = True self.logger.info(f"Connected to Flipper, IP: {self._ip_address}") - elif command.startswith("STOP:"): - # Disconnect request - client_id = command[5:].strip() + # Send initial page content + self._send_page_content() + return + + if raw_command.startswith("STOP:"): + client_id = raw_command[5:].strip() self.logger.info(f"Disconnect request from: {client_id}") self._connected = False - self.logger.info("Disconnected from Flipper") + return + + # Parse and process other commands + command = Protocol.decode_command(raw_command) + if command is None: + self.logger.debug(f"Unknown command: {raw_command}") + return + + # Process command via page manager + result = self._page_manager.process_command(command) + + # Send result if available + if result: + msg = Protocol.encode_result(True, result) + self._send_raw(msg) + + # Always send updated page content after command + self._send_page_content() def _tx_loop(self) -> None: - """Transmit loop - sends stats when connected.""" + """Transmit loop - sends page content periodically when connected.""" while self._running: try: if self._connected: - self._send_stats() + self._send_page_content() time.sleep(self.send_interval) @@ -263,21 +328,16 @@ class FlipperHandler(BaseHandler): self.logger.debug(f"Send error: {e}") return False - def _send_stats(self) -> None: - """Send current statistics to Flipper Zero.""" + def _send_page_content(self) -> None: + """Send current page content to Flipper Zero.""" if not self._connected: return - with self._stats_lock: - total = self._total_frames - pending = self._pending_frames - processed = self._processed_frames - - message = f"STATS:ip={self._ip_address},total={total},pending={pending},processed={processed}\n" - - if self._send_raw(message): - with self._stats_lock: - self._sent_count += 1 + content = self._page_manager.get_current_content() + if content: + if self._send_raw(content): + with self._stats_lock: + self._sent_count += 1 def handle(self, frame: CANFrame) -> bool: """ @@ -292,6 +352,14 @@ class FlipperHandler(BaseHandler): with self._stats_lock: self._total_frames += 1 self._pending_frames += 1 + + # Update CAN provider + self._can_provider.update_stats( + total=self._total_frames, + pending=self._pending_frames, + processed=self._processed_frames + ) + return True def handle_batch(self, frames: List[CANFrame]) -> int: @@ -305,10 +373,19 @@ class FlipperHandler(BaseHandler): Number of frames processed """ count = len(frames) + with self._stats_lock: self._total_frames += count self._processed_frames += count self._pending_frames = max(0, self._pending_frames - count) + + # Update CAN provider + self._can_provider.update_stats( + total=self._total_frames, + pending=self._pending_frames, + processed=self._processed_frames + ) + return count def update_pending(self, pending_count: int) -> None: @@ -321,11 +398,17 @@ class FlipperHandler(BaseHandler): with self._stats_lock: self._pending_frames = pending_count + self._can_provider.update_stats( + total=self._total_frames, + pending=self._pending_frames, + processed=self._processed_frames + ) + def flush(self) -> None: - """Flush - send immediate stats if connected.""" + """Flush - send immediate page content if connected.""" if self._connected: try: - self._send_stats() + self._send_page_content() except Exception as e: self.logger.debug(f"Flush error: {e}") @@ -350,6 +433,9 @@ class FlipperHandler(BaseHandler): except Exception as e: self.logger.debug(f"Error closing serial port: {e}") + # Shutdown page manager + self._page_manager.shutdown() + self._initialized = False self.logger.info("Flipper handler stopped") @@ -361,7 +447,7 @@ class FlipperHandler(BaseHandler): Dictionary with handler stats """ with self._stats_lock: - return { + stats = { "total_frames": self._total_frames, "pending_frames": self._pending_frames, "processed_frames": self._processed_frames, @@ -373,6 +459,15 @@ class FlipperHandler(BaseHandler): "ip_address": self._ip_address, } + # Add page manager stats + stats.update(self._page_manager.get_stats()) + + return stats + def is_connected(self) -> bool: """Check if Flipper is connected.""" return self._connected + + def get_page_manager(self) -> PageManager: + """Get page manager for external page registration.""" + return self._page_manager diff --git a/flip_monitor/README.md b/flip_monitor/README.md index c935517..81ea835 100644 --- a/flip_monitor/README.md +++ b/flip_monitor/README.md @@ -1,27 +1,104 @@ # CAN Monitor for Flipper Zero -Flipper Zero application for monitoring CAN sniffer statistics from Raspberry Pi 5 via UART. +Dynamic multi-page monitor application for Raspberry Pi 5 via UART. ## Features -- Real-time display of CAN sniffer statistics -- Shows: IP address, total frames, pending frames, processed frames -- Connection status indicator -- Compatible with Unleashed firmware (0.84e and later) +- **Dynamic Page System** - Pages are fully controlled by RPi5 +- **Multiple Page Types**: + - **Info** - Read-only status display + - **Menu** - Selectable action items + - **Confirm** - Yes/No confirmation dialogs +- **Real-time Updates** - Content refreshes every second +- **Bidirectional Communication** - Send commands back to RPi5 +- **Result Notifications** - Action results displayed as overlay + +## Default Pages + +When connected to the CAN Sniffer system: + +| Page | Type | Description | +|------|------|-------------| +| CAN Statistics | Info | Frame counts, queue status, per-interface stats | +| UPS Status | Info | Battery level, voltage, charging state (X120x) | +| System Info | Info | CPU temperature, power consumption, fan RPM | +| Actions | Menu | Shutdown, Reboot, Cancel shutdown | + +## Screen Layout + +``` +128x64 pixels (Flipper Zero display) + ++----------------------------------+ +| [1/4] Page Title (*) | <- Header: page indicator, title, connection dot ++----------------------------------+ +| | +| Content Line 1 | <- Content area: 4-5 lines +| Content Line 2 | +| Content Line 3 | +| Content Line 4 | +| | ++----------------------------------+ +| < Hint Text > | <- Footer: nav arrows, action hint ++----------------------------------+ +``` + +## Controls + +| Key | Info Page | Menu Page | Confirm Page | +|-----|-----------|-----------|--------------| +| **Left/Right** | Navigate pages | Navigate pages | Navigate pages | +| **Up/Down** | - | Select item | Switch Yes/No | +| **OK** | - | Execute action | Confirm selection | +| **Back** | Disconnect (pg 0) / Prev page | Same | Cancel dialog | + +## Protocol + +### RPi5 -> Flipper + +``` +PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected> +ACK:<device>,ip=<ip> +RESULT:<OK|ERROR>|<message> +``` + +**Examples:** +``` +PAGE:0/4|info|CAN Statistics|Total: 12.3K;Processed: 12.1K;Queue: 142||0 +PAGE:1/4|info|UPS Status|Bat: 85.2% [====];Voltage: 4.12V;Status: Charging||0 +PAGE:2/4|info|System Info|CPU Temp: 52.1C;Power: 4.2W;Fan: 3200 RPM||0 +PAGE:3/4|menu|Actions|Shutdown;Reboot;Cancel Shutdown||0 +PAGE:3/4|confirm|Confirm Shutdown?|Are you sure?|Yes;No|1 +ACK:rpi5,ip=192.168.1.100 +RESULT:OK|Shutdown in 1 min +``` + +### Flipper -> RPi5 + +``` +INIT:flipper - Start handshake +STOP:flipper - Disconnect +CMD:NAV:next - Next page +CMD:NAV:prev - Previous page +CMD:SELECT:<index> - Select menu item +CMD:CONFIRM - Confirm action +CMD:CANCEL - Cancel action +CMD:REFRESH - Request page update +``` ## Hardware Connection -### Wiring (RPI5 <-> Flipper Zero) +### Wiring Diagram ``` -RPI5 GPIO Flipper Zero GPIO ------------ ----------------- -TX (GPIO 14) --> RX (Pin 14) -RX (GPIO 15) <-- TX (Pin 13) -GND --> GND +Flipper Zero Raspberry Pi 5 +----------- --------------- +Pin 13 (TX) ----> GPIO 15 (RX) - Pin 10 +Pin 14 (RX) <---- GPIO 14 (TX) - Pin 8 +Pin 8/11/18 (GND) ---- GND - Pin 6 ``` -**Note:** Cross TX/RX connections (RPI TX -> Flipper RX, RPI RX -> Flipper TX) +**Note:** Cross TX/RX connections (Flipper TX -> RPi RX) ### Flipper Zero GPIO Pinout @@ -54,12 +131,6 @@ cd unleashed-firmware cp -r /path/to/carpibord/flip_monitor applications_user/can_monitor ``` -3. Create icon (10x10 PNG, 1-bit): -```bash -# Create icons/can_monitor.png (10x10 pixels, black & white) -# You can use any image editor or online tool -``` - ### Build ```bash @@ -83,23 +154,29 @@ SD Card/apps/GPIO/can_monitor.fap ### 1. Enable UART -Add to `/boot/config.txt`: +Add to `/boot/firmware/config.txt`: ``` enable_uart=1 dtoverlay=uart0 ``` +Disable serial console: +```bash +sudo raspi-config +# Interface Options -> Serial Port -> No (login shell) -> Yes (hardware) +``` + Reboot after changes. -### 2. Install pyserial +### 2. Install Dependencies ```bash -pip install pyserial +pip install pyserial smbus2 gpiozero ``` ### 3. Configure CAN Sniffer -Add to `can_sniffer/config.json`: +Add to `can_sniffer/src/config.json`: ```json { "flipper": { @@ -124,19 +201,38 @@ cd can_sniffer/src python main.py ``` -## Protocol +## Adding Custom Pages -The RPI5 sends text-based statistics over UART: +On RPi5 side, create a new page class: -``` -STATS:ip=192.168.1.100,total=12345,pending=100,processed=12245\n +```python +from flipper.pages.base import InfoPage + +class MyCustomPage(InfoPage): + def __init__(self): + super().__init__( + name="my_page", + title="My Page", + icon="custom" + ) + + def get_lines(self) -> list[str]: + return [ + "Custom line 1", + "Custom line 2", + f"Value: {self.get_value()}" + ] ``` -Fields: -- `ip` - RPI5 IP address -- `total` - Total CAN frames received -- `pending` - Frames in processing queue -- `processed` - Successfully processed frames +Register in FlipperHandler: + +```python +# In flipper_handler.py +from flipper.pages.my_custom import MyCustomPage + +# In _setup_pages(): +self._page_manager.register_page(MyCustomPage()) +``` ## Troubleshooting @@ -148,7 +244,7 @@ Fields: 4. Test UART manually: ```bash # On RPI5 -echo "STATS:ip=test,total=1,pending=0,processed=1" > /dev/ttyAMA0 +echo "ACK:rpi5,ip=test" > /dev/ttyAMA0 ``` ### Permission denied on /dev/ttyAMA0 @@ -159,11 +255,23 @@ sudo usermod -a -G dialout $USER # Then logout and login again ``` -### Flipper shows "Waiting..." +### Flipper shows "Waiting for data..." -- Stats are sent every 1 second (configurable) +- Data is sent every 1 second (configurable) - Connection timeout is 5 seconds - Check if CAN sniffer is running +- Check UART logs: `CAN_SNIFFER_LOGGING__LEVEL=DEBUG python main.py` + +### UPS page shows "not available" + +- X120x UPS must be connected via I2C +- Install smbus2: `pip install smbus2` +- Check I2C: `i2cdetect -y 1` (should show device at 0x36) + +## Version History + +- **v2.0** - Dynamic page system, bidirectional commands, UPS/System pages +- **v1.0** - Basic CAN statistics display ## License diff --git a/flip_monitor/application.fam b/flip_monitor/application.fam index 03f0e1a..3191a32 100644 --- a/flip_monitor/application.fam +++ b/flip_monitor/application.fam @@ -3,9 +3,10 @@ App( name="CAN Monitor", apptype=FlipperAppType.EXTERNAL, entry_point="can_monitor_app", - stack_size=2 * 1024, + stack_size=4 * 1024, fap_category="GPIO", fap_author="carpibord", - fap_version="1.0", - fap_description="CAN Sniffer monitor via UART from RPI5", + fap_version="2.0", + fap_description="Dynamic multi-page RPI5 monitor with CAN stats, UPS, system info and actions", + fap_icon="A:/icons/gpio_50.png", ) diff --git a/flip_monitor/can_monitor.c b/flip_monitor/can_monitor.c index d5dc3f6..752c273 100644 --- a/flip_monitor/can_monitor.c +++ b/flip_monitor/can_monitor.c @@ -1,14 +1,21 @@ /** - * CAN Monitor for Flipper Zero + * CAN Monitor for Flipper Zero - Dynamic UI Version * - * Multi-page application with handshake protocol. + * Multi-page application with dynamic content from RPI5. * - * Handshake: - * 1. User presses OK on welcome screen - * 2. Flipper sends: INIT:flipper\n - * 3. RPI5 responds: ACK:rpi5,ip=x.x.x.x\n - * 4. Flipper allows navigation to stats page - * 5. RPI5 starts sending: STATS:ip=...,total=...,pending=...,processed=...\n + * Protocol: + * RPi -> Flipper: + * PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected> + * ACK:<device>,ip=<ip> + * RESULT:<status>|<message> + * + * Flipper -> RPi: + * INIT:flipper + * STOP:flipper + * CMD:NAV:next / CMD:NAV:prev + * CMD:SELECT:<index> + * CMD:CONFIRM / CMD:CANCEL + * CMD:REFRESH * * Wiring: * RPI5 TX (GPIO14, Pin 8) -> Flipper RX (Pin 14) @@ -20,6 +27,7 @@ #include <furi_hal.h> #include <gui/gui.h> #include <gui/view_port.h> +#include <gui/elements.h> #include <expansion/expansion.h> #include <stdlib.h> @@ -27,8 +35,11 @@ #define TAG "CANMonitor" #define UART_BAUD 115200 -#define RX_BUFFER_SIZE 256 -#define IP_BUFFER_SIZE 32 +#define RX_BUFFER_SIZE 512 +#define MAX_LINES 5 +#define MAX_ACTIONS 4 +#define MAX_LINE_LENGTH 32 +#define MAX_TITLE_LENGTH 24 // Connection state typedef enum { @@ -37,21 +48,35 @@ typedef enum { StateConnected, } ConnectionState; -// Pages +// Page types typedef enum { - PageWelcome, - PageStats, -} AppPage; + PageTypeInfo, + PageTypeMenu, + PageTypeConfirm, +} PageType; -// Statistics +// Page content received from RPi typedef struct { - char ip_address[IP_BUFFER_SIZE]; - uint32_t total_frames; - uint32_t pending_frames; - uint32_t processed_frames; - bool data_received; + uint8_t page_index; + uint8_t total_pages; + PageType page_type; + char title[MAX_TITLE_LENGTH]; + char lines[MAX_LINES][MAX_LINE_LENGTH]; + uint8_t line_count; + char actions[MAX_ACTIONS][MAX_LINE_LENGTH]; + uint8_t action_count; + uint8_t selected_index; + bool data_valid; uint32_t last_update_tick; -} CanStats; +} PageContent; + +// Result message +typedef struct { + bool has_result; + bool success; + char message[MAX_LINE_LENGTH]; + uint32_t show_until_tick; +} ResultMessage; // App context typedef struct { @@ -63,13 +88,19 @@ typedef struct { FuriStreamBuffer* rx_stream; FuriThread* worker_thread; - CanStats stats; - AppPage current_page; + PageContent page; + ResultMessage result; ConnectionState conn_state; + char ip_address[32]; + volatile bool running; volatile bool send_init; } CanMonitorApp; +// Forward declarations +static void draw_callback(Canvas* canvas, void* ctx); +static void input_callback(InputEvent* event, void* ctx); + // Send data via UART static void uart_send(CanMonitorApp* app, const char* data) { if(app->serial) { @@ -78,77 +109,200 @@ static void uart_send(CanMonitorApp* app, const char* data) { } } -// Parse ACK response: ACK:rpi5,ip=x.x.x.x -static bool parse_ack(const char* line, CanStats* stats) { +// Clear page content +static void clear_page_content(PageContent* page) { + page->page_index = 0; + page->total_pages = 0; + page->page_type = PageTypeInfo; + page->title[0] = '\0'; + page->line_count = 0; + page->action_count = 0; + page->selected_index = 0; + page->data_valid = false; + + for(int i = 0; i < MAX_LINES; i++) { + page->lines[i][0] = '\0'; + } + for(int i = 0; i < MAX_ACTIONS; i++) { + page->actions[i][0] = '\0'; + } +} + +// Parse semicolon-separated items into array +static uint8_t parse_items(const char* str, char items[][MAX_LINE_LENGTH], uint8_t max_items) { + uint8_t count = 0; + + if(!str || str[0] == '\0') { + return 0; + } + + const char* start = str; + const char* end; + + while(count < max_items && start && *start) { + end = strchr(start, ';'); + + size_t len; + if(end) { + len = end - start; + } else { + len = strlen(start); + } + + if(len >= MAX_LINE_LENGTH) { + len = MAX_LINE_LENGTH - 1; + } + + if(len > 0) { + memcpy(items[count], start, len); + items[count][len] = '\0'; + count++; + } + + if(end) { + start = end + 1; + } else { + break; + } + } + + return count; +} + +// Parse PAGE message +// Format: PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected> +static bool parse_page(const char* line, PageContent* page) { + if(strncmp(line, "PAGE:", 5) != 0) { + return false; + } + + const char* p = line + 5; + char* end; + + // Parse page index and total + page->page_index = (uint8_t)strtoul(p, &end, 10); + if(*end != '/') return false; + p = end + 1; + + page->total_pages = (uint8_t)strtoul(p, &end, 10); + if(*end != '|') return false; + p = end + 1; + + // Parse page type + const char* type_end = strchr(p, '|'); + if(!type_end) return false; + + if(strncmp(p, "info", type_end - p) == 0) { + page->page_type = PageTypeInfo; + } else if(strncmp(p, "menu", type_end - p) == 0) { + page->page_type = PageTypeMenu; + } else if(strncmp(p, "confirm", type_end - p) == 0) { + page->page_type = PageTypeConfirm; + } else { + page->page_type = PageTypeInfo; + } + p = type_end + 1; + + // Parse title + const char* title_end = strchr(p, '|'); + if(!title_end) return false; + + size_t title_len = title_end - p; + if(title_len >= MAX_TITLE_LENGTH) { + title_len = MAX_TITLE_LENGTH - 1; + } + memcpy(page->title, p, title_len); + page->title[title_len] = '\0'; + p = title_end + 1; + + // Parse lines + const char* lines_end = strchr(p, '|'); + if(!lines_end) return false; + + char lines_buf[256]; + size_t lines_len = lines_end - p; + if(lines_len >= sizeof(lines_buf)) { + lines_len = sizeof(lines_buf) - 1; + } + memcpy(lines_buf, p, lines_len); + lines_buf[lines_len] = '\0'; + page->line_count = parse_items(lines_buf, page->lines, MAX_LINES); + p = lines_end + 1; + + // Parse actions + const char* actions_end = strchr(p, '|'); + if(!actions_end) return false; + + char actions_buf[128]; + size_t actions_len = actions_end - p; + if(actions_len >= sizeof(actions_buf)) { + actions_len = sizeof(actions_buf) - 1; + } + memcpy(actions_buf, p, actions_len); + actions_buf[actions_len] = '\0'; + page->action_count = parse_items(actions_buf, page->actions, MAX_ACTIONS); + p = actions_end + 1; + + // Parse selected index + page->selected_index = (uint8_t)strtoul(p, NULL, 10); + + page->data_valid = true; + page->last_update_tick = furi_get_tick(); + + return true; +} + +// Parse ACK message +static bool parse_ack(const char* line, char* ip_out, size_t ip_size) { if(strncmp(line, "ACK:", 4) != 0) { return false; } - const char* p = line + 4; - - // Check for rpi5 identifier - if(strncmp(p, "rpi5", 4) != 0) { - return false; - } - - // Parse ip= - const char* ip = strstr(p, "ip="); + const char* ip = strstr(line, "ip="); if(ip) { ip += 3; - // Find end (comma, newline, or end of string) size_t len = 0; while(ip[len] && ip[len] != ',' && ip[len] != '\n' && ip[len] != '\r') { len++; } - if(len > 0 && len < IP_BUFFER_SIZE) { - memcpy(stats->ip_address, ip, len); - stats->ip_address[len] = '\0'; + if(len > 0 && len < ip_size) { + memcpy(ip_out, ip, len); + ip_out[len] = '\0'; } } return true; } -// Parse STATS: STATS:ip=x.x.x.x,total=N,pending=N,processed=N -static bool parse_stats(const char* line, CanStats* stats) { - if(strncmp(line, "STATS:", 6) != 0) { +// Parse RESULT message +static bool parse_result(const char* line, ResultMessage* result) { + if(strncmp(line, "RESULT:", 7) != 0) { return false; } - const char* p = line + 6; + const char* p = line + 7; - // Parse ip= - const char* ip = strstr(p, "ip="); - if(ip) { - ip += 3; - const char* end = strchr(ip, ','); - if(end && (size_t)(end - ip) < IP_BUFFER_SIZE) { - size_t len = end - ip; - memcpy(stats->ip_address, ip, len); - stats->ip_address[len] = '\0'; - } + // Parse status + if(strncmp(p, "OK|", 3) == 0) { + result->success = true; + p += 3; + } else if(strncmp(p, "ERROR|", 6) == 0) { + result->success = false; + p += 6; + } else { + return false; } - // Parse total= - const char* total = strstr(p, "total="); - if(total) { - stats->total_frames = strtoul(total + 6, NULL, 10); + // Parse message + size_t len = strlen(p); + if(len >= MAX_LINE_LENGTH) { + len = MAX_LINE_LENGTH - 1; } + memcpy(result->message, p, len); + result->message[len] = '\0'; - // Parse pending= - const char* pending = strstr(p, "pending="); - if(pending) { - stats->pending_frames = strtoul(pending + 8, NULL, 10); - } - - // Parse processed= - const char* processed = strstr(p, "processed="); - if(processed) { - stats->processed_frames = strtoul(processed + 10, NULL, 10); - } - - stats->data_received = true; - stats->last_update_tick = furi_get_tick(); + result->has_result = true; + result->show_until_tick = furi_get_tick() + 3000; // Show for 3 seconds return true; } @@ -161,13 +315,17 @@ static void process_line(CanMonitorApp* app, const char* line) { if(app->conn_state == StateConnecting) { // Waiting for ACK - if(parse_ack(line, &app->stats)) { - FURI_LOG_I(TAG, "ACK received, IP: %s", app->stats.ip_address); + if(parse_ack(line, app->ip_address, sizeof(app->ip_address))) { + FURI_LOG_I(TAG, "ACK received, IP: %s", app->ip_address); app->conn_state = StateConnected; } } else if(app->conn_state == StateConnected) { - // Parse stats - parse_stats(line, &app->stats); + // Parse PAGE or RESULT + if(line[0] == 'P') { + parse_page(line, &app->page); + } else if(line[0] == 'R') { + parse_result(line, &app->result); + } } furi_mutex_release(app->mutex); @@ -201,6 +359,7 @@ static int32_t worker_thread(void* ctx) { furi_mutex_acquire(app->mutex, FuriWaitForever); app->conn_state = StateConnecting; + clear_page_content(&app->page); furi_mutex_release(app->mutex); view_port_update(app->view_port); } @@ -232,12 +391,18 @@ static int32_t worker_thread(void* ctx) { // Check data timeout (5 sec) - only when connected furi_mutex_acquire(app->mutex, FuriWaitForever); - if(app->conn_state == StateConnected && app->stats.data_received) { - if((furi_get_tick() - app->stats.last_update_tick) > 5000) { - app->stats.data_received = false; + if(app->conn_state == StateConnected && app->page.data_valid) { + if((furi_get_tick() - app->page.last_update_tick) > 5000) { + app->page.data_valid = false; view_port_update(app->view_port); } } + + // Clear result message after timeout + if(app->result.has_result && furi_get_tick() > app->result.show_until_tick) { + app->result.has_result = false; + view_port_update(app->view_port); + } furi_mutex_release(app->mutex); } @@ -245,24 +410,166 @@ static int32_t worker_thread(void* ctx) { return 0; } -// Draw welcome page +// Draw header with page indicator +static void draw_header(Canvas* canvas, CanMonitorApp* app) { + // Page indicator + char page_str[16]; + snprintf( + page_str, + sizeof(page_str), + "[%d/%d]", + app->page.page_index + 1, + app->page.total_pages); + canvas_draw_str(canvas, 0, 8, page_str); + + // Title (centered) + uint16_t title_width = canvas_string_width(canvas, app->page.title); + uint16_t title_x = (128 - title_width) / 2; + canvas_draw_str(canvas, title_x, 8, app->page.title); + + // Connection indicator (right side) + if(app->page.data_valid) { + canvas_draw_disc(canvas, 122, 4, 3); + } else { + canvas_draw_circle(canvas, 122, 4, 3); + } + + // Separator line + canvas_draw_line(canvas, 0, 11, 128, 11); +} + +// Draw footer with navigation hints +static void draw_footer(Canvas* canvas, PageType page_type, uint8_t total_pages, uint8_t current_page) { + canvas_draw_line(canvas, 0, 54, 128, 54); + + canvas_set_font(canvas, FontSecondary); + + // Left arrow (if not first page) + if(current_page > 0) { + canvas_draw_str(canvas, 2, 63, "<"); + } + + // Center action hint + const char* hint = ""; + switch(page_type) { + case PageTypeInfo: + hint = ""; + break; + case PageTypeMenu: + hint = "OK=Select"; + break; + case PageTypeConfirm: + hint = "OK=Yes Back=No"; + break; + } + uint16_t hint_width = canvas_string_width(canvas, hint); + canvas_draw_str(canvas, (128 - hint_width) / 2, 63, hint); + + // Right arrow (if not last page) + if(current_page < total_pages - 1) { + canvas_draw_str(canvas, 122, 63, ">"); + } +} + +// Draw info page +static void draw_info_page(Canvas* canvas, CanMonitorApp* app) { + canvas_set_font(canvas, FontSecondary); + + uint8_t y = 22; + for(uint8_t i = 0; i < app->page.line_count && i < MAX_LINES; i++) { + canvas_draw_str(canvas, 2, y, app->page.lines[i]); + y += 10; + } +} + +// Draw menu page +static void draw_menu_page(Canvas* canvas, CanMonitorApp* app) { + canvas_set_font(canvas, FontSecondary); + + uint8_t y = 22; + for(uint8_t i = 0; i < app->page.action_count && i < MAX_ACTIONS; i++) { + // Selection indicator + if(i == app->page.selected_index) { + canvas_draw_box(canvas, 0, y - 8, 128, 10); + canvas_set_color(canvas, ColorWhite); + } + + canvas_draw_str(canvas, 4, y, app->page.actions[i]); + + if(i == app->page.selected_index) { + canvas_set_color(canvas, ColorBlack); + } + + y += 10; + } +} + +// Draw confirm page +static void draw_confirm_page(Canvas* canvas, CanMonitorApp* app) { + canvas_set_font(canvas, FontSecondary); + + // Message (centered) + if(app->page.line_count > 0) { + uint16_t msg_width = canvas_string_width(canvas, app->page.lines[0]); + canvas_draw_str(canvas, (128 - msg_width) / 2, 30, app->page.lines[0]); + } + + // Yes/No options + uint8_t btn_y = 44; + uint8_t yes_x = 32; + uint8_t no_x = 80; + + // Draw Yes + if(app->page.selected_index == 0) { + canvas_draw_box(canvas, yes_x - 2, btn_y - 8, 24, 12); + canvas_set_color(canvas, ColorWhite); + } + canvas_draw_str(canvas, yes_x, btn_y, "Yes"); + if(app->page.selected_index == 0) { + canvas_set_color(canvas, ColorBlack); + } + + // Draw No + if(app->page.selected_index == 1) { + canvas_draw_box(canvas, no_x - 2, btn_y - 8, 20, 12); + canvas_set_color(canvas, ColorWhite); + } + canvas_draw_str(canvas, no_x, btn_y, "No"); + if(app->page.selected_index == 1) { + canvas_set_color(canvas, ColorBlack); + } +} + +// Draw result overlay +static void draw_result_overlay(Canvas* canvas, ResultMessage* result) { + // Semi-transparent background + canvas_set_color(canvas, ColorWhite); + canvas_draw_box(canvas, 10, 20, 108, 28); + canvas_set_color(canvas, ColorBlack); + canvas_draw_frame(canvas, 10, 20, 108, 28); + + canvas_set_font(canvas, FontPrimary); + const char* status = result->success ? "OK" : "Error"; + canvas_draw_str(canvas, 14, 32, status); + + canvas_set_font(canvas, FontSecondary); + canvas_draw_str(canvas, 14, 44, result->message); +} + +// Draw welcome/connecting screen static void draw_welcome(Canvas* canvas, CanMonitorApp* app) { canvas_clear(canvas); - // Title canvas_set_font(canvas, FontPrimary); canvas_draw_str_aligned(canvas, 64, 8, AlignCenter, AlignCenter, "CAN Monitor"); canvas_set_font(canvas, FontSecondary); + canvas_draw_str_aligned(canvas, 64, 22, AlignCenter, AlignCenter, "RPI5 Dynamic UI"); - // Subtitle - canvas_draw_str_aligned(canvas, 64, 22, AlignCenter, AlignCenter, "RPI5 CAN Sniffer"); - - // Status based on connection state furi_mutex_acquire(app->mutex, FuriWaitForever); ConnectionState state = app->conn_state; - char ip_buf[IP_BUFFER_SIZE]; - strncpy(ip_buf, app->stats.ip_address, IP_BUFFER_SIZE); + char ip_buf[32]; + strncpy(ip_buf, app->ip_address, sizeof(ip_buf)); furi_mutex_release(app->mutex); switch(state) { @@ -283,139 +590,167 @@ static void draw_welcome(Canvas* canvas, CanMonitorApp* app) { snprintf(buf, sizeof(buf), "RPI5: %s", ip_buf); canvas_draw_str_aligned(canvas, 64, 46, AlignCenter, AlignCenter, buf); } - canvas_draw_str_aligned(canvas, 64, 58, AlignCenter, AlignCenter, "Press RIGHT >"); + canvas_draw_str_aligned(canvas, 64, 58, AlignCenter, AlignCenter, "Waiting for data..."); break; } } -// Draw stats page -static void draw_stats(Canvas* canvas, CanMonitorApp* app) { - char buf[64]; - - canvas_clear(canvas); - - // Header - canvas_set_font(canvas, FontPrimary); - canvas_draw_str(canvas, 10, 10, "CAN Monitor"); - - // Navigation hint - canvas_draw_str(canvas, 0, 10, "<"); - - // Connection indicator - furi_mutex_acquire(app->mutex, FuriWaitForever); - bool data_ok = app->stats.data_received; - furi_mutex_release(app->mutex); - - if(data_ok) { - canvas_draw_disc(canvas, 120, 6, 4); - } else { - canvas_draw_circle(canvas, 120, 6, 4); - } - - // Separator - canvas_draw_line(canvas, 0, 14, 128, 14); - - canvas_set_font(canvas, FontSecondary); - - furi_mutex_acquire(app->mutex, FuriWaitForever); - - // IP Address - if(strlen(app->stats.ip_address) > 0) { - snprintf(buf, sizeof(buf), "IP: %s", app->stats.ip_address); - } else { - snprintf(buf, sizeof(buf), "IP: ---"); - } - canvas_draw_str(canvas, 0, 26, buf); - - // Total frames - snprintf(buf, sizeof(buf), "Total frames: %lu", (unsigned long)app->stats.total_frames); - canvas_draw_str(canvas, 0, 38, buf); - - // Pending frames - snprintf(buf, sizeof(buf), "Pending: %lu", (unsigned long)app->stats.pending_frames); - canvas_draw_str(canvas, 0, 50, buf); - - // Processed frames - snprintf(buf, sizeof(buf), "Processed: %lu", (unsigned long)app->stats.processed_frames); - canvas_draw_str(canvas, 4, 62, buf); - - furi_mutex_release(app->mutex); -} - // Draw callback static void draw_callback(Canvas* canvas, void* ctx) { CanMonitorApp* app = ctx; - switch(app->current_page) { - case PageWelcome: - draw_welcome(canvas, app); + furi_mutex_acquire(app->mutex, FuriWaitForever); + + // Check if we should show welcome screen + if(app->conn_state != StateConnected || !app->page.data_valid) { + furi_mutex_release(app->mutex); + draw_welcome(canvas, app); + return; + } + + canvas_clear(canvas); + canvas_set_font(canvas, FontPrimary); + + // Draw header + draw_header(canvas, app); + + // Draw content based on page type + switch(app->page.page_type) { + case PageTypeInfo: + draw_info_page(canvas, app); break; - case PageStats: - draw_stats(canvas, app); + case PageTypeMenu: + draw_menu_page(canvas, app); + break; + case PageTypeConfirm: + draw_confirm_page(canvas, app); break; } + + // Draw footer + draw_footer(canvas, app->page.page_type, app->page.total_pages, app->page.page_index); + + // Draw result overlay if present + if(app->result.has_result) { + draw_result_overlay(canvas, &app->result); + } + + furi_mutex_release(app->mutex); } // Input callback static void input_callback(InputEvent* event, void* ctx) { CanMonitorApp* app = ctx; - if(event->type == InputTypeShort) { - furi_mutex_acquire(app->mutex, FuriWaitForever); + if(event->type != InputTypeShort && event->type != InputTypeRepeat) { + return; + } + furi_mutex_acquire(app->mutex, FuriWaitForever); + + // Handle welcome screen input + if(app->conn_state == StateDisconnected) { + if(event->key == InputKeyOk) { + app->send_init = true; + } else if(event->key == InputKeyBack) { + app->running = false; + } + furi_mutex_release(app->mutex); + return; + } + + // Handle connected state input + if(app->conn_state == StateConnected && app->page.data_valid) { switch(event->key) { - case InputKeyOk: - if(app->current_page == PageWelcome && app->conn_state == StateDisconnected) { - // Start handshake - app->send_init = true; + case InputKeyUp: + if(app->page.page_type == PageTypeMenu && app->page.selected_index > 0) { + app->page.selected_index--; + view_port_update(app->view_port); + } else if(app->page.page_type == PageTypeConfirm) { + app->page.selected_index = 0; // Yes + view_port_update(app->view_port); } break; - case InputKeyRight: - // Only allow if connected - if(app->current_page == PageWelcome && app->conn_state == StateConnected) { - app->current_page = PageStats; + case InputKeyDown: + if(app->page.page_type == PageTypeMenu && + app->page.selected_index < app->page.action_count - 1) { + app->page.selected_index++; + view_port_update(app->view_port); + } else if(app->page.page_type == PageTypeConfirm) { + app->page.selected_index = 1; // No view_port_update(app->view_port); } break; case InputKeyLeft: - if(app->current_page == PageStats) { - app->current_page = PageWelcome; - view_port_update(app->view_port); + // Navigate to previous page + uart_send(app, "CMD:NAV:prev\n"); + break; + + case InputKeyRight: + // Navigate to next page + uart_send(app, "CMD:NAV:next\n"); + break; + + case InputKeyOk: + if(app->page.page_type == PageTypeMenu) { + // Send selection + char cmd[32]; + snprintf(cmd, sizeof(cmd), "CMD:SELECT:%d\n", app->page.selected_index); + uart_send(app, cmd); + } else if(app->page.page_type == PageTypeConfirm) { + if(app->page.selected_index == 0) { + uart_send(app, "CMD:CONFIRM\n"); + } else { + uart_send(app, "CMD:CANCEL\n"); + } } break; case InputKeyBack: - if(app->current_page == PageWelcome) { - app->running = false; - } else { - app->current_page = PageWelcome; + if(app->page.page_type == PageTypeConfirm) { + // Cancel confirmation + uart_send(app, "CMD:CANCEL\n"); + } else if(app->page.page_index == 0) { + // On first page, disconnect + uart_send(app, "STOP:flipper\n"); + app->conn_state = StateDisconnected; + clear_page_content(&app->page); view_port_update(app->view_port); + } else { + // Go to previous page + uart_send(app, "CMD:NAV:prev\n"); } break; default: break; } - - furi_mutex_release(app->mutex); + } else if(event->key == InputKeyBack) { + // Disconnect and exit + if(app->conn_state != StateDisconnected) { + uart_send(app, "STOP:flipper\n"); + } + app->running = false; } + + furi_mutex_release(app->mutex); } // Main entry point int32_t can_monitor_app(void* p) { UNUSED(p); - FURI_LOG_I(TAG, "Starting CAN Monitor"); + FURI_LOG_I(TAG, "Starting CAN Monitor (Dynamic UI)"); // Allocate app CanMonitorApp* app = malloc(sizeof(CanMonitorApp)); memset(app, 0, sizeof(CanMonitorApp)); app->running = true; - app->current_page = PageWelcome; app->conn_state = StateDisconnected; app->send_init = false; + clear_page_content(&app->page); // Mutex app->mutex = furi_mutex_alloc(FuriMutexTypeNormal); @@ -440,7 +775,7 @@ int32_t can_monitor_app(void* p) { } // Start worker - app->worker_thread = furi_thread_alloc_ex("CanMonitorWorker", 1024, worker_thread, app); + app->worker_thread = furi_thread_alloc_ex("CanMonitorWorker", 2048, worker_thread, app); furi_thread_start(app->worker_thread); // Init GUI @@ -450,7 +785,7 @@ int32_t can_monitor_app(void* p) { view_port_input_callback_set(app->view_port, input_callback, app); gui_add_view_port(app->gui, app->view_port, GuiLayerFullscreen); - FURI_LOG_I(TAG, "GUI initialized, waiting for user input"); + FURI_LOG_I(TAG, "GUI initialized"); // Main loop while(app->running) { @@ -460,7 +795,7 @@ int32_t can_monitor_app(void* p) { FURI_LOG_I(TAG, "Shutting down"); // Send disconnect signal - if(app->conn_state == StateConnected) { + if(app->conn_state != StateDisconnected) { uart_send(app, "STOP:flipper\n"); }