Add flipper zero page for app status
This commit is contained in:
@@ -10,6 +10,7 @@ from flipper.pages.can_stats import CANStatsPage
|
||||
from flipper.pages.ups_status import UPSStatusPage
|
||||
from flipper.pages.system_info import SystemInfoPage
|
||||
from flipper.pages.actions import ActionsPage
|
||||
from flipper.pages.app_status import AppStatusPage
|
||||
|
||||
__all__ = [
|
||||
"BasePage",
|
||||
@@ -17,4 +18,5 @@ __all__ = [
|
||||
"UPSStatusPage",
|
||||
"SystemInfoPage",
|
||||
"ActionsPage",
|
||||
"AppStatusPage",
|
||||
]
|
||||
|
||||
160
can_sniffer/src/flipper/pages/app_status.py
Normal file
160
can_sniffer/src/flipper/pages/app_status.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""
|
||||
Application Status Page with scrolling support.
|
||||
|
||||
Displays can_sniffer application status on Flipper Zero.
|
||||
"""
|
||||
|
||||
from flipper.pages.base import BasePage
|
||||
from flipper.protocol import PageContent, PageType
|
||||
from flipper.providers.app_status_provider import AppStatusProvider
|
||||
|
||||
|
||||
class AppStatusPage(BasePage):
|
||||
"""
|
||||
Page displaying can_sniffer application status with scrolling.
|
||||
|
||||
Shows:
|
||||
- SQLite: OK/ERR, records
|
||||
- PostgreSQL: status, sent/failed/synced
|
||||
- Queue: size/capacity
|
||||
- Handlers: active/total
|
||||
- Uptime
|
||||
|
||||
Supports Up/Down scrolling when content exceeds display.
|
||||
"""
|
||||
|
||||
MAX_VISIBLE_LINES = 4
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
name="app_status",
|
||||
title="App Status",
|
||||
icon="status"
|
||||
)
|
||||
self._provider = AppStatusProvider()
|
||||
self._scroll_offset = 0
|
||||
self._all_lines: list[str] = []
|
||||
|
||||
def _format_number(self, num: int) -> str:
|
||||
"""Format large numbers with K/M suffix."""
|
||||
if num >= 1_000_000:
|
||||
return f"{num / 1_000_000:.1f}M"
|
||||
elif num >= 1_000:
|
||||
return f"{num / 1_000:.1f}K"
|
||||
else:
|
||||
return str(num)
|
||||
|
||||
def _format_uptime(self, seconds: float) -> str:
|
||||
"""Format uptime as human readable string."""
|
||||
if seconds < 60:
|
||||
return f"{int(seconds)}s"
|
||||
elif seconds < 3600:
|
||||
mins = int(seconds / 60)
|
||||
secs = int(seconds % 60)
|
||||
return f"{mins}m{secs}s"
|
||||
else:
|
||||
hours = int(seconds / 3600)
|
||||
mins = int((seconds % 3600) / 60)
|
||||
return f"{hours}h{mins}m"
|
||||
|
||||
def _get_pg_status_icon(self, status: str) -> str:
|
||||
"""Get status icon for PostgreSQL."""
|
||||
icons = {
|
||||
"connected": "+",
|
||||
"disconnected": "-",
|
||||
"connecting": "~",
|
||||
"error": "!",
|
||||
"disabled": "x"
|
||||
}
|
||||
return icons.get(status, "?")
|
||||
|
||||
def _build_all_lines(self) -> list[str]:
|
||||
"""Build complete list of status lines."""
|
||||
self._provider.refresh()
|
||||
data = self._provider.get_data()
|
||||
|
||||
lines = []
|
||||
|
||||
# SQLite status
|
||||
sqlite_status = "OK" if data.sqlite_ok else "ERR"
|
||||
lines.append(f"SQLite: {sqlite_status}")
|
||||
if data.sqlite_ok:
|
||||
lines.append(f" Rec: {self._format_number(data.sqlite_records)}")
|
||||
if data.sqlite_unprocessed > 0:
|
||||
lines.append(f" Unsync: {self._format_number(data.sqlite_unprocessed)}")
|
||||
|
||||
# PostgreSQL status
|
||||
pg_icon = self._get_pg_status_icon(data.pg_status)
|
||||
lines.append(f"PgSQL: [{pg_icon}] {data.pg_status}")
|
||||
if data.pg_enabled and data.pg_status != "disabled":
|
||||
lines.append(f" Sent: {self._format_number(data.pg_sent)}")
|
||||
if data.pg_failed > 0:
|
||||
lines.append(f" Fail: {self._format_number(data.pg_failed)}")
|
||||
if data.pg_synced > 0:
|
||||
lines.append(f" Sync: {self._format_number(data.pg_synced)}")
|
||||
if data.pg_queue > 0:
|
||||
lines.append(f" Queue: {data.pg_queue}")
|
||||
|
||||
# Queue status
|
||||
if data.queue_capacity > 0:
|
||||
pct = int(data.queue_size / data.queue_capacity * 100) if data.queue_capacity else 0
|
||||
lines.append(f"MsgQ: {data.queue_size}/{data.queue_capacity} ({pct}%)")
|
||||
|
||||
# Processed/Dropped
|
||||
if data.processed > 0:
|
||||
lines.append(f"Proc: {self._format_number(data.processed)}")
|
||||
if data.dropped > 0:
|
||||
lines.append(f"Drop: {self._format_number(data.dropped)} (!)")
|
||||
|
||||
# Handlers
|
||||
if data.handlers_total > 0:
|
||||
lines.append(f"Handlers: {data.handlers_active}/{data.handlers_total}")
|
||||
|
||||
# Uptime
|
||||
lines.append(f"Uptime: {self._format_uptime(data.uptime_seconds)}")
|
||||
|
||||
return lines
|
||||
|
||||
def get_content(self) -> PageContent:
|
||||
"""Get page content with scroll support."""
|
||||
self._all_lines = self._build_all_lines()
|
||||
|
||||
# Calculate visible window
|
||||
max_offset = max(0, len(self._all_lines) - self.MAX_VISIBLE_LINES)
|
||||
self._scroll_offset = min(self._scroll_offset, max_offset)
|
||||
|
||||
visible_lines = self._all_lines[
|
||||
self._scroll_offset:self._scroll_offset + self.MAX_VISIBLE_LINES
|
||||
]
|
||||
|
||||
# Add scroll indicators
|
||||
if self._scroll_offset > 0 and visible_lines:
|
||||
visible_lines[0] = "^ " + visible_lines[0]
|
||||
if self._scroll_offset < max_offset and visible_lines:
|
||||
visible_lines[-1] = "v " + visible_lines[-1]
|
||||
|
||||
return PageContent(
|
||||
page_type=PageType.INFO,
|
||||
title=self.title,
|
||||
lines=visible_lines,
|
||||
icon=self.icon
|
||||
)
|
||||
|
||||
def scroll_up(self) -> None:
|
||||
"""Scroll content up."""
|
||||
if self._scroll_offset > 0:
|
||||
self._scroll_offset -= 1
|
||||
|
||||
def scroll_down(self) -> None:
|
||||
"""Scroll content down."""
|
||||
max_offset = max(0, len(self._all_lines) - self.MAX_VISIBLE_LINES)
|
||||
if self._scroll_offset < max_offset:
|
||||
self._scroll_offset += 1
|
||||
|
||||
def on_enter(self) -> None:
|
||||
"""Reset scroll when entering page."""
|
||||
self._scroll_offset = 0
|
||||
|
||||
def get_provider(self) -> AppStatusProvider:
|
||||
"""Get the AppStatus provider instance."""
|
||||
return self._provider
|
||||
@@ -9,10 +9,12 @@ from flipper.providers.base import BaseProvider
|
||||
from flipper.providers.ups_provider import UPSProvider
|
||||
from flipper.providers.system_provider import SystemProvider
|
||||
from flipper.providers.can_provider import CANProvider
|
||||
from flipper.providers.app_status_provider import AppStatusProvider
|
||||
|
||||
__all__ = [
|
||||
"BaseProvider",
|
||||
"UPSProvider",
|
||||
"SystemProvider",
|
||||
"CANProvider",
|
||||
"AppStatusProvider",
|
||||
]
|
||||
|
||||
163
can_sniffer/src/flipper/providers/app_status_provider.py
Normal file
163
can_sniffer/src/flipper/providers/app_status_provider.py
Normal file
@@ -0,0 +1,163 @@
|
||||
"""
|
||||
Application Status Provider.
|
||||
|
||||
Collects status from all can_sniffer components for display on Flipper Zero.
|
||||
"""
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
from flipper.providers.base import BaseProvider
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppStatusData:
|
||||
"""Application status data."""
|
||||
# SQLite Storage
|
||||
sqlite_ok: bool = False
|
||||
sqlite_records: int = 0
|
||||
sqlite_unprocessed: int = 0
|
||||
|
||||
# PostgreSQL
|
||||
pg_enabled: bool = False
|
||||
pg_status: str = "disabled" # connected, disconnected, error, disabled
|
||||
pg_sent: int = 0
|
||||
pg_failed: int = 0
|
||||
pg_synced: int = 0
|
||||
pg_queue: int = 0
|
||||
|
||||
# Message Processor
|
||||
queue_size: int = 0
|
||||
queue_capacity: int = 0
|
||||
processed: int = 0
|
||||
dropped: int = 0
|
||||
|
||||
# Handlers
|
||||
handlers_active: int = 0
|
||||
handlers_total: int = 0
|
||||
|
||||
# Runtime
|
||||
uptime_seconds: float = 0.0
|
||||
|
||||
|
||||
class AppStatusProvider(BaseProvider[AppStatusData]):
|
||||
"""
|
||||
Provider for application status data.
|
||||
|
||||
Collects data from:
|
||||
- Storage (SQLite)
|
||||
- PostgreSQL client
|
||||
- MessageProcessor
|
||||
- Handlers
|
||||
"""
|
||||
|
||||
_start_time: float = 0.0
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._data = AppStatusData()
|
||||
AppStatusProvider._start_time = time.time()
|
||||
|
||||
def refresh(self) -> None:
|
||||
"""Refresh status data from all components."""
|
||||
try:
|
||||
self._collect_storage_status()
|
||||
except Exception:
|
||||
self._data.sqlite_ok = False
|
||||
|
||||
try:
|
||||
self._collect_postgresql_status()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
self._collect_processor_status()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Uptime
|
||||
self._data.uptime_seconds = time.time() - AppStatusProvider._start_time
|
||||
|
||||
def _collect_storage_status(self) -> None:
|
||||
"""Collect SQLite storage status."""
|
||||
try:
|
||||
from storage import get_storage
|
||||
storage = get_storage()
|
||||
|
||||
if storage and storage.connection:
|
||||
self._data.sqlite_ok = True
|
||||
stats = storage.get_stats()
|
||||
self._data.sqlite_records = stats.get("total_records", 0)
|
||||
self._data.sqlite_unprocessed = stats.get("unprocessed_records", 0)
|
||||
else:
|
||||
self._data.sqlite_ok = False
|
||||
except Exception:
|
||||
self._data.sqlite_ok = False
|
||||
|
||||
def _collect_postgresql_status(self) -> None:
|
||||
"""Collect PostgreSQL status."""
|
||||
try:
|
||||
from postgresql_handler import get_postgresql_client
|
||||
from postgresql_handler.postgresql_client import ConnectionStatus
|
||||
|
||||
client = get_postgresql_client()
|
||||
stats = client.get_stats()
|
||||
|
||||
self._data.pg_enabled = stats.get("enabled", False)
|
||||
|
||||
if not self._data.pg_enabled:
|
||||
self._data.pg_status = "disabled"
|
||||
else:
|
||||
status = stats.get("connection_status", "disconnected")
|
||||
self._data.pg_status = status
|
||||
|
||||
self._data.pg_sent = stats.get("sent_count", 0)
|
||||
self._data.pg_failed = stats.get("failed_count", 0)
|
||||
self._data.pg_synced = stats.get("synced_count", 0)
|
||||
self._data.pg_queue = stats.get("queue_size", 0)
|
||||
|
||||
except Exception:
|
||||
self._data.pg_status = "error"
|
||||
|
||||
def _collect_processor_status(self) -> None:
|
||||
"""Collect message processor status."""
|
||||
try:
|
||||
# Try to get processor from global/singleton
|
||||
from socket_can.message_processor import MessageProcessor
|
||||
|
||||
# This is a bit hacky - we need access to the processor instance
|
||||
# In real app, you'd have a global reference or pass it in
|
||||
# For now, try to get stats from handlers
|
||||
|
||||
# Get queue info from config as fallback
|
||||
from config import config
|
||||
self._data.queue_capacity = config.general.buffer_size
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def get_data(self) -> AppStatusData:
|
||||
"""Get current status data."""
|
||||
return self._data
|
||||
|
||||
def update_processor_stats(
|
||||
self,
|
||||
queue_size: int = 0,
|
||||
queue_capacity: int = 0,
|
||||
processed: int = 0,
|
||||
dropped: int = 0,
|
||||
handlers_active: int = 0,
|
||||
handlers_total: int = 0
|
||||
) -> None:
|
||||
"""
|
||||
Update processor stats externally.
|
||||
|
||||
Called by FlipperHandler or MessageProcessor to update stats.
|
||||
"""
|
||||
self._data.queue_size = queue_size
|
||||
self._data.queue_capacity = queue_capacity
|
||||
self._data.processed = processed
|
||||
self._data.dropped = dropped
|
||||
self._data.handlers_active = handlers_active
|
||||
self._data.handlers_total = handlers_total
|
||||
@@ -31,7 +31,7 @@ from logger import get_logger
|
||||
|
||||
from flipper.protocol import Protocol, Command, CommandType
|
||||
from flipper.page_manager import PageManager
|
||||
from flipper.pages import CANStatsPage, UPSStatusPage, SystemInfoPage, ActionsPage
|
||||
from flipper.pages import CANStatsPage, UPSStatusPage, SystemInfoPage, ActionsPage, AppStatusPage
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -136,6 +136,13 @@ class FlipperHandler(BaseHandler):
|
||||
# Keep reference to CAN provider for stats updates
|
||||
self._can_provider = can_page.get_provider()
|
||||
|
||||
# Application Status (SQLite, PostgreSQL, Queue, etc.)
|
||||
app_status_page = AppStatusPage()
|
||||
self._page_manager.register_page(app_status_page)
|
||||
|
||||
# Keep reference to app status provider for updates
|
||||
self._app_status_provider = app_status_page.get_provider()
|
||||
|
||||
# UPS Status (if available)
|
||||
ups_page = UPSStatusPage()
|
||||
self._page_manager.register_page(ups_page)
|
||||
|
||||
Reference in New Issue
Block a user