add scroll up/down flipper zero
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
"""Page definitions and dynamic content generators."""
|
||||
|
||||
from typing import List, Dict, Callable, Optional, Any
|
||||
from typing import List, Dict, Callable, Optional, Any, Tuple
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
import socket
|
||||
@@ -31,9 +31,12 @@ class PageDefinition:
|
||||
class PageManager:
|
||||
"""Manages page content and navigation."""
|
||||
|
||||
MAX_VISIBLE_LINES = 5 # Flipper display limit
|
||||
|
||||
def __init__(self):
|
||||
self._pages: List[PageDefinition] = []
|
||||
self._current_index: int = 0
|
||||
self._scroll_offsets: Dict[int, int] = {} # page_index -> scroll offset
|
||||
self._pending_action: Optional[ActionID] = None
|
||||
self._data_providers: Dict[str, Callable[[], Any]] = {}
|
||||
self._action_handlers: Dict[ActionID, Callable[[], bool]] = {}
|
||||
@@ -132,12 +135,76 @@ class PageManager:
|
||||
return default
|
||||
|
||||
def get_current_page(self) -> Page:
|
||||
"""Get current page content."""
|
||||
"""Get current page content with scroll offset applied."""
|
||||
if 0 <= self._current_index < len(self._pages):
|
||||
page_def = self._pages[self._current_index]
|
||||
return page_def.generator(self)
|
||||
page = page_def.generator(self)
|
||||
|
||||
# Apply scrolling for info pages with more than MAX_VISIBLE_LINES
|
||||
if page.page_type == PageType.INFO and len(page.lines) > self.MAX_VISIBLE_LINES:
|
||||
offset = self._scroll_offsets.get(self._current_index, 0)
|
||||
max_offset = len(page.lines) - self.MAX_VISIBLE_LINES
|
||||
|
||||
# Clamp offset
|
||||
offset = max(0, min(offset, max_offset))
|
||||
self._scroll_offsets[self._current_index] = offset
|
||||
|
||||
# Apply scroll - show lines[offset:offset+5]
|
||||
visible_lines = page.lines[offset:offset + self.MAX_VISIBLE_LINES]
|
||||
|
||||
# Add scroll indicators
|
||||
if offset > 0:
|
||||
visible_lines[0] = "^ " + visible_lines[0][:30]
|
||||
if offset < max_offset:
|
||||
visible_lines[-1] = "v " + visible_lines[-1][:30]
|
||||
|
||||
page.lines = visible_lines
|
||||
|
||||
return page
|
||||
return Page(PageType.INFO, "Error", ["Invalid page index"])
|
||||
|
||||
def scroll_up(self) -> bool:
|
||||
"""Scroll current page up.
|
||||
|
||||
Returns:
|
||||
True if scroll occurred
|
||||
"""
|
||||
if self._current_index not in range(len(self._pages)):
|
||||
return False
|
||||
|
||||
page_def = self._pages[self._current_index]
|
||||
if page_def.page_type != PageType.INFO:
|
||||
return False
|
||||
|
||||
current_offset = self._scroll_offsets.get(self._current_index, 0)
|
||||
if current_offset > 0:
|
||||
self._scroll_offsets[self._current_index] = current_offset - 1
|
||||
return True
|
||||
return False
|
||||
|
||||
def scroll_down(self) -> bool:
|
||||
"""Scroll current page down.
|
||||
|
||||
Returns:
|
||||
True if scroll occurred
|
||||
"""
|
||||
if self._current_index not in range(len(self._pages)):
|
||||
return False
|
||||
|
||||
page_def = self._pages[self._current_index]
|
||||
if page_def.page_type != PageType.INFO:
|
||||
return False
|
||||
|
||||
# Get full page content to know total lines
|
||||
page = page_def.generator(self)
|
||||
max_offset = max(0, len(page.lines) - self.MAX_VISIBLE_LINES)
|
||||
|
||||
current_offset = self._scroll_offsets.get(self._current_index, 0)
|
||||
if current_offset < max_offset:
|
||||
self._scroll_offsets[self._current_index] = current_offset + 1
|
||||
return True
|
||||
return False
|
||||
|
||||
def navigate_next(self) -> bool:
|
||||
"""Navigate to next page.
|
||||
|
||||
@@ -146,6 +213,8 @@ class PageManager:
|
||||
"""
|
||||
if self._current_index < self.total_pages - 1:
|
||||
self._current_index += 1
|
||||
# Reset scroll for new page
|
||||
self._scroll_offsets[self._current_index] = 0
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -157,6 +226,8 @@ class PageManager:
|
||||
"""
|
||||
if self._current_index > 0:
|
||||
self._current_index -= 1
|
||||
# Reset scroll for new page
|
||||
self._scroll_offsets[self._current_index] = 0
|
||||
return True
|
||||
return False
|
||||
|
||||
@@ -240,24 +311,35 @@ class PageManager:
|
||||
# Page generators
|
||||
|
||||
def _generate_live_data_page(self, mgr: "PageManager") -> Page:
|
||||
"""Generate live vehicle data page."""
|
||||
"""Generate live vehicle data page with all available PIDs."""
|
||||
state = mgr.get_data("vehicle_state")
|
||||
lines = []
|
||||
|
||||
if state:
|
||||
rpm = state.rpm
|
||||
speed = state.speed
|
||||
coolant = state.coolant_temp
|
||||
throttle = state.throttle
|
||||
fuel = state.fuel_level
|
||||
# Get all values - returns dict of PID -> PIDValue
|
||||
all_values = state.get_all()
|
||||
|
||||
lines.append(f"RPM: {rpm:.0f}" if rpm is not None else "RPM: ---")
|
||||
lines.append(f"Speed: {speed:.0f} km/h" if speed is not None else "Speed: --- km/h")
|
||||
lines.append(f"Coolant: {coolant:.0f} C" if coolant is not None else "Coolant: --- C")
|
||||
lines.append(f"Throttle: {throttle:.1f}%" if throttle is not None else "Throttle: ---%")
|
||||
lines.append(f"Fuel: {fuel:.1f}%" if fuel is not None else "Fuel: ---%")
|
||||
if all_values:
|
||||
# Priority order for display
|
||||
priority_pids = [0x0C, 0x0D, 0x05, 0x11, 0x2F, 0x5C, 0x04, 0x0F, 0x10]
|
||||
|
||||
# Add priority PIDs first
|
||||
shown_pids = set()
|
||||
for pid_code in priority_pids:
|
||||
if pid_code in all_values:
|
||||
val = all_values[pid_code]
|
||||
lines.append(f"{val.name}: {val.value:.1f} {val.unit}")
|
||||
shown_pids.add(pid_code)
|
||||
|
||||
# Add any remaining PIDs
|
||||
for pid_code, val in sorted(all_values.items()):
|
||||
if pid_code not in shown_pids:
|
||||
lines.append(f"{val.name}: {val.value:.1f} {val.unit}")
|
||||
|
||||
if not lines:
|
||||
lines = ["Waiting for data...", "", "Polling active"]
|
||||
else:
|
||||
lines = ["No connection", "to OBD2", "", "Check CAN bus", "connection"]
|
||||
lines = ["No connection", "to OBD2", "", "Use Actions menu", "to reconnect"]
|
||||
|
||||
return Page(PageType.INFO, "Live Data", lines)
|
||||
|
||||
@@ -268,17 +350,23 @@ class PageManager:
|
||||
|
||||
queries = stats.get("queries", 0)
|
||||
successes = stats.get("successes", 0)
|
||||
failures = stats.get("failures", 0)
|
||||
rate = (successes / queries * 100) if queries > 0 else 0
|
||||
|
||||
hours = int(uptime // 3600)
|
||||
minutes = int((uptime % 3600) // 60)
|
||||
seconds = int(uptime % 60)
|
||||
|
||||
# Calculate queries per second
|
||||
qps = queries / uptime if uptime > 0 else 0
|
||||
|
||||
lines = [
|
||||
f"Queries: {queries}",
|
||||
f"Success: {successes}",
|
||||
f"Failures: {failures}",
|
||||
f"Rate: {rate:.1f}%",
|
||||
f"Failures: {stats.get('failures', 0)}",
|
||||
f"Uptime: {hours}h {minutes}m",
|
||||
f"Q/sec: {qps:.1f}",
|
||||
f"Uptime: {hours}h {minutes}m {seconds}s",
|
||||
]
|
||||
|
||||
return Page(PageType.INFO, "Statistics", lines)
|
||||
@@ -287,17 +375,23 @@ class PageManager:
|
||||
"""Generate system info page."""
|
||||
ip = self._get_ip_address()
|
||||
cpu_temp = self._get_cpu_temp()
|
||||
mem = self._get_memory_usage()
|
||||
mem_percent, mem_used, mem_total = self._get_memory_info()
|
||||
load = self._get_load_average()
|
||||
|
||||
lines = [
|
||||
f"IP: {ip}",
|
||||
f"CPU: {cpu_temp:.1f} C" if cpu_temp else "CPU: --- C",
|
||||
f"Mem: {mem:.1f}%" if mem else "Mem: ---%",
|
||||
f"CPU Temp: {cpu_temp:.1f} C" if cpu_temp else "CPU Temp: ---",
|
||||
f"CPU Load: {load}" if load else "CPU Load: ---",
|
||||
f"Mem: {mem_percent:.0f}%" if mem_percent else "Mem: ---",
|
||||
f"Mem: {mem_used}/{mem_total}MB" if mem_used else "",
|
||||
f"CAN: {mgr.get_data('can_interface', 'can0')}",
|
||||
f"Debug: {'ON' if mgr._debug_enabled else 'OFF'}",
|
||||
f"Flipper: Connected",
|
||||
]
|
||||
|
||||
return Page(PageType.INFO, "System Info", lines)
|
||||
# Remove empty lines
|
||||
lines = [l for l in lines if l]
|
||||
|
||||
return Page(PageType.INFO, "System", lines)
|
||||
|
||||
def _generate_actions_page(self, mgr: "PageManager") -> Page:
|
||||
"""Generate actions menu page."""
|
||||
@@ -345,13 +439,25 @@ class PageManager:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _get_memory_usage() -> Optional[float]:
|
||||
"""Get memory usage percentage."""
|
||||
def _get_memory_info() -> Tuple[Optional[float], Optional[int], Optional[int]]:
|
||||
"""Get memory info (percent, used_mb, total_mb)."""
|
||||
try:
|
||||
with open("/proc/meminfo", "r") as f:
|
||||
lines = f.readlines()
|
||||
total = int(lines[0].split()[1])
|
||||
available = int(lines[2].split()[1])
|
||||
return (1 - available / total) * 100
|
||||
total_kb = int(lines[0].split()[1])
|
||||
available_kb = int(lines[2].split()[1])
|
||||
used_kb = total_kb - available_kb
|
||||
percent = (used_kb / total_kb) * 100
|
||||
return percent, used_kb // 1024, total_kb // 1024
|
||||
except Exception:
|
||||
return None, None, None
|
||||
|
||||
@staticmethod
|
||||
def _get_load_average() -> Optional[str]:
|
||||
"""Get system load average."""
|
||||
try:
|
||||
with open("/proc/loadavg", "r") as f:
|
||||
parts = f.read().split()
|
||||
return f"{parts[0]} {parts[1]} {parts[2]}"
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@@ -310,13 +310,13 @@ class FlipperServer:
|
||||
|
||||
def _handle_scroll_up(self) -> None:
|
||||
"""Handle scroll up on info pages."""
|
||||
# Currently info pages don't have scrolling, but protocol supports it
|
||||
self._logger.debug("Scroll up (not implemented)")
|
||||
if self._page_manager.scroll_up():
|
||||
self._send_current_page()
|
||||
|
||||
def _handle_scroll_down(self) -> None:
|
||||
"""Handle scroll down on info pages."""
|
||||
# Currently info pages don't have scrolling, but protocol supports it
|
||||
self._logger.debug("Scroll down (not implemented)")
|
||||
if self._page_manager.scroll_down():
|
||||
self._send_current_page()
|
||||
|
||||
def _handle_refresh(self) -> None:
|
||||
"""Handle refresh request."""
|
||||
|
||||
Reference in New Issue
Block a user