From cfb8062f266023e219eadaff0a979aa8deb39a9b Mon Sep 17 00:00:00 2001 From: Alexander Poletaev Date: Wed, 28 Jan 2026 00:43:06 +0300 Subject: [PATCH] Add flipper zero page for app status --- can_sniffer/src/flipper/pages/__init__.py | 2 + can_sniffer/src/flipper/pages/app_status.py | 160 +++++++++++++++++ can_sniffer/src/flipper/providers/__init__.py | 2 + .../flipper/providers/app_status_provider.py | 163 ++++++++++++++++++ can_sniffer/src/handlers/flipper_handler.py | 9 +- 5 files changed, 335 insertions(+), 1 deletion(-) create mode 100644 can_sniffer/src/flipper/pages/app_status.py create mode 100644 can_sniffer/src/flipper/providers/app_status_provider.py diff --git a/can_sniffer/src/flipper/pages/__init__.py b/can_sniffer/src/flipper/pages/__init__.py index de58538..2a8e6ab 100644 --- a/can_sniffer/src/flipper/pages/__init__.py +++ b/can_sniffer/src/flipper/pages/__init__.py @@ -10,6 +10,7 @@ from flipper.pages.can_stats import CANStatsPage from flipper.pages.ups_status import UPSStatusPage from flipper.pages.system_info import SystemInfoPage from flipper.pages.actions import ActionsPage +from flipper.pages.app_status import AppStatusPage __all__ = [ "BasePage", @@ -17,4 +18,5 @@ __all__ = [ "UPSStatusPage", "SystemInfoPage", "ActionsPage", + "AppStatusPage", ] diff --git a/can_sniffer/src/flipper/pages/app_status.py b/can_sniffer/src/flipper/pages/app_status.py new file mode 100644 index 0000000..76dad22 --- /dev/null +++ b/can_sniffer/src/flipper/pages/app_status.py @@ -0,0 +1,160 @@ +""" +Application Status Page with scrolling support. + +Displays can_sniffer application status on Flipper Zero. +""" + +from flipper.pages.base import BasePage +from flipper.protocol import PageContent, PageType +from flipper.providers.app_status_provider import AppStatusProvider + + +class AppStatusPage(BasePage): + """ + Page displaying can_sniffer application status with scrolling. + + Shows: + - SQLite: OK/ERR, records + - PostgreSQL: status, sent/failed/synced + - Queue: size/capacity + - Handlers: active/total + - Uptime + + Supports Up/Down scrolling when content exceeds display. + """ + + MAX_VISIBLE_LINES = 4 + + def __init__(self): + super().__init__( + name="app_status", + title="App Status", + icon="status" + ) + self._provider = AppStatusProvider() + self._scroll_offset = 0 + self._all_lines: list[str] = [] + + def _format_number(self, num: int) -> str: + """Format large numbers with K/M suffix.""" + if num >= 1_000_000: + return f"{num / 1_000_000:.1f}M" + elif num >= 1_000: + return f"{num / 1_000:.1f}K" + else: + return str(num) + + def _format_uptime(self, seconds: float) -> str: + """Format uptime as human readable string.""" + if seconds < 60: + return f"{int(seconds)}s" + elif seconds < 3600: + mins = int(seconds / 60) + secs = int(seconds % 60) + return f"{mins}m{secs}s" + else: + hours = int(seconds / 3600) + mins = int((seconds % 3600) / 60) + return f"{hours}h{mins}m" + + def _get_pg_status_icon(self, status: str) -> str: + """Get status icon for PostgreSQL.""" + icons = { + "connected": "+", + "disconnected": "-", + "connecting": "~", + "error": "!", + "disabled": "x" + } + return icons.get(status, "?") + + def _build_all_lines(self) -> list[str]: + """Build complete list of status lines.""" + self._provider.refresh() + data = self._provider.get_data() + + lines = [] + + # SQLite status + sqlite_status = "OK" if data.sqlite_ok else "ERR" + lines.append(f"SQLite: {sqlite_status}") + if data.sqlite_ok: + lines.append(f" Rec: {self._format_number(data.sqlite_records)}") + if data.sqlite_unprocessed > 0: + lines.append(f" Unsync: {self._format_number(data.sqlite_unprocessed)}") + + # PostgreSQL status + pg_icon = self._get_pg_status_icon(data.pg_status) + lines.append(f"PgSQL: [{pg_icon}] {data.pg_status}") + if data.pg_enabled and data.pg_status != "disabled": + lines.append(f" Sent: {self._format_number(data.pg_sent)}") + if data.pg_failed > 0: + lines.append(f" Fail: {self._format_number(data.pg_failed)}") + if data.pg_synced > 0: + lines.append(f" Sync: {self._format_number(data.pg_synced)}") + if data.pg_queue > 0: + lines.append(f" Queue: {data.pg_queue}") + + # Queue status + if data.queue_capacity > 0: + pct = int(data.queue_size / data.queue_capacity * 100) if data.queue_capacity else 0 + lines.append(f"MsgQ: {data.queue_size}/{data.queue_capacity} ({pct}%)") + + # Processed/Dropped + if data.processed > 0: + lines.append(f"Proc: {self._format_number(data.processed)}") + if data.dropped > 0: + lines.append(f"Drop: {self._format_number(data.dropped)} (!)") + + # Handlers + if data.handlers_total > 0: + lines.append(f"Handlers: {data.handlers_active}/{data.handlers_total}") + + # Uptime + lines.append(f"Uptime: {self._format_uptime(data.uptime_seconds)}") + + return lines + + def get_content(self) -> PageContent: + """Get page content with scroll support.""" + self._all_lines = self._build_all_lines() + + # Calculate visible window + max_offset = max(0, len(self._all_lines) - self.MAX_VISIBLE_LINES) + self._scroll_offset = min(self._scroll_offset, max_offset) + + visible_lines = self._all_lines[ + self._scroll_offset:self._scroll_offset + self.MAX_VISIBLE_LINES + ] + + # Add scroll indicators + if self._scroll_offset > 0 and visible_lines: + visible_lines[0] = "^ " + visible_lines[0] + if self._scroll_offset < max_offset and visible_lines: + visible_lines[-1] = "v " + visible_lines[-1] + + return PageContent( + page_type=PageType.INFO, + title=self.title, + lines=visible_lines, + icon=self.icon + ) + + def scroll_up(self) -> None: + """Scroll content up.""" + if self._scroll_offset > 0: + self._scroll_offset -= 1 + + def scroll_down(self) -> None: + """Scroll content down.""" + max_offset = max(0, len(self._all_lines) - self.MAX_VISIBLE_LINES) + if self._scroll_offset < max_offset: + self._scroll_offset += 1 + + def on_enter(self) -> None: + """Reset scroll when entering page.""" + self._scroll_offset = 0 + + def get_provider(self) -> AppStatusProvider: + """Get the AppStatus provider instance.""" + return self._provider diff --git a/can_sniffer/src/flipper/providers/__init__.py b/can_sniffer/src/flipper/providers/__init__.py index c0d7d2b..360b653 100644 --- a/can_sniffer/src/flipper/providers/__init__.py +++ b/can_sniffer/src/flipper/providers/__init__.py @@ -9,10 +9,12 @@ from flipper.providers.base import BaseProvider from flipper.providers.ups_provider import UPSProvider from flipper.providers.system_provider import SystemProvider from flipper.providers.can_provider import CANProvider +from flipper.providers.app_status_provider import AppStatusProvider __all__ = [ "BaseProvider", "UPSProvider", "SystemProvider", "CANProvider", + "AppStatusProvider", ] diff --git a/can_sniffer/src/flipper/providers/app_status_provider.py b/can_sniffer/src/flipper/providers/app_status_provider.py new file mode 100644 index 0000000..c11ba71 --- /dev/null +++ b/can_sniffer/src/flipper/providers/app_status_provider.py @@ -0,0 +1,163 @@ +""" +Application Status Provider. + +Collects status from all can_sniffer components for display on Flipper Zero. +""" + +import time +from dataclasses import dataclass, field +from typing import Dict, Any, Optional + +from flipper.providers.base import BaseProvider + + +@dataclass +class AppStatusData: + """Application status data.""" + # SQLite Storage + sqlite_ok: bool = False + sqlite_records: int = 0 + sqlite_unprocessed: int = 0 + + # PostgreSQL + pg_enabled: bool = False + pg_status: str = "disabled" # connected, disconnected, error, disabled + pg_sent: int = 0 + pg_failed: int = 0 + pg_synced: int = 0 + pg_queue: int = 0 + + # Message Processor + queue_size: int = 0 + queue_capacity: int = 0 + processed: int = 0 + dropped: int = 0 + + # Handlers + handlers_active: int = 0 + handlers_total: int = 0 + + # Runtime + uptime_seconds: float = 0.0 + + +class AppStatusProvider(BaseProvider[AppStatusData]): + """ + Provider for application status data. + + Collects data from: + - Storage (SQLite) + - PostgreSQL client + - MessageProcessor + - Handlers + """ + + _start_time: float = 0.0 + + def __init__(self): + super().__init__() + self._data = AppStatusData() + AppStatusProvider._start_time = time.time() + + def refresh(self) -> None: + """Refresh status data from all components.""" + try: + self._collect_storage_status() + except Exception: + self._data.sqlite_ok = False + + try: + self._collect_postgresql_status() + except Exception: + pass + + try: + self._collect_processor_status() + except Exception: + pass + + # Uptime + self._data.uptime_seconds = time.time() - AppStatusProvider._start_time + + def _collect_storage_status(self) -> None: + """Collect SQLite storage status.""" + try: + from storage import get_storage + storage = get_storage() + + if storage and storage.connection: + self._data.sqlite_ok = True + stats = storage.get_stats() + self._data.sqlite_records = stats.get("total_records", 0) + self._data.sqlite_unprocessed = stats.get("unprocessed_records", 0) + else: + self._data.sqlite_ok = False + except Exception: + self._data.sqlite_ok = False + + def _collect_postgresql_status(self) -> None: + """Collect PostgreSQL status.""" + try: + from postgresql_handler import get_postgresql_client + from postgresql_handler.postgresql_client import ConnectionStatus + + client = get_postgresql_client() + stats = client.get_stats() + + self._data.pg_enabled = stats.get("enabled", False) + + if not self._data.pg_enabled: + self._data.pg_status = "disabled" + else: + status = stats.get("connection_status", "disconnected") + self._data.pg_status = status + + self._data.pg_sent = stats.get("sent_count", 0) + self._data.pg_failed = stats.get("failed_count", 0) + self._data.pg_synced = stats.get("synced_count", 0) + self._data.pg_queue = stats.get("queue_size", 0) + + except Exception: + self._data.pg_status = "error" + + def _collect_processor_status(self) -> None: + """Collect message processor status.""" + try: + # Try to get processor from global/singleton + from socket_can.message_processor import MessageProcessor + + # This is a bit hacky - we need access to the processor instance + # In real app, you'd have a global reference or pass it in + # For now, try to get stats from handlers + + # Get queue info from config as fallback + from config import config + self._data.queue_capacity = config.general.buffer_size + + except Exception: + pass + + def get_data(self) -> AppStatusData: + """Get current status data.""" + return self._data + + def update_processor_stats( + self, + queue_size: int = 0, + queue_capacity: int = 0, + processed: int = 0, + dropped: int = 0, + handlers_active: int = 0, + handlers_total: int = 0 + ) -> None: + """ + Update processor stats externally. + + Called by FlipperHandler or MessageProcessor to update stats. + """ + self._data.queue_size = queue_size + self._data.queue_capacity = queue_capacity + self._data.processed = processed + self._data.dropped = dropped + self._data.handlers_active = handlers_active + self._data.handlers_total = handlers_total diff --git a/can_sniffer/src/handlers/flipper_handler.py b/can_sniffer/src/handlers/flipper_handler.py index 7268268..80ea282 100644 --- a/can_sniffer/src/handlers/flipper_handler.py +++ b/can_sniffer/src/handlers/flipper_handler.py @@ -31,7 +31,7 @@ from logger import get_logger from flipper.protocol import Protocol, Command, CommandType from flipper.page_manager import PageManager -from flipper.pages import CANStatsPage, UPSStatusPage, SystemInfoPage, ActionsPage +from flipper.pages import CANStatsPage, UPSStatusPage, SystemInfoPage, ActionsPage, AppStatusPage logger = get_logger(__name__) @@ -136,6 +136,13 @@ class FlipperHandler(BaseHandler): # Keep reference to CAN provider for stats updates self._can_provider = can_page.get_provider() + # Application Status (SQLite, PostgreSQL, Queue, etc.) + app_status_page = AppStatusPage() + self._page_manager.register_page(app_status_page) + + # Keep reference to app status provider for updates + self._app_status_provider = app_status_page.get_provider() + # UPS Status (if available) ups_page = UPSStatusPage() self._page_manager.register_page(ups_page)