From dc7fd19022df7b135ddb1c808df4d92bcf087c57 Mon Sep 17 00:00:00 2001 From: Alexander Poletaev Date: Fri, 30 Jan 2026 12:20:34 +0300 Subject: [PATCH] add systemd services and handlers pipeline --- obd2_client/config.json.example | 60 ++ obd2_client/requirements.txt | 8 + obd2_client/src/config.py | 111 ++++ obd2_client/src/flipper/pages/__init__.py | 16 + obd2_client/src/flipper/pages/base.py | 312 +++++++++ obd2_client/src/flipper/pages/ups_status.py | 94 +++ obd2_client/src/flipper/providers/__init__.py | 10 + obd2_client/src/flipper/providers/base.py | 116 ++++ .../src/flipper/providers/ups_provider.py | 259 ++++++++ obd2_client/src/handlers/__init__.py | 18 + obd2_client/src/handlers/base.py | 156 +++++ obd2_client/src/handlers/pipeline.py | 211 +++++++ .../src/handlers/postgresql_handler.py | 427 +++++++++++++ obd2_client/src/handlers/storage_handler.py | 228 +++++++ obd2_client/src/main.py | 103 ++- obd2_client/src/storage/__init__.py | 9 + obd2_client/src/storage/storage.py | 593 ++++++++++++++++++ obd2_client/src/vehicle/poller.py | 37 +- systemd/can0-link.service | 14 + systemd/carpibord.service | 21 + systemd/install.sh | 48 ++ systemd/uninstall.sh | 27 + 22 files changed, 2875 insertions(+), 3 deletions(-) create mode 100644 obd2_client/config.json.example create mode 100644 obd2_client/src/flipper/pages/__init__.py create mode 100644 obd2_client/src/flipper/pages/base.py create mode 100644 obd2_client/src/flipper/pages/ups_status.py create mode 100644 obd2_client/src/flipper/providers/__init__.py create mode 100644 obd2_client/src/flipper/providers/base.py create mode 100644 obd2_client/src/flipper/providers/ups_provider.py create mode 100644 obd2_client/src/handlers/__init__.py create mode 100644 obd2_client/src/handlers/base.py create mode 100644 obd2_client/src/handlers/pipeline.py create mode 100644 obd2_client/src/handlers/postgresql_handler.py create mode 100644 obd2_client/src/handlers/storage_handler.py create mode 100644 obd2_client/src/storage/__init__.py create mode 100644 obd2_client/src/storage/storage.py create mode 100644 systemd/can0-link.service create mode 100644 systemd/carpibord.service create mode 100644 systemd/install.sh create mode 100644 systemd/uninstall.sh diff --git a/obd2_client/config.json.example b/obd2_client/config.json.example new file mode 100644 index 0000000..d13d85a --- /dev/null +++ b/obd2_client/config.json.example @@ -0,0 +1,60 @@ +{ + "_comment": "Carpibord OBD2 Client Configuration", + + "can": { + "interface": "can0", + "bitrate": 500000, + "virtual": false + }, + + "obd2": { + "request_id": "0x7DF", + "response_id": "0x7E8", + "timeout": 0.2 + }, + + "scan": { + "retries": 5, + "retry_delay": 1.5, + "initial_delay": 1.0 + }, + + "polling": { + "interval_fast": 0.1, + "interval_slow": 1.0, + "fast_pids": ["0x0C", "0x0D", "0x11", "0x04"], + "slow_pids": ["0x05", "0x5C", "0x0F", "0x2F", "0x10"] + }, + + "storage": { + "_comment": "Offline-first SQLite storage", + "enabled": true, + "db_path": null, + "wal_mode": true, + "retention_days": 30, + "batch_size": 50, + "flush_interval": 1.0 + }, + + "postgresql": { + "_comment": "Remote PostgreSQL sync (offline-first)", + "enabled": false, + "host": "your-postgres-server.com", + "port": 5432, + "database": "carpibord", + "user": "carpibord", + "password": "your-secure-password", + "sync_interval": 30.0, + "batch_size": 500, + "max_retries": 3, + "retry_delay": 5.0 + }, + + "flipper": { + "_comment": "Flipper Zero UART display", + "enabled": true, + "port": "/dev/serial0", + "baudrate": 115200, + "show_ups": true + } +} diff --git a/obd2_client/requirements.txt b/obd2_client/requirements.txt index 892c2f5..ae40894 100644 --- a/obd2_client/requirements.txt +++ b/obd2_client/requirements.txt @@ -1,2 +1,10 @@ +# Core dependencies python-can>=4.0.0 pyserial>=3.5 + +# PostgreSQL support (optional) +psycopg2-binary>=2.9.0 + +# UPS monitoring (optional, RPi only) +smbus2>=0.4.0 +gpiozero>=2.0.0 diff --git a/obd2_client/src/config.py b/obd2_client/src/config.py index 9dea51c..40bb313 100644 --- a/obd2_client/src/config.py +++ b/obd2_client/src/config.py @@ -44,6 +44,44 @@ class PollingConfig: slow_pids: List[int] = field(default_factory=lambda: [0x05, 0x2F, 0x5C]) +@dataclass +class StorageConfig: + """SQLite storage configuration.""" + + enabled: bool = True + db_path: Optional[str] = None # None = default location + wal_mode: bool = True + retention_days: int = 30 + batch_size: int = 50 + flush_interval: float = 1.0 + + +@dataclass +class PostgreSQLConfig: + """PostgreSQL sync configuration.""" + + enabled: bool = False + host: str = "localhost" + port: int = 5432 + database: str = "carpibord" + user: str = "carpibord" + password: str = "" + sync_interval: float = 30.0 + batch_size: int = 500 + max_retries: int = 3 + retry_delay: float = 5.0 + + +@dataclass +class FlipperConfig: + """Flipper Zero configuration.""" + + enabled: bool = False + port: str = "/dev/serial0" + baudrate: int = 115200 + show_ups: bool = True + + @dataclass class Config: """Main configuration container.""" @@ -52,6 +90,9 @@ class Config: obd2: OBD2Config = field(default_factory=OBD2Config) scan: ScanConfig = field(default_factory=ScanConfig) polling: PollingConfig = field(default_factory=PollingConfig) + storage: StorageConfig = field(default_factory=StorageConfig) + postgresql: PostgreSQLConfig = field(default_factory=PostgreSQLConfig) + flipper: FlipperConfig = field(default_factory=FlipperConfig) @classmethod def load(cls, config_path: Optional[str] = None) -> "Config": @@ -120,8 +161,52 @@ class Config: self._parse_hex(p) for p in poll_data["slow_pids"] ] + if "storage" in data: + storage_data = data["storage"] + self.storage.enabled = storage_data.get("enabled", self.storage.enabled) + self.storage.db_path = storage_data.get("db_path", self.storage.db_path) + self.storage.wal_mode = storage_data.get("wal_mode", self.storage.wal_mode) + self.storage.retention_days = storage_data.get( + "retention_days", self.storage.retention_days + ) + self.storage.batch_size = storage_data.get( + "batch_size", self.storage.batch_size + ) + self.storage.flush_interval = storage_data.get( + "flush_interval", self.storage.flush_interval + ) + + if "postgresql" in data: + pg_data = data["postgresql"] + self.postgresql.enabled = pg_data.get("enabled", self.postgresql.enabled) + self.postgresql.host = pg_data.get("host", self.postgresql.host) + self.postgresql.port = pg_data.get("port", self.postgresql.port) + self.postgresql.database = pg_data.get("database", self.postgresql.database) + self.postgresql.user = pg_data.get("user", self.postgresql.user) + self.postgresql.password = pg_data.get("password", self.postgresql.password) + self.postgresql.sync_interval = pg_data.get( + "sync_interval", self.postgresql.sync_interval + ) + self.postgresql.batch_size = pg_data.get( + "batch_size", self.postgresql.batch_size + ) + self.postgresql.max_retries = pg_data.get( + "max_retries", self.postgresql.max_retries + ) + self.postgresql.retry_delay = pg_data.get( + "retry_delay", self.postgresql.retry_delay + ) + + if "flipper" in data: + flipper_data = data["flipper"] + self.flipper.enabled = flipper_data.get("enabled", self.flipper.enabled) + self.flipper.port = flipper_data.get("port", self.flipper.port) + self.flipper.baudrate = flipper_data.get("baudrate", self.flipper.baudrate) + self.flipper.show_ups = flipper_data.get("show_ups", self.flipper.show_ups) + def _load_from_env(self) -> None: """Load configuration from environment variables.""" + # CAN config if "OBD2_CAN_INTERFACE" in os.environ: self.can.interface = os.environ["OBD2_CAN_INTERFACE"] if "OBD2_CAN_BITRATE" in os.environ: @@ -131,6 +216,32 @@ class Config: if "OBD2_TIMEOUT" in os.environ: self.obd2.timeout = float(os.environ["OBD2_TIMEOUT"]) + # Storage config + if "OBD2_STORAGE_ENABLED" in os.environ: + self.storage.enabled = os.environ["OBD2_STORAGE_ENABLED"].lower() == "true" + if "OBD2_STORAGE_PATH" in os.environ: + self.storage.db_path = os.environ["OBD2_STORAGE_PATH"] + + # PostgreSQL config + if "OBD2_PG_ENABLED" in os.environ: + self.postgresql.enabled = os.environ["OBD2_PG_ENABLED"].lower() == "true" + if "OBD2_PG_HOST" in os.environ: + self.postgresql.host = os.environ["OBD2_PG_HOST"] + if "OBD2_PG_PORT" in os.environ: + self.postgresql.port = int(os.environ["OBD2_PG_PORT"]) + if "OBD2_PG_DATABASE" in os.environ: + self.postgresql.database = os.environ["OBD2_PG_DATABASE"] + if "OBD2_PG_USER" in os.environ: + self.postgresql.user = os.environ["OBD2_PG_USER"] + if "OBD2_PG_PASSWORD" in os.environ: + self.postgresql.password = os.environ["OBD2_PG_PASSWORD"] + + # Flipper config + if "OBD2_FLIPPER_ENABLED" in os.environ: + self.flipper.enabled = os.environ["OBD2_FLIPPER_ENABLED"].lower() == "true" + if "OBD2_FLIPPER_PORT" in os.environ: + self.flipper.port = os.environ["OBD2_FLIPPER_PORT"] + @staticmethod def _parse_hex(value) -> int: """Parse hex string or int to int.""" diff --git a/obd2_client/src/flipper/pages/__init__.py b/obd2_client/src/flipper/pages/__init__.py new file mode 100644 index 0000000..b7ac79d --- /dev/null +++ b/obd2_client/src/flipper/pages/__init__.py @@ -0,0 +1,16 @@ +""" +Flipper Zero UI Pages. + +Each page represents a screen on the Flipper Zero display. +""" + +from .base import BasePage, InfoPage, MenuPage, ConfirmPage +from .ups_status import UPSStatusPage + +__all__ = [ + "BasePage", + "InfoPage", + "MenuPage", + "ConfirmPage", + "UPSStatusPage", +] diff --git a/obd2_client/src/flipper/pages/base.py b/obd2_client/src/flipper/pages/base.py new file mode 100644 index 0000000..6387b71 --- /dev/null +++ b/obd2_client/src/flipper/pages/base.py @@ -0,0 +1,312 @@ +""" +Base Page Interface for Flipper Zero UI. + +All pages must inherit from BasePage and implement required methods. +""" + +from abc import ABC, abstractmethod +from typing import Optional, Callable, List +from dataclasses import dataclass, field +from enum import Enum + + +class PageType(Enum): + """Types of pages for different UI layouts.""" + INFO = "info" + MENU = "menu" + CONFIRM = "confirm" + + +@dataclass +class PageContent: + """ + Content structure for a page to be displayed on Flipper Zero. + + Attributes: + page_type: Type of page (info, menu, confirm) + title: Page title (max ~20 chars for display) + lines: Content lines to display (max 4-5 lines) + actions: Available actions/menu items (for menu type) + selected: Currently selected item index (for menu type) + icon: Optional icon identifier for the page + """ + page_type: PageType + title: str + lines: List[str] = field(default_factory=list) + actions: List[str] = field(default_factory=list) + selected: int = 0 + icon: str = "" + + def __post_init__(self): + """Truncate title if too long.""" + if len(self.title) > 20: + self.title = self.title[:17] + "..." + + +class BasePage(ABC): + """ + Abstract base class for Flipper Zero pages. + + Each page represents a screen on Flipper Zero display. + Pages can be informational (read-only), interactive menus, + or confirmation dialogs. + + Attributes: + name: Unique identifier for the page + title: Display title (shown in header) + icon: Icon identifier for visual representation + enabled: Whether the page is active + """ + + def __init__( + self, + name: str, + title: str, + icon: str = "", + enabled: bool = True, + ): + """ + Initialize base page. + + Args: + name: Unique page identifier + title: Display title + icon: Icon identifier + enabled: Whether page is enabled + """ + self.name = name + self.title = title + self.icon = icon + self.enabled = enabled + self._selected_index = 0 + self._scroll_offset = 0 + self._pending_action: Optional[str] = None + + @abstractmethod + def get_content(self) -> PageContent: + """ + Get current page content for display. + + Returns: + PageContent object with type, title, lines, and actions + """ + pass + + def handle_select(self, index: int) -> Optional[str]: + """ + Handle menu item selection. + + Called when user presses OK on a menu item. + + Args: + index: Selected item index + + Returns: + Optional result message to display, or None + """ + self._selected_index = index + return None + + def handle_confirm(self) -> Optional[str]: + """ + Handle confirmation action. + + Called when user confirms a pending action. + + Returns: + Optional result message, or None + """ + return None + + def handle_cancel(self) -> Optional[str]: + """ + Handle cancel action. + + Called when user cancels a pending action. + + Returns: + Optional result message, or None + """ + self._pending_action = None + return None + + def handle_scroll(self, direction: str) -> None: + """ + Handle scroll up/down. + + Args: + direction: "up" or "down" + """ + if direction == "up": + self._scroll_offset = max(0, self._scroll_offset - 1) + elif direction == "down": + self._scroll_offset += 1 + + def get_selected_index(self) -> int: + """Get currently selected index for menu pages.""" + return self._selected_index + + def set_selected_index(self, index: int) -> None: + """Set selected index for menu pages.""" + self._selected_index = index + + def get_scroll_offset(self) -> int: + """Get current scroll offset.""" + return self._scroll_offset + + def reset_scroll(self) -> None: + """Reset scroll position to top.""" + self._scroll_offset = 0 + + def has_pending_action(self) -> bool: + """Check if there's a pending action requiring confirmation.""" + return self._pending_action is not None + + def get_pending_action(self) -> Optional[str]: + """Get pending action name.""" + return self._pending_action + + def is_enabled(self) -> bool: + """Check if page is enabled.""" + return self.enabled + + def on_enter(self) -> None: + """Called when user navigates to this page.""" + pass + + def on_leave(self) -> None: + """Called when user navigates away from this page.""" + pass + + +class InfoPage(BasePage): + """ + Base class for information-only pages. + + Info pages display read-only data that updates periodically. + They have no interactive elements. + """ + + def __init__( + self, + name: str, + title: str, + icon: str = "", + enabled: bool = True, + ): + super().__init__(name, title, icon, enabled) + + @abstractmethod + def get_lines(self) -> List[str]: + """ + Get content lines for display. + + Returns: + List of strings to display (max 4-5 lines) + """ + pass + + def get_content(self) -> PageContent: + """Get page content as INFO type.""" + return PageContent( + page_type=PageType.INFO, + title=self.title, + lines=self.get_lines(), + icon=self.icon, + ) + + +class MenuPage(BasePage): + """ + Base class for menu pages. + + Menu pages display selectable items that can trigger actions. + """ + + def __init__( + self, + name: str, + title: str, + icon: str = "", + enabled: bool = True, + ): + super().__init__(name, title, icon, enabled) + self._menu_items: List[tuple[str, Callable[[], Optional[str]]]] = [] + + def add_item(self, label: str, action: Callable[[], Optional[str]]) -> None: + """ + Add menu item. + + Args: + label: Display label for the item + action: Callback function when item is selected + """ + self._menu_items.append((label, action)) + + def clear_items(self) -> None: + """Clear all menu items.""" + self._menu_items.clear() + self._selected_index = 0 + + def get_content(self) -> PageContent: + """Get page content as MENU type.""" + labels = [item[0] for item in self._menu_items] + return PageContent( + page_type=PageType.MENU, + title=self.title, + lines=[], + actions=labels, + selected=self._selected_index, + icon=self.icon, + ) + + def handle_select(self, index: int) -> Optional[str]: + """Execute action for selected menu item.""" + self._selected_index = index + if 0 <= index < len(self._menu_items): + _, action = self._menu_items[index] + return action() + return None + + +class ConfirmPage(BasePage): + """ + Confirmation dialog page. + + Used to confirm dangerous actions like shutdown or reboot. + """ + + def __init__( + self, + name: str, + title: str, + message: str, + on_confirm: Callable[[], Optional[str]], + on_cancel: Optional[Callable[[], Optional[str]]] = None, + icon: str = "", + ): + super().__init__(name, title, icon, enabled=True) + self.message = message + self._on_confirm = on_confirm + self._on_cancel = on_cancel + + def get_content(self) -> PageContent: + """Get page content as CONFIRM type.""" + return PageContent( + page_type=PageType.CONFIRM, + title=self.title, + lines=[self.message], + actions=["Yes", "No"], + selected=1, # Default to "No" for safety + icon=self.icon, + ) + + def handle_confirm(self) -> Optional[str]: + """Execute confirm action.""" + return self._on_confirm() + + def handle_cancel(self) -> Optional[str]: + """Execute cancel action.""" + if self._on_cancel: + return self._on_cancel() + return None diff --git a/obd2_client/src/flipper/pages/ups_status.py b/obd2_client/src/flipper/pages/ups_status.py new file mode 100644 index 0000000..a879f74 --- /dev/null +++ b/obd2_client/src/flipper/pages/ups_status.py @@ -0,0 +1,94 @@ +""" +UPS Status Page. + +Displays X120x UPS battery and power status on Flipper Zero. +""" + +from typing import List + +from .base import InfoPage +from ..providers.ups_provider import UPSProvider + + +class UPSStatusPage(InfoPage): + """ + Page displaying UPS (X120x) status. + + Shows: + - Battery percentage and voltage + - Charging status + - Power loss indicator + - Input voltage + + Visual battery bar for quick status check. + """ + + def __init__(self): + super().__init__( + name="ups_status", + title="UPS Status", + icon="battery", + ) + self._provider = UPSProvider() + + def get_lines(self) -> List[str]: + """Get UPS status lines for display.""" + if not self._provider.is_available(): + return [ + "UPS not available", + self._provider.get_last_error() or "Check I2C connection", + ] + + # Force refresh to get fresh data + self._provider.refresh() + data = self._provider.get_data() + + # Battery bar visualization [=====] + battery_bar = self._get_battery_bar(data.capacity) + + lines = [ + f"Bat: {data.capacity:.1f}% {battery_bar}", + f"Voltage: {data.voltage:.2f}V", + ] + + # Charging/power status + if data.power_loss: + lines.append("Power: BATTERY MODE") + if data.capacity < 15: + lines.append("!! CRITICAL !!") + elif data.capacity < 25: + lines.append("! LOW BATTERY !") + else: + status = "Charging" if data.is_charging else "Full" + lines.append(f"Status: {status}") + if data.input_voltage > 0: + lines.append(f"Input: {data.input_voltage:.2f}V") + + return lines + + def _get_battery_bar(self, percent: float) -> str: + """ + Create ASCII battery bar. + + Args: + percent: Battery percentage (0-100) + + Returns: + Battery visualization string like "[==== ]" + """ + filled = int(percent / 16.67) # 6 segments + empty = 6 - filled + bar = "[" + "=" * filled + " " * empty + "]" + return bar + + def is_enabled(self) -> bool: + """Check if page should be shown.""" + return self.enabled and self._provider.is_available() + + def get_provider(self) -> UPSProvider: + """Get the UPS provider instance.""" + return self._provider + + def on_enter(self) -> None: + """Refresh data when page is shown.""" + self._provider.refresh() diff --git a/obd2_client/src/flipper/providers/__init__.py b/obd2_client/src/flipper/providers/__init__.py new file mode 100644 index 0000000..0e3ecc4 --- /dev/null +++ b/obd2_client/src/flipper/providers/__init__.py @@ -0,0 +1,10 @@ +""" +Data Providers for Flipper Zero pages. + +Providers are singleton data sources with caching. +""" + +from .base import BaseProvider +from .ups_provider import UPSProvider, UPSData + +__all__ = ["BaseProvider", "UPSProvider", "UPSData"] diff --git a/obd2_client/src/flipper/providers/base.py b/obd2_client/src/flipper/providers/base.py new file mode 100644 index 0000000..bb9edd6 --- /dev/null +++ b/obd2_client/src/flipper/providers/base.py @@ -0,0 +1,116 @@ +""" +Base Provider Interface. + +Providers are singleton data sources that pages can query. +They handle caching and thread-safe data access. +""" + +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional +import threading +import time + + +class BaseProvider(ABC): + """ + Abstract base class for data providers. + + Providers: + - Are singletons (one instance per type) + - Cache data with configurable TTL + - Are thread-safe + - Handle errors gracefully + + Attributes: + name: Provider identifier + cache_ttl: Cache time-to-live in seconds + """ + + _instances: Dict[str, "BaseProvider"] = {} + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + """Singleton pattern implementation.""" + with cls._lock: + if cls.__name__ not in cls._instances: + instance = super().__new__(cls) + cls._instances[cls.__name__] = instance + return cls._instances[cls.__name__] + + def __init__(self, name: str, cache_ttl: float = 1.0): + """ + Initialize provider. + + Args: + name: Provider identifier + cache_ttl: Cache TTL in seconds + """ + if hasattr(self, "_initialized") and self._initialized: + return + + self.name = name + self.cache_ttl = cache_ttl + self._cache: Dict[str, Any] = {} + self._cache_timestamps: Dict[str, float] = {} + self._data_lock = threading.Lock() + self._initialized = True + self._available = True + self._last_error: Optional[str] = None + + def _get_cached(self, key: str) -> Optional[Any]: + """ + Get cached value if not expired. + + Args: + key: Cache key + + Returns: + Cached value or None if expired/missing + """ + with self._data_lock: + if key not in self._cache: + return None + timestamp = self._cache_timestamps.get(key, 0) + if time.time() - timestamp > self.cache_ttl: + return None + return self._cache[key] + + def _set_cached(self, key: str, value: Any) -> None: + """ + Set cached value. + + Args: + key: Cache key + value: Value to cache + """ + with self._data_lock: + self._cache[key] = value + self._cache_timestamps[key] = time.time() + + def _clear_cache(self) -> None: + """Clear all cached values.""" + with self._data_lock: + self._cache.clear() + self._cache_timestamps.clear() + + @abstractmethod + def refresh(self) -> bool: + """ + Refresh provider data from source. + + Returns: + True if refresh successful + """ + pass + + def is_available(self) -> bool: + """Check if provider is available.""" + return self._available + + def get_last_error(self) -> Optional[str]: + """Get last error message.""" + return self._last_error + + def shutdown(self) -> None: + """Cleanup provider resources.""" + self._clear_cache() diff --git a/obd2_client/src/flipper/providers/ups_provider.py b/obd2_client/src/flipper/providers/ups_provider.py new file mode 100644 index 0000000..6d5836b --- /dev/null +++ b/obd2_client/src/flipper/providers/ups_provider.py @@ -0,0 +1,259 @@ +""" +UPS Provider for X120x UPS on Raspberry Pi. + +Reads battery status, voltage, and charging state via I2C (SMBus). +Based on https://github.com/suptronics/x120x +""" + +import struct +from typing import Optional, Tuple +from dataclasses import dataclass + +from .base import BaseProvider +from ...logger import get_logger + +logger = get_logger(__name__) + + +@dataclass +class UPSData: + """UPS status data.""" + voltage: float = 0.0 + capacity: float = 0.0 + is_charging: bool = False + power_loss: bool = False + input_voltage: float = 0.0 + + +class UPSProvider(BaseProvider): + """ + Provider for X120x UPS data. + + Reads from MAX17048 fuel gauge via I2C (address 0x36). + Also monitors power loss detection pin (GPIO 6). + + Supported UPS boards: + - X120x series (X1201, X1202, etc.) + - Other boards with MAX17048 fuel gauge + """ + + I2C_ADDRESS = 0x36 + I2C_BUS = 1 + CHG_ONOFF_PIN = 16 + PLD_PIN = 6 # Power Loss Detection + + def __init__(self): + super().__init__(name="ups", cache_ttl=2.0) + self._bus = None + self._pld_button = None + self._data = UPSData() + self._init_hardware() + + def _init_hardware(self) -> None: + """Initialize I2C and GPIO.""" + # Try to initialize I2C + try: + import smbus2 + self._bus = smbus2.SMBus(self.I2C_BUS) + self._available = True + logger.info(f"UPS I2C initialized on bus {self.I2C_BUS}") + except ImportError: + self._last_error = "smbus2 not installed" + self._available = False + logger.warning("smbus2 not installed - UPS monitoring disabled") + except PermissionError as e: + self._last_error = f"I2C permission denied: {e}" + self._available = False + logger.warning(f"I2C permission denied: {e}") + except FileNotFoundError as e: + self._last_error = f"I2C bus not found: {e}" + self._available = False + logger.debug(f"I2C bus not found (UPS not connected): {e}") + except Exception as e: + self._last_error = f"I2C init failed: {e}" + self._available = False + logger.warning(f"I2C initialization failed: {e}") + + # Try to initialize GPIO for power loss detection + try: + from gpiozero import Button + self._pld_button = Button(self.PLD_PIN) + logger.debug("GPIO for power loss detection initialized") + except ImportError: + logger.debug("gpiozero not installed - power loss detection disabled") + except Exception as e: + logger.debug(f"GPIO init failed: {e}") + + def _read_voltage_and_capacity(self) -> Tuple[float, float]: + """ + Read voltage and capacity from MAX17048 fuel gauge. + + Returns: + Tuple of (voltage, capacity) + """ + if not self._bus: + return 0.0, 0.0 + + try: + # Read voltage register (0x02) + voltage_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x02) + # Read capacity register (0x04) + capacity_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x04) + + # Swap bytes (SMBus returns LSB first, but MAX17048 is MSB first) + voltage_swapped = struct.unpack("H", voltage_read))[0] + capacity_swapped = struct.unpack("H", capacity_read))[0] + + # Convert to actual values + # Voltage: 78.125 µV per bit + voltage = voltage_swapped * 1.25 / 1000 / 16 + # Capacity: 1/256% per bit + capacity = capacity_swapped / 256 + + return voltage, min(capacity, 100.0) + + except Exception as e: + self._last_error = f"Read error: {e}" + logger.debug(f"Failed to read UPS data: {e}") + return 0.0, 0.0 + + def _get_power_loss_state(self) -> bool: + """ + Check power loss detection pin. + + Returns: + True if external power is lost (running on battery) + """ + if self._pld_button is None: + return False + + try: + # Pin is LOW when power is lost + return not self._pld_button.is_pressed + except Exception: + return False + + def _read_input_voltage(self) -> float: + """ + Read input voltage using vcgencmd. + + Returns: + Input voltage in Volts + """ + try: + from subprocess import check_output, DEVNULL + + vcgencmd_paths = [ + "/usr/bin/vcgencmd", + "/opt/vc/bin/vcgencmd", + "vcgencmd" + ] + + for vcgencmd in vcgencmd_paths: + try: + output = check_output( + [vcgencmd, "pmic_read_adc", "EXT5V_V"], + stderr=DEVNULL + ).decode("utf-8") + # Output: "EXT5V_V volt=4.9234V" + value_str = output.split("=")[1].strip().rstrip("V") + return float(value_str) + except FileNotFoundError: + continue + except (IndexError, ValueError): + continue + + return 0.0 + + except Exception: + return 0.0 + + def refresh(self) -> bool: + """ + Refresh UPS data from hardware. + + Returns: + True if refresh successful + """ + if not self._available: + return False + + try: + voltage, capacity = self._read_voltage_and_capacity() + power_loss = self._get_power_loss_state() + input_voltage = self._read_input_voltage() + + # Determine charging state + # Charging if capacity < 100% and we have external power + is_charging = capacity < 100.0 and not power_loss + + self._data = UPSData( + voltage=voltage, + capacity=capacity, + is_charging=is_charging, + power_loss=power_loss, + input_voltage=input_voltage, + ) + + self._set_cached("data", self._data) + return True + + except Exception as e: + self._last_error = str(e) + logger.debug(f"UPS refresh failed: {e}") + return False + + def get_data(self) -> UPSData: + """Get current UPS data.""" + cached = self._get_cached("data") + if cached is not None: + return cached + + self.refresh() + return self._data + + def get_voltage(self) -> float: + """Get battery voltage in Volts.""" + return self.get_data().voltage + + def get_capacity(self) -> float: + """Get battery capacity in percent (0-100).""" + return self.get_data().capacity + + def is_charging(self) -> bool: + """Check if battery is charging.""" + return self.get_data().is_charging + + def has_power_loss(self) -> bool: + """Check if external power is lost.""" + return self.get_data().power_loss + + def get_input_voltage(self) -> float: + """Get input voltage in Volts.""" + return self.get_data().input_voltage + + def get_status_string(self) -> str: + """Get human-readable status string.""" + data = self.get_data() + + if data.power_loss: + if data.capacity < 15: + return "CRITICAL" + elif data.capacity < 25: + return "LOW" + else: + return "Battery" + elif data.is_charging: + return "Charging" + else: + return "OK" + + def shutdown(self) -> None: + """Cleanup resources.""" + super().shutdown() + if self._bus: + try: + self._bus.close() + except Exception: + pass + self._bus = None diff --git a/obd2_client/src/handlers/__init__.py b/obd2_client/src/handlers/__init__.py new file mode 100644 index 0000000..359b4f4 --- /dev/null +++ b/obd2_client/src/handlers/__init__.py @@ -0,0 +1,18 @@ +""" +Handler Pipeline for OBD2 data processing. + +Provides modular, pluggable handlers for processing OBD2 readings. +""" + +from .base import BaseHandler, OBD2Reading +from .pipeline import HandlerPipeline +from .storage_handler import StorageHandler +from .postgresql_handler import PostgreSQLHandler + +__all__ = [ + "BaseHandler", + "OBD2Reading", + "HandlerPipeline", + "StorageHandler", + "PostgreSQLHandler", +] diff --git a/obd2_client/src/handlers/base.py b/obd2_client/src/handlers/base.py new file mode 100644 index 0000000..b0f4289 --- /dev/null +++ b/obd2_client/src/handlers/base.py @@ -0,0 +1,156 @@ +""" +Base Handler Interface for OBD2 data processing. + +All handlers must inherit from BaseHandler and implement required methods. +""" + +from abc import ABC, abstractmethod +from typing import List, Dict, Any, Optional +from dataclasses import dataclass +from datetime import datetime + +from ..logger import get_logger + + +@dataclass +class OBD2Reading: + """ + Single OBD2 reading for handler processing. + + Attributes: + pid: PID code (e.g., 0x0C for RPM) + name: Human-readable PID name + value: Decoded value + unit: Unit of measurement + timestamp: Reading timestamp + session_id: Optional session identifier + """ + pid: int + name: str + value: float + unit: str + timestamp: datetime + session_id: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for storage.""" + return { + "pid": self.pid, + "name": self.name, + "value": self.value, + "unit": self.unit, + "timestamp": self.timestamp.isoformat(), + "session_id": self.session_id, + } + + +class BaseHandler(ABC): + """ + Abstract base class for OBD2 data handlers. + + Handlers form a pipeline for processing OBD2 readings. + Each handler can: + - Store data locally + - Sync to remote databases + - Send notifications + - Perform analytics + + Attributes: + name: Handler identifier + enabled: Whether handler is active + """ + + def __init__(self, name: str, enabled: bool = True): + """ + Initialize handler. + + Args: + name: Handler identifier (for logging) + enabled: Whether handler is enabled + """ + self.name = name + self.enabled = enabled + self._logger = get_logger(f"handler.{name}") + self._initialized = False + + @abstractmethod + def initialize(self) -> bool: + """ + Initialize handler resources. + + Called once at startup. Should set up connections, + create tables, etc. + + Returns: + True if initialization successful + """ + pass + + @abstractmethod + def handle(self, reading: OBD2Reading) -> bool: + """ + Process a single OBD2 reading. + + Args: + reading: OBD2Reading to process + + Returns: + True if reading was accepted + """ + pass + + @abstractmethod + def handle_batch(self, readings: List[OBD2Reading]) -> int: + """ + Process multiple OBD2 readings. + + More efficient than calling handle() repeatedly. + + Args: + readings: List of readings to process + + Returns: + Number of readings successfully processed + """ + pass + + @abstractmethod + def flush(self) -> None: + """ + Flush pending data. + + Force write any buffered data to storage. + """ + pass + + @abstractmethod + def shutdown(self) -> None: + """ + Clean shutdown. + + Flush pending data and release resources. + """ + pass + + @abstractmethod + def get_stats(self) -> Dict[str, Any]: + """ + Get handler statistics. + + Returns: + Dict with handler-specific stats + """ + pass + + def is_enabled(self) -> bool: + """Check if handler is enabled.""" + return self.enabled + + def is_initialized(self) -> bool: + """Check if handler is initialized.""" + return self._initialized + + def set_enabled(self, enabled: bool) -> None: + """Enable or disable handler.""" + self.enabled = enabled + self._logger.info(f"Handler {'enabled' if enabled else 'disabled'}") diff --git a/obd2_client/src/handlers/pipeline.py b/obd2_client/src/handlers/pipeline.py new file mode 100644 index 0000000..89b3a05 --- /dev/null +++ b/obd2_client/src/handlers/pipeline.py @@ -0,0 +1,211 @@ +""" +Handler Pipeline - orchestrates multiple handlers. + +The pipeline distributes OBD2 readings to all registered handlers. +""" + +import threading +from typing import List, Dict, Any, Optional +from .base import BaseHandler, OBD2Reading +from ..logger import get_logger + + +class HandlerPipeline: + """ + Manages and orchestrates multiple OBD2 data handlers. + + The pipeline: + - Initializes all handlers at startup + - Distributes readings to enabled handlers + - Provides unified stats across handlers + - Handles graceful shutdown + + Thread-safe for concurrent access. + """ + + def __init__(self): + """Initialize the pipeline.""" + self._handlers: List[BaseHandler] = [] + self._lock = threading.RLock() + self._logger = get_logger("handler.pipeline") + self._initialized = False + self._total_readings = 0 + self._failed_readings = 0 + + def register(self, handler: BaseHandler) -> None: + """ + Register a handler in the pipeline. + + Args: + handler: Handler to add + """ + with self._lock: + self._handlers.append(handler) + self._logger.info(f"Registered handler: {handler.name}") + + def unregister(self, handler_name: str) -> bool: + """ + Remove a handler from the pipeline. + + Args: + handler_name: Name of handler to remove + + Returns: + True if handler was found and removed + """ + with self._lock: + for i, h in enumerate(self._handlers): + if h.name == handler_name: + self._handlers.pop(i) + self._logger.info(f"Unregistered handler: {handler_name}") + return True + return False + + def get_handler(self, name: str) -> Optional[BaseHandler]: + """ + Get handler by name. + + Args: + name: Handler name + + Returns: + Handler instance or None + """ + with self._lock: + for h in self._handlers: + if h.name == name: + return h + return None + + def initialize(self) -> bool: + """ + Initialize all handlers. + + Returns: + True if all handlers initialized successfully + """ + with self._lock: + success = True + for handler in self._handlers: + if handler.is_enabled(): + try: + if handler.initialize(): + self._logger.info(f"Initialized handler: {handler.name}") + else: + self._logger.error(f"Failed to initialize: {handler.name}") + success = False + except Exception as e: + self._logger.error(f"Error initializing {handler.name}: {e}") + success = False + + self._initialized = success + return success + + def handle(self, reading: OBD2Reading) -> int: + """ + Send reading to all enabled handlers. + + Args: + reading: OBD2Reading to process + + Returns: + Number of handlers that accepted the reading + """ + with self._lock: + self._total_readings += 1 + accepted = 0 + + for handler in self._handlers: + if handler.is_enabled() and handler.is_initialized(): + try: + if handler.handle(reading): + accepted += 1 + except Exception as e: + self._logger.error(f"Error in {handler.name}: {e}") + + if accepted == 0: + self._failed_readings += 1 + + return accepted + + def handle_batch(self, readings: List[OBD2Reading]) -> Dict[str, int]: + """ + Send batch of readings to all handlers. + + Args: + readings: List of readings to process + + Returns: + Dict mapping handler name to number of readings processed + """ + with self._lock: + self._total_readings += len(readings) + results = {} + + for handler in self._handlers: + if handler.is_enabled() and handler.is_initialized(): + try: + count = handler.handle_batch(readings) + results[handler.name] = count + except Exception as e: + self._logger.error(f"Error in {handler.name}: {e}") + results[handler.name] = 0 + + return results + + def flush(self) -> None: + """Flush all handlers.""" + with self._lock: + for handler in self._handlers: + if handler.is_enabled() and handler.is_initialized(): + try: + handler.flush() + except Exception as e: + self._logger.error(f"Error flushing {handler.name}: {e}") + + def shutdown(self) -> None: + """Shutdown all handlers.""" + self._logger.info("Shutting down handler pipeline...") + + with self._lock: + for handler in self._handlers: + if handler.is_initialized(): + try: + handler.shutdown() + self._logger.info(f"Shutdown handler: {handler.name}") + except Exception as e: + self._logger.error(f"Error shutting down {handler.name}: {e}") + + self._initialized = False + + def get_stats(self) -> Dict[str, Any]: + """ + Get combined statistics from all handlers. + + Returns: + Dict with pipeline and per-handler stats + """ + with self._lock: + handler_stats = {} + for handler in self._handlers: + try: + handler_stats[handler.name] = handler.get_stats() + except Exception as e: + handler_stats[handler.name] = {"error": str(e)} + + return { + "total_readings": self._total_readings, + "failed_readings": self._failed_readings, + "handler_count": len(self._handlers), + "enabled_count": sum(1 for h in self._handlers if h.is_enabled()), + "handlers": handler_stats, + } + + def get_handler_names(self) -> List[str]: + """Get list of registered handler names.""" + with self._lock: + return [h.name for h in self._handlers] + + def is_initialized(self) -> bool: + """Check if pipeline is initialized.""" + return self._initialized diff --git a/obd2_client/src/handlers/postgresql_handler.py b/obd2_client/src/handlers/postgresql_handler.py new file mode 100644 index 0000000..efcb09e --- /dev/null +++ b/obd2_client/src/handlers/postgresql_handler.py @@ -0,0 +1,427 @@ +""" +PostgreSQL Handler - syncs OBD2 data to remote PostgreSQL. + +Implements offline-first architecture: +- Data is first saved to SQLite (via StorageHandler) +- This handler periodically syncs unsynced data to PostgreSQL +- Handles connection failures gracefully +""" + +import threading +import time +from typing import Dict, Any, List, Optional +from datetime import datetime + +from .base import BaseHandler, OBD2Reading +from ..storage.storage import get_storage, Storage +from ..logger import get_logger + +logger = get_logger(__name__) + + +class PostgreSQLHandler(BaseHandler): + """ + Handler that syncs OBD2 data to PostgreSQL. + + Features: + - Background sync thread + - Automatic retry on failure + - Batch sync for efficiency + - Connection health monitoring + - Graceful degradation when offline + + Requires StorageHandler to be active. + """ + + def __init__( + self, + enabled: bool = True, + host: str = "localhost", + port: int = 5432, + database: str = "carpibord", + user: str = "carpibord", + password: str = "", + sync_interval: float = 30.0, + batch_size: int = 500, + max_retries: int = 3, + retry_delay: float = 5.0, + ): + """ + Initialize PostgreSQL handler. + + Args: + enabled: Whether handler is active + host: PostgreSQL host + port: PostgreSQL port + database: Database name + user: Database user + password: Database password + sync_interval: Seconds between sync attempts + batch_size: Number of readings per sync batch + max_retries: Max retries on failure + retry_delay: Seconds between retries + """ + super().__init__(name="postgresql", enabled=enabled) + self._host = host + self._port = port + self._database = database + self._user = user + self._password = password + self._sync_interval = sync_interval + self._batch_size = batch_size + self._max_retries = max_retries + self._retry_delay = retry_delay + + self._conn = None + self._storage: Optional[Storage] = None + self._sync_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._connected = False + self._last_sync_time: Optional[datetime] = None + self._synced_count = 0 + self._failed_count = 0 + self._last_error: Optional[str] = None + + def initialize(self) -> bool: + """Initialize the handler.""" + try: + # Get storage instance for reading unsynced data + self._storage = get_storage() + + # Test PostgreSQL connection + if not self._connect(): + logger.warning( + "PostgreSQL connection failed - will retry in background" + ) + + # Create tables if connected + if self._connected: + self._create_tables() + + # Start background sync thread + self._stop_event.clear() + self._sync_thread = threading.Thread( + target=self._sync_loop, + name="postgresql-sync", + daemon=True, + ) + self._sync_thread.start() + + self._initialized = True + logger.info( + f"PostgreSQL handler initialized: {self._host}:{self._port}/{self._database}" + ) + return True + + except Exception as e: + logger.error(f"Failed to initialize PostgreSQL handler: {e}") + self._last_error = str(e) + return False + + def _connect(self) -> bool: + """Establish connection to PostgreSQL.""" + try: + import psycopg2 + + self._conn = psycopg2.connect( + host=self._host, + port=self._port, + database=self._database, + user=self._user, + password=self._password, + connect_timeout=10, + ) + self._conn.autocommit = False + self._connected = True + self._last_error = None + logger.info("Connected to PostgreSQL") + return True + + except ImportError: + self._last_error = "psycopg2 not installed" + logger.error("psycopg2 not installed - run: pip install psycopg2-binary") + return False + except Exception as e: + self._last_error = str(e) + self._connected = False + logger.warning(f"PostgreSQL connection failed: {e}") + return False + + def _disconnect(self) -> None: + """Close PostgreSQL connection.""" + if self._conn: + try: + self._conn.close() + except Exception: + pass + self._conn = None + self._connected = False + + def _create_tables(self) -> None: + """Create tables in PostgreSQL if they don't exist.""" + if not self._conn: + return + + try: + with self._conn.cursor() as cur: + cur.execute(""" + CREATE TABLE IF NOT EXISTS obd2_readings ( + id SERIAL PRIMARY KEY, + device_id TEXT NOT NULL DEFAULT 'rpi5', + session_id TEXT, + pid INTEGER NOT NULL, + name TEXT NOT NULL, + value REAL NOT NULL, + unit TEXT NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_pg_readings_device + ON obd2_readings(device_id); + CREATE INDEX IF NOT EXISTS idx_pg_readings_session + ON obd2_readings(session_id); + CREATE INDEX IF NOT EXISTS idx_pg_readings_timestamp + ON obd2_readings(timestamp); + CREATE INDEX IF NOT EXISTS idx_pg_readings_pid + ON obd2_readings(pid); + + CREATE TABLE IF NOT EXISTS obd2_sessions ( + id TEXT PRIMARY KEY, + device_id TEXT NOT NULL DEFAULT 'rpi5', + start_time TIMESTAMPTZ NOT NULL, + end_time TIMESTAMPTZ, + duration_seconds REAL, + reading_count INTEGER, + avg_speed REAL, + max_speed REAL, + avg_rpm REAL, + max_rpm REAL, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ); + """) + self._conn.commit() + logger.info("PostgreSQL tables created") + + except Exception as e: + self._conn.rollback() + logger.error(f"Failed to create tables: {e}") + + def _sync_loop(self) -> None: + """Background sync loop.""" + while not self._stop_event.is_set(): + try: + if self.enabled: + self._do_sync() + except Exception as e: + logger.error(f"Sync error: {e}") + self._last_error = str(e) + + # Wait for next sync interval + self._stop_event.wait(self._sync_interval) + + def _do_sync(self) -> None: + """Perform sync operation.""" + if not self._storage: + return + + # Reconnect if needed + if not self._connected: + if not self._connect(): + return + self._create_tables() + + # Sync readings + synced_readings = self._sync_readings() + + # Sync sessions + synced_sessions = self._sync_sessions() + + if synced_readings > 0 or synced_sessions > 0: + self._last_sync_time = datetime.now() + logger.info( + f"Synced {synced_readings} readings, {synced_sessions} sessions" + ) + + def _sync_readings(self) -> int: + """Sync unsynced readings to PostgreSQL.""" + if not self._conn or not self._storage: + return 0 + + readings = self._storage.get_unsynced_readings(limit=self._batch_size) + if not readings: + return 0 + + synced_ids = [] + retries = 0 + + while retries < self._max_retries: + try: + with self._conn.cursor() as cur: + for reading in readings: + cur.execute( + """INSERT INTO obd2_readings + (session_id, pid, name, value, unit, timestamp) + VALUES (%s, %s, %s, %s, %s, %s)""", + ( + reading["session_id"], + reading["pid"], + reading["name"], + reading["value"], + reading["unit"], + reading["timestamp"], + ), + ) + synced_ids.append(reading["id"]) + + self._conn.commit() + break + + except Exception as e: + self._conn.rollback() + retries += 1 + self._last_error = str(e) + logger.warning(f"Sync failed (attempt {retries}): {e}") + + if retries < self._max_retries: + time.sleep(self._retry_delay) + self._disconnect() + self._connect() + else: + self._failed_count += len(readings) + return 0 + + # Mark as synced in SQLite + if synced_ids: + self._storage.mark_readings_synced(synced_ids) + self._synced_count += len(synced_ids) + + return len(synced_ids) + + def _sync_sessions(self) -> int: + """Sync unsynced sessions to PostgreSQL.""" + if not self._conn or not self._storage: + return 0 + + sessions = self._storage.get_unsynced_sessions() + if not sessions: + return 0 + + synced_ids = [] + + try: + with self._conn.cursor() as cur: + for session in sessions: + cur.execute( + """INSERT INTO obd2_sessions + (id, start_time, end_time, duration_seconds, + reading_count, avg_speed, max_speed, avg_rpm, max_rpm) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (id) DO UPDATE SET + end_time = EXCLUDED.end_time, + duration_seconds = EXCLUDED.duration_seconds, + reading_count = EXCLUDED.reading_count, + avg_speed = EXCLUDED.avg_speed, + max_speed = EXCLUDED.max_speed, + avg_rpm = EXCLUDED.avg_rpm, + max_rpm = EXCLUDED.max_rpm""", + ( + session["id"], + session["start_time"], + session["end_time"], + session["duration_seconds"], + session["reading_count"], + session["avg_speed"], + session["max_speed"], + session["avg_rpm"], + session["max_rpm"], + ), + ) + synced_ids.append(session["id"]) + + self._conn.commit() + + if synced_ids: + self._storage.mark_sessions_synced(synced_ids) + + except Exception as e: + self._conn.rollback() + logger.error(f"Session sync failed: {e}") + return 0 + + return len(synced_ids) + + def handle(self, reading: OBD2Reading) -> bool: + """ + Handle reading - this handler doesn't process individual readings. + + Data is synced from SQLite in batches by background thread. + """ + # PostgreSQL handler syncs from SQLite, not directly + return True + + def handle_batch(self, readings: List[OBD2Reading]) -> int: + """Handle batch - syncs happen in background.""" + return len(readings) + + def flush(self) -> None: + """Force immediate sync.""" + if self._initialized and self.enabled: + self._do_sync() + + def shutdown(self) -> None: + """Shutdown the handler.""" + logger.info("Shutting down PostgreSQL handler...") + + # Stop sync thread + self._stop_event.set() + if self._sync_thread and self._sync_thread.is_alive(): + self._sync_thread.join(timeout=5.0) + + # Final sync attempt + if self._connected: + try: + self._do_sync() + except Exception: + pass + + self._disconnect() + + logger.info( + f"PostgreSQL handler shutdown: synced={self._synced_count}, " + f"failed={self._failed_count}" + ) + self._initialized = False + + def get_stats(self) -> Dict[str, Any]: + """Get handler statistics.""" + return { + "name": self.name, + "enabled": self.enabled, + "initialized": self._initialized, + "connected": self._connected, + "host": self._host, + "port": self._port, + "database": self._database, + "synced_count": self._synced_count, + "failed_count": self._failed_count, + "last_sync_time": ( + self._last_sync_time.isoformat() if self._last_sync_time else None + ), + "last_error": self._last_error, + "sync_interval": self._sync_interval, + "batch_size": self._batch_size, + } + + def is_connected(self) -> bool: + """Check if connected to PostgreSQL.""" + return self._connected + + def force_sync(self) -> int: + """Force immediate sync and return count.""" + if not self._initialized: + return 0 + + before = self._synced_count + self._do_sync() + return self._synced_count - before diff --git a/obd2_client/src/handlers/storage_handler.py b/obd2_client/src/handlers/storage_handler.py new file mode 100644 index 0000000..b505656 --- /dev/null +++ b/obd2_client/src/handlers/storage_handler.py @@ -0,0 +1,228 @@ +""" +Storage Handler - persists OBD2 readings to SQLite. + +Implements batching for efficient database writes. +""" + +import threading +import time +from typing import Dict, Any, List, Optional +from datetime import datetime + +from .base import BaseHandler, OBD2Reading +from ..storage.storage import get_storage, Storage +from ..logger import get_logger + +logger = get_logger(__name__) + + +class StorageHandler(BaseHandler): + """ + Handler that saves OBD2 readings to SQLite. + + Features: + - Configurable batch size for efficient writes + - Automatic flush on interval + - Session management integration + - Thread-safe batching + """ + + def __init__( + self, + enabled: bool = True, + batch_size: int = 50, + flush_interval: float = 1.0, + db_path: Optional[str] = None, + auto_session: bool = True, + ): + """ + Initialize storage handler. + + Args: + enabled: Whether handler is active + batch_size: Number of readings to batch before write + flush_interval: Maximum time before flushing batch (seconds) + db_path: Path to SQLite database + auto_session: Automatically start session on first reading + """ + super().__init__(name="storage", enabled=enabled) + self._storage: Optional[Storage] = None + self._batch_size = batch_size + self._flush_interval = flush_interval + self._db_path = db_path + self._auto_session = auto_session + self._batch: List[Dict[str, Any]] = [] + self._batch_lock = threading.Lock() + self._last_flush_time = time.time() + self._saved_count = 0 + self._batch_count = 0 + self._session_started = False + + def initialize(self) -> bool: + """Initialize the handler.""" + try: + self._storage = get_storage(db_path=self._db_path) + self._initialized = True + logger.info( + f"Storage handler initialized: batch_size={self._batch_size}, " + f"flush_interval={self._flush_interval}s" + ) + return True + except Exception as e: + logger.error(f"Failed to initialize storage handler: {e}") + return False + + def _ensure_session(self) -> None: + """Ensure a session is started.""" + if self._auto_session and not self._session_started: + if self._storage and not self._storage.get_current_session_id(): + self._storage.start_session() + self._session_started = True + + def handle(self, reading: OBD2Reading) -> bool: + """ + Add reading to batch. + + Args: + reading: OBD2Reading to save + + Returns: + True if reading was accepted + """ + if not self._initialized or not self.enabled: + return False + + self._ensure_session() + + with self._batch_lock: + self._batch.append({ + "pid": reading.pid, + "name": reading.name, + "value": reading.value, + "unit": reading.unit, + "timestamp": reading.timestamp.isoformat(), + "session_id": reading.session_id, + }) + + should_flush = ( + len(self._batch) >= self._batch_size or + (time.time() - self._last_flush_time) >= self._flush_interval + ) + + if should_flush: + self._do_flush() + + return True + + def handle_batch(self, readings: List[OBD2Reading]) -> int: + """ + Add multiple readings to batch. + + Args: + readings: List of OBD2Reading objects + + Returns: + Number of readings accepted + """ + if not self._initialized or not self.enabled: + return 0 + + self._ensure_session() + + with self._batch_lock: + for reading in readings: + self._batch.append({ + "pid": reading.pid, + "name": reading.name, + "value": reading.value, + "unit": reading.unit, + "timestamp": reading.timestamp.isoformat(), + "session_id": reading.session_id, + }) + + should_flush = ( + len(self._batch) >= self._batch_size or + (time.time() - self._last_flush_time) >= self._flush_interval + ) + + if should_flush: + self._do_flush() + + return len(readings) + + def _do_flush(self) -> None: + """Internal flush - must be called with lock held.""" + if not self._storage or not self._batch: + return + + batch_to_save = self._batch + self._batch = [] + self._last_flush_time = time.time() + + try: + saved = self._storage.save_readings_batch(batch_to_save) + self._saved_count += saved + self._batch_count += 1 + logger.debug(f"Flushed {saved} readings (batch #{self._batch_count})") + except Exception as e: + logger.error(f"Failed to flush readings: {e}") + # Put back failed batch + self._batch = batch_to_save + self._batch + + def flush(self) -> None: + """Flush pending readings to database.""" + with self._batch_lock: + self._do_flush() + + def shutdown(self) -> None: + """Shutdown the handler.""" + self.flush() + + if self._storage and self._session_started: + self._storage.end_session() + + logger.info( + f"Storage handler shutdown: saved={self._saved_count}, " + f"batches={self._batch_count}" + ) + self._initialized = False + + def get_stats(self) -> Dict[str, Any]: + """Get handler statistics.""" + storage_stats = {} + if self._storage: + storage_stats = self._storage.get_stats() + + with self._batch_lock: + pending = len(self._batch) + + return { + "name": self.name, + "enabled": self.enabled, + "initialized": self._initialized, + "saved_count": self._saved_count, + "batch_count": self._batch_count, + "pending_in_batch": pending, + "batch_size": self._batch_size, + "flush_interval": self._flush_interval, + **storage_stats, + } + + @property + def storage(self) -> Optional[Storage]: + """Get the Storage instance.""" + return self._storage + + def start_session(self) -> Optional[str]: + """Manually start a new session.""" + if self._storage: + session_id = self._storage.start_session() + self._session_started = True + return session_id + return None + + def end_session(self) -> None: + """Manually end current session.""" + if self._storage: + self._storage.end_session() + self._session_started = False diff --git a/obd2_client/src/main.py b/obd2_client/src/main.py index f9c2a72..77fedba 100644 --- a/obd2_client/src/main.py +++ b/obd2_client/src/main.py @@ -5,6 +5,7 @@ import argparse import signal import sys import time +from datetime import datetime from pathlib import Path from .config import Config @@ -16,6 +17,8 @@ from .vehicle.state import VehicleState from .vehicle.poller import VehiclePoller from .flipper.server import FlipperServer from .flipper.pages import ActionID +from .handlers import HandlerPipeline, StorageHandler, PostgreSQLHandler, OBD2Reading +from .flipper.pages.ups_status import UPSStatusPage class OBD2Client: @@ -54,12 +57,73 @@ class OBD2Client: slow_pids=config.polling.slow_pids, ) + # Handler Pipeline for data processing + self.pipeline = HandlerPipeline() + self._setup_handlers() + + # Connect poller to pipeline + if self.pipeline.get_handler_names(): + self.poller.add_reading_callback(self._on_reading) + # Flipper Zero server self.flipper_server = None - if flipper_port: - self.flipper_server = FlipperServer(port=flipper_port) + if flipper_port or config.flipper.enabled: + port = flipper_port or config.flipper.port + self.flipper_server = FlipperServer(port=port) self._setup_flipper_integration() + def _setup_handlers(self) -> None: + """Set up data processing handlers.""" + # Storage handler (SQLite, offline-first) + if self.config.storage.enabled: + storage_handler = StorageHandler( + enabled=True, + batch_size=self.config.storage.batch_size, + flush_interval=self.config.storage.flush_interval, + db_path=self.config.storage.db_path, + auto_session=True, + ) + self.pipeline.register(storage_handler) + self._logger.info("Storage handler registered") + + # PostgreSQL handler (sync from SQLite) + if self.config.postgresql.enabled: + pg_handler = PostgreSQLHandler( + enabled=True, + host=self.config.postgresql.host, + port=self.config.postgresql.port, + database=self.config.postgresql.database, + user=self.config.postgresql.user, + password=self.config.postgresql.password, + sync_interval=self.config.postgresql.sync_interval, + batch_size=self.config.postgresql.batch_size, + max_retries=self.config.postgresql.max_retries, + retry_delay=self.config.postgresql.retry_delay, + ) + self.pipeline.register(pg_handler) + self._logger.info("PostgreSQL handler registered") + + def _on_reading( + self, + pid: int, + name: str, + value: float, + unit: str, + timestamp: datetime, + ) -> None: + """Handle new reading from poller - send to pipeline.""" + if not self.pipeline.is_initialized(): + return + + reading = OBD2Reading( + pid=pid, + name=name, + value=value, + unit=unit, + timestamp=timestamp, + ) + self.pipeline.handle(reading) + def _setup_flipper_integration(self) -> None: """Set up Flipper Zero data providers and action handlers.""" pm = self.flipper_server.page_manager @@ -69,9 +133,34 @@ class OBD2Client: pm.set_data_provider("poller_stats", lambda: self.poller.get_stats()) pm.set_data_provider("uptime", lambda: time.time() - self._start_time) pm.set_data_provider("can_interface", lambda: self.config.can.interface) + pm.set_data_provider("pipeline_stats", lambda: self.pipeline.get_stats()) + + # Add UPS status page if enabled + if self.config.flipper.show_ups: + try: + ups_page = UPSStatusPage() + if ups_page.is_enabled(): + pm.add_page(ups_page) + self._logger.info("UPS status page added") + except Exception as e: + self._logger.debug(f"UPS page not available: {e}") # Action handlers pm.set_action_handler(ActionID.RECONNECT_OBD, self._action_reconnect_obd) + pm.set_action_handler("force_sync", self._action_force_sync) + + def _action_force_sync(self) -> bool: + """Force sync to PostgreSQL.""" + pg_handler = self.pipeline.get_handler("postgresql") + if pg_handler and hasattr(pg_handler, "force_sync"): + try: + synced = pg_handler.force_sync() + self._logger.info(f"Force sync: {synced} readings synced") + return synced > 0 + except Exception as e: + self._logger.error(f"Force sync failed: {e}") + return False + return False def _action_reconnect_obd(self) -> bool: """Reconnect to OBD2 and rescan PIDs.""" @@ -126,6 +215,11 @@ class OBD2Client: signal.signal(signal.SIGTERM, self._signal_handler) try: + # Initialize handler pipeline + if self.pipeline.get_handler_names(): + self._logger.info("Initializing handler pipeline...") + self.pipeline.initialize() + # Start Flipper server (if configured) if self.flipper_server: self._logger.info("Starting Flipper Zero server...") @@ -275,6 +369,11 @@ class OBD2Client: if self.poller.is_running: self.poller.stop() + # Shutdown handler pipeline (flushes pending data) + if self.pipeline.is_initialized(): + self._logger.info("Shutting down handler pipeline...") + self.pipeline.shutdown() + if self.can_interface.is_connected: self.can_interface.disconnect() diff --git a/obd2_client/src/storage/__init__.py b/obd2_client/src/storage/__init__.py new file mode 100644 index 0000000..a68fa4c --- /dev/null +++ b/obd2_client/src/storage/__init__.py @@ -0,0 +1,9 @@ +""" +Storage module for offline-first data persistence. + +Provides SQLite local storage with PostgreSQL synchronization. +""" + +from .storage import Storage, get_storage + +__all__ = ["Storage", "get_storage"] diff --git a/obd2_client/src/storage/storage.py b/obd2_client/src/storage/storage.py new file mode 100644 index 0000000..df9bc91 --- /dev/null +++ b/obd2_client/src/storage/storage.py @@ -0,0 +1,593 @@ +""" +SQLite Storage for OBD2 readings. + +Provides offline-first local storage with sync support. +Uses WAL mode for better concurrent performance. +""" + +import sqlite3 +import threading +import time +from dataclasses import dataclass +from datetime import datetime, timedelta +from pathlib import Path +from typing import List, Dict, Any, Optional, Tuple +from contextlib import contextmanager + +from ..logger import get_logger + +logger = get_logger(__name__) + + +@dataclass +class SessionSummary: + """Summary of a driving session.""" + session_id: str + start_time: datetime + end_time: Optional[datetime] + duration_seconds: float + reading_count: int + avg_speed: Optional[float] + max_speed: Optional[float] + avg_rpm: Optional[float] + max_rpm: Optional[float] + + +class Storage: + """ + SQLite storage for OBD2 readings. + + Features: + - WAL mode for concurrent access + - Session management + - Sync status tracking for PostgreSQL + - Automatic cleanup of old synced data + - Thread-safe operations + + Tables: + - obd2_readings: Individual PID readings + - obd2_sessions: Driving sessions + - obd2_aggregated: Hourly aggregates + """ + + _instance: Optional["Storage"] = None + _lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + """Singleton pattern.""" + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__( + self, + db_path: Optional[str] = None, + wal_mode: bool = True, + retention_days: int = 30, + ): + """ + Initialize storage. + + Args: + db_path: Path to SQLite database file + wal_mode: Enable WAL mode for better concurrency + retention_days: Days to keep synced readings + """ + if hasattr(self, "_initialized") and self._initialized: + return + + if db_path is None: + db_path = str(Path(__file__).parent.parent.parent / "obd2_data.db") + + self.db_path = db_path + self.wal_mode = wal_mode + self.retention_days = retention_days + self._write_lock = threading.Lock() + self._local = threading.local() + self._current_session_id: Optional[str] = None + self._initialized = False + + self._init_database() + self._initialized = True + + def _get_connection(self) -> sqlite3.Connection: + """Get thread-local database connection.""" + if not hasattr(self._local, "conn") or self._local.conn is None: + self._local.conn = sqlite3.connect( + self.db_path, + check_same_thread=False, + timeout=30.0, + ) + self._local.conn.row_factory = sqlite3.Row + if self.wal_mode: + self._local.conn.execute("PRAGMA journal_mode=WAL") + self._local.conn.execute("PRAGMA foreign_keys=ON") + return self._local.conn + + @contextmanager + def _transaction(self): + """Context manager for write transactions.""" + conn = self._get_connection() + with self._write_lock: + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + + def _init_database(self) -> None: + """Create database tables if they don't exist.""" + conn = self._get_connection() + + conn.executescript(""" + CREATE TABLE IF NOT EXISTS obd2_sessions ( + id TEXT PRIMARY KEY, + start_time TEXT NOT NULL, + end_time TEXT, + duration_seconds REAL, + reading_count INTEGER DEFAULT 0, + avg_speed REAL, + max_speed REAL, + avg_rpm REAL, + max_rpm REAL, + synced INTEGER DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS obd2_readings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT, + pid INTEGER NOT NULL, + name TEXT NOT NULL, + value REAL NOT NULL, + unit TEXT NOT NULL, + timestamp TEXT NOT NULL, + synced INTEGER DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (session_id) REFERENCES obd2_sessions(id) + ); + + CREATE INDEX IF NOT EXISTS idx_readings_session + ON obd2_readings(session_id); + CREATE INDEX IF NOT EXISTS idx_readings_timestamp + ON obd2_readings(timestamp); + CREATE INDEX IF NOT EXISTS idx_readings_synced + ON obd2_readings(synced); + CREATE INDEX IF NOT EXISTS idx_readings_pid + ON obd2_readings(pid); + + CREATE TABLE IF NOT EXISTS obd2_aggregated ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + hour TEXT NOT NULL, + pid INTEGER NOT NULL, + name TEXT NOT NULL, + unit TEXT NOT NULL, + min_value REAL, + max_value REAL, + avg_value REAL, + count INTEGER, + synced INTEGER DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + UNIQUE(hour, pid) + ); + + CREATE INDEX IF NOT EXISTS idx_aggregated_hour + ON obd2_aggregated(hour); + CREATE INDEX IF NOT EXISTS idx_aggregated_synced + ON obd2_aggregated(synced); + """) + conn.commit() + + logger.info(f"Database initialized: {self.db_path}") + + # Session Management + + def start_session(self) -> str: + """ + Start a new driving session. + + Returns: + Session ID + """ + session_id = datetime.now().strftime("%Y%m%d_%H%M%S") + start_time = datetime.now().isoformat() + + with self._transaction() as conn: + conn.execute( + "INSERT INTO obd2_sessions (id, start_time) VALUES (?, ?)", + (session_id, start_time), + ) + + self._current_session_id = session_id + logger.info(f"Started session: {session_id}") + return session_id + + def end_session(self, session_id: Optional[str] = None) -> Optional[SessionSummary]: + """ + End a driving session and calculate summary. + + Args: + session_id: Session to end (default: current) + + Returns: + Session summary or None + """ + session_id = session_id or self._current_session_id + if not session_id: + return None + + end_time = datetime.now() + + with self._transaction() as conn: + # Get session start time + row = conn.execute( + "SELECT start_time FROM obd2_sessions WHERE id = ?", + (session_id,), + ).fetchone() + + if not row: + return None + + start_time = datetime.fromisoformat(row["start_time"]) + duration = (end_time - start_time).total_seconds() + + # Calculate stats from readings + stats = conn.execute(""" + SELECT + COUNT(*) as count, + AVG(CASE WHEN pid = 13 THEN value END) as avg_speed, + MAX(CASE WHEN pid = 13 THEN value END) as max_speed, + AVG(CASE WHEN pid = 12 THEN value END) as avg_rpm, + MAX(CASE WHEN pid = 12 THEN value END) as max_rpm + FROM obd2_readings + WHERE session_id = ? + """, (session_id,)).fetchone() + + conn.execute(""" + UPDATE obd2_sessions SET + end_time = ?, + duration_seconds = ?, + reading_count = ?, + avg_speed = ?, + max_speed = ?, + avg_rpm = ?, + max_rpm = ? + WHERE id = ? + """, ( + end_time.isoformat(), + duration, + stats["count"], + stats["avg_speed"], + stats["max_speed"], + stats["avg_rpm"], + stats["max_rpm"], + session_id, + )) + + if session_id == self._current_session_id: + self._current_session_id = None + + logger.info(f"Ended session: {session_id}, {stats['count']} readings") + + return SessionSummary( + session_id=session_id, + start_time=start_time, + end_time=end_time, + duration_seconds=duration, + reading_count=stats["count"], + avg_speed=stats["avg_speed"], + max_speed=stats["max_speed"], + avg_rpm=stats["avg_rpm"], + max_rpm=stats["max_rpm"], + ) + + def get_current_session_id(self) -> Optional[str]: + """Get current session ID.""" + return self._current_session_id + + # Reading Storage + + def save_reading( + self, + pid: int, + name: str, + value: float, + unit: str, + timestamp: Optional[datetime] = None, + session_id: Optional[str] = None, + ) -> int: + """ + Save a single OBD2 reading. + + Args: + pid: PID code + name: PID name + value: Decoded value + unit: Unit of measurement + timestamp: Reading time (default: now) + session_id: Session ID (default: current) + + Returns: + Row ID of inserted reading + """ + if timestamp is None: + timestamp = datetime.now() + if session_id is None: + session_id = self._current_session_id + + with self._transaction() as conn: + cursor = conn.execute( + """INSERT INTO obd2_readings + (session_id, pid, name, value, unit, timestamp) + VALUES (?, ?, ?, ?, ?, ?)""", + (session_id, pid, name, value, unit, timestamp.isoformat()), + ) + return cursor.lastrowid + + def save_readings_batch(self, readings: List[Dict[str, Any]]) -> int: + """ + Save multiple readings in a batch. + + Args: + readings: List of reading dicts with keys: + pid, name, value, unit, timestamp, session_id + + Returns: + Number of readings saved + """ + if not readings: + return 0 + + session_id = self._current_session_id + + with self._transaction() as conn: + cursor = conn.executemany( + """INSERT INTO obd2_readings + (session_id, pid, name, value, unit, timestamp) + VALUES (?, ?, ?, ?, ?, ?)""", + [ + ( + r.get("session_id", session_id), + r["pid"], + r["name"], + r["value"], + r["unit"], + r.get("timestamp", datetime.now().isoformat()), + ) + for r in readings + ], + ) + return cursor.rowcount + + def get_readings( + self, + session_id: Optional[str] = None, + pid: Optional[int] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + limit: int = 1000, + ) -> List[Dict[str, Any]]: + """ + Get readings with optional filters. + + Args: + session_id: Filter by session + pid: Filter by PID + start_time: Filter by start time + end_time: Filter by end time + limit: Maximum number of readings + + Returns: + List of reading dicts + """ + conn = self._get_connection() + query = "SELECT * FROM obd2_readings WHERE 1=1" + params = [] + + if session_id: + query += " AND session_id = ?" + params.append(session_id) + if pid is not None: + query += " AND pid = ?" + params.append(pid) + if start_time: + query += " AND timestamp >= ?" + params.append(start_time.isoformat()) + if end_time: + query += " AND timestamp <= ?" + params.append(end_time.isoformat()) + + query += " ORDER BY timestamp DESC LIMIT ?" + params.append(limit) + + rows = conn.execute(query, params).fetchall() + return [dict(row) for row in rows] + + # Sync Support + + def get_unsynced_readings(self, limit: int = 500) -> List[Dict[str, Any]]: + """ + Get readings not yet synced to PostgreSQL. + + Args: + limit: Maximum number of readings + + Returns: + List of unsynced readings + """ + conn = self._get_connection() + rows = conn.execute( + """SELECT * FROM obd2_readings + WHERE synced = 0 + ORDER BY timestamp + LIMIT ?""", + (limit,), + ).fetchall() + return [dict(row) for row in rows] + + def mark_readings_synced(self, reading_ids: List[int]) -> int: + """ + Mark readings as synced. + + Args: + reading_ids: IDs of synced readings + + Returns: + Number of readings updated + """ + if not reading_ids: + return 0 + + placeholders = ",".join("?" * len(reading_ids)) + with self._transaction() as conn: + cursor = conn.execute( + f"UPDATE obd2_readings SET synced = 1 WHERE id IN ({placeholders})", + reading_ids, + ) + return cursor.rowcount + + def get_unsynced_sessions(self, limit: int = 50) -> List[Dict[str, Any]]: + """Get sessions not yet synced.""" + conn = self._get_connection() + rows = conn.execute( + """SELECT * FROM obd2_sessions + WHERE synced = 0 AND end_time IS NOT NULL + ORDER BY start_time + LIMIT ?""", + (limit,), + ).fetchall() + return [dict(row) for row in rows] + + def mark_sessions_synced(self, session_ids: List[str]) -> int: + """Mark sessions as synced.""" + if not session_ids: + return 0 + + placeholders = ",".join("?" * len(session_ids)) + with self._transaction() as conn: + cursor = conn.execute( + f"UPDATE obd2_sessions SET synced = 1 WHERE id IN ({placeholders})", + session_ids, + ) + return cursor.rowcount + + # Aggregation & Cleanup + + def aggregate_hour(self, hour: Optional[datetime] = None) -> int: + """ + Create hourly aggregates from readings. + + Args: + hour: Hour to aggregate (default: previous hour) + + Returns: + Number of aggregates created + """ + if hour is None: + hour = datetime.now().replace(minute=0, second=0, microsecond=0) + hour = hour - timedelta(hours=1) + + hour_str = hour.strftime("%Y-%m-%d %H:00:00") + next_hour = (hour + timedelta(hours=1)).strftime("%Y-%m-%d %H:00:00") + + with self._transaction() as conn: + cursor = conn.execute(""" + INSERT OR REPLACE INTO obd2_aggregated + (hour, pid, name, unit, min_value, max_value, avg_value, count) + SELECT + ?, + pid, + name, + unit, + MIN(value), + MAX(value), + AVG(value), + COUNT(*) + FROM obd2_readings + WHERE timestamp >= ? AND timestamp < ? + GROUP BY pid + """, (hour_str, hour_str, next_hour)) + return cursor.rowcount + + def cleanup_old_readings(self, days: Optional[int] = None) -> int: + """ + Delete old synced readings. + + Args: + days: Retention period (default: from config) + + Returns: + Number of readings deleted + """ + days = days or self.retention_days + cutoff = (datetime.now() - timedelta(days=days)).isoformat() + + with self._transaction() as conn: + cursor = conn.execute( + "DELETE FROM obd2_readings WHERE synced = 1 AND timestamp < ?", + (cutoff,), + ) + deleted = cursor.rowcount + + if deleted > 0: + logger.info(f"Cleaned up {deleted} old readings") + + return deleted + + # Statistics + + def get_stats(self) -> Dict[str, Any]: + """Get storage statistics.""" + conn = self._get_connection() + + total = conn.execute("SELECT COUNT(*) FROM obd2_readings").fetchone()[0] + unsynced = conn.execute( + "SELECT COUNT(*) FROM obd2_readings WHERE synced = 0" + ).fetchone()[0] + sessions = conn.execute("SELECT COUNT(*) FROM obd2_sessions").fetchone()[0] + + db_size = Path(self.db_path).stat().st_size if Path(self.db_path).exists() else 0 + + return { + "total_readings": total, + "unsynced_readings": unsynced, + "synced_readings": total - unsynced, + "total_sessions": sessions, + "current_session": self._current_session_id, + "db_size_mb": round(db_size / 1024 / 1024, 2), + "db_path": self.db_path, + } + + def close(self) -> None: + """Close database connection.""" + if hasattr(self._local, "conn") and self._local.conn: + self._local.conn.close() + self._local.conn = None + + +# Singleton accessor +_storage_instance: Optional[Storage] = None + + +def get_storage( + db_path: Optional[str] = None, + wal_mode: bool = True, + retention_days: int = 30, +) -> Storage: + """ + Get or create storage singleton. + + Args: + db_path: Database file path + wal_mode: Enable WAL mode + retention_days: Days to keep synced data + + Returns: + Storage instance + """ + global _storage_instance + if _storage_instance is None: + _storage_instance = Storage(db_path, wal_mode, retention_days) + return _storage_instance diff --git a/obd2_client/src/vehicle/poller.py b/obd2_client/src/vehicle/poller.py index 19abef0..35e0b32 100644 --- a/obd2_client/src/vehicle/poller.py +++ b/obd2_client/src/vehicle/poller.py @@ -2,7 +2,8 @@ import threading import time -from typing import List, Optional, Set +from datetime import datetime +from typing import List, Optional, Set, Callable, Any from enum import Enum from ..obd2.protocol import OBD2Protocol @@ -11,6 +12,10 @@ from ..logger import get_logger from .state import VehicleState +# Type alias for reading callback +ReadingCallback = Callable[[int, str, float, str, datetime], None] + + class PollerState(Enum): """Poller state enum.""" @@ -62,6 +67,20 @@ class VehiclePoller: self._logger = get_logger("obd2_client.poller") self._stats = {"queries": 0, "successes": 0, "failures": 0} + self._reading_callbacks: List[ReadingCallback] = [] + + def add_reading_callback(self, callback: ReadingCallback) -> None: + """Add callback to be called on each successful reading. + + Args: + callback: Function(pid, name, value, unit, timestamp) + """ + self._reading_callbacks.append(callback) + + def remove_reading_callback(self, callback: ReadingCallback) -> None: + """Remove a reading callback.""" + if callback in self._reading_callbacks: + self._reading_callbacks.remove(callback) @property def poller_state(self) -> PollerState: @@ -184,10 +203,26 @@ class VehiclePoller: return False self._stats["successes"] += 1 + timestamp = datetime.now() + self.state.update( pid_code=pid_code, name=response.pid.name, value=response.value, unit=response.unit, ) + + # Notify callbacks + for callback in self._reading_callbacks: + try: + callback( + pid_code, + response.pid.name, + response.value, + response.unit, + timestamp, + ) + except Exception as e: + self._logger.debug(f"Reading callback error: {e}") + return True diff --git a/systemd/can0-link.service b/systemd/can0-link.service new file mode 100644 index 0000000..2a7421b --- /dev/null +++ b/systemd/can0-link.service @@ -0,0 +1,14 @@ +[Unit] +Description=CAN0 Interface Setup (500kbps for OBD2) +After=network.target +Before=carpibord.service + +[Service] +Type=oneshot +RemainAfterExit=yes +ExecStart=/sbin/ip link set can0 type can bitrate 500000 +ExecStart=/sbin/ip link set can0 up +ExecStop=/sbin/ip link set can0 down + +[Install] +WantedBy=multi-user.target diff --git a/systemd/carpibord.service b/systemd/carpibord.service new file mode 100644 index 0000000..10e3cdc --- /dev/null +++ b/systemd/carpibord.service @@ -0,0 +1,21 @@ +[Unit] +Description=Carpibord OBD2 Client with Flipper Zero +After=network-online.target can0-link.service +Wants=network-online.target +Requires=can0-link.service + +[Service] +Type=simple +User=pi +WorkingDirectory=/home/pi/carpibord/obd2_client +ExecStart=/usr/bin/python3 -m src.main --interface can0 --flipper /dev/serial0 +Restart=on-failure +RestartSec=5 +StandardOutput=journal +StandardError=journal + +# Переменные окружения +Environment=PYTHONUNBUFFERED=1 + +[Install] +WantedBy=multi-user.target diff --git a/systemd/install.sh b/systemd/install.sh new file mode 100644 index 0000000..41d4950 --- /dev/null +++ b/systemd/install.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# Скрипт установки systemd сервисов для Carpibord + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$(dirname "$SCRIPT_DIR")" + +echo "=== Установка Carpibord сервисов ===" + +# Проверка root +if [ "$EUID" -ne 0 ]; then + echo "Запусти с sudo: sudo ./install.sh" + exit 1 +fi + +# Копируем сервисы +echo "[1/4] Копирование сервисов..." +cp "$SCRIPT_DIR/can0-link.service" /etc/systemd/system/ +cp "$SCRIPT_DIR/carpibord.service" /etc/systemd/system/ + +# Обновляем путь к проекту в сервисе +echo "[2/4] Настройка путей..." +sed -i "s|/home/pi/carpibord|$PROJECT_DIR|g" /etc/systemd/system/carpibord.service + +# Определяем пользователя (кто запустил sudo) +ACTUAL_USER="${SUDO_USER:-pi}" +sed -i "s|User=pi|User=$ACTUAL_USER|g" /etc/systemd/system/carpibord.service + +# Перезагружаем systemd +echo "[3/4] Перезагрузка systemd..." +systemctl daemon-reload + +# Включаем сервисы +echo "[4/4] Включение сервисов..." +systemctl enable can0-link.service +systemctl enable carpibord.service + +echo "" +echo "=== Готово! ===" +echo "" +echo "Команды управления:" +echo " sudo systemctl start carpibord # Запустить (can0 поднимется автоматически)" +echo " sudo systemctl stop carpibord # Остановить" +echo " sudo systemctl status carpibord # Статус" +echo " journalctl -u carpibord -f # Логи в реальном времени" +echo "" +echo "Сервисы запустятся автоматически при следующей загрузке." diff --git a/systemd/uninstall.sh b/systemd/uninstall.sh new file mode 100644 index 0000000..a46ed60 --- /dev/null +++ b/systemd/uninstall.sh @@ -0,0 +1,27 @@ +#!/bin/bash +# Скрипт удаления systemd сервисов Carpibord + +set -e + +echo "=== Удаление Carpibord сервисов ===" + +if [ "$EUID" -ne 0 ]; then + echo "Запусти с sudo: sudo ./uninstall.sh" + exit 1 +fi + +echo "[1/3] Остановка сервисов..." +systemctl stop carpibord.service 2>/dev/null || true +systemctl stop can0-link.service 2>/dev/null || true + +echo "[2/3] Отключение автозапуска..." +systemctl disable carpibord.service 2>/dev/null || true +systemctl disable can0-link.service 2>/dev/null || true + +echo "[3/3] Удаление файлов..." +rm -f /etc/systemd/system/carpibord.service +rm -f /etc/systemd/system/can0-link.service +systemctl daemon-reload + +echo "" +echo "=== Сервисы удалены ==="