""" UPS Provider for X120x UPS on Raspberry Pi. Reads battery status, voltage, and charging state via I2C (SMBus). Based on https://github.com/suptronics/x120x """ import struct from typing import Optional, Tuple from dataclasses import dataclass from .base import BaseProvider from ...logger import get_logger logger = get_logger(__name__) @dataclass class UPSData: """UPS status data.""" voltage: float = 0.0 capacity: float = 0.0 is_charging: bool = False power_loss: bool = False input_voltage: float = 0.0 class UPSProvider(BaseProvider): """ Provider for X120x UPS data. Reads from MAX17048 fuel gauge via I2C (address 0x36). Also monitors power loss detection pin (GPIO 6). Supported UPS boards: - X120x series (X1201, X1202, etc.) - Other boards with MAX17048 fuel gauge """ I2C_ADDRESS = 0x36 I2C_BUS = 1 CHG_ONOFF_PIN = 16 PLD_PIN = 6 # Power Loss Detection def __init__(self): super().__init__(name="ups", cache_ttl=2.0) self._bus = None self._pld_button = None self._data = UPSData() self._init_hardware() def _init_hardware(self) -> None: """Initialize I2C and GPIO.""" # Try to initialize I2C try: import smbus2 self._bus = smbus2.SMBus(self.I2C_BUS) self._available = True logger.info(f"UPS I2C initialized on bus {self.I2C_BUS}") except ImportError: self._last_error = "smbus2 not installed" self._available = False logger.warning("smbus2 not installed - UPS monitoring disabled") except PermissionError as e: self._last_error = f"I2C permission denied: {e}" self._available = False logger.warning(f"I2C permission denied: {e}") except FileNotFoundError as e: self._last_error = f"I2C bus not found: {e}" self._available = False logger.debug(f"I2C bus not found (UPS not connected): {e}") except Exception as e: self._last_error = f"I2C init failed: {e}" self._available = False logger.warning(f"I2C initialization failed: {e}") # Try to initialize GPIO for power loss detection try: from gpiozero import Button self._pld_button = Button(self.PLD_PIN) logger.debug("GPIO for power loss detection initialized") except ImportError: logger.debug("gpiozero not installed - power loss detection disabled") except Exception as e: logger.debug(f"GPIO init failed: {e}") def _read_voltage_and_capacity(self) -> Tuple[float, float]: """ Read voltage and capacity from MAX17048 fuel gauge. Returns: Tuple of (voltage, capacity) """ if not self._bus: return 0.0, 0.0 try: # Read voltage register (0x02) voltage_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x02) # Read capacity register (0x04) capacity_read = self._bus.read_word_data(self.I2C_ADDRESS, 0x04) # Swap bytes (SMBus returns LSB first, but MAX17048 is MSB first) voltage_swapped = struct.unpack("H", voltage_read))[0] capacity_swapped = struct.unpack("H", capacity_read))[0] # Convert to actual values # Voltage: 78.125 µV per bit voltage = voltage_swapped * 1.25 / 1000 / 16 # Capacity: 1/256% per bit capacity = capacity_swapped / 256 return voltage, min(capacity, 100.0) except Exception as e: self._last_error = f"Read error: {e}" logger.debug(f"Failed to read UPS data: {e}") return 0.0, 0.0 def _get_power_loss_state(self) -> bool: """ Check power loss detection pin. Returns: True if external power is lost (running on battery) """ if self._pld_button is None: return False try: # Pin is LOW when power is lost return not self._pld_button.is_pressed except Exception: return False def _read_input_voltage(self) -> float: """ Read input voltage using vcgencmd. Returns: Input voltage in Volts """ try: from subprocess import check_output, DEVNULL vcgencmd_paths = [ "/usr/bin/vcgencmd", "/opt/vc/bin/vcgencmd", "vcgencmd" ] for vcgencmd in vcgencmd_paths: try: output = check_output( [vcgencmd, "pmic_read_adc", "EXT5V_V"], stderr=DEVNULL ).decode("utf-8") # Output: "EXT5V_V volt=4.9234V" value_str = output.split("=")[1].strip().rstrip("V") return float(value_str) except FileNotFoundError: continue except (IndexError, ValueError): continue return 0.0 except Exception: return 0.0 def refresh(self) -> bool: """ Refresh UPS data from hardware. Returns: True if refresh successful """ if not self._available: return False try: voltage, capacity = self._read_voltage_and_capacity() power_loss = self._get_power_loss_state() input_voltage = self._read_input_voltage() # Determine charging state # Charging if capacity < 100% and we have external power is_charging = capacity < 100.0 and not power_loss self._data = UPSData( voltage=voltage, capacity=capacity, is_charging=is_charging, power_loss=power_loss, input_voltage=input_voltage, ) self._set_cached("data", self._data) return True except Exception as e: self._last_error = str(e) logger.debug(f"UPS refresh failed: {e}") return False def get_data(self) -> UPSData: """Get current UPS data.""" cached = self._get_cached("data") if cached is not None: return cached self.refresh() return self._data def get_voltage(self) -> float: """Get battery voltage in Volts.""" return self.get_data().voltage def get_capacity(self) -> float: """Get battery capacity in percent (0-100).""" return self.get_data().capacity def is_charging(self) -> bool: """Check if battery is charging.""" return self.get_data().is_charging def has_power_loss(self) -> bool: """Check if external power is lost.""" return self.get_data().power_loss def get_input_voltage(self) -> float: """Get input voltage in Volts.""" return self.get_data().input_voltage def get_status_string(self) -> str: """Get human-readable status string.""" data = self.get_data() if data.power_loss: if data.capacity < 15: return "CRITICAL" elif data.capacity < 25: return "LOW" else: return "Battery" elif data.is_charging: return "Charging" else: return "OK" def shutdown(self) -> None: """Cleanup resources.""" super().shutdown() if self._bus: try: self._bus.close() except Exception: pass self._bus = None