update influxdb

This commit is contained in:
2026-01-07 03:08:01 +03:00
parent 681021394c
commit cd89e87e77
2 changed files with 305 additions and 31 deletions

View File

@@ -103,6 +103,18 @@ class InfluxDBConfig(BaseModel):
default=10,
description="Таймаут подключения (секунды)"
)
max_retries: int = Field(
default=3,
description="Максимальное количество попыток повтора при ошибке"
)
retry_backoff: float = Field(
default=1.0,
description="Базовый интервал backoff для повторов (секунды)"
)
health_check_interval: int = Field(
default=30,
description="Интервал проверки здоровья соединения (секунды)"
)
class LoggingConfig(BaseModel):

View File

@@ -2,13 +2,15 @@
Модуль для работы с InfluxDB.
Предоставляет singleton класс для отправки CAN сообщений в InfluxDB
с поддержкой пакетной отправки и store-and-forward механизма.
с поддержкой пакетной отправки, store-and-forward механизма,
retry с backoff и health-check.
"""
import threading
import time
from queue import Queue, Empty
from typing import Optional, List, Tuple, Dict, Any
from typing import Optional, List, Tuple, Dict, Any, Callable
from enum import Enum
from config import config
from logger import get_logger
@@ -18,7 +20,7 @@ logger = get_logger(__name__)
# Импортируем InfluxDB клиент
try:
from influxdb_client import InfluxDBClient as InfluxClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions
INFLUXDB_AVAILABLE = True
except ImportError:
INFLUXDB_AVAILABLE = False
@@ -27,9 +29,18 @@ except ImportError:
WritePrecision = None
SYNCHRONOUS = None
ASYNCHRONOUS = None
WriteOptions = None
logger.warning("influxdb-client not installed. Install with: pip install influxdb-client")
class ConnectionStatus(Enum):
"""Статус соединения с InfluxDB."""
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
ERROR = "error"
class InfluxDBClient:
"""Singleton класс для работы с InfluxDB."""
@@ -59,9 +70,15 @@ class InfluxDBClient:
self.message_queue: Queue[Dict[str, Any]] = Queue()
self.running = False
self.forwarder_thread: Optional[threading.Thread] = None
self.health_check_thread: Optional[threading.Thread] = None
self.connection_status = ConnectionStatus.DISCONNECTED
self.last_health_check: Optional[float] = None
# Статистика
self.sent_count = 0
self.failed_count = 0
self.retry_count = 0
self.reconnect_count = 0
self._initialized = False
if not INFLUXDB_AVAILABLE:
@@ -83,6 +100,7 @@ class InfluxDBClient:
return
try:
self.connection_status = ConnectionStatus.CONNECTING
self.logger.info(
"Initializing InfluxDB client",
extra={
@@ -100,14 +118,26 @@ class InfluxDBClient:
timeout=self.config.timeout * 1000 # Конвертируем в миллисекунды
)
# Создаем write API для асинхронной записи
# Создаем write API для асинхронной записи с callbacks
write_options = ASYNCHRONOUS
# Устанавливаем callbacks для обработки успешных и неудачных записей
self.write_api = self.client.write_api(
write_options=ASYNCHRONOUS
write_options=write_options,
success_callback=self._on_write_success,
error_callback=self._on_write_error
)
self.logger.info("InfluxDB client initialized successfully")
# Проверяем соединение
if self._health_check():
self.connection_status = ConnectionStatus.CONNECTED
self.logger.info("InfluxDB client initialized successfully")
else:
self.connection_status = ConnectionStatus.ERROR
self.logger.warning("InfluxDB client initialized but health check failed")
except Exception as e:
self.connection_status = ConnectionStatus.ERROR
self.logger.error(
f"Failed to initialize InfluxDB client: {e}",
exc_info=True
@@ -115,6 +145,130 @@ class InfluxDBClient:
self.client = None
self.write_api = None
def _on_write_success(self, conf: Tuple[str, str, str], data: str) -> None:
"""
Callback для успешной записи в InfluxDB.
Args:
conf: Конфигурация записи (bucket, org, precision)
data: Данные, которые были записаны
"""
self.sent_count += 1
self.logger.debug(
"Successfully wrote to InfluxDB",
extra={"bucket": conf[0], "org": conf[1]}
)
def _on_write_error(self, conf: Tuple[str, str, str], data: str, exception: Exception) -> None:
"""
Callback для ошибки записи в InfluxDB.
Args:
conf: Конфигурация записи (bucket, org, precision)
data: Данные, которые не удалось записать
exception: Исключение, которое произошло
"""
self.failed_count += 1
self.logger.error(
f"Failed to write to InfluxDB: {exception}",
exc_info=True,
extra={"bucket": conf[0], "org": conf[1]}
)
# Если ошибка сети, помечаем соединение как проблемное
if isinstance(exception, (ConnectionError, TimeoutError)):
self.connection_status = ConnectionStatus.ERROR
def _health_check(self) -> bool:
"""
Проверка здоровья соединения с InfluxDB.
Returns:
True если соединение работает, False иначе
"""
if not self.client:
return False
try:
# Простая проверка - пытаемся получить информацию о buckets
# Это легковесная операция, которая проверяет соединение
buckets_api = self.client.buckets_api()
buckets_api.find_buckets() # Проверяем доступность API
self.last_health_check = time.time()
return True
except Exception as e:
self.logger.debug(f"InfluxDB health check failed: {e}")
return False
def _health_check_loop(self) -> None:
"""Цикл проверки здоровья соединения."""
self.logger.info("InfluxDB health check loop started")
while self.running:
try:
time.sleep(self.config.health_check_interval)
if not self.running:
break
# Проверяем здоровье соединения
is_healthy = self._health_check()
if is_healthy:
if self.connection_status != ConnectionStatus.CONNECTED:
self.logger.info("InfluxDB connection restored")
self.connection_status = ConnectionStatus.CONNECTED
else:
if self.connection_status == ConnectionStatus.CONNECTED:
self.logger.warning("InfluxDB connection lost, attempting reconnect...")
self.connection_status = ConnectionStatus.ERROR
self._reconnect()
except Exception as e:
self.logger.error(
f"Error in health check loop: {e}",
exc_info=True
)
self.logger.info("InfluxDB health check loop stopped")
def _reconnect(self) -> None:
"""Переподключение к InfluxDB."""
if self.connection_status == ConnectionStatus.CONNECTING:
return
self.connection_status = ConnectionStatus.CONNECTING
self.reconnect_count += 1
try:
# Закрываем старое соединение
if self.write_api:
try:
self.write_api.close()
except Exception:
pass
if self.client:
try:
self.client.close()
except Exception:
pass
# Пересоздаем клиент
self._init_client()
if self.connection_status == ConnectionStatus.CONNECTED:
self.logger.info("InfluxDB reconnected successfully")
else:
self.logger.warning("InfluxDB reconnection failed")
except Exception as e:
self.connection_status = ConnectionStatus.ERROR
self.logger.error(
f"Failed to reconnect to InfluxDB: {e}",
exc_info=True
)
def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> bool:
"""
Добавление сообщения в очередь для отправки в InfluxDB.
@@ -149,6 +303,91 @@ class InfluxDBClient:
)
return False
def _create_point(self, msg: Dict[str, Any]) -> Optional[Point]:
"""
Создание точки InfluxDB из сообщения.
Args:
msg: Словарь с данными сообщения
Returns:
Точка InfluxDB или None при ошибке
"""
if not Point:
return None
try:
data_hex = msg["data"].hex() if msg["data"] else ""
point = Point("can_message") \
.tag("interface", msg["interface"]) \
.tag("can_id", hex(msg["can_id"])) \
.field("can_id_int", msg["can_id"]) \
.field("dlc", msg["dlc"]) \
.field("data_hex", data_hex) \
.field("data_bytes", len(msg["data"])) \
.time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды
return point
except Exception as e:
self.logger.error(
f"Failed to create InfluxDB point: {e}",
exc_info=True
)
return None
def _write_with_retry(self, points: List[Point], retry_count: int = 0) -> bool:
"""
Отправка точек в InfluxDB с retry и backoff.
Args:
points: Список точек для отправки
retry_count: Текущее количество попыток
Returns:
True если отправка успешна
"""
if not self.write_api or not points:
return False
try:
# Отправляем батч
self.write_api.write(
bucket=self.config.bucket,
org=self.config.org,
record=points
)
return True
except Exception as e:
# Если это ошибка сети и есть попытки - повторяем
if isinstance(e, (ConnectionError, TimeoutError)) and retry_count < self.config.max_retries:
self.retry_count += 1
backoff_time = self.config.retry_backoff * (2 ** retry_count) # Exponential backoff
self.logger.warning(
f"InfluxDB write failed, retrying in {backoff_time:.2f}s (attempt {retry_count + 1}/{self.config.max_retries})",
extra={"error": str(e), "batch_size": len(points)}
)
time.sleep(backoff_time)
# Пытаемся переподключиться перед повтором
if self.connection_status != ConnectionStatus.CONNECTED:
self._reconnect()
# Повторяем отправку
return self._write_with_retry(points, retry_count + 1)
else:
# Ошибка не связана с сетью или закончились попытки
self.failed_count += len(points)
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(points), "retry_count": retry_count}
)
return False
def write_messages_batch(self, messages: List[Dict[str, Any]]) -> int:
"""
Пакетная отправка сообщений в InfluxDB.
@@ -162,36 +401,38 @@ class InfluxDBClient:
if not self.write_api or not messages:
return 0
# Проверяем соединение перед отправкой
if self.connection_status != ConnectionStatus.CONNECTED:
if not self._health_check():
self.logger.warning("InfluxDB connection not available, skipping batch")
self.failed_count += len(messages)
return 0
else:
self.connection_status = ConnectionStatus.CONNECTED
try:
# Создаем точки для InfluxDB
points = []
for msg in messages:
point = Point("can_message") \
.tag("interface", msg["interface"]) \
.tag("can_id", hex(msg["can_id"])) \
.field("can_id_int", msg["can_id"]) \
.field("dlc", msg["dlc"]) \
.field("data", msg["data"].hex()) \
.field("data_bytes", len(msg["data"])) \
.time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды
points.append(point)
point = self._create_point(msg)
if point:
points.append(point)
# Отправляем батч
self.write_api.write(
bucket=self.config.bucket,
org=self.config.org,
record=points
)
if not points:
self.logger.warning("No valid points created from messages")
return 0
sent = len(points)
self.sent_count += sent
self.logger.debug(
f"Sent {sent} messages to InfluxDB",
extra={"batch_size": sent}
)
return sent
# Отправляем с retry
if self._write_with_retry(points):
sent = len(points)
self.sent_count += sent
self.logger.debug(
f"Sent {sent} messages to InfluxDB",
extra={"batch_size": sent}
)
return sent
else:
return 0
except Exception as e:
self.failed_count += len(messages)
@@ -260,12 +501,23 @@ class InfluxDBClient:
return
self.running = True
# Запускаем forwarder поток
self.forwarder_thread = threading.Thread(
target=self._forwarder_loop,
name="InfluxDB-Forwarder",
daemon=True
)
self.forwarder_thread.start()
# Запускаем health check поток
self.health_check_thread = threading.Thread(
target=self._health_check_loop,
name="InfluxDB-HealthCheck",
daemon=True
)
self.health_check_thread.start()
self.logger.info("InfluxDB forwarder started")
def stop(self) -> None:
@@ -276,11 +528,17 @@ class InfluxDBClient:
self.logger.info("Stopping InfluxDB forwarder...")
self.running = False
# Ждем завершения потоков
if self.forwarder_thread and self.forwarder_thread.is_alive():
self.forwarder_thread.join(timeout=10.0)
if self.forwarder_thread.is_alive():
self.logger.warning("Forwarder thread did not stop gracefully")
if self.health_check_thread and self.health_check_thread.is_alive():
self.health_check_thread.join(timeout=5.0)
if self.health_check_thread.is_alive():
self.logger.warning("Health check thread did not stop gracefully")
# Закрываем write API и клиент
if self.write_api:
try:
@@ -294,6 +552,7 @@ class InfluxDBClient:
except Exception as e:
self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True)
self.connection_status = ConnectionStatus.DISCONNECTED
self.logger.info("InfluxDB forwarder stopped")
def get_stats(self) -> Dict[str, Any]:
@@ -302,9 +561,13 @@ class InfluxDBClient:
"enabled": self.config.enabled,
"initialized": self._initialized and self.client is not None,
"running": getattr(self, 'running', False),
"connection_status": self.connection_status.value,
"sent_count": self.sent_count,
"failed_count": self.failed_count,
"retry_count": self.retry_count,
"reconnect_count": self.reconnect_count,
"queue_size": self.message_queue.qsize(),
"last_health_check": self.last_health_check,
"url": self.config.url if self.config.enabled else None,
"bucket": self.config.bucket if self.config.enabled else None
}
@@ -329,4 +592,3 @@ def get_influxdb_client() -> InfluxDBClient:
if _influxdb_instance is None:
_influxdb_instance = InfluxDBClient()
return _influxdb_instance