add scroll up/down flipper zero

This commit is contained in:
2026-01-30 00:23:40 +03:00
parent 649a993779
commit a995b9e79a
3 changed files with 48 additions and 11 deletions

View File

@@ -5,9 +5,12 @@ from dataclasses import dataclass
from enum import Enum from enum import Enum
import socket import socket
import os import os
import logging
from .protocol import Page, PageType from .protocol import Page, PageType
_logger = logging.getLogger("obd2_client.pages")
class ActionID(Enum): class ActionID(Enum):
"""Action identifiers for menu items.""" """Action identifiers for menu items."""
@@ -31,7 +34,7 @@ class PageDefinition:
class PageManager: class PageManager:
"""Manages page content and navigation.""" """Manages page content and navigation."""
MAX_VISIBLE_LINES = 5 # Flipper display limit MAX_VISIBLE_LINES = 4 # Flipper display limit (5 lines overlap with footer)
def __init__(self): def __init__(self):
self._pages: List[PageDefinition] = [] self._pages: List[PageDefinition] = []
@@ -170,16 +173,21 @@ class PageManager:
True if scroll occurred True if scroll occurred
""" """
if self._current_index not in range(len(self._pages)): if self._current_index not in range(len(self._pages)):
_logger.debug(f"scroll_up: invalid page index {self._current_index}")
return False return False
page_def = self._pages[self._current_index] page_def = self._pages[self._current_index]
if page_def.page_type != PageType.INFO: if page_def.page_type != PageType.INFO:
_logger.debug(f"scroll_up: page type is {page_def.page_type}, not INFO")
return False return False
current_offset = self._scroll_offsets.get(self._current_index, 0) current_offset = self._scroll_offsets.get(self._current_index, 0)
_logger.debug(f"scroll_up: current_offset={current_offset}")
if current_offset > 0: if current_offset > 0:
self._scroll_offsets[self._current_index] = current_offset - 1 self._scroll_offsets[self._current_index] = current_offset - 1
_logger.debug(f"scroll_up: new offset={current_offset - 1}")
return True return True
_logger.debug("scroll_up: already at top")
return False return False
def scroll_down(self) -> bool: def scroll_down(self) -> bool:
@@ -189,20 +197,27 @@ class PageManager:
True if scroll occurred True if scroll occurred
""" """
if self._current_index not in range(len(self._pages)): if self._current_index not in range(len(self._pages)):
_logger.debug(f"scroll_down: invalid page index {self._current_index}")
return False return False
page_def = self._pages[self._current_index] page_def = self._pages[self._current_index]
if page_def.page_type != PageType.INFO: if page_def.page_type != PageType.INFO:
_logger.debug(f"scroll_down: page type is {page_def.page_type}, not INFO")
return False return False
# Get full page content to know total lines # Get full page content to know total lines
page = page_def.generator(self) page = page_def.generator(self)
max_offset = max(0, len(page.lines) - self.MAX_VISIBLE_LINES) total_lines = len(page.lines)
max_offset = max(0, total_lines - self.MAX_VISIBLE_LINES)
current_offset = self._scroll_offsets.get(self._current_index, 0) current_offset = self._scroll_offsets.get(self._current_index, 0)
_logger.debug(f"scroll_down: total_lines={total_lines}, max_offset={max_offset}, current_offset={current_offset}")
if current_offset < max_offset: if current_offset < max_offset:
self._scroll_offsets[self._current_index] = current_offset + 1 self._scroll_offsets[self._current_index] = current_offset + 1
_logger.debug(f"scroll_down: new offset={current_offset + 1}")
return True return True
_logger.debug("scroll_down: already at bottom")
return False return False
def navigate_next(self) -> bool: def navigate_next(self) -> bool:
@@ -318,6 +333,7 @@ class PageManager:
if state: if state:
# Get all values - returns dict of PID -> PIDValue # Get all values - returns dict of PID -> PIDValue
all_values = state.get_all() all_values = state.get_all()
_logger.debug(f"Live data: got {len(all_values)} PIDs from state")
if all_values: if all_values:
# Priority order for display # Priority order for display
@@ -339,8 +355,9 @@ class PageManager:
if not lines: if not lines:
lines = ["Waiting for data...", "", "Polling active"] lines = ["Waiting for data...", "", "Polling active"]
else: else:
lines = ["No connection", "to OBD2", "", "Use Actions menu", "to reconnect"] lines = ["No OBD2 connection", "", "Use Actions menu", "to reconnect"]
_logger.debug(f"Live data page: {len(lines)} lines")
return Page(PageType.INFO, "Live Data", lines) return Page(PageType.INFO, "Live Data", lines)
def _generate_stats_page(self, mgr: "PageManager") -> Page: def _generate_stats_page(self, mgr: "PageManager") -> Page:

View File

@@ -199,12 +199,13 @@ class FlipperServer:
def _handle_line(self, line: str) -> None: def _handle_line(self, line: str) -> None:
"""Handle received line.""" """Handle received line."""
self._logger.debug(f"RX raw: '{line.strip()}'")
cmd = FlipperProtocol.parse_command(line) cmd = FlipperProtocol.parse_command(line)
if cmd is None: if cmd is None:
self._logger.debug(f"Unknown command: {line.strip()}") self._logger.debug(f"Unknown command: {line.strip()}")
return return
self._logger.debug(f"Received: {cmd.cmd_type.name}") self._logger.info(f"CMD: {cmd.cmd_type.name}" + (f" param={cmd.param}" if cmd.param else ""))
# INIT can be received at any time (reconnection support) # INIT can be received at any time (reconnection support)
if cmd.cmd_type == CommandType.INIT: if cmd.cmd_type == CommandType.INIT:
@@ -310,13 +311,21 @@ class FlipperServer:
def _handle_scroll_up(self) -> None: def _handle_scroll_up(self) -> None:
"""Handle scroll up on info pages.""" """Handle scroll up on info pages."""
self._logger.debug(f"Scroll UP requested, page {self._page_manager.current_index}")
if self._page_manager.scroll_up(): if self._page_manager.scroll_up():
self._logger.debug(f"Scroll UP success, offset now {self._page_manager._scroll_offsets.get(self._page_manager.current_index, 0)}")
self._send_current_page() self._send_current_page()
else:
self._logger.debug("Scroll UP - already at top")
def _handle_scroll_down(self) -> None: def _handle_scroll_down(self) -> None:
"""Handle scroll down on info pages.""" """Handle scroll down on info pages."""
self._logger.debug(f"Scroll DOWN requested, page {self._page_manager.current_index}")
if self._page_manager.scroll_down(): if self._page_manager.scroll_down():
self._logger.debug(f"Scroll DOWN success, offset now {self._page_manager._scroll_offsets.get(self._page_manager.current_index, 0)}")
self._send_current_page() self._send_current_page()
else:
self._logger.debug("Scroll DOWN - already at bottom")
def _handle_refresh(self) -> None: def _handle_refresh(self) -> None:
"""Handle refresh request.""" """Handle refresh request."""

View File

@@ -179,24 +179,29 @@ class OBD2Client:
if monitor: if monitor:
self._monitor_loop() self._monitor_loop()
else:
# Keep running without monitor display
self._logger.info("Running without monitor (use Ctrl+C to stop)")
while self._running:
time.sleep(1.0)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally: finally:
self._shutdown() self._shutdown()
def _monitor_loop(self) -> None: def _monitor_loop(self, clear_screen: bool = True) -> None:
"""Display live values in console.""" """Display live values in console."""
print("\n" + "=" * 50) print("\n" + "=" * 50)
print(" OBD2 Live Monitor (Ctrl+C to stop)") print(" OBD2 Live Monitor (Ctrl+C to stop)")
print("=" * 50 + "\n") print("=" * 50 + "\n")
while self._running: while self._running:
if clear_screen:
self._clear_screen() self._clear_screen()
self._print_header() self._print_header()
self._print_values() self._print_values()
self._print_stats() self._print_stats()
time.sleep(0.5) time.sleep(0.5)
def _clear_screen(self) -> None: def _clear_screen(self) -> None:
@@ -314,6 +319,12 @@ def main():
help="Enable Flipper Zero server on specified serial port (e.g., /dev/serial0)", help="Enable Flipper Zero server on specified serial port (e.g., /dev/serial0)",
) )
parser.add_argument(
"--no-monitor",
action="store_true",
help="Disable live monitor display (useful with --debug)",
)
parser.add_argument( parser.add_argument(
"--debug", "--debug",
action="store_true", action="store_true",
@@ -334,7 +345,7 @@ def main():
config.can.virtual = True config.can.virtual = True
client = OBD2Client(config, flipper_port=args.flipper) client = OBD2Client(config, flipper_port=args.flipper)
client.run(scan_only=args.scan_only) client.run(scan_only=args.scan_only, monitor=not args.no_monitor)
if __name__ == "__main__": if __name__ == "__main__":