From 09ff5c4ec438f72052cf8abd6bc6ec19a71b23e5 Mon Sep 17 00:00:00 2001 From: qsethuk Date: Tue, 6 Jan 2026 23:10:06 +0300 Subject: [PATCH] add influxdb client --- can_sniffer/requirements.txt | 1 + can_sniffer/src/influxdb_client/__init__.py | 6 + .../src/influxdb_client/influxdb_client.py | 332 ++++++++++++++++++ .../src/socket_can/message_processor.py | 70 +++- can_sniffer/src/socket_can_example.py | 94 ----- 5 files changed, 391 insertions(+), 112 deletions(-) create mode 100644 can_sniffer/src/influxdb_client/__init__.py create mode 100644 can_sniffer/src/influxdb_client/influxdb_client.py delete mode 100644 can_sniffer/src/socket_can_example.py diff --git a/can_sniffer/requirements.txt b/can_sniffer/requirements.txt index 0ab1435..4277c2a 100644 --- a/can_sniffer/requirements.txt +++ b/can_sniffer/requirements.txt @@ -1,4 +1,5 @@ pydantic>=2.0.0 pydantic-settings>=2.0.0 python-can>=4.0.0 +influxdb-client>=1.36.0 diff --git a/can_sniffer/src/influxdb_client/__init__.py b/can_sniffer/src/influxdb_client/__init__.py new file mode 100644 index 0000000..faf2955 --- /dev/null +++ b/can_sniffer/src/influxdb_client/__init__.py @@ -0,0 +1,6 @@ +"""Модуль для работы с InfluxDB.""" + +from .influxdb_client import InfluxDBClient, get_influxdb_client + +__all__ = ['InfluxDBClient', 'get_influxdb_client'] + diff --git a/can_sniffer/src/influxdb_client/influxdb_client.py b/can_sniffer/src/influxdb_client/influxdb_client.py new file mode 100644 index 0000000..4a9ca75 --- /dev/null +++ b/can_sniffer/src/influxdb_client/influxdb_client.py @@ -0,0 +1,332 @@ +""" +Модуль для работы с InfluxDB. + +Предоставляет singleton класс для отправки CAN сообщений в InfluxDB +с поддержкой пакетной отправки и store-and-forward механизма. +""" + +import threading +import time +from queue import Queue, Empty +from typing import Optional, List, Tuple, Dict, Any + +from config import config +from logger import get_logger + +logger = get_logger(__name__) + +# Импортируем InfluxDB клиент +try: + from influxdb_client import InfluxDBClient as InfluxClient, Point, WritePrecision + from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS + INFLUXDB_AVAILABLE = True +except ImportError: + INFLUXDB_AVAILABLE = False + logger.warning("influxdb-client not installed. Install with: pip install influxdb-client") + + +class InfluxDBClient: + """Singleton класс для работы с InfluxDB.""" + + _instance: Optional['InfluxDBClient'] = None + _lock = threading.Lock() + + def __new__(cls): + """Singleton паттерн для единого экземпляра клиента.""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """Инициализация клиента InfluxDB.""" + # Проверяем, что инициализация выполняется только один раз + if hasattr(self, '_initialized'): + return + + if not INFLUXDB_AVAILABLE: + self.logger = logger + self.client = None + self.write_api = None + self._initialized = False + self.logger.error("InfluxDB client library not available") + return + + self.config = config.influxdb + self.logger = logger + self.client: Optional[InfluxClient] = None + self.write_api = None + + # Очередь для пакетной отправки + self.message_queue: Queue[Dict[str, Any]] = Queue() + self.running = False + self.forwarder_thread: Optional[threading.Thread] = None + + # Статистика + self.sent_count = 0 + self.failed_count = 0 + self.retry_count = 0 + + # Инициализируем клиент + self._init_client() + self._initialized = True + + def _init_client(self) -> None: + """Инициализация клиента InfluxDB.""" + if not self.config.enabled: + self.logger.info("InfluxDB is disabled in configuration") + return + + if not INFLUXDB_AVAILABLE: + self.logger.error("InfluxDB client library not available") + return + + try: + self.logger.info( + "Initializing InfluxDB client", + extra={ + "url": self.config.url, + "org": self.config.org, + "bucket": self.config.bucket + } + ) + + # Создаем клиент InfluxDB + self.client = InfluxClient( + url=self.config.url, + token=self.config.token, + org=self.config.org, + timeout=self.config.timeout * 1000 # Конвертируем в миллисекунды + ) + + # Создаем write API для асинхронной записи + self.write_api = self.client.write_api( + write_options=ASYNCHRONOUS + ) + + self.logger.info("InfluxDB client initialized successfully") + + except Exception as e: + self.logger.error( + f"Failed to initialize InfluxDB client: {e}", + exc_info=True + ) + self.client = None + self.write_api = None + + def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> bool: + """ + Добавление сообщения в очередь для отправки в InfluxDB. + + Args: + interface: Имя интерфейса (например, 'can0') + can_id: CAN ID сообщения + dlc: Data Length Code + data: Данные сообщения (bytes) + timestamp: Временная метка сообщения + + Returns: + True если сообщение добавлено в очередь + """ + if not self.write_api: + return False + + try: + # Добавляем сообщение в очередь для пакетной отправки + self.message_queue.put({ + "interface": interface, + "can_id": can_id, + "dlc": dlc, + "data": data, + "timestamp": timestamp + }) + return True + except Exception as e: + self.logger.error( + f"Failed to queue message for InfluxDB: {e}", + exc_info=True + ) + return False + + def write_messages_batch(self, messages: List[Dict[str, Any]]) -> int: + """ + Пакетная отправка сообщений в InfluxDB. + + Args: + messages: Список словарей с данными сообщений + + Returns: + Количество успешно отправленных сообщений + """ + if not self.write_api or not messages: + return 0 + + try: + # Создаем точки для InfluxDB + points = [] + for msg in messages: + point = Point("can_message") \ + .tag("interface", msg["interface"]) \ + .tag("can_id", hex(msg["can_id"])) \ + .field("can_id_int", msg["can_id"]) \ + .field("dlc", msg["dlc"]) \ + .field("data", msg["data"].hex()) \ + .field("data_bytes", len(msg["data"])) \ + .time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды + + points.append(point) + + # Отправляем батч + self.write_api.write( + bucket=self.config.bucket, + org=self.config.org, + record=points + ) + + sent = len(points) + self.sent_count += sent + self.logger.debug( + f"Sent {sent} messages to InfluxDB", + extra={"batch_size": sent} + ) + + return sent + + except Exception as e: + self.failed_count += len(messages) + self.logger.error( + f"Failed to send messages batch to InfluxDB: {e}", + exc_info=True, + extra={"batch_size": len(messages)} + ) + return 0 + + def _forwarder_loop(self) -> None: + """Основной цикл для отправки сообщений в InfluxDB.""" + self.logger.info("InfluxDB forwarder loop started") + + batch = [] + last_flush_time = time.time() + + while self.running or not self.message_queue.empty(): + try: + # Собираем сообщения в батч + try: + message = self.message_queue.get(timeout=0.1) + batch.append(message) + except Empty: + pass + + # Отправляем батч если он заполнен или прошло достаточно времени + current_time = time.time() + should_flush = ( + len(batch) >= self.config.batch_size or + (batch and (current_time - last_flush_time) >= self.config.flush_interval) + ) + + if should_flush: + if batch: + self.write_messages_batch(batch) + batch = [] + last_flush_time = current_time + + except Exception as e: + self.logger.error( + f"Error in forwarder loop: {e}", + exc_info=True + ) + time.sleep(0.1) + + # Отправляем оставшиеся сообщения + if batch: + self.write_messages_batch(batch) + + self.logger.info( + "InfluxDB forwarder loop stopped", + extra={ + "sent_count": self.sent_count, + "failed_count": self.failed_count + } + ) + + def start(self) -> None: + """Запуск forwarder потока для отправки сообщений.""" + if not self.config.enabled or not self.write_api: + return + + if self.running: + self.logger.warning("InfluxDB forwarder is already running") + return + + self.running = True + self.forwarder_thread = threading.Thread( + target=self._forwarder_loop, + name="InfluxDB-Forwarder", + daemon=True + ) + self.forwarder_thread.start() + self.logger.info("InfluxDB forwarder started") + + def stop(self) -> None: + """Остановка forwarder потока.""" + if not self.running: + return + + self.logger.info("Stopping InfluxDB forwarder...") + self.running = False + + if self.forwarder_thread and self.forwarder_thread.is_alive(): + self.forwarder_thread.join(timeout=10.0) + if self.forwarder_thread.is_alive(): + self.logger.warning("Forwarder thread did not stop gracefully") + + # Закрываем write API и клиент + if self.write_api: + try: + self.write_api.close() + except Exception as e: + self.logger.error(f"Error closing write API: {e}", exc_info=True) + + if self.client: + try: + self.client.close() + except Exception as e: + self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True) + + self.logger.info("InfluxDB forwarder stopped") + + def get_stats(self) -> Dict[str, Any]: + """Получение статистики клиента.""" + return { + "enabled": self.config.enabled, + "initialized": self._initialized and self.client is not None, + "running": self.running, + "sent_count": self.sent_count, + "failed_count": self.failed_count, + "queue_size": self.message_queue.qsize(), + "url": self.config.url if self.config.enabled else None, + "bucket": self.config.bucket if self.config.enabled else None + } + + def close(self) -> None: + """Закрытие соединения с InfluxDB.""" + self.stop() + + +# Глобальный экземпляр клиента +_influxdb_instance: Optional[InfluxDBClient] = None + + +def get_influxdb_client() -> InfluxDBClient: + """ + Получение глобального экземпляра клиента InfluxDB. + + Returns: + Экземпляр InfluxDBClient + """ + global _influxdb_instance + if _influxdb_instance is None: + _influxdb_instance = InfluxDBClient() + return _influxdb_instance + diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index 7bd4ac0..a8a2293 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -13,6 +13,7 @@ from typing import Optional, Tuple from logger import get_logger from config import config from storage import get_storage +from influxdb_client import get_influxdb_client logger = get_logger(__name__) @@ -72,16 +73,22 @@ class MessageProcessor: self.logger.info("InfluxDB is disabled in configuration") return - # TODO: Инициализация InfluxDB клиента - # from influxdb_client import InfluxDBClient - # self.influxdb_client = InfluxDBClient(config.influxdb) - self.logger.info( - "InfluxDB initialization", - extra={ - "url": config.influxdb.url, - "bucket": config.influxdb.bucket - } - ) + try: + self.influxdb_client = get_influxdb_client() + self.logger.info( + "InfluxDB initialized", + extra={ + "url": config.influxdb.url, + "bucket": config.influxdb.bucket, + "org": config.influxdb.org + } + ) + except Exception as e: + self.logger.error( + f"Failed to initialize InfluxDB client: {e}", + exc_info=True + ) + self.influxdb_client = None def enqueue(self, interface: str, message: can.Message) -> bool: """ @@ -236,10 +243,20 @@ class MessageProcessor: extra={"batch_size": len(messages_to_save)} ) - # Отправляем в InfluxDB (если включено) + # Отправляем в InfluxDB (если включено) - пакетно if self.influxdb_client: + influx_messages = [] for interface, message in batch: - self._send_to_influxdb(interface, message) + influx_messages.append({ + "interface": interface, + "can_id": message.arbitration_id, + "dlc": message.dlc, + "data": message.data if message.data else b'', + "timestamp": message.timestamp + }) + + if influx_messages: + self.influxdb_client.write_messages_batch(influx_messages) # Обновляем счетчик обработанных сообщений self.processed_count += len(batch) @@ -340,7 +357,7 @@ class MessageProcessor: def _send_to_influxdb(self, interface: str, message: can.Message) -> None: """ - Отправка сообщения в InfluxDB. + Отправка сообщения в InfluxDB (для одиночных сообщений). Args: interface: Имя интерфейса @@ -350,9 +367,13 @@ class MessageProcessor: return try: - # TODO: Реализовать отправку в InfluxDB - # self.influxdb_client.write_message(interface, message) - pass + self.influxdb_client.write_message( + interface=interface, + can_id=message.arbitration_id, + dlc=message.dlc, + data=message.data if message.data else b'', + timestamp=message.timestamp + ) except Exception as e: self.logger.error( f"Failed to send message to InfluxDB: {e}", @@ -397,8 +418,13 @@ class MessageProcessor: ) if self.influxdb_client: - # self.influxdb_client.close() - pass + try: + self.influxdb_client.close() + except Exception as e: + self.logger.error( + f"Error closing InfluxDB client: {e}", + exc_info=True + ) self.logger.info( "Message processor stopped", @@ -425,4 +451,12 @@ class MessageProcessor: except Exception as e: self.logger.debug(f"Failed to get storage stats: {e}") + # Добавляем статистику InfluxDB, если он инициализирован + if self.influxdb_client: + try: + influx_stats = self.influxdb_client.get_stats() + stats["influxdb"] = influx_stats + except Exception as e: + self.logger.debug(f"Failed to get InfluxDB stats: {e}") + return stats diff --git a/can_sniffer/src/socket_can_example.py b/can_sniffer/src/socket_can_example.py deleted file mode 100644 index d6f6a8a..0000000 --- a/can_sniffer/src/socket_can_example.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -Пример использования модуля socket_can. -""" - -import time -import signal -import sys -from socket_can import CANSniffer -from logger import get_logger - -logger = get_logger(__name__) - -# Глобальная переменная для graceful shutdown -sniffer = None - - -def message_handler(interface: str, message): - """ - Обработчик CAN сообщений. - - Args: - interface: Имя интерфейса (например, 'can0') - message: CAN сообщение (can.Message) - """ - # Здесь можно добавить логику сохранения в SQLite/InfluxDB - logger.info( - "CAN message", - extra={ - "interface": interface, - "can_id": hex(message.arbitration_id), - "dlc": message.dlc, - "data": message.data.hex() if message.data else "", - "timestamp": message.timestamp - } - ) - - -def signal_handler(sig, frame): - """Обработчик сигналов для graceful shutdown.""" - logger.info("Received shutdown signal, stopping...") - if sniffer: - sniffer.stop() - sys.exit(0) - - -def main(): - """Пример использования CANSniffer.""" - global sniffer - - # Регистрируем обработчики сигналов - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) - - # Создаем sniffer с callback функцией - sniffer = CANSniffer(message_callback=message_handler) - - try: - # Запускаем чтение (можно использовать context manager) - sniffer.start() - - logger.info("CANSniffer started. Press Ctrl+C to stop.") - - # Основной цикл - просто ждем - while True: - time.sleep(1) - - # Периодически выводим статистику - stats = sniffer.get_stats() - logger.debug("Statistics", extra=stats) - - except KeyboardInterrupt: - logger.info("Keyboard interrupt received") - finally: - if sniffer: - sniffer.stop() - logger.info("Application stopped") - - -def example_with_context_manager(): - """Пример использования с context manager.""" - def message_handler(interface: str, message): - print(f"[{interface}] ID: {hex(message.arbitration_id)}, Data: {message.data.hex()}") - - # Использование с context manager (автоматический start/stop) - with CANSniffer(message_callback=message_handler) as sniffer: - # Читаем сообщения в течение 10 секунд - time.sleep(10) - - # После выхода из блока with, sniffer автоматически остановится - - -if __name__ == '__main__': - main() -