Create new module for flipper and update UI flipper zero

This commit is contained in:
2026-01-27 14:52:06 +03:00
parent 434feb2a1d
commit dab72ac1b1
18 changed files with 2659 additions and 254 deletions

View File

@@ -0,0 +1,11 @@
"""
Flipper Zero Dynamic UI System.
This module provides a plugin-based page system for Flipper Zero display
with bidirectional communication via UART.
"""
from flipper.protocol import PageType, PageContent, Protocol
from flipper.page_manager import PageManager
__all__ = ["PageType", "PageContent", "Protocol", "PageManager"]

View File

@@ -0,0 +1,255 @@
"""
Page Manager for Flipper Zero UI.
Manages page navigation, state, and content generation.
"""
import threading
from typing import List, Optional, Dict, Any
from flipper.protocol import Protocol, PageContent, Command, CommandType
from flipper.pages.base import BasePage
class PageManager:
"""
Manages Flipper Zero pages and navigation.
Handles:
- Page registration and lifecycle
- Navigation between pages
- Command processing
- Content generation
"""
def __init__(self):
"""Initialize page manager."""
self._pages: List[BasePage] = []
self._current_index = 0
self._lock = threading.Lock()
self._last_result: Optional[str] = None
def register_page(self, page: BasePage) -> None:
"""
Register a page.
Args:
page: Page instance to register
"""
with self._lock:
# Check for duplicate names
for existing in self._pages:
if existing.name == page.name:
raise ValueError(f"Page '{page.name}' already registered")
self._pages.append(page)
def unregister_page(self, name: str) -> bool:
"""
Unregister a page by name.
Args:
name: Page name to remove
Returns:
True if page was found and removed
"""
with self._lock:
for i, page in enumerate(self._pages):
if page.name == name:
self._pages.pop(i)
# Adjust current index if needed
if self._current_index >= len(self._pages):
self._current_index = max(0, len(self._pages) - 1)
return True
return False
def get_enabled_pages(self) -> List[BasePage]:
"""Get list of enabled pages."""
with self._lock:
return [p for p in self._pages if p.is_enabled()]
def get_page_count(self) -> int:
"""Get number of enabled pages."""
return len(self.get_enabled_pages())
def get_current_page(self) -> Optional[BasePage]:
"""Get currently active page."""
pages = self.get_enabled_pages()
with self._lock:
if 0 <= self._current_index < len(pages):
return pages[self._current_index]
return None
def get_current_index(self) -> int:
"""Get current page index."""
with self._lock:
return self._current_index
def navigate_next(self) -> bool:
"""
Navigate to next page.
Returns:
True if navigation occurred
"""
pages = self.get_enabled_pages()
with self._lock:
# Don't navigate if current page has pending action
if self._current_index < len(pages):
current = pages[self._current_index]
if current.has_pending_action():
return False
current.on_leave()
if self._current_index < len(pages) - 1:
self._current_index += 1
pages[self._current_index].on_enter()
return True
return False
def navigate_prev(self) -> bool:
"""
Navigate to previous page.
Returns:
True if navigation occurred
"""
pages = self.get_enabled_pages()
with self._lock:
# Don't navigate if current page has pending action
if self._current_index < len(pages):
current = pages[self._current_index]
if current.has_pending_action():
return False
current.on_leave()
if self._current_index > 0:
self._current_index -= 1
pages[self._current_index].on_enter()
return True
return False
def navigate_to(self, index: int) -> bool:
"""
Navigate to specific page index.
Args:
index: Page index to navigate to
Returns:
True if navigation occurred
"""
pages = self.get_enabled_pages()
with self._lock:
if 0 <= index < len(pages):
if self._current_index < len(pages):
pages[self._current_index].on_leave()
self._current_index = index
pages[self._current_index].on_enter()
return True
return False
def get_current_content(self) -> Optional[str]:
"""
Get encoded content for current page.
Returns:
Protocol-encoded page content string or None
"""
page = self.get_current_page()
if page is None:
return None
content = page.get_content()
total = self.get_page_count()
index = self.get_current_index()
return Protocol.encode_page(index, total, content)
def process_command(self, command: Command) -> Optional[str]:
"""
Process command from Flipper Zero.
Args:
command: Parsed command
Returns:
Result message or None
"""
self._last_result = None
if command.cmd_type == CommandType.NAV_NEXT:
self.navigate_next()
return None
if command.cmd_type == CommandType.NAV_PREV:
self.navigate_prev()
return None
if command.cmd_type == CommandType.SELECT:
page = self.get_current_page()
if page:
result = page.handle_select(command.select_index)
self._last_result = result
return result
return None
if command.cmd_type == CommandType.CONFIRM:
page = self.get_current_page()
if page:
result = page.handle_confirm()
self._last_result = result
return result
return None
if command.cmd_type == CommandType.CANCEL:
page = self.get_current_page()
if page:
result = page.handle_cancel()
self._last_result = result
return result
return None
if command.cmd_type == CommandType.REFRESH:
# Just return None, content will be sent anyway
return None
return None
def get_last_result(self) -> Optional[str]:
"""Get last action result message."""
return self._last_result
def clear_last_result(self) -> None:
"""Clear last result message."""
self._last_result = None
def get_stats(self) -> Dict[str, Any]:
"""Get manager statistics."""
pages = self.get_enabled_pages()
return {
"total_pages": len(self._pages),
"enabled_pages": len(pages),
"current_index": self._current_index,
"current_page": pages[self._current_index].name if pages else None,
"pages": [p.name for p in pages]
}
def shutdown(self) -> None:
"""Shutdown all pages."""
with self._lock:
for page in self._pages:
page.on_leave()
self._pages.clear()
self._current_index = 0

View File

@@ -0,0 +1,20 @@
"""
Flipper Zero Pages.
Each page is a plugin that provides content for display on Flipper Zero
and handles user actions.
"""
from flipper.pages.base import BasePage
from flipper.pages.can_stats import CANStatsPage
from flipper.pages.ups_status import UPSStatusPage
from flipper.pages.system_info import SystemInfoPage
from flipper.pages.actions import ActionsPage
__all__ = [
"BasePage",
"CANStatsPage",
"UPSStatusPage",
"SystemInfoPage",
"ActionsPage",
]

View File

@@ -0,0 +1,163 @@
"""
Actions Page.
Menu page for system control actions (shutdown, reboot, etc.)
"""
from typing import Optional, Callable
from flipper.pages.base import BasePage
from flipper.protocol import PageContent, PageType
from flipper.providers.system_provider import SystemProvider
class ActionsPage(BasePage):
"""
Page with system control actions.
Provides menu items for:
- Shutdown
- Reboot
- Cancel pending shutdown
Dangerous actions require confirmation before execution.
"""
# Action identifiers
ACTION_SHUTDOWN = "shutdown"
ACTION_REBOOT = "reboot"
ACTION_CANCEL = "cancel_shutdown"
def __init__(self, on_result: Optional[Callable[[str], None]] = None):
"""
Initialize actions page.
Args:
on_result: Callback when action result is available
"""
super().__init__(
name="actions",
title="Actions",
icon="settings"
)
self._provider = SystemProvider()
self._on_result = on_result
self._confirm_mode = False
self._pending_action: Optional[str] = None
# Menu items: (label, action_id, requires_confirm)
self._menu_items = [
("Shutdown", self.ACTION_SHUTDOWN, True),
("Reboot", self.ACTION_REBOOT, True),
("Cancel Shutdown", self.ACTION_CANCEL, False),
]
def get_content(self) -> PageContent:
"""Get page content."""
if self._confirm_mode and self._pending_action:
# Show confirmation dialog
action_name = self._get_action_label(self._pending_action)
return PageContent(
page_type=PageType.CONFIRM,
title=f"Confirm {action_name}?",
lines=[f"Are you sure you want to {action_name.lower()}?"],
actions=["Yes", "No"],
selected=1, # Default to "No" for safety
icon=self.icon
)
# Normal menu mode
labels = [item[0] for item in self._menu_items]
return PageContent(
page_type=PageType.MENU,
title=self.title,
lines=[],
actions=labels,
selected=self._selected_index,
icon=self.icon
)
def handle_select(self, index: int) -> Optional[str]:
"""Handle menu item selection."""
self._selected_index = index
if 0 <= index < len(self._menu_items):
label, action_id, requires_confirm = self._menu_items[index]
if requires_confirm:
# Enter confirmation mode
self._confirm_mode = True
self._pending_action = action_id
return None # Will show confirm dialog
else:
# Execute immediately
return self._execute_action(action_id)
return None
def handle_confirm(self) -> Optional[str]:
"""Execute confirmed action."""
if self._pending_action:
action_id = self._pending_action
self._confirm_mode = False
self._pending_action = None
return self._execute_action(action_id)
return None
def handle_cancel(self) -> Optional[str]:
"""Cancel pending action."""
self._confirm_mode = False
self._pending_action = None
return "Cancelled"
def has_pending_action(self) -> bool:
"""Check if in confirmation mode."""
return self._confirm_mode
def _execute_action(self, action_id: str) -> str:
"""
Execute the specified action.
Args:
action_id: Action identifier
Returns:
Result message
"""
result = ""
if action_id == self.ACTION_SHUTDOWN:
if self._provider.shutdown(delay_minutes=1):
result = "Shutdown in 1 min"
else:
result = "Shutdown failed"
elif action_id == self.ACTION_REBOOT:
if self._provider.reboot():
result = "Rebooting..."
else:
result = "Reboot failed"
elif action_id == self.ACTION_CANCEL:
if self._provider.cancel_shutdown():
result = "Shutdown cancelled"
else:
result = "No pending shutdown"
# Notify callback if available
if self._on_result:
self._on_result(result)
return result
def _get_action_label(self, action_id: str) -> str:
"""Get human-readable label for action ID."""
for label, aid, _ in self._menu_items:
if aid == action_id:
return label
return action_id
def get_provider(self) -> SystemProvider:
"""Get the System provider instance."""
return self._provider

View File

@@ -0,0 +1,260 @@
"""
Base Page Interface for Flipper Zero UI.
All pages must inherit from BasePage and implement required methods.
"""
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, Callable
from flipper.protocol import PageContent, PageType
class BasePage(ABC):
"""
Abstract base class for Flipper Zero pages.
Each page represents a screen on Flipper Zero display.
Pages can be informational (read-only), interactive menus,
or confirmation dialogs.
Attributes:
name: Unique identifier for the page
title: Display title (shown in header)
icon: Icon identifier for visual representation
enabled: Whether the page is active
"""
def __init__(
self,
name: str,
title: str,
icon: str = "",
enabled: bool = True
):
"""
Initialize base page.
Args:
name: Unique page identifier
title: Display title
icon: Icon identifier
enabled: Whether page is enabled
"""
self.name = name
self.title = title
self.icon = icon
self.enabled = enabled
self._selected_index = 0
self._pending_action: Optional[str] = None
@abstractmethod
def get_content(self) -> PageContent:
"""
Get current page content for display.
Returns:
PageContent object with type, title, lines, and actions
"""
pass
def handle_select(self, index: int) -> Optional[str]:
"""
Handle menu item selection.
Called when user presses OK on a menu item.
Args:
index: Selected item index
Returns:
Optional result message to display, or None
"""
self._selected_index = index
return None
def handle_confirm(self) -> Optional[str]:
"""
Handle confirmation action.
Called when user confirms a pending action.
Returns:
Optional result message, or None
"""
return None
def handle_cancel(self) -> Optional[str]:
"""
Handle cancel action.
Called when user cancels a pending action.
Returns:
Optional result message, or None
"""
self._pending_action = None
return None
def get_selected_index(self) -> int:
"""Get currently selected index for menu pages."""
return self._selected_index
def set_selected_index(self, index: int) -> None:
"""Set selected index for menu pages."""
self._selected_index = index
def has_pending_action(self) -> bool:
"""Check if there's a pending action requiring confirmation."""
return self._pending_action is not None
def get_pending_action(self) -> Optional[str]:
"""Get pending action name."""
return self._pending_action
def is_enabled(self) -> bool:
"""Check if page is enabled."""
return self.enabled
def on_enter(self) -> None:
"""Called when user navigates to this page."""
pass
def on_leave(self) -> None:
"""Called when user navigates away from this page."""
pass
class InfoPage(BasePage):
"""
Base class for information-only pages.
Info pages display read-only data that updates periodically.
They have no interactive elements.
"""
def __init__(
self,
name: str,
title: str,
icon: str = "",
enabled: bool = True
):
super().__init__(name, title, icon, enabled)
@abstractmethod
def get_lines(self) -> list[str]:
"""
Get content lines for display.
Returns:
List of strings to display (max 4-5 lines)
"""
pass
def get_content(self) -> PageContent:
"""Get page content as INFO type."""
return PageContent(
page_type=PageType.INFO,
title=self.title,
lines=self.get_lines(),
icon=self.icon
)
class MenuPage(BasePage):
"""
Base class for menu pages.
Menu pages display selectable items that can trigger actions.
"""
def __init__(
self,
name: str,
title: str,
icon: str = "",
enabled: bool = True
):
super().__init__(name, title, icon, enabled)
self._menu_items: list[tuple[str, Callable[[], Optional[str]]]] = []
def add_item(self, label: str, action: Callable[[], Optional[str]]) -> None:
"""
Add menu item.
Args:
label: Display label for the item
action: Callback function when item is selected
"""
self._menu_items.append((label, action))
def clear_items(self) -> None:
"""Clear all menu items."""
self._menu_items.clear()
self._selected_index = 0
def get_content(self) -> PageContent:
"""Get page content as MENU type."""
labels = [item[0] for item in self._menu_items]
return PageContent(
page_type=PageType.MENU,
title=self.title,
lines=[],
actions=labels,
selected=self._selected_index,
icon=self.icon
)
def handle_select(self, index: int) -> Optional[str]:
"""Execute action for selected menu item."""
self._selected_index = index
if 0 <= index < len(self._menu_items):
_, action = self._menu_items[index]
return action()
return None
class ConfirmPage(BasePage):
"""
Confirmation dialog page.
Used to confirm dangerous actions like shutdown or reboot.
"""
def __init__(
self,
name: str,
title: str,
message: str,
on_confirm: Callable[[], Optional[str]],
on_cancel: Optional[Callable[[], Optional[str]]] = None,
icon: str = "",
):
super().__init__(name, title, icon, enabled=True)
self.message = message
self._on_confirm = on_confirm
self._on_cancel = on_cancel
def get_content(self) -> PageContent:
"""Get page content as CONFIRM type."""
return PageContent(
page_type=PageType.CONFIRM,
title=self.title,
lines=[self.message],
actions=["Yes", "No"],
selected=1, # Default to "No" for safety
icon=self.icon
)
def handle_confirm(self) -> Optional[str]:
"""Execute confirm action."""
return self._on_confirm()
def handle_cancel(self) -> Optional[str]:
"""Execute cancel action."""
if self._on_cancel:
return self._on_cancel()
return None

View File

@@ -0,0 +1,66 @@
"""
CAN Statistics Page.
Displays CAN sniffer statistics on Flipper Zero.
"""
from flipper.pages.base import InfoPage
from flipper.providers.can_provider import CANProvider
class CANStatsPage(InfoPage):
"""
Page displaying CAN bus statistics.
Shows:
- Total frames received
- Pending/processed frames
- Queue status
- Dropped frames (if any)
"""
def __init__(self):
super().__init__(
name="can_stats",
title="CAN Statistics",
icon="can"
)
self._provider = CANProvider()
def get_lines(self) -> list[str]:
"""Get statistics lines for display."""
data = self._provider.get_data()
lines = [
f"Total: {self._format_number(data.total_frames)}",
f"Processed: {self._format_number(data.processed_frames)}",
f"Queue: {data.queue_size}/{data.queue_capacity}",
]
# Add interface breakdown if available
if data.interfaces:
iface_str = ", ".join(
f"{k}: {self._format_number(v)}"
for k, v in data.interfaces.items()
)
if len(iface_str) <= 25:
lines.append(iface_str)
# Show dropped count if non-zero
if data.dropped_frames > 0:
lines.append(f"Dropped: {data.dropped_frames}")
return lines
def _format_number(self, num: int) -> str:
"""Format large numbers with K/M suffix."""
if num >= 1_000_000:
return f"{num / 1_000_000:.1f}M"
elif num >= 1_000:
return f"{num / 1_000:.1f}K"
else:
return str(num)
def get_provider(self) -> CANProvider:
"""Get the CAN provider instance."""
return self._provider

View File

@@ -0,0 +1,72 @@
"""
System Information Page.
Displays Raspberry Pi system metrics on Flipper Zero.
"""
from flipper.pages.base import InfoPage
from flipper.providers.system_provider import SystemProvider
class SystemInfoPage(InfoPage):
"""
Page displaying Raspberry Pi system metrics.
Shows:
- CPU temperature
- Power consumption
- Fan RPM
- Input voltage
"""
def __init__(self):
super().__init__(
name="system_info",
title="System Info",
icon="cpu"
)
self._provider = SystemProvider()
def get_lines(self) -> list[str]:
"""Get system info lines for display."""
data = self._provider.get_data()
lines = [
f"CPU Temp: {data.cpu_temp:.1f}C {self._get_temp_indicator(data.cpu_temp)}",
f"Power: {data.power_watts:.2f}W",
]
# Fan RPM (may not be available on all systems)
if data.fan_rpm > 0:
lines.append(f"Fan: {data.fan_rpm} RPM")
else:
lines.append("Fan: N/A")
# Input voltage
if data.input_voltage > 0:
lines.append(f"Input: {data.input_voltage:.2f}V")
return lines
def _get_temp_indicator(self, temp: float) -> str:
"""
Get temperature status indicator.
Args:
temp: Temperature in Celsius
Returns:
Status indicator string
"""
if temp >= 80:
return "(!)" # Critical
elif temp >= 70:
return "(W)" # Warning
elif temp >= 60:
return "(~)" # Warm
else:
return "" # OK
def get_provider(self) -> SystemProvider:
"""Get the System provider instance."""
return self._provider

View File

@@ -0,0 +1,85 @@
"""
UPS Status Page.
Displays X120x UPS battery and power status on Flipper Zero.
"""
from flipper.pages.base import InfoPage
from flipper.providers.ups_provider import UPSProvider
class UPSStatusPage(InfoPage):
"""
Page displaying UPS (X120x) status.
Shows:
- Battery percentage and voltage
- Charging status
- Power loss indicator
- Input voltage
"""
def __init__(self):
super().__init__(
name="ups_status",
title="UPS Status",
icon="battery"
)
self._provider = UPSProvider()
def get_lines(self) -> list[str]:
"""Get UPS status lines for display."""
if not self._provider.is_available():
return [
"UPS not available",
self._provider.get_last_error() or "Check I2C connection"
]
data = self._provider.get_data()
# Battery bar visualization
battery_bar = self._get_battery_bar(data.capacity)
lines = [
f"Bat: {data.capacity:.1f}% {battery_bar}",
f"Voltage: {data.voltage:.2f}V",
]
# Charging/power status
if data.power_loss:
lines.append("Power: BATTERY MODE")
if data.capacity < 15:
lines.append("WARNING: Critical!")
elif data.capacity < 25:
lines.append("WARNING: Low battery")
else:
status = "Charging" if data.is_charging else "Full"
lines.append(f"Status: {status}")
lines.append(f"Input: {data.input_voltage:.2f}V")
return lines
def _get_battery_bar(self, percent: float) -> str:
"""
Create ASCII battery bar.
Args:
percent: Battery percentage (0-100)
Returns:
Battery visualization string like "[==== ]"
"""
filled = int(percent / 20) # 5 segments
empty = 5 - filled
bar = "[" + "=" * filled + " " * empty + "]"
return bar
def is_enabled(self) -> bool:
"""Check if page should be shown."""
return self.enabled and self._provider.is_available()
def get_provider(self) -> UPSProvider:
"""Get the UPS provider instance."""
return self._provider

View File

@@ -0,0 +1,194 @@
"""
Flipper Zero Communication Protocol.
Defines message formats for RPi <-> Flipper Zero UART communication.
Protocol Messages:
RPi -> Flipper:
PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected>
ACK:<device>,ip=<ip>
RESULT:<status>|<message>
Flipper -> RPi:
INIT:<device>
STOP:<device>
CMD:NAV:<next|prev>
CMD:SELECT:<index>
CMD:CONFIRM
CMD:CANCEL
CMD:REFRESH
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
class PageType(Enum):
"""Types of pages for different UI layouts."""
INFO = "info" # Read-only information display
MENU = "menu" # Selectable menu items
CONFIRM = "confirm" # Confirmation dialog (Yes/No)
class CommandType(Enum):
"""Types of commands from Flipper to RPi."""
INIT = "INIT"
STOP = "STOP"
NAV_NEXT = "NAV:next"
NAV_PREV = "NAV:prev"
SELECT = "SELECT"
CONFIRM = "CONFIRM"
CANCEL = "CANCEL"
REFRESH = "REFRESH"
@dataclass
class PageContent:
"""
Content structure for a page to be displayed on Flipper Zero.
Attributes:
page_type: Type of page (info, menu, confirm)
title: Page title (max ~20 chars for display)
lines: Content lines to display (max 4-5 lines)
actions: Available actions/menu items (for menu type)
selected: Currently selected item index (for menu type)
icon: Optional icon identifier for the page
"""
page_type: PageType
title: str
lines: List[str] = field(default_factory=list)
actions: List[str] = field(default_factory=list)
selected: int = 0
icon: str = ""
def __post_init__(self):
# Truncate title if too long
if len(self.title) > 20:
self.title = self.title[:17] + "..."
@dataclass
class Command:
"""Parsed command from Flipper Zero."""
cmd_type: CommandType
payload: str = ""
@property
def select_index(self) -> int:
"""Get selected index for SELECT command."""
if self.cmd_type == CommandType.SELECT:
try:
return int(self.payload)
except ValueError:
return 0
return 0
class Protocol:
"""
Protocol encoder/decoder for Flipper Zero communication.
Message format uses pipe (|) as field separator and semicolon (;)
as item separator within fields.
"""
FIELD_SEP = "|"
ITEM_SEP = ";"
@staticmethod
def encode_page(
page_index: int,
total_pages: int,
content: PageContent
) -> str:
"""
Encode page content to protocol message.
Args:
page_index: Current page index (0-based)
total_pages: Total number of pages
content: Page content to encode
Returns:
Encoded message string with newline
"""
# PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected>
lines_str = Protocol.ITEM_SEP.join(content.lines)
actions_str = Protocol.ITEM_SEP.join(content.actions)
parts = [
f"PAGE:{page_index}/{total_pages}",
content.page_type.value,
content.title,
lines_str,
actions_str,
str(content.selected)
]
return Protocol.FIELD_SEP.join(parts) + "\n"
@staticmethod
def encode_ack(device: str, ip: str) -> str:
"""Encode ACK message."""
return f"ACK:{device},ip={ip}\n"
@staticmethod
def encode_result(success: bool, message: str) -> str:
"""Encode result message after action execution."""
status = "OK" if success else "ERROR"
return f"RESULT:{status}|{message}\n"
@staticmethod
def decode_command(line: str) -> Optional[Command]:
"""
Decode command from Flipper Zero.
Args:
line: Raw command line (without newline)
Returns:
Parsed Command object or None if invalid
"""
line = line.strip()
if not line:
return None
# INIT:<device>
if line.startswith("INIT:"):
return Command(CommandType.INIT, line[5:])
# STOP:<device>
if line.startswith("STOP:"):
return Command(CommandType.STOP, line[5:])
# CMD:NAV:next / CMD:NAV:prev
if line == "CMD:NAV:next":
return Command(CommandType.NAV_NEXT)
if line == "CMD:NAV:prev":
return Command(CommandType.NAV_PREV)
# CMD:SELECT:<index>
if line.startswith("CMD:SELECT:"):
return Command(CommandType.SELECT, line[11:])
# CMD:CONFIRM
if line == "CMD:CONFIRM":
return Command(CommandType.CONFIRM)
# CMD:CANCEL
if line == "CMD:CANCEL":
return Command(CommandType.CANCEL)
# CMD:REFRESH
if line == "CMD:REFRESH":
return Command(CommandType.REFRESH)
return None

View File

@@ -0,0 +1,18 @@
"""
Data Providers for Flipper Zero Pages.
Providers abstract data sources (UPS, system metrics, CAN stats)
from the pages that display them.
"""
from flipper.providers.base import BaseProvider
from flipper.providers.ups_provider import UPSProvider
from flipper.providers.system_provider import SystemProvider
from flipper.providers.can_provider import CANProvider
__all__ = [
"BaseProvider",
"UPSProvider",
"SystemProvider",
"CANProvider",
]

View File

@@ -0,0 +1,119 @@
"""
Base Provider Interface.
Providers are singleton data sources that pages can query.
They handle caching and thread-safe data access.
"""
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import threading
import time
class BaseProvider(ABC):
"""
Abstract base class for data providers.
Providers:
- Are singletons (one instance per type)
- Cache data with configurable TTL
- Are thread-safe
- Handle errors gracefully
Attributes:
name: Provider identifier
cache_ttl: Cache time-to-live in seconds
"""
_instances: Dict[str, "BaseProvider"] = {}
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
"""Singleton pattern implementation."""
with cls._lock:
if cls.__name__ not in cls._instances:
instance = super().__new__(cls)
cls._instances[cls.__name__] = instance
return cls._instances[cls.__name__]
def __init__(self, name: str, cache_ttl: float = 1.0):
"""
Initialize provider.
Args:
name: Provider identifier
cache_ttl: Cache TTL in seconds
"""
# Prevent re-initialization for singleton
if hasattr(self, "_initialized") and self._initialized:
return
self.name = name
self.cache_ttl = cache_ttl
self._cache: Dict[str, Any] = {}
self._cache_timestamps: Dict[str, float] = {}
self._data_lock = threading.Lock()
self._initialized = True
self._available = True
self._last_error: Optional[str] = None
def _get_cached(self, key: str) -> Optional[Any]:
"""
Get cached value if not expired.
Args:
key: Cache key
Returns:
Cached value or None if expired/missing
"""
with self._data_lock:
if key not in self._cache:
return None
timestamp = self._cache_timestamps.get(key, 0)
if time.time() - timestamp > self.cache_ttl:
return None
return self._cache[key]
def _set_cached(self, key: str, value: Any) -> None:
"""
Set cached value.
Args:
key: Cache key
value: Value to cache
"""
with self._data_lock:
self._cache[key] = value
self._cache_timestamps[key] = time.time()
def _clear_cache(self) -> None:
"""Clear all cached values."""
with self._data_lock:
self._cache.clear()
self._cache_timestamps.clear()
@abstractmethod
def refresh(self) -> bool:
"""
Refresh provider data from source.
Returns:
True if refresh successful
"""
pass
def is_available(self) -> bool:
"""Check if provider is available."""
return self._available
def get_last_error(self) -> Optional[str]:
"""Get last error message."""
return self._last_error
def shutdown(self) -> None:
"""Cleanup provider resources."""
self._clear_cache()

View File

@@ -0,0 +1,135 @@
"""
CAN Statistics Provider.
Provides CAN sniffer statistics from the message processor.
"""
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from flipper.providers.base import BaseProvider
@dataclass
class CANData:
"""CAN statistics data."""
total_frames: int = 0
pending_frames: int = 0
processed_frames: int = 0
dropped_frames: int = 0
queue_size: int = 0
queue_capacity: int = 100000
interfaces: Dict[str, int] = field(default_factory=dict)
class CANProvider(BaseProvider):
"""
Provider for CAN sniffer statistics.
This provider is updated by the FlipperHandler when
CAN frames are processed.
"""
def __init__(self):
super().__init__(name="can", cache_ttl=0.5)
self._data = CANData()
self._stats_callback: Optional[Callable[[], Dict[str, Any]]] = None
def set_stats_callback(self, callback: Callable[[], Dict[str, Any]]) -> None:
"""
Set callback to retrieve stats from message processor.
Args:
callback: Function that returns stats dictionary
"""
self._stats_callback = callback
def update_stats(
self,
total: int = 0,
pending: int = 0,
processed: int = 0,
dropped: int = 0,
queue_size: int = 0
) -> None:
"""
Update CAN statistics directly.
Args:
total: Total frames received
pending: Pending frames in queue
processed: Processed frames
dropped: Dropped frames
queue_size: Current queue size
"""
self._data.total_frames = total
self._data.pending_frames = pending
self._data.processed_frames = processed
self._data.dropped_frames = dropped
self._data.queue_size = queue_size
self._set_cached("data", self._data)
def update_interface_stats(self, interface: str, count: int) -> None:
"""
Update per-interface statistics.
Args:
interface: Interface name (e.g., "can0")
count: Message count for this interface
"""
self._data.interfaces[interface] = count
def refresh(self) -> bool:
"""Refresh stats from callback if available."""
if self._stats_callback:
try:
stats = self._stats_callback()
self._data.total_frames = stats.get("total_frames", 0)
self._data.pending_frames = stats.get("pending_frames", 0)
self._data.processed_frames = stats.get("processed_frames", 0)
self._data.dropped_frames = stats.get("dropped_count", 0)
self._data.queue_size = stats.get("queue_size", 0)
self._set_cached("data", self._data)
return True
except Exception as e:
self._last_error = str(e)
return False
return True
def get_data(self) -> CANData:
"""Get current CAN statistics."""
cached = self._get_cached("data")
if cached is not None:
return cached
self.refresh()
return self._data
def get_total_frames(self) -> int:
"""Get total frames received."""
return self.get_data().total_frames
def get_pending_frames(self) -> int:
"""Get pending frames in queue."""
return self.get_data().pending_frames
def get_processed_frames(self) -> int:
"""Get processed frames."""
return self.get_data().processed_frames
def get_dropped_frames(self) -> int:
"""Get dropped frames."""
return self.get_data().dropped_frames
def get_queue_fill_percent(self) -> float:
"""Get queue fill percentage."""
data = self.get_data()
if data.queue_capacity == 0:
return 0.0
return (data.queue_size / data.queue_capacity) * 100
def get_interface_count(self, interface: str) -> int:
"""Get message count for specific interface."""
return self.get_data().interfaces.get(interface, 0)

View File

@@ -0,0 +1,240 @@
"""
System Provider for Raspberry Pi metrics and control.
Provides CPU temperature, power consumption, fan RPM,
and system control actions (shutdown, reboot).
"""
import os
import subprocess
from typing import Optional
from dataclasses import dataclass
from pathlib import Path
from flipper.providers.base import BaseProvider
@dataclass
class SystemData:
"""System metrics data."""
cpu_temp: float = 0.0
cpu_volts: float = 0.0
cpu_amps: float = 0.0
power_watts: float = 0.0
fan_rpm: int = 0
input_voltage: float = 0.0
class SystemProvider(BaseProvider):
"""
Provider for Raspberry Pi system metrics.
Uses vcgencmd for hardware metrics and sysfs for fan speed.
Also provides system control actions.
"""
def __init__(self):
super().__init__(name="system", cache_ttl=2.0)
self._data = SystemData()
def _read_metric(self, args: list, strip_chars: str) -> Optional[float]:
"""
Read a hardware metric using vcgencmd.
Args:
args: Command arguments
strip_chars: Characters to strip from value
Returns:
Float value or None on error
"""
try:
output = subprocess.check_output(args).decode("utf-8")
value_str = output.split("=")[1].strip().rstrip(strip_chars)
return float(value_str)
except Exception:
return None
def _read_cpu_temp(self) -> float:
"""Read CPU temperature in Celsius."""
result = self._read_metric(["vcgencmd", "measure_temp"], "'C")
return result if result is not None else 0.0
def _read_cpu_volts(self) -> float:
"""Read CPU core voltage."""
result = self._read_metric(["vcgencmd", "pmic_read_adc", "VDD_CORE_V"], "V")
return result if result is not None else 0.0
def _read_cpu_amps(self) -> float:
"""Read CPU core current."""
result = self._read_metric(["vcgencmd", "pmic_read_adc", "VDD_CORE_A"], "A")
return result if result is not None else 0.0
def _read_input_voltage(self) -> float:
"""Read external 5V input voltage."""
result = self._read_metric(["vcgencmd", "pmic_read_adc", "EXT5V_V"], "V")
return result if result is not None else 0.0
def _read_power_watts(self) -> float:
"""
Calculate total system power consumption.
Reads all PMIC ADC values and calculates wattage.
"""
try:
output = subprocess.check_output(["vcgencmd", "pmic_read_adc"]).decode("utf-8")
lines = output.strip().split("\n")
amperages = {}
voltages = {}
for line in lines:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 2:
continue
label = parts[0]
value_str = parts[-1]
if "=" not in value_str:
continue
val = float(value_str.split("=")[1][:-1])
short_label = label[:-2]
if label.endswith("A"):
amperages[short_label] = val
else:
voltages[short_label] = val
# Calculate total wattage
wattage = sum(
amperages[key] * voltages[key]
for key in amperages
if key in voltages
)
return wattage
except Exception:
return 0.0
def _read_fan_rpm(self) -> int:
"""
Read fan RPM from sysfs.
Returns:
Fan speed in RPM or 0 if not available
"""
try:
sys_path = Path("/sys/devices/platform/cooling_fan")
fan_files = list(sys_path.rglob("fan1_input"))
if not fan_files:
return 0
with open(fan_files[0], "r") as f:
rpm = f.read().strip()
return int(rpm)
except Exception:
return 0
def refresh(self) -> bool:
"""Refresh system metrics."""
try:
self._data = SystemData(
cpu_temp=self._read_cpu_temp(),
cpu_volts=self._read_cpu_volts(),
cpu_amps=self._read_cpu_amps(),
power_watts=self._read_power_watts(),
fan_rpm=self._read_fan_rpm(),
input_voltage=self._read_input_voltage()
)
self._set_cached("data", self._data)
return True
except Exception as e:
self._last_error = str(e)
return False
def get_data(self) -> SystemData:
"""Get current system data."""
cached = self._get_cached("data")
if cached is not None:
return cached
self.refresh()
return self._data
def get_cpu_temp(self) -> float:
"""Get CPU temperature in Celsius."""
return self.get_data().cpu_temp
def get_power_watts(self) -> float:
"""Get total power consumption in Watts."""
return self.get_data().power_watts
def get_fan_rpm(self) -> int:
"""Get fan speed in RPM."""
return self.get_data().fan_rpm
def shutdown(self, delay_minutes: int = 0) -> bool:
"""
Shutdown the system.
Args:
delay_minutes: Delay before shutdown (0 = immediate)
Returns:
True if command executed
"""
try:
if delay_minutes > 0:
cmd = f"sudo shutdown -P +{delay_minutes}"
else:
cmd = "sudo shutdown -P now"
subprocess.Popen(cmd, shell=True)
return True
except Exception as e:
self._last_error = str(e)
return False
def reboot(self) -> bool:
"""
Reboot the system.
Returns:
True if command executed
"""
try:
subprocess.Popen("sudo reboot", shell=True)
return True
except Exception as e:
self._last_error = str(e)
return False
def cancel_shutdown(self) -> bool:
"""
Cancel pending shutdown.
Returns:
True if command executed
"""
try:
subprocess.call("sudo shutdown -c", shell=True)
return True
except Exception as e:
self._last_error = str(e)
return False

View File

@@ -0,0 +1,228 @@
"""
UPS Provider for X120x UPS on Raspberry Pi.
Reads battery status, voltage, and charging state via I2C (SMBus).
Based on https://github.com/suptronics/x120x
"""
import struct
from typing import Optional, Tuple
from dataclasses import dataclass
from flipper.providers.base import BaseProvider
@dataclass
class UPSData:
"""UPS status data."""
voltage: float = 0.0
capacity: float = 0.0
is_charging: bool = False
power_loss: bool = False
input_voltage: float = 0.0
class UPSProvider(BaseProvider):
"""
Provider for X120x UPS data.
Reads from MAX17048 fuel gauge via I2C (address 0x36).
Also monitors power loss detection pin (GPIO 6).
Hardware requirements:
- X120x UPS HAT connected via I2C
- smbus2 library installed
- gpiozero for power loss detection
"""
# I2C address for MAX17048 fuel gauge
I2C_ADDRESS = 0x36
I2C_BUS = 1
# GPIO pins
CHG_ONOFF_PIN = 16
PLD_PIN = 6 # Power Loss Detection
def __init__(self):
super().__init__(name="ups", cache_ttl=2.0)
self._bus = None
self._pld_button = None
self._data = UPSData()
self._init_hardware()
def _init_hardware(self) -> None:
"""Initialize I2C and GPIO."""
try:
import smbus2
self._bus = smbus2.SMBus(self.I2C_BUS)
self._available = True
except ImportError:
self._last_error = "smbus2 not installed"
self._available = False
except Exception as e:
self._last_error = f"I2C init failed: {e}"
self._available = False
try:
from gpiozero import Button
self._pld_button = Button(self.PLD_PIN)
except ImportError:
pass # GPIO optional
except Exception:
pass # GPIO optional
def _read_voltage_and_capacity(self) -> Tuple[float, float]:
"""
Read voltage and capacity from MAX17048.
Returns:
Tuple of (voltage in V, capacity in %)
"""
if not self._bus:
return 0.0, 0.0
try:
# Read voltage register (0x02)
voltage_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x02)
# Read capacity register (0x04)
capacity_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x04)
# Swap bytes (little-endian to big-endian)
voltage_swapped = struct.unpack("<H", struct.pack(">H", voltage_read))[0]
capacity_swapped = struct.unpack("<H", struct.pack(">H", capacity_read))[0]
# Calculate actual values
voltage = voltage_swapped * 1.25 / 1000 / 16
capacity = capacity_swapped / 256
return voltage, capacity
except Exception as e:
self._last_error = f"Read error: {e}"
return 0.0, 0.0
def _get_power_loss_state(self) -> bool:
"""
Check power loss detection pin.
Returns:
True if power loss detected (running on battery)
"""
if self._pld_button is None:
return False
try:
# Button pressed = power OK, not pressed = power loss
return not self._pld_button.is_pressed
except Exception:
return False
def _read_input_voltage(self) -> float:
"""
Read input voltage using vcgencmd.
Returns:
Input voltage in V
"""
try:
from subprocess import check_output
output = check_output(["vcgencmd", "pmic_read_adc", "EXT5V_V"]).decode("utf-8")
value_str = output.split("=")[1].strip().rstrip("V")
return float(value_str)
except Exception:
return 0.0
def refresh(self) -> bool:
"""Refresh UPS data from hardware."""
if not self._available:
return False
try:
voltage, capacity = self._read_voltage_and_capacity()
power_loss = self._get_power_loss_state()
input_voltage = self._read_input_voltage()
# Determine charging state (charge disabled above 90%)
is_charging = capacity < 90 and not power_loss
self._data = UPSData(
voltage=voltage,
capacity=capacity,
is_charging=is_charging,
power_loss=power_loss,
input_voltage=input_voltage
)
self._set_cached("data", self._data)
return True
except Exception as e:
self._last_error = str(e)
return False
def get_data(self) -> UPSData:
"""
Get current UPS data.
Returns:
UPSData object with current status
"""
cached = self._get_cached("data")
if cached is not None:
return cached
self.refresh()
return self._data
def get_voltage(self) -> float:
"""Get battery voltage in V."""
return self.get_data().voltage
def get_capacity(self) -> float:
"""Get battery capacity in %."""
return self.get_data().capacity
def is_charging(self) -> bool:
"""Check if battery is charging."""
return self.get_data().is_charging
def has_power_loss(self) -> bool:
"""Check if external power is lost."""
return self.get_data().power_loss
def get_input_voltage(self) -> float:
"""Get input voltage in V."""
return self.get_data().input_voltage
def get_status_string(self) -> str:
"""
Get human-readable status string.
Returns:
Status string like "OK", "Battery", "Charging"
"""
data = self.get_data()
if data.power_loss:
if data.capacity < 15:
return "CRITICAL"
elif data.capacity < 25:
return "LOW"
else:
return "Battery"
elif data.is_charging:
return "Charging"
else:
return "OK"
def shutdown(self) -> None:
"""Cleanup resources."""
super().shutdown()
if self._bus:
try:
self._bus.close()
except Exception:
pass

View File

@@ -1,15 +1,22 @@
"""
Flipper Zero UART Handler with Handshake Protocol.
Flipper Zero Dynamic UI Handler.
Waits for INIT command from Flipper Zero before sending statistics.
Supports handshake protocol for secure connection establishment.
Provides multi-page interface with bidirectional communication via UART.
Supports pluggable pages for CAN stats, UPS status, system info, and actions.
Protocol:
1. RPI5 waits in passive mode, listening for commands
2. Flipper sends: INIT:flipper\n
3. RPI5 responds: ACK:rpi5,ip=x.x.x.x\n
4. RPI5 starts sending: STATS:ip=...,total=...,pending=...,processed=...\n
5. Flipper sends: STOP:flipper\n to disconnect
RPi -> Flipper:
PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected>
ACK:<device>,ip=<ip>
RESULT:<status>|<message>
Flipper -> RPi:
INIT:<device>
STOP:<device>
CMD:NAV:<next|prev>
CMD:SELECT:<index>
CMD:CONFIRM / CMD:CANCEL
CMD:REFRESH
"""
import socket
@@ -22,6 +29,10 @@ from can_frame import CANFrame
from config import config
from logger import get_logger
from flipper.protocol import Protocol, Command, CommandType
from flipper.page_manager import PageManager
from flipper.pages import CANStatsPage, UPSStatusPage, SystemInfoPage, ActionsPage
logger = get_logger(__name__)
@@ -57,16 +68,14 @@ class FlipperHandler(BaseHandler):
"""
Handler that communicates with Flipper Zero via UART.
Implements handshake protocol:
- Waits for INIT:flipper command
- Responds with ACK:rpi5,ip=x.x.x.x
- Sends STATS periodically while connected
- Stops on STOP:flipper command
Provides dynamic multi-page interface:
- CAN Statistics
- UPS Status (if available)
- System Information
- Actions Menu
UART Configuration:
- Device: /dev/ttyAMA0 (or configured device)
- Baud: 115200
- Format: 8N1
Implements handshake protocol for connection management
and bidirectional command processing.
"""
def __init__(self, enabled: Optional[bool] = None):
@@ -83,6 +92,7 @@ class FlipperHandler(BaseHandler):
super().__init__(name="flipper_handler", enabled=enabled)
# Serial configuration
self.serial_port: Optional[Any] = None
self.device = "/dev/ttyAMA0"
self.baudrate = 115200
@@ -113,6 +123,39 @@ class FlipperHandler(BaseHandler):
# IP address
self._ip_address = "0.0.0.0"
# Page manager
self._page_manager = PageManager()
self._setup_pages()
def _setup_pages(self) -> None:
"""Setup default pages."""
# CAN Statistics (always available)
can_page = CANStatsPage()
self._page_manager.register_page(can_page)
# Keep reference to CAN provider for stats updates
self._can_provider = can_page.get_provider()
# UPS Status (if available)
ups_page = UPSStatusPage()
self._page_manager.register_page(ups_page)
# System Information
system_page = SystemInfoPage()
self._page_manager.register_page(system_page)
# Actions Menu
actions_page = ActionsPage(on_result=self._on_action_result)
self._page_manager.register_page(actions_page)
def _on_action_result(self, result: str) -> None:
"""Handle action result from actions page."""
self.logger.info(f"Action result: {result}")
# Send result to Flipper
if self._connected:
msg = Protocol.encode_result(True, result)
self._send_raw(msg)
def initialize(self) -> bool:
"""
Initialize UART connection.
@@ -162,13 +205,15 @@ class FlipperHandler(BaseHandler):
)
self._rx_thread.start()
# Start TX thread (sends stats when connected)
# Start TX thread (sends page content when connected)
self._tx_thread = threading.Thread(
target=self._tx_loop, name="FlipperTX", daemon=True
)
self._tx_thread.start()
self.logger.info("Flipper handler started, waiting for connection...")
self.logger.info(
f"Flipper handler started with {self._page_manager.get_page_count()} pages"
)
def _rx_loop(self) -> None:
"""Receive loop - listens for commands from Flipper."""
@@ -198,41 +243,61 @@ class FlipperHandler(BaseHandler):
self.logger.debug(f"RX error: {e}")
time.sleep(0.1)
def _process_command(self, command: str) -> None:
def _process_command(self, raw_command: str) -> None:
"""
Process received command from Flipper.
Args:
command: Received command string
raw_command: Raw command string
"""
self.logger.info(f"Received command: {command}")
self.logger.debug(f"RX: {raw_command}")
if command.startswith("INIT:"):
# Handshake initiation
client_id = command[5:].strip()
# Handle handshake commands directly
if raw_command.startswith("INIT:"):
client_id = raw_command[5:].strip()
self.logger.info(f"Handshake request from: {client_id}")
# Send ACK with IP address
# Update IP and send ACK
self._ip_address = get_ip_address()
ack_msg = f"ACK:rpi5,ip={self._ip_address}\n"
ack_msg = Protocol.encode_ack("rpi5", self._ip_address)
self._send_raw(ack_msg)
self._connected = True
self.logger.info(f"Connected to Flipper, IP: {self._ip_address}")
elif command.startswith("STOP:"):
# Disconnect request
client_id = command[5:].strip()
# Send initial page content
self._send_page_content()
return
if raw_command.startswith("STOP:"):
client_id = raw_command[5:].strip()
self.logger.info(f"Disconnect request from: {client_id}")
self._connected = False
self.logger.info("Disconnected from Flipper")
return
# Parse and process other commands
command = Protocol.decode_command(raw_command)
if command is None:
self.logger.debug(f"Unknown command: {raw_command}")
return
# Process command via page manager
result = self._page_manager.process_command(command)
# Send result if available
if result:
msg = Protocol.encode_result(True, result)
self._send_raw(msg)
# Always send updated page content after command
self._send_page_content()
def _tx_loop(self) -> None:
"""Transmit loop - sends stats when connected."""
"""Transmit loop - sends page content periodically when connected."""
while self._running:
try:
if self._connected:
self._send_stats()
self._send_page_content()
time.sleep(self.send_interval)
@@ -263,21 +328,16 @@ class FlipperHandler(BaseHandler):
self.logger.debug(f"Send error: {e}")
return False
def _send_stats(self) -> None:
"""Send current statistics to Flipper Zero."""
def _send_page_content(self) -> None:
"""Send current page content to Flipper Zero."""
if not self._connected:
return
with self._stats_lock:
total = self._total_frames
pending = self._pending_frames
processed = self._processed_frames
message = f"STATS:ip={self._ip_address},total={total},pending={pending},processed={processed}\n"
if self._send_raw(message):
with self._stats_lock:
self._sent_count += 1
content = self._page_manager.get_current_content()
if content:
if self._send_raw(content):
with self._stats_lock:
self._sent_count += 1
def handle(self, frame: CANFrame) -> bool:
"""
@@ -292,6 +352,14 @@ class FlipperHandler(BaseHandler):
with self._stats_lock:
self._total_frames += 1
self._pending_frames += 1
# Update CAN provider
self._can_provider.update_stats(
total=self._total_frames,
pending=self._pending_frames,
processed=self._processed_frames
)
return True
def handle_batch(self, frames: List[CANFrame]) -> int:
@@ -305,10 +373,19 @@ class FlipperHandler(BaseHandler):
Number of frames processed
"""
count = len(frames)
with self._stats_lock:
self._total_frames += count
self._processed_frames += count
self._pending_frames = max(0, self._pending_frames - count)
# Update CAN provider
self._can_provider.update_stats(
total=self._total_frames,
pending=self._pending_frames,
processed=self._processed_frames
)
return count
def update_pending(self, pending_count: int) -> None:
@@ -321,11 +398,17 @@ class FlipperHandler(BaseHandler):
with self._stats_lock:
self._pending_frames = pending_count
self._can_provider.update_stats(
total=self._total_frames,
pending=self._pending_frames,
processed=self._processed_frames
)
def flush(self) -> None:
"""Flush - send immediate stats if connected."""
"""Flush - send immediate page content if connected."""
if self._connected:
try:
self._send_stats()
self._send_page_content()
except Exception as e:
self.logger.debug(f"Flush error: {e}")
@@ -350,6 +433,9 @@ class FlipperHandler(BaseHandler):
except Exception as e:
self.logger.debug(f"Error closing serial port: {e}")
# Shutdown page manager
self._page_manager.shutdown()
self._initialized = False
self.logger.info("Flipper handler stopped")
@@ -361,7 +447,7 @@ class FlipperHandler(BaseHandler):
Dictionary with handler stats
"""
with self._stats_lock:
return {
stats = {
"total_frames": self._total_frames,
"pending_frames": self._pending_frames,
"processed_frames": self._processed_frames,
@@ -373,6 +459,15 @@ class FlipperHandler(BaseHandler):
"ip_address": self._ip_address,
}
# Add page manager stats
stats.update(self._page_manager.get_stats())
return stats
def is_connected(self) -> bool:
"""Check if Flipper is connected."""
return self._connected
def get_page_manager(self) -> PageManager:
"""Get page manager for external page registration."""
return self._page_manager

View File

@@ -1,27 +1,104 @@
# CAN Monitor for Flipper Zero
Flipper Zero application for monitoring CAN sniffer statistics from Raspberry Pi 5 via UART.
Dynamic multi-page monitor application for Raspberry Pi 5 via UART.
## Features
- Real-time display of CAN sniffer statistics
- Shows: IP address, total frames, pending frames, processed frames
- Connection status indicator
- Compatible with Unleashed firmware (0.84e and later)
- **Dynamic Page System** - Pages are fully controlled by RPi5
- **Multiple Page Types**:
- **Info** - Read-only status display
- **Menu** - Selectable action items
- **Confirm** - Yes/No confirmation dialogs
- **Real-time Updates** - Content refreshes every second
- **Bidirectional Communication** - Send commands back to RPi5
- **Result Notifications** - Action results displayed as overlay
## Default Pages
When connected to the CAN Sniffer system:
| Page | Type | Description |
|------|------|-------------|
| CAN Statistics | Info | Frame counts, queue status, per-interface stats |
| UPS Status | Info | Battery level, voltage, charging state (X120x) |
| System Info | Info | CPU temperature, power consumption, fan RPM |
| Actions | Menu | Shutdown, Reboot, Cancel shutdown |
## Screen Layout
```
128x64 pixels (Flipper Zero display)
+----------------------------------+
| [1/4] Page Title (*) | <- Header: page indicator, title, connection dot
+----------------------------------+
| |
| Content Line 1 | <- Content area: 4-5 lines
| Content Line 2 |
| Content Line 3 |
| Content Line 4 |
| |
+----------------------------------+
| < Hint Text > | <- Footer: nav arrows, action hint
+----------------------------------+
```
## Controls
| Key | Info Page | Menu Page | Confirm Page |
|-----|-----------|-----------|--------------|
| **Left/Right** | Navigate pages | Navigate pages | Navigate pages |
| **Up/Down** | - | Select item | Switch Yes/No |
| **OK** | - | Execute action | Confirm selection |
| **Back** | Disconnect (pg 0) / Prev page | Same | Cancel dialog |
## Protocol
### RPi5 -> Flipper
```
PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected>
ACK:<device>,ip=<ip>
RESULT:<OK|ERROR>|<message>
```
**Examples:**
```
PAGE:0/4|info|CAN Statistics|Total: 12.3K;Processed: 12.1K;Queue: 142||0
PAGE:1/4|info|UPS Status|Bat: 85.2% [====];Voltage: 4.12V;Status: Charging||0
PAGE:2/4|info|System Info|CPU Temp: 52.1C;Power: 4.2W;Fan: 3200 RPM||0
PAGE:3/4|menu|Actions|Shutdown;Reboot;Cancel Shutdown||0
PAGE:3/4|confirm|Confirm Shutdown?|Are you sure?|Yes;No|1
ACK:rpi5,ip=192.168.1.100
RESULT:OK|Shutdown in 1 min
```
### Flipper -> RPi5
```
INIT:flipper - Start handshake
STOP:flipper - Disconnect
CMD:NAV:next - Next page
CMD:NAV:prev - Previous page
CMD:SELECT:<index> - Select menu item
CMD:CONFIRM - Confirm action
CMD:CANCEL - Cancel action
CMD:REFRESH - Request page update
```
## Hardware Connection
### Wiring (RPI5 <-> Flipper Zero)
### Wiring Diagram
```
RPI5 GPIO Flipper Zero GPIO
----------- -----------------
TX (GPIO 14) --> RX (Pin 14)
RX (GPIO 15) <-- TX (Pin 13)
GND --> GND
Flipper Zero Raspberry Pi 5
----------- ---------------
Pin 13 (TX) ----> GPIO 15 (RX) - Pin 10
Pin 14 (RX) <---- GPIO 14 (TX) - Pin 8
Pin 8/11/18 (GND) ---- GND - Pin 6
```
**Note:** Cross TX/RX connections (RPI TX -> Flipper RX, RPI RX -> Flipper TX)
**Note:** Cross TX/RX connections (Flipper TX -> RPi RX)
### Flipper Zero GPIO Pinout
@@ -54,12 +131,6 @@ cd unleashed-firmware
cp -r /path/to/carpibord/flip_monitor applications_user/can_monitor
```
3. Create icon (10x10 PNG, 1-bit):
```bash
# Create icons/can_monitor.png (10x10 pixels, black & white)
# You can use any image editor or online tool
```
### Build
```bash
@@ -83,23 +154,29 @@ SD Card/apps/GPIO/can_monitor.fap
### 1. Enable UART
Add to `/boot/config.txt`:
Add to `/boot/firmware/config.txt`:
```
enable_uart=1
dtoverlay=uart0
```
Disable serial console:
```bash
sudo raspi-config
# Interface Options -> Serial Port -> No (login shell) -> Yes (hardware)
```
Reboot after changes.
### 2. Install pyserial
### 2. Install Dependencies
```bash
pip install pyserial
pip install pyserial smbus2 gpiozero
```
### 3. Configure CAN Sniffer
Add to `can_sniffer/config.json`:
Add to `can_sniffer/src/config.json`:
```json
{
"flipper": {
@@ -124,19 +201,38 @@ cd can_sniffer/src
python main.py
```
## Protocol
## Adding Custom Pages
The RPI5 sends text-based statistics over UART:
On RPi5 side, create a new page class:
```
STATS:ip=192.168.1.100,total=12345,pending=100,processed=12245\n
```python
from flipper.pages.base import InfoPage
class MyCustomPage(InfoPage):
def __init__(self):
super().__init__(
name="my_page",
title="My Page",
icon="custom"
)
def get_lines(self) -> list[str]:
return [
"Custom line 1",
"Custom line 2",
f"Value: {self.get_value()}"
]
```
Fields:
- `ip` - RPI5 IP address
- `total` - Total CAN frames received
- `pending` - Frames in processing queue
- `processed` - Successfully processed frames
Register in FlipperHandler:
```python
# In flipper_handler.py
from flipper.pages.my_custom import MyCustomPage
# In _setup_pages():
self._page_manager.register_page(MyCustomPage())
```
## Troubleshooting
@@ -148,7 +244,7 @@ Fields:
4. Test UART manually:
```bash
# On RPI5
echo "STATS:ip=test,total=1,pending=0,processed=1" > /dev/ttyAMA0
echo "ACK:rpi5,ip=test" > /dev/ttyAMA0
```
### Permission denied on /dev/ttyAMA0
@@ -159,11 +255,23 @@ sudo usermod -a -G dialout $USER
# Then logout and login again
```
### Flipper shows "Waiting..."
### Flipper shows "Waiting for data..."
- Stats are sent every 1 second (configurable)
- Data is sent every 1 second (configurable)
- Connection timeout is 5 seconds
- Check if CAN sniffer is running
- Check UART logs: `CAN_SNIFFER_LOGGING__LEVEL=DEBUG python main.py`
### UPS page shows "not available"
- X120x UPS must be connected via I2C
- Install smbus2: `pip install smbus2`
- Check I2C: `i2cdetect -y 1` (should show device at 0x36)
## Version History
- **v2.0** - Dynamic page system, bidirectional commands, UPS/System pages
- **v1.0** - Basic CAN statistics display
## License

View File

@@ -3,9 +3,10 @@ App(
name="CAN Monitor",
apptype=FlipperAppType.EXTERNAL,
entry_point="can_monitor_app",
stack_size=2 * 1024,
stack_size=4 * 1024,
fap_category="GPIO",
fap_author="carpibord",
fap_version="1.0",
fap_description="CAN Sniffer monitor via UART from RPI5",
fap_version="2.0",
fap_description="Dynamic multi-page RPI5 monitor with CAN stats, UPS, system info and actions",
fap_icon="A:/icons/gpio_50.png",
)

View File

@@ -1,14 +1,21 @@
/**
* CAN Monitor for Flipper Zero
* CAN Monitor for Flipper Zero - Dynamic UI Version
*
* Multi-page application with handshake protocol.
* Multi-page application with dynamic content from RPI5.
*
* Handshake:
* 1. User presses OK on welcome screen
* 2. Flipper sends: INIT:flipper\n
* 3. RPI5 responds: ACK:rpi5,ip=x.x.x.x\n
* 4. Flipper allows navigation to stats page
* 5. RPI5 starts sending: STATS:ip=...,total=...,pending=...,processed=...\n
* Protocol:
* RPi -> Flipper:
* PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected>
* ACK:<device>,ip=<ip>
* RESULT:<status>|<message>
*
* Flipper -> RPi:
* INIT:flipper
* STOP:flipper
* CMD:NAV:next / CMD:NAV:prev
* CMD:SELECT:<index>
* CMD:CONFIRM / CMD:CANCEL
* CMD:REFRESH
*
* Wiring:
* RPI5 TX (GPIO14, Pin 8) -> Flipper RX (Pin 14)
@@ -20,6 +27,7 @@
#include <furi_hal.h>
#include <gui/gui.h>
#include <gui/view_port.h>
#include <gui/elements.h>
#include <expansion/expansion.h>
#include <stdlib.h>
@@ -27,8 +35,11 @@
#define TAG "CANMonitor"
#define UART_BAUD 115200
#define RX_BUFFER_SIZE 256
#define IP_BUFFER_SIZE 32
#define RX_BUFFER_SIZE 512
#define MAX_LINES 5
#define MAX_ACTIONS 4
#define MAX_LINE_LENGTH 32
#define MAX_TITLE_LENGTH 24
// Connection state
typedef enum {
@@ -37,21 +48,35 @@ typedef enum {
StateConnected,
} ConnectionState;
// Pages
// Page types
typedef enum {
PageWelcome,
PageStats,
} AppPage;
PageTypeInfo,
PageTypeMenu,
PageTypeConfirm,
} PageType;
// Statistics
// Page content received from RPi
typedef struct {
char ip_address[IP_BUFFER_SIZE];
uint32_t total_frames;
uint32_t pending_frames;
uint32_t processed_frames;
bool data_received;
uint8_t page_index;
uint8_t total_pages;
PageType page_type;
char title[MAX_TITLE_LENGTH];
char lines[MAX_LINES][MAX_LINE_LENGTH];
uint8_t line_count;
char actions[MAX_ACTIONS][MAX_LINE_LENGTH];
uint8_t action_count;
uint8_t selected_index;
bool data_valid;
uint32_t last_update_tick;
} CanStats;
} PageContent;
// Result message
typedef struct {
bool has_result;
bool success;
char message[MAX_LINE_LENGTH];
uint32_t show_until_tick;
} ResultMessage;
// App context
typedef struct {
@@ -63,13 +88,19 @@ typedef struct {
FuriStreamBuffer* rx_stream;
FuriThread* worker_thread;
CanStats stats;
AppPage current_page;
PageContent page;
ResultMessage result;
ConnectionState conn_state;
char ip_address[32];
volatile bool running;
volatile bool send_init;
} CanMonitorApp;
// Forward declarations
static void draw_callback(Canvas* canvas, void* ctx);
static void input_callback(InputEvent* event, void* ctx);
// Send data via UART
static void uart_send(CanMonitorApp* app, const char* data) {
if(app->serial) {
@@ -78,77 +109,200 @@ static void uart_send(CanMonitorApp* app, const char* data) {
}
}
// Parse ACK response: ACK:rpi5,ip=x.x.x.x
static bool parse_ack(const char* line, CanStats* stats) {
// Clear page content
static void clear_page_content(PageContent* page) {
page->page_index = 0;
page->total_pages = 0;
page->page_type = PageTypeInfo;
page->title[0] = '\0';
page->line_count = 0;
page->action_count = 0;
page->selected_index = 0;
page->data_valid = false;
for(int i = 0; i < MAX_LINES; i++) {
page->lines[i][0] = '\0';
}
for(int i = 0; i < MAX_ACTIONS; i++) {
page->actions[i][0] = '\0';
}
}
// Parse semicolon-separated items into array
static uint8_t parse_items(const char* str, char items[][MAX_LINE_LENGTH], uint8_t max_items) {
uint8_t count = 0;
if(!str || str[0] == '\0') {
return 0;
}
const char* start = str;
const char* end;
while(count < max_items && start && *start) {
end = strchr(start, ';');
size_t len;
if(end) {
len = end - start;
} else {
len = strlen(start);
}
if(len >= MAX_LINE_LENGTH) {
len = MAX_LINE_LENGTH - 1;
}
if(len > 0) {
memcpy(items[count], start, len);
items[count][len] = '\0';
count++;
}
if(end) {
start = end + 1;
} else {
break;
}
}
return count;
}
// Parse PAGE message
// Format: PAGE:<idx>/<total>|<type>|<title>|<lines>|<actions>|<selected>
static bool parse_page(const char* line, PageContent* page) {
if(strncmp(line, "PAGE:", 5) != 0) {
return false;
}
const char* p = line + 5;
char* end;
// Parse page index and total
page->page_index = (uint8_t)strtoul(p, &end, 10);
if(*end != '/') return false;
p = end + 1;
page->total_pages = (uint8_t)strtoul(p, &end, 10);
if(*end != '|') return false;
p = end + 1;
// Parse page type
const char* type_end = strchr(p, '|');
if(!type_end) return false;
if(strncmp(p, "info", type_end - p) == 0) {
page->page_type = PageTypeInfo;
} else if(strncmp(p, "menu", type_end - p) == 0) {
page->page_type = PageTypeMenu;
} else if(strncmp(p, "confirm", type_end - p) == 0) {
page->page_type = PageTypeConfirm;
} else {
page->page_type = PageTypeInfo;
}
p = type_end + 1;
// Parse title
const char* title_end = strchr(p, '|');
if(!title_end) return false;
size_t title_len = title_end - p;
if(title_len >= MAX_TITLE_LENGTH) {
title_len = MAX_TITLE_LENGTH - 1;
}
memcpy(page->title, p, title_len);
page->title[title_len] = '\0';
p = title_end + 1;
// Parse lines
const char* lines_end = strchr(p, '|');
if(!lines_end) return false;
char lines_buf[256];
size_t lines_len = lines_end - p;
if(lines_len >= sizeof(lines_buf)) {
lines_len = sizeof(lines_buf) - 1;
}
memcpy(lines_buf, p, lines_len);
lines_buf[lines_len] = '\0';
page->line_count = parse_items(lines_buf, page->lines, MAX_LINES);
p = lines_end + 1;
// Parse actions
const char* actions_end = strchr(p, '|');
if(!actions_end) return false;
char actions_buf[128];
size_t actions_len = actions_end - p;
if(actions_len >= sizeof(actions_buf)) {
actions_len = sizeof(actions_buf) - 1;
}
memcpy(actions_buf, p, actions_len);
actions_buf[actions_len] = '\0';
page->action_count = parse_items(actions_buf, page->actions, MAX_ACTIONS);
p = actions_end + 1;
// Parse selected index
page->selected_index = (uint8_t)strtoul(p, NULL, 10);
page->data_valid = true;
page->last_update_tick = furi_get_tick();
return true;
}
// Parse ACK message
static bool parse_ack(const char* line, char* ip_out, size_t ip_size) {
if(strncmp(line, "ACK:", 4) != 0) {
return false;
}
const char* p = line + 4;
// Check for rpi5 identifier
if(strncmp(p, "rpi5", 4) != 0) {
return false;
}
// Parse ip=
const char* ip = strstr(p, "ip=");
const char* ip = strstr(line, "ip=");
if(ip) {
ip += 3;
// Find end (comma, newline, or end of string)
size_t len = 0;
while(ip[len] && ip[len] != ',' && ip[len] != '\n' && ip[len] != '\r') {
len++;
}
if(len > 0 && len < IP_BUFFER_SIZE) {
memcpy(stats->ip_address, ip, len);
stats->ip_address[len] = '\0';
if(len > 0 && len < ip_size) {
memcpy(ip_out, ip, len);
ip_out[len] = '\0';
}
}
return true;
}
// Parse STATS: STATS:ip=x.x.x.x,total=N,pending=N,processed=N
static bool parse_stats(const char* line, CanStats* stats) {
if(strncmp(line, "STATS:", 6) != 0) {
// Parse RESULT message
static bool parse_result(const char* line, ResultMessage* result) {
if(strncmp(line, "RESULT:", 7) != 0) {
return false;
}
const char* p = line + 6;
const char* p = line + 7;
// Parse ip=
const char* ip = strstr(p, "ip=");
if(ip) {
ip += 3;
const char* end = strchr(ip, ',');
if(end && (size_t)(end - ip) < IP_BUFFER_SIZE) {
size_t len = end - ip;
memcpy(stats->ip_address, ip, len);
stats->ip_address[len] = '\0';
}
// Parse status
if(strncmp(p, "OK|", 3) == 0) {
result->success = true;
p += 3;
} else if(strncmp(p, "ERROR|", 6) == 0) {
result->success = false;
p += 6;
} else {
return false;
}
// Parse total=
const char* total = strstr(p, "total=");
if(total) {
stats->total_frames = strtoul(total + 6, NULL, 10);
// Parse message
size_t len = strlen(p);
if(len >= MAX_LINE_LENGTH) {
len = MAX_LINE_LENGTH - 1;
}
memcpy(result->message, p, len);
result->message[len] = '\0';
// Parse pending=
const char* pending = strstr(p, "pending=");
if(pending) {
stats->pending_frames = strtoul(pending + 8, NULL, 10);
}
// Parse processed=
const char* processed = strstr(p, "processed=");
if(processed) {
stats->processed_frames = strtoul(processed + 10, NULL, 10);
}
stats->data_received = true;
stats->last_update_tick = furi_get_tick();
result->has_result = true;
result->show_until_tick = furi_get_tick() + 3000; // Show for 3 seconds
return true;
}
@@ -161,13 +315,17 @@ static void process_line(CanMonitorApp* app, const char* line) {
if(app->conn_state == StateConnecting) {
// Waiting for ACK
if(parse_ack(line, &app->stats)) {
FURI_LOG_I(TAG, "ACK received, IP: %s", app->stats.ip_address);
if(parse_ack(line, app->ip_address, sizeof(app->ip_address))) {
FURI_LOG_I(TAG, "ACK received, IP: %s", app->ip_address);
app->conn_state = StateConnected;
}
} else if(app->conn_state == StateConnected) {
// Parse stats
parse_stats(line, &app->stats);
// Parse PAGE or RESULT
if(line[0] == 'P') {
parse_page(line, &app->page);
} else if(line[0] == 'R') {
parse_result(line, &app->result);
}
}
furi_mutex_release(app->mutex);
@@ -201,6 +359,7 @@ static int32_t worker_thread(void* ctx) {
furi_mutex_acquire(app->mutex, FuriWaitForever);
app->conn_state = StateConnecting;
clear_page_content(&app->page);
furi_mutex_release(app->mutex);
view_port_update(app->view_port);
}
@@ -232,12 +391,18 @@ static int32_t worker_thread(void* ctx) {
// Check data timeout (5 sec) - only when connected
furi_mutex_acquire(app->mutex, FuriWaitForever);
if(app->conn_state == StateConnected && app->stats.data_received) {
if((furi_get_tick() - app->stats.last_update_tick) > 5000) {
app->stats.data_received = false;
if(app->conn_state == StateConnected && app->page.data_valid) {
if((furi_get_tick() - app->page.last_update_tick) > 5000) {
app->page.data_valid = false;
view_port_update(app->view_port);
}
}
// Clear result message after timeout
if(app->result.has_result && furi_get_tick() > app->result.show_until_tick) {
app->result.has_result = false;
view_port_update(app->view_port);
}
furi_mutex_release(app->mutex);
}
@@ -245,24 +410,166 @@ static int32_t worker_thread(void* ctx) {
return 0;
}
// Draw welcome page
// Draw header with page indicator
static void draw_header(Canvas* canvas, CanMonitorApp* app) {
// Page indicator
char page_str[16];
snprintf(
page_str,
sizeof(page_str),
"[%d/%d]",
app->page.page_index + 1,
app->page.total_pages);
canvas_draw_str(canvas, 0, 8, page_str);
// Title (centered)
uint16_t title_width = canvas_string_width(canvas, app->page.title);
uint16_t title_x = (128 - title_width) / 2;
canvas_draw_str(canvas, title_x, 8, app->page.title);
// Connection indicator (right side)
if(app->page.data_valid) {
canvas_draw_disc(canvas, 122, 4, 3);
} else {
canvas_draw_circle(canvas, 122, 4, 3);
}
// Separator line
canvas_draw_line(canvas, 0, 11, 128, 11);
}
// Draw footer with navigation hints
static void draw_footer(Canvas* canvas, PageType page_type, uint8_t total_pages, uint8_t current_page) {
canvas_draw_line(canvas, 0, 54, 128, 54);
canvas_set_font(canvas, FontSecondary);
// Left arrow (if not first page)
if(current_page > 0) {
canvas_draw_str(canvas, 2, 63, "<");
}
// Center action hint
const char* hint = "";
switch(page_type) {
case PageTypeInfo:
hint = "";
break;
case PageTypeMenu:
hint = "OK=Select";
break;
case PageTypeConfirm:
hint = "OK=Yes Back=No";
break;
}
uint16_t hint_width = canvas_string_width(canvas, hint);
canvas_draw_str(canvas, (128 - hint_width) / 2, 63, hint);
// Right arrow (if not last page)
if(current_page < total_pages - 1) {
canvas_draw_str(canvas, 122, 63, ">");
}
}
// Draw info page
static void draw_info_page(Canvas* canvas, CanMonitorApp* app) {
canvas_set_font(canvas, FontSecondary);
uint8_t y = 22;
for(uint8_t i = 0; i < app->page.line_count && i < MAX_LINES; i++) {
canvas_draw_str(canvas, 2, y, app->page.lines[i]);
y += 10;
}
}
// Draw menu page
static void draw_menu_page(Canvas* canvas, CanMonitorApp* app) {
canvas_set_font(canvas, FontSecondary);
uint8_t y = 22;
for(uint8_t i = 0; i < app->page.action_count && i < MAX_ACTIONS; i++) {
// Selection indicator
if(i == app->page.selected_index) {
canvas_draw_box(canvas, 0, y - 8, 128, 10);
canvas_set_color(canvas, ColorWhite);
}
canvas_draw_str(canvas, 4, y, app->page.actions[i]);
if(i == app->page.selected_index) {
canvas_set_color(canvas, ColorBlack);
}
y += 10;
}
}
// Draw confirm page
static void draw_confirm_page(Canvas* canvas, CanMonitorApp* app) {
canvas_set_font(canvas, FontSecondary);
// Message (centered)
if(app->page.line_count > 0) {
uint16_t msg_width = canvas_string_width(canvas, app->page.lines[0]);
canvas_draw_str(canvas, (128 - msg_width) / 2, 30, app->page.lines[0]);
}
// Yes/No options
uint8_t btn_y = 44;
uint8_t yes_x = 32;
uint8_t no_x = 80;
// Draw Yes
if(app->page.selected_index == 0) {
canvas_draw_box(canvas, yes_x - 2, btn_y - 8, 24, 12);
canvas_set_color(canvas, ColorWhite);
}
canvas_draw_str(canvas, yes_x, btn_y, "Yes");
if(app->page.selected_index == 0) {
canvas_set_color(canvas, ColorBlack);
}
// Draw No
if(app->page.selected_index == 1) {
canvas_draw_box(canvas, no_x - 2, btn_y - 8, 20, 12);
canvas_set_color(canvas, ColorWhite);
}
canvas_draw_str(canvas, no_x, btn_y, "No");
if(app->page.selected_index == 1) {
canvas_set_color(canvas, ColorBlack);
}
}
// Draw result overlay
static void draw_result_overlay(Canvas* canvas, ResultMessage* result) {
// Semi-transparent background
canvas_set_color(canvas, ColorWhite);
canvas_draw_box(canvas, 10, 20, 108, 28);
canvas_set_color(canvas, ColorBlack);
canvas_draw_frame(canvas, 10, 20, 108, 28);
canvas_set_font(canvas, FontPrimary);
const char* status = result->success ? "OK" : "Error";
canvas_draw_str(canvas, 14, 32, status);
canvas_set_font(canvas, FontSecondary);
canvas_draw_str(canvas, 14, 44, result->message);
}
// Draw welcome/connecting screen
static void draw_welcome(Canvas* canvas, CanMonitorApp* app) {
canvas_clear(canvas);
// Title
canvas_set_font(canvas, FontPrimary);
canvas_draw_str_aligned(canvas, 64, 8, AlignCenter, AlignCenter, "CAN Monitor");
canvas_set_font(canvas, FontSecondary);
canvas_draw_str_aligned(canvas, 64, 22, AlignCenter, AlignCenter, "RPI5 Dynamic UI");
// Subtitle
canvas_draw_str_aligned(canvas, 64, 22, AlignCenter, AlignCenter, "RPI5 CAN Sniffer");
// Status based on connection state
furi_mutex_acquire(app->mutex, FuriWaitForever);
ConnectionState state = app->conn_state;
char ip_buf[IP_BUFFER_SIZE];
strncpy(ip_buf, app->stats.ip_address, IP_BUFFER_SIZE);
char ip_buf[32];
strncpy(ip_buf, app->ip_address, sizeof(ip_buf));
furi_mutex_release(app->mutex);
switch(state) {
@@ -283,139 +590,167 @@ static void draw_welcome(Canvas* canvas, CanMonitorApp* app) {
snprintf(buf, sizeof(buf), "RPI5: %s", ip_buf);
canvas_draw_str_aligned(canvas, 64, 46, AlignCenter, AlignCenter, buf);
}
canvas_draw_str_aligned(canvas, 64, 58, AlignCenter, AlignCenter, "Press RIGHT >");
canvas_draw_str_aligned(canvas, 64, 58, AlignCenter, AlignCenter, "Waiting for data...");
break;
}
}
// Draw stats page
static void draw_stats(Canvas* canvas, CanMonitorApp* app) {
char buf[64];
canvas_clear(canvas);
// Header
canvas_set_font(canvas, FontPrimary);
canvas_draw_str(canvas, 10, 10, "CAN Monitor");
// Navigation hint
canvas_draw_str(canvas, 0, 10, "<");
// Connection indicator
furi_mutex_acquire(app->mutex, FuriWaitForever);
bool data_ok = app->stats.data_received;
furi_mutex_release(app->mutex);
if(data_ok) {
canvas_draw_disc(canvas, 120, 6, 4);
} else {
canvas_draw_circle(canvas, 120, 6, 4);
}
// Separator
canvas_draw_line(canvas, 0, 14, 128, 14);
canvas_set_font(canvas, FontSecondary);
furi_mutex_acquire(app->mutex, FuriWaitForever);
// IP Address
if(strlen(app->stats.ip_address) > 0) {
snprintf(buf, sizeof(buf), "IP: %s", app->stats.ip_address);
} else {
snprintf(buf, sizeof(buf), "IP: ---");
}
canvas_draw_str(canvas, 0, 26, buf);
// Total frames
snprintf(buf, sizeof(buf), "Total frames: %lu", (unsigned long)app->stats.total_frames);
canvas_draw_str(canvas, 0, 38, buf);
// Pending frames
snprintf(buf, sizeof(buf), "Pending: %lu", (unsigned long)app->stats.pending_frames);
canvas_draw_str(canvas, 0, 50, buf);
// Processed frames
snprintf(buf, sizeof(buf), "Processed: %lu", (unsigned long)app->stats.processed_frames);
canvas_draw_str(canvas, 4, 62, buf);
furi_mutex_release(app->mutex);
}
// Draw callback
static void draw_callback(Canvas* canvas, void* ctx) {
CanMonitorApp* app = ctx;
switch(app->current_page) {
case PageWelcome:
draw_welcome(canvas, app);
furi_mutex_acquire(app->mutex, FuriWaitForever);
// Check if we should show welcome screen
if(app->conn_state != StateConnected || !app->page.data_valid) {
furi_mutex_release(app->mutex);
draw_welcome(canvas, app);
return;
}
canvas_clear(canvas);
canvas_set_font(canvas, FontPrimary);
// Draw header
draw_header(canvas, app);
// Draw content based on page type
switch(app->page.page_type) {
case PageTypeInfo:
draw_info_page(canvas, app);
break;
case PageStats:
draw_stats(canvas, app);
case PageTypeMenu:
draw_menu_page(canvas, app);
break;
case PageTypeConfirm:
draw_confirm_page(canvas, app);
break;
}
// Draw footer
draw_footer(canvas, app->page.page_type, app->page.total_pages, app->page.page_index);
// Draw result overlay if present
if(app->result.has_result) {
draw_result_overlay(canvas, &app->result);
}
furi_mutex_release(app->mutex);
}
// Input callback
static void input_callback(InputEvent* event, void* ctx) {
CanMonitorApp* app = ctx;
if(event->type == InputTypeShort) {
furi_mutex_acquire(app->mutex, FuriWaitForever);
if(event->type != InputTypeShort && event->type != InputTypeRepeat) {
return;
}
furi_mutex_acquire(app->mutex, FuriWaitForever);
// Handle welcome screen input
if(app->conn_state == StateDisconnected) {
if(event->key == InputKeyOk) {
app->send_init = true;
} else if(event->key == InputKeyBack) {
app->running = false;
}
furi_mutex_release(app->mutex);
return;
}
// Handle connected state input
if(app->conn_state == StateConnected && app->page.data_valid) {
switch(event->key) {
case InputKeyOk:
if(app->current_page == PageWelcome && app->conn_state == StateDisconnected) {
// Start handshake
app->send_init = true;
case InputKeyUp:
if(app->page.page_type == PageTypeMenu && app->page.selected_index > 0) {
app->page.selected_index--;
view_port_update(app->view_port);
} else if(app->page.page_type == PageTypeConfirm) {
app->page.selected_index = 0; // Yes
view_port_update(app->view_port);
}
break;
case InputKeyRight:
// Only allow if connected
if(app->current_page == PageWelcome && app->conn_state == StateConnected) {
app->current_page = PageStats;
case InputKeyDown:
if(app->page.page_type == PageTypeMenu &&
app->page.selected_index < app->page.action_count - 1) {
app->page.selected_index++;
view_port_update(app->view_port);
} else if(app->page.page_type == PageTypeConfirm) {
app->page.selected_index = 1; // No
view_port_update(app->view_port);
}
break;
case InputKeyLeft:
if(app->current_page == PageStats) {
app->current_page = PageWelcome;
view_port_update(app->view_port);
// Navigate to previous page
uart_send(app, "CMD:NAV:prev\n");
break;
case InputKeyRight:
// Navigate to next page
uart_send(app, "CMD:NAV:next\n");
break;
case InputKeyOk:
if(app->page.page_type == PageTypeMenu) {
// Send selection
char cmd[32];
snprintf(cmd, sizeof(cmd), "CMD:SELECT:%d\n", app->page.selected_index);
uart_send(app, cmd);
} else if(app->page.page_type == PageTypeConfirm) {
if(app->page.selected_index == 0) {
uart_send(app, "CMD:CONFIRM\n");
} else {
uart_send(app, "CMD:CANCEL\n");
}
}
break;
case InputKeyBack:
if(app->current_page == PageWelcome) {
app->running = false;
} else {
app->current_page = PageWelcome;
if(app->page.page_type == PageTypeConfirm) {
// Cancel confirmation
uart_send(app, "CMD:CANCEL\n");
} else if(app->page.page_index == 0) {
// On first page, disconnect
uart_send(app, "STOP:flipper\n");
app->conn_state = StateDisconnected;
clear_page_content(&app->page);
view_port_update(app->view_port);
} else {
// Go to previous page
uart_send(app, "CMD:NAV:prev\n");
}
break;
default:
break;
}
furi_mutex_release(app->mutex);
} else if(event->key == InputKeyBack) {
// Disconnect and exit
if(app->conn_state != StateDisconnected) {
uart_send(app, "STOP:flipper\n");
}
app->running = false;
}
furi_mutex_release(app->mutex);
}
// Main entry point
int32_t can_monitor_app(void* p) {
UNUSED(p);
FURI_LOG_I(TAG, "Starting CAN Monitor");
FURI_LOG_I(TAG, "Starting CAN Monitor (Dynamic UI)");
// Allocate app
CanMonitorApp* app = malloc(sizeof(CanMonitorApp));
memset(app, 0, sizeof(CanMonitorApp));
app->running = true;
app->current_page = PageWelcome;
app->conn_state = StateDisconnected;
app->send_init = false;
clear_page_content(&app->page);
// Mutex
app->mutex = furi_mutex_alloc(FuriMutexTypeNormal);
@@ -440,7 +775,7 @@ int32_t can_monitor_app(void* p) {
}
// Start worker
app->worker_thread = furi_thread_alloc_ex("CanMonitorWorker", 1024, worker_thread, app);
app->worker_thread = furi_thread_alloc_ex("CanMonitorWorker", 2048, worker_thread, app);
furi_thread_start(app->worker_thread);
// Init GUI
@@ -450,7 +785,7 @@ int32_t can_monitor_app(void* p) {
view_port_input_callback_set(app->view_port, input_callback, app);
gui_add_view_port(app->gui, app->view_port, GuiLayerFullscreen);
FURI_LOG_I(TAG, "GUI initialized, waiting for user input");
FURI_LOG_I(TAG, "GUI initialized");
// Main loop
while(app->running) {
@@ -460,7 +795,7 @@ int32_t can_monitor_app(void* p) {
FURI_LOG_I(TAG, "Shutting down");
// Send disconnect signal
if(app->conn_state == StateConnected) {
if(app->conn_state != StateDisconnected) {
uart_send(app, "STOP:flipper\n");
}