delete unused influxdb

This commit is contained in:
2026-01-07 14:37:10 +03:00
parent 1383eb585d
commit 8ba620aa6c
2 changed files with 0 additions and 694 deletions

View File

@@ -1,6 +0,0 @@
"""Модуль для работы с InfluxDB."""
from .influxdb_client import InfluxDBClient, get_influxdb_client
__all__ = ['InfluxDBClient', 'get_influxdb_client']

View File

@@ -1,688 +0,0 @@
"""
Модуль для работы с InfluxDB.
Предоставляет singleton класс для отправки CAN сообщений в InfluxDB
с поддержкой пакетной отправки, store-and-forward механизма,
retry с backoff и health-check.
"""
import threading
import time
from queue import Queue, Empty
from typing import Optional, List, Tuple, Dict, Any, Callable
from enum import Enum
from config import config
from logger import get_logger
logger = get_logger(__name__)
# Импортируем InfluxDB клиент
try:
from influxdb_client import InfluxDBClient as InfluxClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS, WriteOptions
INFLUXDB_AVAILABLE = True
except ImportError:
INFLUXDB_AVAILABLE = False
InfluxClient = None
Point = None
WritePrecision = None
SYNCHRONOUS = None
ASYNCHRONOUS = None
WriteOptions = None
logger.warning("influxdb-client not installed. Install with: pip install influxdb-client")
class ConnectionStatus(Enum):
"""Статус соединения с InfluxDB."""
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
ERROR = "error"
class InfluxDBClient:
"""Singleton класс для работы с InfluxDB."""
_instance: Optional['InfluxDBClient'] = None
_lock = threading.Lock()
def __new__(cls):
"""Singleton паттерн для единого экземпляра клиента."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Инициализация клиента InfluxDB."""
# Проверяем, что инициализация выполняется только один раз
if hasattr(self, '_initialized'):
return
self.config = config.influxdb
self.logger = logger
# Инициализируем атрибуты по умолчанию
self.client: Optional[InfluxClient] = None
self.write_api = None
# Очередь с ограничением размера для предотвращения переполнения памяти
# Размер = batch_size * 10 (для буферизации нескольких батчей)
queue_maxsize = self.config.batch_size * 10
self.message_queue: Queue[Dict[str, Any]] = Queue(maxsize=queue_maxsize)
self.running = False
self.forwarder_thread: Optional[threading.Thread] = None
self.health_check_thread: Optional[threading.Thread] = None
self.connection_status = ConnectionStatus.DISCONNECTED
self.last_health_check: Optional[float] = None
# Статистика
self.sent_count = 0
self.failed_count = 0
self.retry_count = 0
self.reconnect_count = 0
self._initialized = False
if not INFLUXDB_AVAILABLE:
self.logger.error("InfluxDB client library not available")
return
# Инициализируем клиент
self._init_client()
self._initialized = True
def _init_client(self) -> None:
"""Инициализация клиента InfluxDB."""
if not self.config.enabled:
self.logger.info("InfluxDB is disabled in configuration")
return
if not INFLUXDB_AVAILABLE:
self.logger.error("InfluxDB client library not available")
return
try:
self.connection_status = ConnectionStatus.CONNECTING
self.logger.info(
"Initializing InfluxDB client",
extra={
"url": self.config.url,
"org": self.config.org,
"bucket": self.config.bucket
}
)
# Создаем клиент InfluxDB
self.client = InfluxClient(
url=self.config.url,
token=self.config.token,
org=self.config.org,
timeout=self.config.timeout * 1000 # Конвертируем в миллисекунды
)
# Создаем write API для асинхронной записи с callbacks
write_options = ASYNCHRONOUS
# Устанавливаем callbacks для обработки успешных и неудачных записей
self.write_api = self.client.write_api(
write_options=write_options,
success_callback=self._on_write_success,
error_callback=self._on_write_error
)
# Проверяем соединение
if self._health_check():
self.connection_status = ConnectionStatus.CONNECTED
self.logger.info("InfluxDB client initialized successfully")
else:
self.connection_status = ConnectionStatus.ERROR
self.logger.warning("InfluxDB client initialized but health check failed")
except Exception as e:
self.connection_status = ConnectionStatus.ERROR
self.logger.error(
f"Failed to initialize InfluxDB client: {e}",
exc_info=True
)
self.client = None
self.write_api = None
def _on_write_success(self, conf: Tuple[str, str, str], data: str) -> None:
"""
Callback для успешной записи в InfluxDB.
Args:
conf: Конфигурация записи (bucket, org, precision)
data: Данные, которые были записаны
"""
self.sent_count += 1
self.logger.debug(
"Successfully wrote to InfluxDB",
extra={"bucket": conf[0], "org": conf[1]}
)
def _on_write_error(self, conf: Tuple[str, str, str], data: str, exception: Exception) -> None:
"""
Callback для ошибки записи в InfluxDB.
Args:
conf: Конфигурация записи (bucket, org, precision)
data: Данные, которые не удалось записать
exception: Исключение, которое произошло
"""
self.failed_count += 1
self.logger.error(
f"Failed to write to InfluxDB: {exception}",
exc_info=True,
extra={"bucket": conf[0], "org": conf[1]}
)
# Если ошибка сети, помечаем соединение как проблемное
if isinstance(exception, (ConnectionError, TimeoutError)):
self.connection_status = ConnectionStatus.ERROR
def _health_check(self) -> bool:
"""
Проверка здоровья соединения с InfluxDB.
Returns:
True если соединение работает, False иначе
"""
if not self.client:
return False
try:
# Простая проверка - пытаемся получить информацию о buckets
# Это легковесная операция, которая проверяет соединение
buckets_api = self.client.buckets_api()
buckets_api.find_buckets() # Проверяем доступность API
self.last_health_check = time.time()
return True
except Exception as e:
self.logger.debug(f"InfluxDB health check failed: {e}")
return False
def _health_check_loop(self) -> None:
"""Цикл проверки здоровья соединения."""
self.logger.info("InfluxDB health check loop started")
while self.running:
try:
time.sleep(self.config.health_check_interval)
if not self.running:
break
# Проверяем здоровье соединения
is_healthy = self._health_check()
if is_healthy:
if self.connection_status != ConnectionStatus.CONNECTED:
self.logger.info("InfluxDB connection restored")
self.connection_status = ConnectionStatus.CONNECTED
else:
if self.connection_status == ConnectionStatus.CONNECTED:
self.logger.warning("InfluxDB connection lost, attempting reconnect...")
self.connection_status = ConnectionStatus.ERROR
self._reconnect()
except Exception as e:
self.logger.error(
f"Error in health check loop: {e}",
exc_info=True
)
self.logger.info("InfluxDB health check loop stopped")
def _reconnect(self) -> None:
"""Переподключение к InfluxDB."""
if self.connection_status == ConnectionStatus.CONNECTING:
return
self.connection_status = ConnectionStatus.CONNECTING
self.reconnect_count += 1
try:
# Закрываем старое соединение
if self.write_api:
try:
self.write_api.close()
except Exception:
pass
if self.client:
try:
self.client.close()
except Exception:
pass
# Пересоздаем клиент
self._init_client()
if self.connection_status == ConnectionStatus.CONNECTED:
self.logger.info("InfluxDB reconnected successfully")
else:
self.logger.warning("InfluxDB reconnection failed")
except Exception as e:
self.connection_status = ConnectionStatus.ERROR
self.logger.error(
f"Failed to reconnect to InfluxDB: {e}",
exc_info=True
)
def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> bool:
"""
Добавление сообщения в очередь для отправки в InfluxDB.
Args:
interface: Имя интерфейса (например, 'can0')
can_id: CAN ID сообщения
dlc: Data Length Code
data: Данные сообщения (bytes)
timestamp: Временная метка сообщения
Returns:
True если сообщение добавлено в очередь
"""
if not self.write_api:
return False
try:
# Добавляем сообщение в очередь для пакетной отправки
# Используем неблокирующий режим, если block=False
if block:
self.message_queue.put({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
})
else:
# Неблокирующий режим - если очередь полна, пропускаем сообщение
try:
self.message_queue.put_nowait({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
})
except:
# Очередь переполнена - пропускаем сообщение
self.failed_count += 1
return False
return True
except Exception as e:
self.logger.error(
f"Failed to queue message for InfluxDB: {e}",
exc_info=True
)
self.failed_count += 1
return False
def _create_point(self, msg: Dict[str, Any]) -> Optional[Point]:
"""
Создание точки InfluxDB из сообщения.
Args:
msg: Словарь с данными сообщения
Returns:
Точка InfluxDB или None при ошибке
"""
if not Point:
return None
try:
data_hex = msg["data"].hex() if msg["data"] else ""
point = Point("can_message") \
.tag("interface", msg["interface"]) \
.tag("can_id", hex(msg["can_id"])) \
.field("can_id_int", msg["can_id"]) \
.field("dlc", msg["dlc"]) \
.field("data_hex", data_hex) \
.field("data_bytes", len(msg["data"])) \
.time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды
return point
except Exception as e:
self.logger.error(
f"Failed to create InfluxDB point: {e}",
exc_info=True
)
return None
def _write_with_retry(self, points: List[Point], retry_count: int = 0) -> bool:
"""
Отправка точек в InfluxDB с retry и backoff.
Args:
points: Список точек для отправки
retry_count: Текущее количество попыток
Returns:
True если отправка успешна
"""
if not self.write_api or not points:
return False
try:
# Отправляем батч
self.write_api.write(
bucket=self.config.bucket,
org=self.config.org,
record=points
)
return True
except Exception as e:
# Если это ошибка сети и есть попытки - повторяем
if isinstance(e, (ConnectionError, TimeoutError)) and retry_count < self.config.max_retries:
self.retry_count += 1
backoff_time = self.config.retry_backoff * (2 ** retry_count) # Exponential backoff
self.logger.warning(
f"InfluxDB write failed, retrying in {backoff_time:.2f}s (attempt {retry_count + 1}/{self.config.max_retries})",
extra={"error": str(e), "batch_size": len(points)}
)
time.sleep(backoff_time)
# Пытаемся переподключиться перед повтором
if self.connection_status != ConnectionStatus.CONNECTED:
self._reconnect()
# Повторяем отправку
return self._write_with_retry(points, retry_count + 1)
else:
# Ошибка не связана с сетью или закончились попытки
self.failed_count += len(points)
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(points), "retry_count": retry_count}
)
return False
def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int:
"""
Пакетная отправка сообщений в InfluxDB.
Добавляет сообщения в очередь для асинхронной отправки через forwarder loop.
Args:
messages: Список словарей с данными сообщений
block: Блокировать ли при переполнении очереди
Returns:
Количество успешно добавленных в очередь сообщений
"""
if not self.write_api or not messages:
return 0
# Проверяем соединение перед добавлением в очередь
if self.connection_status != ConnectionStatus.CONNECTED:
if not self._health_check():
# Соединение недоступно - пропускаем батч без ошибки
self.failed_count += len(messages)
return 0
else:
self.connection_status = ConnectionStatus.CONNECTED
# Проверяем заполненность очереди перед добавлением
queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0
if queue_usage > 0.9 and not block:
# Очередь почти переполнена - пропускаем батч
self.failed_count += len(messages)
return 0
# Добавляем сообщения в очередь для асинхронной отправки
added_count = 0
for msg in messages:
try:
if block:
self.message_queue.put(msg)
else:
try:
self.message_queue.put_nowait(msg)
except:
# Очередь переполнена - пропускаем оставшиеся сообщения
break
added_count += 1
except Exception as e:
self.logger.debug(f"Failed to queue message: {e}")
break
if added_count < len(messages):
self.failed_count += (len(messages) - added_count)
return added_count
def _send_messages_batch(self, messages: List[Dict[str, Any]]) -> int:
"""
Непосредственная отправка батча сообщений в InfluxDB.
Этот метод вызывается из forwarder loop для реальной отправки данных.
Args:
messages: Список словарей с данными сообщений
Returns:
Количество успешно отправленных сообщений
"""
if not self.write_api or not messages:
return 0
# Проверяем соединение перед отправкой
if self.connection_status != ConnectionStatus.CONNECTED:
if not self._health_check():
self.logger.warning("InfluxDB connection not available, skipping batch")
self.failed_count += len(messages)
return 0
else:
self.connection_status = ConnectionStatus.CONNECTED
try:
# Создаем точки для InfluxDB
points = []
for msg in messages:
point = self._create_point(msg)
if point:
points.append(point)
if not points:
self.logger.warning("No valid points created from messages")
return 0
# Отправляем с retry
if self._write_with_retry(points):
sent = len(points)
self.sent_count += sent
self.logger.debug(
f"Sent {sent} messages to InfluxDB",
extra={"batch_size": sent}
)
return sent
else:
return 0
except Exception as e:
self.failed_count += len(messages)
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
)
return 0
def _forwarder_loop(self) -> None:
"""Основной цикл для отправки сообщений в InfluxDB."""
self.logger.info("InfluxDB forwarder loop started")
batch = []
last_flush_time = time.time()
while self.running or not self.message_queue.empty():
try:
# Собираем сообщения в батч
try:
message = self.message_queue.get(timeout=0.1)
batch.append(message)
except Empty:
pass
# Отправляем батч если он заполнен или прошло достаточно времени
current_time = time.time()
should_flush = (
len(batch) >= self.config.batch_size or
(batch and (current_time - last_flush_time) >= self.config.flush_interval)
)
if should_flush:
if batch:
# Отправляем батч напрямую в InfluxDB (не добавляем в очередь!)
self._send_messages_batch(batch)
batch = []
last_flush_time = current_time
except Exception as e:
self.logger.error(
f"Error in forwarder loop: {e}",
exc_info=True
)
time.sleep(0.1)
# Отправляем оставшиеся сообщения
if batch:
self._send_messages_batch(batch)
self.logger.info(
"InfluxDB forwarder loop stopped",
extra={
"sent_count": self.sent_count,
"failed_count": self.failed_count
}
)
def start(self) -> None:
"""Запуск forwarder потока для отправки сообщений."""
if not self.config.enabled or not self.write_api:
return
if hasattr(self, 'running') and self.running:
self.logger.warning("InfluxDB forwarder is already running")
return
self.running = True
# Запускаем forwarder поток
# НЕ используем daemon=True для корректного завершения
self.forwarder_thread = threading.Thread(
target=self._forwarder_loop,
name="InfluxDB-Forwarder",
daemon=False
)
self.forwarder_thread.start()
# Запускаем health check поток
# Health check может быть daemon, так как он не критичен при shutdown
self.health_check_thread = threading.Thread(
target=self._health_check_loop,
name="InfluxDB-HealthCheck",
daemon=True
)
self.health_check_thread.start()
self.logger.info("InfluxDB forwarder started")
def stop(self) -> None:
"""Остановка forwarder потока."""
if not hasattr(self, 'running') or not self.running:
return
self.logger.info("Stopping InfluxDB forwarder...")
self.running = False
# Даем время на обработку оставшихся сообщений в очереди
max_wait_time = 5.0
wait_start = time.time()
while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time:
time.sleep(0.1)
if not self.message_queue.empty():
remaining = self.message_queue.qsize()
self.logger.warning(
f"InfluxDB queue not empty after shutdown, {remaining} messages remaining"
)
# Ждем завершения потоков
if self.forwarder_thread and self.forwarder_thread.is_alive():
self.forwarder_thread.join(timeout=10.0)
if self.forwarder_thread.is_alive():
self.logger.warning("Forwarder thread did not stop gracefully")
if self.health_check_thread and self.health_check_thread.is_alive():
self.health_check_thread.join(timeout=2.0)
if self.health_check_thread.is_alive():
self.logger.warning("Health check thread did not stop gracefully")
# Закрываем write API и клиент
# Важно: закрываем в правильном порядке
if self.write_api:
try:
self.write_api.close()
self.write_api = None
except Exception as e:
self.logger.error(f"Error closing write API: {e}", exc_info=True)
if self.client:
try:
self.client.close()
self.client = None
except Exception as e:
self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True)
self.connection_status = ConnectionStatus.DISCONNECTED
self.logger.info("InfluxDB forwarder stopped")
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики клиента."""
return {
"enabled": self.config.enabled,
"initialized": self._initialized and self.client is not None,
"running": getattr(self, 'running', False),
"connection_status": self.connection_status.value,
"sent_count": self.sent_count,
"failed_count": self.failed_count,
"retry_count": self.retry_count,
"reconnect_count": self.reconnect_count,
"queue_size": self.message_queue.qsize(),
"last_health_check": self.last_health_check,
"url": self.config.url if self.config.enabled else None,
"bucket": self.config.bucket if self.config.enabled else None
}
def close(self) -> None:
"""Закрытие соединения с InfluxDB."""
self.stop()
# Глобальный экземпляр клиента
_influxdb_instance: Optional[InfluxDBClient] = None
def get_influxdb_client() -> InfluxDBClient:
"""
Получение глобального экземпляра клиента InfluxDB.
Returns:
Экземпляр InfluxDBClient
"""
global _influxdb_instance
if _influxdb_instance is None:
_influxdb_instance = InfluxDBClient()
return _influxdb_instance