add influxdb client

This commit is contained in:
2026-01-06 23:10:06 +03:00
parent d72ea56772
commit 09ff5c4ec4
5 changed files with 391 additions and 112 deletions

View File

@@ -1,4 +1,5 @@
pydantic>=2.0.0
pydantic-settings>=2.0.0
python-can>=4.0.0
influxdb-client>=1.36.0

View File

@@ -0,0 +1,6 @@
"""Модуль для работы с InfluxDB."""
from .influxdb_client import InfluxDBClient, get_influxdb_client
__all__ = ['InfluxDBClient', 'get_influxdb_client']

View File

@@ -0,0 +1,332 @@
"""
Модуль для работы с InfluxDB.
Предоставляет singleton класс для отправки CAN сообщений в InfluxDB
с поддержкой пакетной отправки и store-and-forward механизма.
"""
import threading
import time
from queue import Queue, Empty
from typing import Optional, List, Tuple, Dict, Any
from config import config
from logger import get_logger
logger = get_logger(__name__)
# Импортируем InfluxDB клиент
try:
from influxdb_client import InfluxDBClient as InfluxClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
INFLUXDB_AVAILABLE = True
except ImportError:
INFLUXDB_AVAILABLE = False
logger.warning("influxdb-client not installed. Install with: pip install influxdb-client")
class InfluxDBClient:
"""Singleton класс для работы с InfluxDB."""
_instance: Optional['InfluxDBClient'] = None
_lock = threading.Lock()
def __new__(cls):
"""Singleton паттерн для единого экземпляра клиента."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Инициализация клиента InfluxDB."""
# Проверяем, что инициализация выполняется только один раз
if hasattr(self, '_initialized'):
return
if not INFLUXDB_AVAILABLE:
self.logger = logger
self.client = None
self.write_api = None
self._initialized = False
self.logger.error("InfluxDB client library not available")
return
self.config = config.influxdb
self.logger = logger
self.client: Optional[InfluxClient] = None
self.write_api = None
# Очередь для пакетной отправки
self.message_queue: Queue[Dict[str, Any]] = Queue()
self.running = False
self.forwarder_thread: Optional[threading.Thread] = None
# Статистика
self.sent_count = 0
self.failed_count = 0
self.retry_count = 0
# Инициализируем клиент
self._init_client()
self._initialized = True
def _init_client(self) -> None:
"""Инициализация клиента InfluxDB."""
if not self.config.enabled:
self.logger.info("InfluxDB is disabled in configuration")
return
if not INFLUXDB_AVAILABLE:
self.logger.error("InfluxDB client library not available")
return
try:
self.logger.info(
"Initializing InfluxDB client",
extra={
"url": self.config.url,
"org": self.config.org,
"bucket": self.config.bucket
}
)
# Создаем клиент InfluxDB
self.client = InfluxClient(
url=self.config.url,
token=self.config.token,
org=self.config.org,
timeout=self.config.timeout * 1000 # Конвертируем в миллисекунды
)
# Создаем write API для асинхронной записи
self.write_api = self.client.write_api(
write_options=ASYNCHRONOUS
)
self.logger.info("InfluxDB client initialized successfully")
except Exception as e:
self.logger.error(
f"Failed to initialize InfluxDB client: {e}",
exc_info=True
)
self.client = None
self.write_api = None
def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> bool:
"""
Добавление сообщения в очередь для отправки в InfluxDB.
Args:
interface: Имя интерфейса (например, 'can0')
can_id: CAN ID сообщения
dlc: Data Length Code
data: Данные сообщения (bytes)
timestamp: Временная метка сообщения
Returns:
True если сообщение добавлено в очередь
"""
if not self.write_api:
return False
try:
# Добавляем сообщение в очередь для пакетной отправки
self.message_queue.put({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
})
return True
except Exception as e:
self.logger.error(
f"Failed to queue message for InfluxDB: {e}",
exc_info=True
)
return False
def write_messages_batch(self, messages: List[Dict[str, Any]]) -> int:
"""
Пакетная отправка сообщений в InfluxDB.
Args:
messages: Список словарей с данными сообщений
Returns:
Количество успешно отправленных сообщений
"""
if not self.write_api or not messages:
return 0
try:
# Создаем точки для InfluxDB
points = []
for msg in messages:
point = Point("can_message") \
.tag("interface", msg["interface"]) \
.tag("can_id", hex(msg["can_id"])) \
.field("can_id_int", msg["can_id"]) \
.field("dlc", msg["dlc"]) \
.field("data", msg["data"].hex()) \
.field("data_bytes", len(msg["data"])) \
.time(int(msg["timestamp"] * 1e9), WritePrecision.NS) # Наносекунды
points.append(point)
# Отправляем батч
self.write_api.write(
bucket=self.config.bucket,
org=self.config.org,
record=points
)
sent = len(points)
self.sent_count += sent
self.logger.debug(
f"Sent {sent} messages to InfluxDB",
extra={"batch_size": sent}
)
return sent
except Exception as e:
self.failed_count += len(messages)
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
)
return 0
def _forwarder_loop(self) -> None:
"""Основной цикл для отправки сообщений в InfluxDB."""
self.logger.info("InfluxDB forwarder loop started")
batch = []
last_flush_time = time.time()
while self.running or not self.message_queue.empty():
try:
# Собираем сообщения в батч
try:
message = self.message_queue.get(timeout=0.1)
batch.append(message)
except Empty:
pass
# Отправляем батч если он заполнен или прошло достаточно времени
current_time = time.time()
should_flush = (
len(batch) >= self.config.batch_size or
(batch and (current_time - last_flush_time) >= self.config.flush_interval)
)
if should_flush:
if batch:
self.write_messages_batch(batch)
batch = []
last_flush_time = current_time
except Exception as e:
self.logger.error(
f"Error in forwarder loop: {e}",
exc_info=True
)
time.sleep(0.1)
# Отправляем оставшиеся сообщения
if batch:
self.write_messages_batch(batch)
self.logger.info(
"InfluxDB forwarder loop stopped",
extra={
"sent_count": self.sent_count,
"failed_count": self.failed_count
}
)
def start(self) -> None:
"""Запуск forwarder потока для отправки сообщений."""
if not self.config.enabled or not self.write_api:
return
if self.running:
self.logger.warning("InfluxDB forwarder is already running")
return
self.running = True
self.forwarder_thread = threading.Thread(
target=self._forwarder_loop,
name="InfluxDB-Forwarder",
daemon=True
)
self.forwarder_thread.start()
self.logger.info("InfluxDB forwarder started")
def stop(self) -> None:
"""Остановка forwarder потока."""
if not self.running:
return
self.logger.info("Stopping InfluxDB forwarder...")
self.running = False
if self.forwarder_thread and self.forwarder_thread.is_alive():
self.forwarder_thread.join(timeout=10.0)
if self.forwarder_thread.is_alive():
self.logger.warning("Forwarder thread did not stop gracefully")
# Закрываем write API и клиент
if self.write_api:
try:
self.write_api.close()
except Exception as e:
self.logger.error(f"Error closing write API: {e}", exc_info=True)
if self.client:
try:
self.client.close()
except Exception as e:
self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True)
self.logger.info("InfluxDB forwarder stopped")
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики клиента."""
return {
"enabled": self.config.enabled,
"initialized": self._initialized and self.client is not None,
"running": self.running,
"sent_count": self.sent_count,
"failed_count": self.failed_count,
"queue_size": self.message_queue.qsize(),
"url": self.config.url if self.config.enabled else None,
"bucket": self.config.bucket if self.config.enabled else None
}
def close(self) -> None:
"""Закрытие соединения с InfluxDB."""
self.stop()
# Глобальный экземпляр клиента
_influxdb_instance: Optional[InfluxDBClient] = None
def get_influxdb_client() -> InfluxDBClient:
"""
Получение глобального экземпляра клиента InfluxDB.
Returns:
Экземпляр InfluxDBClient
"""
global _influxdb_instance
if _influxdb_instance is None:
_influxdb_instance = InfluxDBClient()
return _influxdb_instance

View File

@@ -13,6 +13,7 @@ from typing import Optional, Tuple
from logger import get_logger
from config import config
from storage import get_storage
from influxdb_client import get_influxdb_client
logger = get_logger(__name__)
@@ -72,16 +73,22 @@ class MessageProcessor:
self.logger.info("InfluxDB is disabled in configuration")
return
# TODO: Инициализация InfluxDB клиента
# from influxdb_client import InfluxDBClient
# self.influxdb_client = InfluxDBClient(config.influxdb)
self.logger.info(
"InfluxDB initialization",
extra={
"url": config.influxdb.url,
"bucket": config.influxdb.bucket
}
)
try:
self.influxdb_client = get_influxdb_client()
self.logger.info(
"InfluxDB initialized",
extra={
"url": config.influxdb.url,
"bucket": config.influxdb.bucket,
"org": config.influxdb.org
}
)
except Exception as e:
self.logger.error(
f"Failed to initialize InfluxDB client: {e}",
exc_info=True
)
self.influxdb_client = None
def enqueue(self, interface: str, message: can.Message) -> bool:
"""
@@ -236,10 +243,20 @@ class MessageProcessor:
extra={"batch_size": len(messages_to_save)}
)
# Отправляем в InfluxDB (если включено)
# Отправляем в InfluxDB (если включено) - пакетно
if self.influxdb_client:
influx_messages = []
for interface, message in batch:
self._send_to_influxdb(interface, message)
influx_messages.append({
"interface": interface,
"can_id": message.arbitration_id,
"dlc": message.dlc,
"data": message.data if message.data else b'',
"timestamp": message.timestamp
})
if influx_messages:
self.influxdb_client.write_messages_batch(influx_messages)
# Обновляем счетчик обработанных сообщений
self.processed_count += len(batch)
@@ -340,7 +357,7 @@ class MessageProcessor:
def _send_to_influxdb(self, interface: str, message: can.Message) -> None:
"""
Отправка сообщения в InfluxDB.
Отправка сообщения в InfluxDB (для одиночных сообщений).
Args:
interface: Имя интерфейса
@@ -350,9 +367,13 @@ class MessageProcessor:
return
try:
# TODO: Реализовать отправку в InfluxDB
# self.influxdb_client.write_message(interface, message)
pass
self.influxdb_client.write_message(
interface=interface,
can_id=message.arbitration_id,
dlc=message.dlc,
data=message.data if message.data else b'',
timestamp=message.timestamp
)
except Exception as e:
self.logger.error(
f"Failed to send message to InfluxDB: {e}",
@@ -397,8 +418,13 @@ class MessageProcessor:
)
if self.influxdb_client:
# self.influxdb_client.close()
pass
try:
self.influxdb_client.close()
except Exception as e:
self.logger.error(
f"Error closing InfluxDB client: {e}",
exc_info=True
)
self.logger.info(
"Message processor stopped",
@@ -425,4 +451,12 @@ class MessageProcessor:
except Exception as e:
self.logger.debug(f"Failed to get storage stats: {e}")
# Добавляем статистику InfluxDB, если он инициализирован
if self.influxdb_client:
try:
influx_stats = self.influxdb_client.get_stats()
stats["influxdb"] = influx_stats
except Exception as e:
self.logger.debug(f"Failed to get InfluxDB stats: {e}")
return stats

View File

@@ -1,94 +0,0 @@
"""
Пример использования модуля socket_can.
"""
import time
import signal
import sys
from socket_can import CANSniffer
from logger import get_logger
logger = get_logger(__name__)
# Глобальная переменная для graceful shutdown
sniffer = None
def message_handler(interface: str, message):
"""
Обработчик CAN сообщений.
Args:
interface: Имя интерфейса (например, 'can0')
message: CAN сообщение (can.Message)
"""
# Здесь можно добавить логику сохранения в SQLite/InfluxDB
logger.info(
"CAN message",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
def signal_handler(sig, frame):
"""Обработчик сигналов для graceful shutdown."""
logger.info("Received shutdown signal, stopping...")
if sniffer:
sniffer.stop()
sys.exit(0)
def main():
"""Пример использования CANSniffer."""
global sniffer
# Регистрируем обработчики сигналов
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Создаем sniffer с callback функцией
sniffer = CANSniffer(message_callback=message_handler)
try:
# Запускаем чтение (можно использовать context manager)
sniffer.start()
logger.info("CANSniffer started. Press Ctrl+C to stop.")
# Основной цикл - просто ждем
while True:
time.sleep(1)
# Периодически выводим статистику
stats = sniffer.get_stats()
logger.debug("Statistics", extra=stats)
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
finally:
if sniffer:
sniffer.stop()
logger.info("Application stopped")
def example_with_context_manager():
"""Пример использования с context manager."""
def message_handler(interface: str, message):
print(f"[{interface}] ID: {hex(message.arbitration_id)}, Data: {message.data.hex()}")
# Использование с context manager (автоматический start/stop)
with CANSniffer(message_callback=message_handler) as sniffer:
# Читаем сообщения в течение 10 секунд
time.sleep(10)
# После выхода из блока with, sniffer автоматически остановится
if __name__ == '__main__':
main()