From cd89e87e772f2cd8c91ace61ad1f0db9826d587d Mon Sep 17 00:00:00 2001 From: qsethuk Date: Wed, 7 Jan 2026 03:08:01 +0300 Subject: [PATCH] update influxdb --- can_sniffer/src/config.py | 12 + .../src/influxdb_handler/influxdb_client.py | 324 ++++++++++++++++-- 2 files changed, 305 insertions(+), 31 deletions(-) diff --git a/can_sniffer/src/config.py b/can_sniffer/src/config.py index 03c0ae5..2d52cf9 100644 --- a/can_sniffer/src/config.py +++ b/can_sniffer/src/config.py @@ -103,6 +103,18 @@ class InfluxDBConfig(BaseModel): default=10, description="Таймаут подключения (секунды)" ) + max_retries: int = Field( + default=3, + description="Максимальное количество попыток повтора при ошибке" + ) + retry_backoff: float = Field( + default=1.0, + description="Базовый интервал backoff для повторов (секунды)" + ) + health_check_interval: int = Field( + default=30, + description="Интервал проверки здоровья соединения (секунды)" + ) class LoggingConfig(BaseModel): diff --git a/can_sniffer/src/influxdb_handler/influxdb_client.py b/can_sniffer/src/influxdb_handler/influxdb_client.py index 838b0ae..c4a6172 100644 --- a/can_sniffer/src/influxdb_handler/influxdb_client.py +++ b/can_sniffer/src/influxdb_handler/influxdb_client.py @@ -2,13 +2,15 @@ Модуль для работы с InfluxDB. Предоставляет singleton класс для отправки CAN сообщений в InfluxDB -с поддержкой пакетной отправки и store-and-forward механизма. +с поддержкой пакетной отправки, store-and-forward механизма, +retry с backoff и health-check. """ import threading import time from queue import Queue, Empty -from typing import Optional, List, Tuple, Dict, Any +from typing import Optional, List, Tuple, Dict, Any, Callable +from enum import Enum from config import config from logger import get_logger @@ -18,7 +20,7 @@ logger = get_logger(__name__) # Импортируем InfluxDB клиент try: from influxdb_client import InfluxDBClient as InfluxClient, Point, WritePrecision - from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS + from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions INFLUXDB_AVAILABLE = True except ImportError: INFLUXDB_AVAILABLE = False @@ -27,9 +29,18 @@ except ImportError: WritePrecision = None SYNCHRONOUS = None ASYNCHRONOUS = None + WriteOptions = None logger.warning("influxdb-client not installed. Install with: pip install influxdb-client") +class ConnectionStatus(Enum): + """Статус соединения с InfluxDB.""" + DISCONNECTED = "disconnected" + CONNECTING = "connecting" + CONNECTED = "connected" + ERROR = "error" + + class InfluxDBClient: """Singleton класс для работы с InfluxDB.""" @@ -59,9 +70,15 @@ class InfluxDBClient: self.message_queue: Queue[Dict[str, Any]] = Queue() self.running = False self.forwarder_thread: Optional[threading.Thread] = None + self.health_check_thread: Optional[threading.Thread] = None + self.connection_status = ConnectionStatus.DISCONNECTED + self.last_health_check: Optional[float] = None + + # Статистика self.sent_count = 0 self.failed_count = 0 self.retry_count = 0 + self.reconnect_count = 0 self._initialized = False if not INFLUXDB_AVAILABLE: @@ -83,6 +100,7 @@ class InfluxDBClient: return try: + self.connection_status = ConnectionStatus.CONNECTING self.logger.info( "Initializing InfluxDB client", extra={ @@ -100,14 +118,26 @@ class InfluxDBClient: timeout=self.config.timeout * 1000 # Конвертируем в миллисекунды ) - # Создаем write API для асинхронной записи + # Создаем write API для асинхронной записи с callbacks + write_options = ASYNCHRONOUS + + # Устанавливаем callbacks для обработки успешных и неудачных записей self.write_api = self.client.write_api( - write_options=ASYNCHRONOUS + write_options=write_options, + success_callback=self._on_write_success, + error_callback=self._on_write_error ) - self.logger.info("InfluxDB client initialized successfully") + # Проверяем соединение + if self._health_check(): + self.connection_status = ConnectionStatus.CONNECTED + self.logger.info("InfluxDB client initialized successfully") + else: + self.connection_status = ConnectionStatus.ERROR + self.logger.warning("InfluxDB client initialized but health check failed") except Exception as e: + self.connection_status = ConnectionStatus.ERROR self.logger.error( f"Failed to initialize InfluxDB client: {e}", exc_info=True @@ -115,6 +145,130 @@ class InfluxDBClient: self.client = None self.write_api = None + def _on_write_success(self, conf: Tuple[str, str, str], data: str) -> None: + """ + Callback для успешной записи в InfluxDB. + + Args: + conf: Конфигурация записи (bucket, org, precision) + data: Данные, которые были записаны + """ + self.sent_count += 1 + self.logger.debug( + "Successfully wrote to InfluxDB", + extra={"bucket": conf[0], "org": conf[1]} + ) + + def _on_write_error(self, conf: Tuple[str, str, str], data: str, exception: Exception) -> None: + """ + Callback для ошибки записи в InfluxDB. + + Args: + conf: Конфигурация записи (bucket, org, precision) + data: Данные, которые не удалось записать + exception: Исключение, которое произошло + """ + self.failed_count += 1 + self.logger.error( + f"Failed to write to InfluxDB: {exception}", + exc_info=True, + extra={"bucket": conf[0], "org": conf[1]} + ) + + # Если ошибка сети, помечаем соединение как проблемное + if isinstance(exception, (ConnectionError, TimeoutError)): + self.connection_status = ConnectionStatus.ERROR + + def _health_check(self) -> bool: + """ + Проверка здоровья соединения с InfluxDB. + + Returns: + True если соединение работает, False иначе + """ + if not self.client: + return False + + try: + # Простая проверка - пытаемся получить информацию о buckets + # Это легковесная операция, которая проверяет соединение + buckets_api = self.client.buckets_api() + buckets_api.find_buckets() # Проверяем доступность API + self.last_health_check = time.time() + return True + except Exception as e: + self.logger.debug(f"InfluxDB health check failed: {e}") + return False + + def _health_check_loop(self) -> None: + """Цикл проверки здоровья соединения.""" + self.logger.info("InfluxDB health check loop started") + + while self.running: + try: + time.sleep(self.config.health_check_interval) + + if not self.running: + break + + # Проверяем здоровье соединения + is_healthy = self._health_check() + + if is_healthy: + if self.connection_status != ConnectionStatus.CONNECTED: + self.logger.info("InfluxDB connection restored") + self.connection_status = ConnectionStatus.CONNECTED + else: + if self.connection_status == ConnectionStatus.CONNECTED: + self.logger.warning("InfluxDB connection lost, attempting reconnect...") + self.connection_status = ConnectionStatus.ERROR + self._reconnect() + + except Exception as e: + self.logger.error( + f"Error in health check loop: {e}", + exc_info=True + ) + + self.logger.info("InfluxDB health check loop stopped") + + def _reconnect(self) -> None: + """Переподключение к InfluxDB.""" + if self.connection_status == ConnectionStatus.CONNECTING: + return + + self.connection_status = ConnectionStatus.CONNECTING + self.reconnect_count += 1 + + try: + # Закрываем старое соединение + if self.write_api: + try: + self.write_api.close() + except Exception: + pass + + if self.client: + try: + self.client.close() + except Exception: + pass + + # Пересоздаем клиент + self._init_client() + + if self.connection_status == ConnectionStatus.CONNECTED: + self.logger.info("InfluxDB reconnected successfully") + else: + self.logger.warning("InfluxDB reconnection failed") + + except Exception as e: + self.connection_status = ConnectionStatus.ERROR + self.logger.error( + f"Failed to reconnect to InfluxDB: {e}", + exc_info=True + ) + def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> bool: """ Добавление сообщения в очередь для отправки в InfluxDB. @@ -149,6 +303,91 @@ class InfluxDBClient: ) return False + def _create_point(self, msg: Dict[str, Any]) -> Optional[Point]: + """ + Создание точки InfluxDB из сообщения. + + Args: + msg: Словарь с данными сообщения + + Returns: + Точка InfluxDB или None при ошибке + """ + if not Point: + return None + + try: + data_hex = msg["data"].hex() if msg["data"] else "" + + point = Point("can_message") \ + .tag("interface", msg["interface"]) \ + .tag("can_id", hex(msg["can_id"])) \ + .field("can_id_int", msg["can_id"]) \ + .field("dlc", msg["dlc"]) \ + .field("data_hex", data_hex) \ + .field("data_bytes", len(msg["data"])) \ + .time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды + + return point + except Exception as e: + self.logger.error( + f"Failed to create InfluxDB point: {e}", + exc_info=True + ) + return None + + def _write_with_retry(self, points: List[Point], retry_count: int = 0) -> bool: + """ + Отправка точек в InfluxDB с retry и backoff. + + Args: + points: Список точек для отправки + retry_count: Текущее количество попыток + + Returns: + True если отправка успешна + """ + if not self.write_api or not points: + return False + + try: + # Отправляем батч + self.write_api.write( + bucket=self.config.bucket, + org=self.config.org, + record=points + ) + return True + + except Exception as e: + # Если это ошибка сети и есть попытки - повторяем + if isinstance(e, (ConnectionError, TimeoutError)) and retry_count < self.config.max_retries: + self.retry_count += 1 + backoff_time = self.config.retry_backoff * (2 ** retry_count) # Exponential backoff + + self.logger.warning( + f"InfluxDB write failed, retrying in {backoff_time:.2f}s (attempt {retry_count + 1}/{self.config.max_retries})", + extra={"error": str(e), "batch_size": len(points)} + ) + + time.sleep(backoff_time) + + # Пытаемся переподключиться перед повтором + if self.connection_status != ConnectionStatus.CONNECTED: + self._reconnect() + + # Повторяем отправку + return self._write_with_retry(points, retry_count + 1) + else: + # Ошибка не связана с сетью или закончились попытки + self.failed_count += len(points) + self.logger.error( + f"Failed to send messages batch to InfluxDB: {e}", + exc_info=True, + extra={"batch_size": len(points), "retry_count": retry_count} + ) + return False + def write_messages_batch(self, messages: List[Dict[str, Any]]) -> int: """ Пакетная отправка сообщений в InfluxDB. @@ -162,36 +401,38 @@ class InfluxDBClient: if not self.write_api or not messages: return 0 + # Проверяем соединение перед отправкой + if self.connection_status != ConnectionStatus.CONNECTED: + if not self._health_check(): + self.logger.warning("InfluxDB connection not available, skipping batch") + self.failed_count += len(messages) + return 0 + else: + self.connection_status = ConnectionStatus.CONNECTED + try: # Создаем точки для InfluxDB points = [] for msg in messages: - point = Point("can_message") \ - .tag("interface", msg["interface"]) \ - .tag("can_id", hex(msg["can_id"])) \ - .field("can_id_int", msg["can_id"]) \ - .field("dlc", msg["dlc"]) \ - .field("data", msg["data"].hex()) \ - .field("data_bytes", len(msg["data"])) \ - .time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды - - points.append(point) + point = self._create_point(msg) + if point: + points.append(point) - # Отправляем батч - self.write_api.write( - bucket=self.config.bucket, - org=self.config.org, - record=points - ) + if not points: + self.logger.warning("No valid points created from messages") + return 0 - sent = len(points) - self.sent_count += sent - self.logger.debug( - f"Sent {sent} messages to InfluxDB", - extra={"batch_size": sent} - ) - - return sent + # Отправляем с retry + if self._write_with_retry(points): + sent = len(points) + self.sent_count += sent + self.logger.debug( + f"Sent {sent} messages to InfluxDB", + extra={"batch_size": sent} + ) + return sent + else: + return 0 except Exception as e: self.failed_count += len(messages) @@ -260,12 +501,23 @@ class InfluxDBClient: return self.running = True + + # Запускаем forwarder поток self.forwarder_thread = threading.Thread( target=self._forwarder_loop, name="InfluxDB-Forwarder", daemon=True ) self.forwarder_thread.start() + + # Запускаем health check поток + self.health_check_thread = threading.Thread( + target=self._health_check_loop, + name="InfluxDB-HealthCheck", + daemon=True + ) + self.health_check_thread.start() + self.logger.info("InfluxDB forwarder started") def stop(self) -> None: @@ -276,11 +528,17 @@ class InfluxDBClient: self.logger.info("Stopping InfluxDB forwarder...") self.running = False + # Ждем завершения потоков if self.forwarder_thread and self.forwarder_thread.is_alive(): self.forwarder_thread.join(timeout=10.0) if self.forwarder_thread.is_alive(): self.logger.warning("Forwarder thread did not stop gracefully") + if self.health_check_thread and self.health_check_thread.is_alive(): + self.health_check_thread.join(timeout=5.0) + if self.health_check_thread.is_alive(): + self.logger.warning("Health check thread did not stop gracefully") + # Закрываем write API и клиент if self.write_api: try: @@ -294,6 +552,7 @@ class InfluxDBClient: except Exception as e: self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True) + self.connection_status = ConnectionStatus.DISCONNECTED self.logger.info("InfluxDB forwarder stopped") def get_stats(self) -> Dict[str, Any]: @@ -302,9 +561,13 @@ class InfluxDBClient: "enabled": self.config.enabled, "initialized": self._initialized and self.client is not None, "running": getattr(self, 'running', False), + "connection_status": self.connection_status.value, "sent_count": self.sent_count, "failed_count": self.failed_count, + "retry_count": self.retry_count, + "reconnect_count": self.reconnect_count, "queue_size": self.message_queue.qsize(), + "last_health_check": self.last_health_check, "url": self.config.url if self.config.enabled else None, "bucket": self.config.bucket if self.config.enabled else None } @@ -329,4 +592,3 @@ def get_influxdb_client() -> InfluxDBClient: if _influxdb_instance is None: _influxdb_instance = InfluxDBClient() return _influxdb_instance -