Update flipper and can_sniffer

This commit is contained in:
2026-01-26 15:32:23 +03:00
parent 9044e96685
commit 9f38bbcf9d
2 changed files with 495 additions and 243 deletions

View File

@@ -1,8 +1,15 @@
"""
Flipper Zero UART Handler.
Flipper Zero UART Handler with Handshake Protocol.
Sends CAN sniffer statistics to Flipper Zero via UART.
Provides real-time monitoring on Flipper Zero display.
Waits for INIT command from Flipper Zero before sending statistics.
Supports handshake protocol for secure connection establishment.
Protocol:
1. RPI5 waits in passive mode, listening for commands
2. Flipper sends: INIT:flipper\n
3. RPI5 responds: ACK:rpi5,ip=x.x.x.x\n
4. RPI5 starts sending: STATS:ip=...,total=...,pending=...,processed=...\n
5. Flipper sends: STOP:flipper\n to disconnect
"""
import socket
@@ -26,10 +33,8 @@ def get_ip_address() -> str:
IP address string or "0.0.0.0" if unable to determine
"""
try:
# Create a socket to determine the outgoing IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0.1)
# Connect to a public address (doesn't actually send data)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
@@ -37,7 +42,6 @@ def get_ip_address() -> str:
except Exception:
pass
# Fallback: try to get any non-localhost IP
try:
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
@@ -51,15 +55,18 @@ def get_ip_address() -> str:
class FlipperHandler(BaseHandler):
"""
Handler that sends statistics to Flipper Zero via UART.
Handler that communicates with Flipper Zero via UART.
Implements handshake protocol:
- Waits for INIT:flipper command
- Responds with ACK:rpi5,ip=x.x.x.x
- Sends STATS periodically while connected
- Stops on STOP:flipper command
UART Configuration:
- Device: /dev/ttyAMA0 (or configured device)
- Baud: 115200
- Format: 8N1
Protocol:
Sends text line: STATS:ip=<ip>,total=<n>,pending=<n>,processed=<n>\n
"""
def __init__(self, enabled: Optional[bool] = None):
@@ -69,7 +76,6 @@ class FlipperHandler(BaseHandler):
Args:
enabled: Whether handler is enabled. If None, reads from config.
"""
# Check config for enabled status
if enabled is None:
enabled = getattr(config, "flipper", None) is not None
if enabled:
@@ -80,15 +86,18 @@ class FlipperHandler(BaseHandler):
self.serial_port: Optional[Any] = None
self.device = "/dev/ttyAMA0"
self.baudrate = 115200
self.send_interval = 1.0 # Send stats every 1 second
self.send_interval = 1.0
# Load config if available
if hasattr(config, "flipper"):
flipper_cfg = config.flipper
self.device = getattr(flipper_cfg, "device", self.device)
self.baudrate = getattr(flipper_cfg, "baudrate", self.baudrate)
self.send_interval = getattr(flipper_cfg, "send_interval", self.send_interval)
# Connection state
self._connected = False
self._running = False
# Statistics
self._stats_lock = threading.Lock()
self._total_frames = 0
@@ -97,17 +106,16 @@ class FlipperHandler(BaseHandler):
self._sent_count = 0
self._error_count = 0
# Background sender thread
self._sender_thread: Optional[threading.Thread] = None
self._running = False
# Threads
self._rx_thread: Optional[threading.Thread] = None
self._tx_thread: Optional[threading.Thread] = None
# IP address cache
# IP address
self._ip_address = "0.0.0.0"
self._last_ip_check = 0
def initialize(self) -> bool:
"""
Initialize UART connection to Flipper Zero.
Initialize UART connection.
Returns:
True if initialization successful
@@ -124,13 +132,12 @@ class FlipperHandler(BaseHandler):
timeout=0.1,
)
# Get initial IP address
self._ip_address = get_ip_address()
self._last_ip_check = time.time()
self._initialized = True
self.logger.info(
f"Flipper handler initialized on {self.device} @ {self.baudrate} baud"
f"Flipper handler initialized on {self.device} @ {self.baudrate} baud, "
f"IP: {self._ip_address}"
)
return True
@@ -142,71 +149,145 @@ class FlipperHandler(BaseHandler):
return False
def start(self) -> None:
"""Start the background sender thread."""
"""Start the RX listener and TX sender threads."""
if self._running:
return
self._running = True
self._sender_thread = threading.Thread(
target=self._sender_loop, name="FlipperSender", daemon=True
)
self._sender_thread.start()
self.logger.info("Flipper sender thread started")
self._connected = False
# Start RX thread (listens for commands)
self._rx_thread = threading.Thread(
target=self._rx_loop, name="FlipperRX", daemon=True
)
self._rx_thread.start()
# Start TX thread (sends stats when connected)
self._tx_thread = threading.Thread(
target=self._tx_loop, name="FlipperTX", daemon=True
)
self._tx_thread.start()
self.logger.info("Flipper handler started, waiting for connection...")
def _rx_loop(self) -> None:
"""Receive loop - listens for commands from Flipper."""
buffer = ""
def _sender_loop(self) -> None:
"""Background loop that sends stats periodically."""
while self._running:
try:
self._send_stats()
if not self.serial_port or not self.serial_port.is_open:
time.sleep(0.1)
continue
# Read available data
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
buffer += data.decode("utf-8", errors="ignore")
# Process complete lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if line:
self._process_command(line)
else:
time.sleep(0.05)
except Exception as e:
self.logger.debug(f"Error sending stats to Flipper: {e}")
self.logger.debug(f"RX error: {e}")
time.sleep(0.1)
def _process_command(self, command: str) -> None:
"""
Process received command from Flipper.
Args:
command: Received command string
"""
self.logger.info(f"Received command: {command}")
if command.startswith("INIT:"):
# Handshake initiation
client_id = command[5:].strip()
self.logger.info(f"Handshake request from: {client_id}")
# Send ACK with IP address
self._ip_address = get_ip_address()
ack_msg = f"ACK:rpi5,ip={self._ip_address}\n"
self._send_raw(ack_msg)
self._connected = True
self.logger.info(f"Connected to Flipper, IP: {self._ip_address}")
elif command.startswith("STOP:"):
# Disconnect request
client_id = command[5:].strip()
self.logger.info(f"Disconnect request from: {client_id}")
self._connected = False
self.logger.info("Disconnected from Flipper")
def _tx_loop(self) -> None:
"""Transmit loop - sends stats when connected."""
while self._running:
try:
if self._connected:
self._send_stats()
time.sleep(self.send_interval)
except Exception as e:
self.logger.debug(f"TX error: {e}")
with self._stats_lock:
self._error_count += 1
time.sleep(self.send_interval)
def _send_raw(self, message: str) -> bool:
"""
Send raw message via UART.
Args:
message: Message to send
Returns:
True if sent successfully
"""
if not self.serial_port or not self.serial_port.is_open:
return False
try:
self.serial_port.write(message.encode("utf-8"))
self.serial_port.flush()
self.logger.debug(f"TX: {message.strip()}")
return True
except Exception as e:
self.logger.debug(f"Send error: {e}")
return False
def _send_stats(self) -> None:
"""Send current statistics to Flipper Zero."""
if not self.serial_port or not self.serial_port.is_open:
if not self._connected:
return
# Refresh IP address every 60 seconds
current_time = time.time()
if current_time - self._last_ip_check > 60:
self._ip_address = get_ip_address()
self._last_ip_check = current_time
with self._stats_lock:
total = self._total_frames
pending = self._pending_frames
processed = self._processed_frames
# Build stats message
message = f"STATS:ip={self._ip_address},total={total},pending={pending},processed={processed}\n"
try:
self.serial_port.write(message.encode("utf-8"))
self.serial_port.flush()
if self._send_raw(message):
with self._stats_lock:
self._sent_count += 1
except Exception as e:
self.logger.debug(f"UART write error: {e}")
with self._stats_lock:
self._error_count += 1
def handle(self, frame: CANFrame) -> bool:
"""
Handle a single CAN frame.
Updates frame counters for statistics.
Args:
frame: CANFrame to handle
Returns:
True (always succeeds, just updates counters)
True (always succeeds)
"""
with self._stats_lock:
self._total_frames += 1
@@ -221,14 +302,12 @@ class FlipperHandler(BaseHandler):
frames: List of CANFrame objects
Returns:
Number of frames processed (all of them)
Number of frames processed
"""
count = len(frames)
with self._stats_lock:
self._total_frames += count
# After batch processing, frames are processed
self._processed_frames += count
# Reduce pending by batch count
self._pending_frames = max(0, self._pending_frames - count)
return count
@@ -236,8 +315,6 @@ class FlipperHandler(BaseHandler):
"""
Update pending frame count.
Called externally to sync with actual queue size.
Args:
pending_count: Current number of pending frames
"""
@@ -245,26 +322,30 @@ class FlipperHandler(BaseHandler):
self._pending_frames = pending_count
def flush(self) -> None:
"""Flush - send immediate stats update."""
try:
self._send_stats()
except Exception as e:
self.logger.debug(f"Error in flush: {e}")
"""Flush - send immediate stats if connected."""
if self._connected:
try:
self._send_stats()
except Exception as e:
self.logger.debug(f"Flush error: {e}")
def shutdown(self) -> None:
"""Shutdown the handler."""
self.logger.info("Shutting down Flipper handler...")
self._running = False
self._connected = False
if self._sender_thread and self._sender_thread.is_alive():
self._sender_thread.join(timeout=2.0)
# Wait for threads
if self._rx_thread and self._rx_thread.is_alive():
self._rx_thread.join(timeout=2.0)
if self._tx_thread and self._tx_thread.is_alive():
self._tx_thread.join(timeout=2.0)
# Close serial port
if self.serial_port and self.serial_port.is_open:
try:
# Send final "disconnected" message
self.serial_port.write(b"STATS:ip=---,total=0,pending=0,processed=0\n")
self.serial_port.flush()
self.serial_port.close()
except Exception as e:
self.logger.debug(f"Error closing serial port: {e}")
@@ -288,6 +369,10 @@ class FlipperHandler(BaseHandler):
"error_count": self._error_count,
"device": self.device,
"baudrate": self.baudrate,
"connected": self.serial_port.is_open if self.serial_port else False,
"connected": self._connected,
"ip_address": self._ip_address,
}
def is_connected(self) -> bool:
"""Check if Flipper is connected."""
return self._connected