From 8ba620aa6cb93eb55137f52af82d8badd5e741bf Mon Sep 17 00:00:00 2001 From: qsethuk Date: Wed, 7 Jan 2026 14:37:10 +0300 Subject: [PATCH] delete unused influxdb --- can_sniffer/src/influxdb_handler/__init__.py | 6 - .../src/influxdb_handler/influxdb_client.py | 688 ------------------ 2 files changed, 694 deletions(-) delete mode 100644 can_sniffer/src/influxdb_handler/__init__.py delete mode 100644 can_sniffer/src/influxdb_handler/influxdb_client.py diff --git a/can_sniffer/src/influxdb_handler/__init__.py b/can_sniffer/src/influxdb_handler/__init__.py deleted file mode 100644 index faf2955..0000000 --- a/can_sniffer/src/influxdb_handler/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Модуль для работы с InfluxDB.""" - -from .influxdb_client import InfluxDBClient, get_influxdb_client - -__all__ = ['InfluxDBClient', 'get_influxdb_client'] - diff --git a/can_sniffer/src/influxdb_handler/influxdb_client.py b/can_sniffer/src/influxdb_handler/influxdb_client.py deleted file mode 100644 index a1a35fd..0000000 --- a/can_sniffer/src/influxdb_handler/influxdb_client.py +++ /dev/null @@ -1,688 +0,0 @@ -""" -Модуль для работы с InfluxDB. - -Предоставляет singleton класс для отправки CAN сообщений в InfluxDB -с поддержкой пакетной отправки, store-and-forward механизма, -retry с backoff и health-check. -""" - -import threading -import time -from queue import Queue, Empty -from typing import Optional, List, Tuple, Dict, Any, Callable -from enum import Enum - -from config import config -from logger import get_logger - -logger = get_logger(__name__) - -# Импортируем InfluxDB клиент -try: - from influxdb_client import InfluxDBClient as InfluxClient, Point, WritePrecision - from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions - INFLUXDB_AVAILABLE = True -except ImportError: - INFLUXDB_AVAILABLE = False - InfluxClient = None - Point = None - WritePrecision = None - SYNCHRONOUS = None - ASYNCHRONOUS = None - WriteOptions = None - logger.warning("influxdb-client not installed. Install with: pip install influxdb-client") - - -class ConnectionStatus(Enum): - """Статус соединения с InfluxDB.""" - DISCONNECTED = "disconnected" - CONNECTING = "connecting" - CONNECTED = "connected" - ERROR = "error" - - -class InfluxDBClient: - """Singleton класс для работы с InfluxDB.""" - - _instance: Optional['InfluxDBClient'] = None - _lock = threading.Lock() - - def __new__(cls): - """Singleton паттерн для единого экземпляра клиента.""" - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = super().__new__(cls) - return cls._instance - - def __init__(self): - """Инициализация клиента InfluxDB.""" - # Проверяем, что инициализация выполняется только один раз - if hasattr(self, '_initialized'): - return - - self.config = config.influxdb - self.logger = logger - - # Инициализируем атрибуты по умолчанию - self.client: Optional[InfluxClient] = None - self.write_api = None - # Очередь с ограничением размера для предотвращения переполнения памяти - # Размер = batch_size * 10 (для буферизации нескольких батчей) - queue_maxsize = self.config.batch_size * 10 - self.message_queue: Queue[Dict[str, Any]] = Queue(maxsize=queue_maxsize) - self.running = False - self.forwarder_thread: Optional[threading.Thread] = None - self.health_check_thread: Optional[threading.Thread] = None - self.connection_status = ConnectionStatus.DISCONNECTED - self.last_health_check: Optional[float] = None - - # Статистика - self.sent_count = 0 - self.failed_count = 0 - self.retry_count = 0 - self.reconnect_count = 0 - self._initialized = False - - if not INFLUXDB_AVAILABLE: - self.logger.error("InfluxDB client library not available") - return - - # Инициализируем клиент - self._init_client() - self._initialized = True - - def _init_client(self) -> None: - """Инициализация клиента InfluxDB.""" - if not self.config.enabled: - self.logger.info("InfluxDB is disabled in configuration") - return - - if not INFLUXDB_AVAILABLE: - self.logger.error("InfluxDB client library not available") - return - - try: - self.connection_status = ConnectionStatus.CONNECTING - self.logger.info( - "Initializing InfluxDB client", - extra={ - "url": self.config.url, - "org": self.config.org, - "bucket": self.config.bucket - } - ) - - # Создаем клиент InfluxDB - self.client = InfluxClient( - url=self.config.url, - token=self.config.token, - org=self.config.org, - timeout=self.config.timeout * 1000 # Конвертируем в миллисекунды - ) - - # Создаем write API для асинхронной записи с callbacks - write_options = ASYNCHRONOUS - - # Устанавливаем callbacks для обработки успешных и неудачных записей - self.write_api = self.client.write_api( - write_options=write_options, - success_callback=self._on_write_success, - error_callback=self._on_write_error - ) - - # Проверяем соединение - if self._health_check(): - self.connection_status = ConnectionStatus.CONNECTED - self.logger.info("InfluxDB client initialized successfully") - else: - self.connection_status = ConnectionStatus.ERROR - self.logger.warning("InfluxDB client initialized but health check failed") - - except Exception as e: - self.connection_status = ConnectionStatus.ERROR - self.logger.error( - f"Failed to initialize InfluxDB client: {e}", - exc_info=True - ) - self.client = None - self.write_api = None - - def _on_write_success(self, conf: Tuple[str, str, str], data: str) -> None: - """ - Callback для успешной записи в InfluxDB. - - Args: - conf: Конфигурация записи (bucket, org, precision) - data: Данные, которые были записаны - """ - self.sent_count += 1 - self.logger.debug( - "Successfully wrote to InfluxDB", - extra={"bucket": conf[0], "org": conf[1]} - ) - - def _on_write_error(self, conf: Tuple[str, str, str], data: str, exception: Exception) -> None: - """ - Callback для ошибки записи в InfluxDB. - - Args: - conf: Конфигурация записи (bucket, org, precision) - data: Данные, которые не удалось записать - exception: Исключение, которое произошло - """ - self.failed_count += 1 - self.logger.error( - f"Failed to write to InfluxDB: {exception}", - exc_info=True, - extra={"bucket": conf[0], "org": conf[1]} - ) - - # Если ошибка сети, помечаем соединение как проблемное - if isinstance(exception, (ConnectionError, TimeoutError)): - self.connection_status = ConnectionStatus.ERROR - - def _health_check(self) -> bool: - """ - Проверка здоровья соединения с InfluxDB. - - Returns: - True если соединение работает, False иначе - """ - if not self.client: - return False - - try: - # Простая проверка - пытаемся получить информацию о buckets - # Это легковесная операция, которая проверяет соединение - buckets_api = self.client.buckets_api() - buckets_api.find_buckets() # Проверяем доступность API - self.last_health_check = time.time() - return True - except Exception as e: - self.logger.debug(f"InfluxDB health check failed: {e}") - return False - - def _health_check_loop(self) -> None: - """Цикл проверки здоровья соединения.""" - self.logger.info("InfluxDB health check loop started") - - while self.running: - try: - time.sleep(self.config.health_check_interval) - - if not self.running: - break - - # Проверяем здоровье соединения - is_healthy = self._health_check() - - if is_healthy: - if self.connection_status != ConnectionStatus.CONNECTED: - self.logger.info("InfluxDB connection restored") - self.connection_status = ConnectionStatus.CONNECTED - else: - if self.connection_status == ConnectionStatus.CONNECTED: - self.logger.warning("InfluxDB connection lost, attempting reconnect...") - self.connection_status = ConnectionStatus.ERROR - self._reconnect() - - except Exception as e: - self.logger.error( - f"Error in health check loop: {e}", - exc_info=True - ) - - self.logger.info("InfluxDB health check loop stopped") - - def _reconnect(self) -> None: - """Переподключение к InfluxDB.""" - if self.connection_status == ConnectionStatus.CONNECTING: - return - - self.connection_status = ConnectionStatus.CONNECTING - self.reconnect_count += 1 - - try: - # Закрываем старое соединение - if self.write_api: - try: - self.write_api.close() - except Exception: - pass - - if self.client: - try: - self.client.close() - except Exception: - pass - - # Пересоздаем клиент - self._init_client() - - if self.connection_status == ConnectionStatus.CONNECTED: - self.logger.info("InfluxDB reconnected successfully") - else: - self.logger.warning("InfluxDB reconnection failed") - - except Exception as e: - self.connection_status = ConnectionStatus.ERROR - self.logger.error( - f"Failed to reconnect to InfluxDB: {e}", - exc_info=True - ) - - def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> bool: - """ - Добавление сообщения в очередь для отправки в InfluxDB. - - Args: - interface: Имя интерфейса (например, 'can0') - can_id: CAN ID сообщения - dlc: Data Length Code - data: Данные сообщения (bytes) - timestamp: Временная метка сообщения - - Returns: - True если сообщение добавлено в очередь - """ - if not self.write_api: - return False - - try: - # Добавляем сообщение в очередь для пакетной отправки - # Используем неблокирующий режим, если block=False - if block: - self.message_queue.put({ - "interface": interface, - "can_id": can_id, - "dlc": dlc, - "data": data, - "timestamp": timestamp - }) - else: - # Неблокирующий режим - если очередь полна, пропускаем сообщение - try: - self.message_queue.put_nowait({ - "interface": interface, - "can_id": can_id, - "dlc": dlc, - "data": data, - "timestamp": timestamp - }) - except: - # Очередь переполнена - пропускаем сообщение - self.failed_count += 1 - return False - return True - except Exception as e: - self.logger.error( - f"Failed to queue message for InfluxDB: {e}", - exc_info=True - ) - self.failed_count += 1 - return False - - def _create_point(self, msg: Dict[str, Any]) -> Optional[Point]: - """ - Создание точки InfluxDB из сообщения. - - Args: - msg: Словарь с данными сообщения - - Returns: - Точка InfluxDB или None при ошибке - """ - if not Point: - return None - - try: - data_hex = msg["data"].hex() if msg["data"] else "" - - point = Point("can_message") \ - .tag("interface", msg["interface"]) \ - .tag("can_id", hex(msg["can_id"])) \ - .field("can_id_int", msg["can_id"]) \ - .field("dlc", msg["dlc"]) \ - .field("data_hex", data_hex) \ - .field("data_bytes", len(msg["data"])) \ - .time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды - - return point - except Exception as e: - self.logger.error( - f"Failed to create InfluxDB point: {e}", - exc_info=True - ) - return None - - def _write_with_retry(self, points: List[Point], retry_count: int = 0) -> bool: - """ - Отправка точек в InfluxDB с retry и backoff. - - Args: - points: Список точек для отправки - retry_count: Текущее количество попыток - - Returns: - True если отправка успешна - """ - if not self.write_api or not points: - return False - - try: - # Отправляем батч - self.write_api.write( - bucket=self.config.bucket, - org=self.config.org, - record=points - ) - return True - - except Exception as e: - # Если это ошибка сети и есть попытки - повторяем - if isinstance(e, (ConnectionError, TimeoutError)) and retry_count < self.config.max_retries: - self.retry_count += 1 - backoff_time = self.config.retry_backoff * (2 ** retry_count) # Exponential backoff - - self.logger.warning( - f"InfluxDB write failed, retrying in {backoff_time:.2f}s (attempt {retry_count + 1}/{self.config.max_retries})", - extra={"error": str(e), "batch_size": len(points)} - ) - - time.sleep(backoff_time) - - # Пытаемся переподключиться перед повтором - if self.connection_status != ConnectionStatus.CONNECTED: - self._reconnect() - - # Повторяем отправку - return self._write_with_retry(points, retry_count + 1) - else: - # Ошибка не связана с сетью или закончились попытки - self.failed_count += len(points) - self.logger.error( - f"Failed to send messages batch to InfluxDB: {e}", - exc_info=True, - extra={"batch_size": len(points), "retry_count": retry_count} - ) - return False - - def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int: - """ - Пакетная отправка сообщений в InfluxDB. - - Добавляет сообщения в очередь для асинхронной отправки через forwarder loop. - - Args: - messages: Список словарей с данными сообщений - block: Блокировать ли при переполнении очереди - - Returns: - Количество успешно добавленных в очередь сообщений - """ - if not self.write_api or not messages: - return 0 - - # Проверяем соединение перед добавлением в очередь - if self.connection_status != ConnectionStatus.CONNECTED: - if not self._health_check(): - # Соединение недоступно - пропускаем батч без ошибки - self.failed_count += len(messages) - return 0 - else: - self.connection_status = ConnectionStatus.CONNECTED - - # Проверяем заполненность очереди перед добавлением - queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0 - if queue_usage > 0.9 and not block: - # Очередь почти переполнена - пропускаем батч - self.failed_count += len(messages) - return 0 - - # Добавляем сообщения в очередь для асинхронной отправки - added_count = 0 - for msg in messages: - try: - if block: - self.message_queue.put(msg) - else: - try: - self.message_queue.put_nowait(msg) - except: - # Очередь переполнена - пропускаем оставшиеся сообщения - break - added_count += 1 - except Exception as e: - self.logger.debug(f"Failed to queue message: {e}") - break - - if added_count < len(messages): - self.failed_count += (len(messages) - added_count) - - return added_count - - def _send_messages_batch(self, messages: List[Dict[str, Any]]) -> int: - """ - Непосредственная отправка батча сообщений в InfluxDB. - - Этот метод вызывается из forwarder loop для реальной отправки данных. - - Args: - messages: Список словарей с данными сообщений - - Returns: - Количество успешно отправленных сообщений - """ - if not self.write_api or not messages: - return 0 - - # Проверяем соединение перед отправкой - if self.connection_status != ConnectionStatus.CONNECTED: - if not self._health_check(): - self.logger.warning("InfluxDB connection not available, skipping batch") - self.failed_count += len(messages) - return 0 - else: - self.connection_status = ConnectionStatus.CONNECTED - - try: - # Создаем точки для InfluxDB - points = [] - for msg in messages: - point = self._create_point(msg) - if point: - points.append(point) - - if not points: - self.logger.warning("No valid points created from messages") - return 0 - - # Отправляем с retry - if self._write_with_retry(points): - sent = len(points) - self.sent_count += sent - self.logger.debug( - f"Sent {sent} messages to InfluxDB", - extra={"batch_size": sent} - ) - return sent - else: - return 0 - - except Exception as e: - self.failed_count += len(messages) - self.logger.error( - f"Failed to send messages batch to InfluxDB: {e}", - exc_info=True, - extra={"batch_size": len(messages)} - ) - return 0 - - def _forwarder_loop(self) -> None: - """Основной цикл для отправки сообщений в InfluxDB.""" - self.logger.info("InfluxDB forwarder loop started") - - batch = [] - last_flush_time = time.time() - - while self.running or not self.message_queue.empty(): - try: - # Собираем сообщения в батч - try: - message = self.message_queue.get(timeout=0.1) - batch.append(message) - except Empty: - pass - - # Отправляем батч если он заполнен или прошло достаточно времени - current_time = time.time() - should_flush = ( - len(batch) >= self.config.batch_size or - (batch and (current_time - last_flush_time) >= self.config.flush_interval) - ) - - if should_flush: - if batch: - # Отправляем батч напрямую в InfluxDB (не добавляем в очередь!) - self._send_messages_batch(batch) - batch = [] - last_flush_time = current_time - - except Exception as e: - self.logger.error( - f"Error in forwarder loop: {e}", - exc_info=True - ) - time.sleep(0.1) - - # Отправляем оставшиеся сообщения - if batch: - self._send_messages_batch(batch) - - self.logger.info( - "InfluxDB forwarder loop stopped", - extra={ - "sent_count": self.sent_count, - "failed_count": self.failed_count - } - ) - - def start(self) -> None: - """Запуск forwarder потока для отправки сообщений.""" - if not self.config.enabled or not self.write_api: - return - - if hasattr(self, 'running') and self.running: - self.logger.warning("InfluxDB forwarder is already running") - return - - self.running = True - - # Запускаем forwarder поток - # НЕ используем daemon=True для корректного завершения - self.forwarder_thread = threading.Thread( - target=self._forwarder_loop, - name="InfluxDB-Forwarder", - daemon=False - ) - self.forwarder_thread.start() - - # Запускаем health check поток - # Health check может быть daemon, так как он не критичен при shutdown - self.health_check_thread = threading.Thread( - target=self._health_check_loop, - name="InfluxDB-HealthCheck", - daemon=True - ) - self.health_check_thread.start() - - self.logger.info("InfluxDB forwarder started") - - def stop(self) -> None: - """Остановка forwarder потока.""" - if not hasattr(self, 'running') or not self.running: - return - - self.logger.info("Stopping InfluxDB forwarder...") - self.running = False - - # Даем время на обработку оставшихся сообщений в очереди - max_wait_time = 5.0 - wait_start = time.time() - while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time: - time.sleep(0.1) - - if not self.message_queue.empty(): - remaining = self.message_queue.qsize() - self.logger.warning( - f"InfluxDB queue not empty after shutdown, {remaining} messages remaining" - ) - - # Ждем завершения потоков - if self.forwarder_thread and self.forwarder_thread.is_alive(): - self.forwarder_thread.join(timeout=10.0) - if self.forwarder_thread.is_alive(): - self.logger.warning("Forwarder thread did not stop gracefully") - - if self.health_check_thread and self.health_check_thread.is_alive(): - self.health_check_thread.join(timeout=2.0) - if self.health_check_thread.is_alive(): - self.logger.warning("Health check thread did not stop gracefully") - - # Закрываем write API и клиент - # Важно: закрываем в правильном порядке - if self.write_api: - try: - self.write_api.close() - self.write_api = None - except Exception as e: - self.logger.error(f"Error closing write API: {e}", exc_info=True) - - if self.client: - try: - self.client.close() - self.client = None - except Exception as e: - self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True) - - self.connection_status = ConnectionStatus.DISCONNECTED - self.logger.info("InfluxDB forwarder stopped") - - def get_stats(self) -> Dict[str, Any]: - """Получение статистики клиента.""" - return { - "enabled": self.config.enabled, - "initialized": self._initialized and self.client is not None, - "running": getattr(self, 'running', False), - "connection_status": self.connection_status.value, - "sent_count": self.sent_count, - "failed_count": self.failed_count, - "retry_count": self.retry_count, - "reconnect_count": self.reconnect_count, - "queue_size": self.message_queue.qsize(), - "last_health_check": self.last_health_check, - "url": self.config.url if self.config.enabled else None, - "bucket": self.config.bucket if self.config.enabled else None - } - - def close(self) -> None: - """Закрытие соединения с InfluxDB.""" - self.stop() - - -# Глобальный экземпляр клиента -_influxdb_instance: Optional[InfluxDBClient] = None - - -def get_influxdb_client() -> InfluxDBClient: - """ - Получение глобального экземпляра клиента InfluxDB. - - Returns: - Экземпляр InfluxDBClient - """ - global _influxdb_instance - if _influxdb_instance is None: - _influxdb_instance = InfluxDBClient() - return _influxdb_instance