From 1383eb585d44f33ef2d12beeff7fb76a554998ca Mon Sep 17 00:00:00 2001 From: qsethuk Date: Wed, 7 Jan 2026 13:29:17 +0300 Subject: [PATCH] remove influxdb and add postgresql --- can_sniffer/requirements.txt | 2 +- can_sniffer/src/config.py | 54 +- can_sniffer/src/handlers/__init__.py | 4 +- ...luxdb_handler.py => postgresql_handler.py} | 85 +-- can_sniffer/src/main.py | 4 +- .../src/postgresql_handler/__init__.py | 12 + .../postgresql_handler/postgresql_client.py | 533 ++++++++++++++++++ .../src/socket_can/message_processor.py | 10 +- can_sniffer/src/storage/storage.py | 2 +- 9 files changed, 629 insertions(+), 77 deletions(-) rename can_sniffer/src/handlers/{influxdb_handler.py => postgresql_handler.py} (60%) create mode 100644 can_sniffer/src/postgresql_handler/__init__.py create mode 100644 can_sniffer/src/postgresql_handler/postgresql_client.py diff --git a/can_sniffer/requirements.txt b/can_sniffer/requirements.txt index 4277c2a..3c602f4 100644 --- a/can_sniffer/requirements.txt +++ b/can_sniffer/requirements.txt @@ -1,5 +1,5 @@ pydantic>=2.0.0 pydantic-settings>=2.0.0 python-can>=4.0.0 -influxdb-client>=1.36.0 +psycopg2-binary>=2.9.0 diff --git a/can_sniffer/src/config.py b/can_sniffer/src/config.py index 3080546..fd8d1bb 100644 --- a/can_sniffer/src/config.py +++ b/can_sniffer/src/config.py @@ -66,30 +66,34 @@ class StorageConfig(BaseModel): ) -class InfluxDBConfig(BaseModel): - """Конфигурация InfluxDB.""" +class PostgreSQLConfig(BaseModel): + """Конфигурация PostgreSQL.""" model_config = {"extra": "ignore"} enabled: bool = Field( default=True, - description="Включить отправку данных в InfluxDB" + description="Включить отправку данных в PostgreSQL" ) - url: str = Field( - default="http://localhost:8086", - description="URL сервера InfluxDB" + host: str = Field( + default="localhost", + description="Хост PostgreSQL сервера" ) - token: str = Field( + port: int = Field( + default=5432, + description="Порт PostgreSQL сервера" + ) + database: str = Field( + default="can_bus", + description="Имя базы данных" + ) + user: str = Field( + default="postgres", + description="Имя пользователя PostgreSQL" + ) + password: str = Field( default="", - description="Токен аутентификации InfluxDB" - ) - org: str = Field( - default="", - description="Организация InfluxDB" - ) - bucket: str = Field( - default="can_data", - description="Имя bucket для данных" + description="Пароль пользователя PostgreSQL" ) batch_size: int = Field( default=1000, @@ -99,10 +103,6 @@ class InfluxDBConfig(BaseModel): default=5, description="Интервал отправки батча (секунды)" ) - timeout: int = Field( - default=10, - description="Таймаут подключения (секунды)" - ) max_retries: int = Field( default=3, description="Максимальное количество попыток повтора при ошибке" @@ -111,9 +111,13 @@ class InfluxDBConfig(BaseModel): default=1.0, description="Базовый интервал backoff для повторов (секунды)" ) - health_check_interval: int = Field( - default=30, - description="Интервал проверки здоровья соединения (секунды)" + connection_pool_size: int = Field( + default=5, + description="Размер пула соединений" + ) + connection_timeout: int = Field( + default=10, + description="Таймаут подключения (секунды)" ) @@ -183,7 +187,7 @@ class Config(BaseSettings): can: CanConfig = Field(default_factory=CanConfig) storage: StorageConfig = Field(default_factory=StorageConfig) - influxdb: InfluxDBConfig = Field(default_factory=InfluxDBConfig) + postgresql: PostgreSQLConfig = Field(default_factory=PostgreSQLConfig) logging: LoggingConfig = Field(default_factory=LoggingConfig) general: GeneralConfig = Field(default_factory=GeneralConfig) @@ -291,7 +295,7 @@ class Config(BaseSettings): """Получение всей секции конфигурации. Args: - section: Имя секции, например 'can', 'influxdb' + section: Имя секции, например 'can', 'postgresql' Returns: Объект конфигурации секции diff --git a/can_sniffer/src/handlers/__init__.py b/can_sniffer/src/handlers/__init__.py index 66c3639..6c6b784 100644 --- a/can_sniffer/src/handlers/__init__.py +++ b/can_sniffer/src/handlers/__init__.py @@ -6,11 +6,11 @@ from .base import BaseHandler from .storage_handler import StorageHandler -from .influxdb_handler import InfluxDBHandler +from .postgresql_handler import PostgreSQLHandler __all__ = [ 'BaseHandler', 'StorageHandler', - 'InfluxDBHandler', + 'PostgreSQLHandler', ] diff --git a/can_sniffer/src/handlers/influxdb_handler.py b/can_sniffer/src/handlers/postgresql_handler.py similarity index 60% rename from can_sniffer/src/handlers/influxdb_handler.py rename to can_sniffer/src/handlers/postgresql_handler.py index 1553bcc..9d90224 100644 --- a/can_sniffer/src/handlers/influxdb_handler.py +++ b/can_sniffer/src/handlers/postgresql_handler.py @@ -1,53 +1,53 @@ """ -Обработчик для отправки CAN сообщений в InfluxDB. +Обработчик для отправки CAN сообщений в PostgreSQL. """ from typing import List, Dict, Any, Optional from can_frame import CANFrame from .base import BaseHandler -from influxdb_handler import get_influxdb_client -from influxdb_handler.influxdb_client import ConnectionStatus +from postgresql_handler import get_postgresql_client +from postgresql_handler.postgresql_client import ConnectionStatus from config import config -class InfluxDBHandler(BaseHandler): - """Обработчик для отправки в InfluxDB.""" +class PostgreSQLHandler(BaseHandler): + """Обработчик для отправки в PostgreSQL.""" def __init__(self, enabled: Optional[bool] = None): """ - Инициализация обработчика InfluxDB. + Инициализация обработчика PostgreSQL. Args: - enabled: Включен ли обработчик. Если None, берется из config.influxdb.enabled + enabled: Включен ли обработчик. Если None, берется из config.postgresql.enabled """ super().__init__( - name="influxdb", - enabled=enabled if enabled is not None else config.influxdb.enabled + name="postgresql", + enabled=enabled if enabled is not None else config.postgresql.enabled ) - self.influxdb_client = None + self.postgresql_client = None def initialize(self) -> bool: - """Инициализация InfluxDB клиента.""" + """Инициализация PostgreSQL клиента.""" if not self.enabled: return False try: - self.influxdb_client = get_influxdb_client() + self.postgresql_client = get_postgresql_client() self._initialized = True - self.logger.info("InfluxDB handler initialized") + self.logger.info("PostgreSQL handler initialized") return True except Exception as e: - self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True) - self.influxdb_client = None + self.logger.error(f"Failed to initialize PostgreSQL: {e}", exc_info=True) + self.postgresql_client = None return False def handle(self, frame: CANFrame) -> bool: """Обработка одного CAN фрейма.""" - if not self.enabled or not self._initialized or not self.influxdb_client: + if not self.enabled or not self._initialized or not self.postgresql_client: return False try: - return self.influxdb_client.write_message( + return self.postgresql_client.write_message( interface=frame.bus, can_id=frame.can_id, dlc=frame.dlc, @@ -56,7 +56,7 @@ class InfluxDBHandler(BaseHandler): ) except Exception as e: self.logger.error( - f"Failed to send frame to InfluxDB: {e}", + f"Failed to send frame to PostgreSQL: {e}", exc_info=True, extra={"can_id": frame.can_id_hex} ) @@ -66,40 +66,43 @@ class InfluxDBHandler(BaseHandler): """ Обработка батча CAN фреймов. - Неблокирующий метод - при ошибках или переполнении очереди InfluxDB + Неблокирующий метод - при ошибках или переполнении очереди PostgreSQL просто пропускает батч, не останавливая обработку других handlers. """ - if not self.enabled or not self._initialized or not self.influxdb_client or not frames: + if not self.enabled or not self._initialized or not self.postgresql_client or not frames: return 0 try: # Проверяем состояние соединения перед обработкой - if hasattr(self.influxdb_client, 'connection_status'): - if self.influxdb_client.connection_status != ConnectionStatus.CONNECTED: + if hasattr(self.postgresql_client, 'connection_status'): + if self.postgresql_client.connection_status != ConnectionStatus.CONNECTED: # Соединение недоступно - пропускаем батч без ошибки return 0 - # Конвертируем CANFrame в формат для InfluxDB - influx_messages = [] + # Конвертируем CANFrame в формат для PostgreSQL + postgresql_messages = [] for frame in frames: - influx_messages.append({ + postgresql_messages.append({ "interface": frame.bus, "can_id": frame.can_id, + "can_id_hex": frame.can_id_hex, "dlc": frame.dlc, "data": frame.data, - "timestamp": frame.timestamp # float timestamp в секундах + "data_hex": frame.data_hex, + "timestamp": frame.timestamp, # float timestamp в секундах + "is_extended": frame.is_extended }) - if influx_messages: - # Пытаемся добавить в очередь InfluxDB (неблокирующий режим) + if postgresql_messages: + # Пытаемся добавить в очередь PostgreSQL (неблокирующий режим) # Если очередь переполнена, пропускаем батч - return self.influxdb_client.write_messages_batch(influx_messages) + return self.postgresql_client.write_messages_batch(postgresql_messages) return 0 except Exception as e: # Ошибка не должна останавливать обработку других handlers # Логируем, но не пробрасываем исключение self.logger.error( - f"Failed to send frames batch to InfluxDB: {e}", + f"Failed to send frames batch to PostgreSQL: {e}", exc_info=True, extra={"batch_size": len(frames)} ) @@ -107,25 +110,25 @@ class InfluxDBHandler(BaseHandler): def flush(self) -> None: """Принудительная отправка накопленных данных.""" - # InfluxDB forwarder сам управляет flush через свой цикл + # PostgreSQL forwarder сам управляет flush через свой цикл # Но можно вызвать явный flush если нужно pass def shutdown(self) -> None: """Корректное завершение работы обработчика.""" - if self.influxdb_client: + if self.postgresql_client: try: - self.influxdb_client.close() - self.logger.info("InfluxDB handler closed") + self.postgresql_client.close() + self.logger.info("PostgreSQL handler closed") except Exception as e: - self.logger.error(f"Error closing InfluxDB: {e}", exc_info=True) + self.logger.error(f"Error closing PostgreSQL: {e}", exc_info=True) self._initialized = False def get_stats(self) -> Dict[str, Any]: """Получение статистики обработчика.""" - if self.influxdb_client: + if self.postgresql_client: try: - stats = self.influxdb_client.get_stats() + stats = self.postgresql_client.get_stats() stats["handler"] = self.name stats["enabled"] = self.enabled stats["initialized"] = self._initialized @@ -139,10 +142,10 @@ class InfluxDBHandler(BaseHandler): } def start(self) -> None: - """Запуск InfluxDB forwarder (если используется).""" - if self.influxdb_client: + """Запуск PostgreSQL forwarder (если используется).""" + if self.postgresql_client: try: - self.influxdb_client.start() + self.postgresql_client.start() except Exception as e: - self.logger.error(f"Failed to start InfluxDB forwarder: {e}", exc_info=True) + self.logger.error(f"Failed to start PostgreSQL forwarder: {e}", exc_info=True) diff --git a/can_sniffer/src/main.py b/can_sniffer/src/main.py index 52e0de7..e900e21 100644 --- a/can_sniffer/src/main.py +++ b/can_sniffer/src/main.py @@ -48,8 +48,8 @@ def main(): }) logger.info("Configuration loaded", extra={ - "influxdb_enabled": config.influxdb.enabled, - "influxdb_url": config.influxdb.url if config.influxdb.enabled else None, + "postgresql_enabled": config.postgresql.enabled, + "postgresql_host": config.postgresql.host if config.postgresql.enabled else None, "storage_path": config.storage.database_path }) diff --git a/can_sniffer/src/postgresql_handler/__init__.py b/can_sniffer/src/postgresql_handler/__init__.py new file mode 100644 index 0000000..175ab2f --- /dev/null +++ b/can_sniffer/src/postgresql_handler/__init__.py @@ -0,0 +1,12 @@ +""" +Модуль для работы с PostgreSQL. + +Предоставляет singleton класс для отправки CAN сообщений в PostgreSQL +с поддержкой пакетной отправки, connection pooling, retry с backoff. +""" + +from typing import Optional +from .postgresql_client import PostgreSQLClient, get_postgresql_client + +__all__ = ['PostgreSQLClient', 'get_postgresql_client'] + diff --git a/can_sniffer/src/postgresql_handler/postgresql_client.py b/can_sniffer/src/postgresql_handler/postgresql_client.py new file mode 100644 index 0000000..9948eb7 --- /dev/null +++ b/can_sniffer/src/postgresql_handler/postgresql_client.py @@ -0,0 +1,533 @@ +""" +Модуль для работы с PostgreSQL. + +Предоставляет singleton класс для отправки CAN сообщений в PostgreSQL +с поддержкой пакетной отправки, connection pooling, retry с backoff. +""" + +import threading +import time +from queue import Queue, Empty +from typing import Optional, List, Dict, Any +from enum import Enum + +from config import config +from logger import get_logger + +logger = get_logger(__name__) + +# Импортируем PostgreSQL клиент +try: + import psycopg2 + from psycopg2 import pool, sql + from psycopg2.extras import execute_batch + POSTGRESQL_AVAILABLE = True +except ImportError: + POSTGRESQL_AVAILABLE = False + psycopg2 = None + pool = None + sql = None + execute_batch = None + logger.warning("psycopg2 not installed. Install with: pip install psycopg2-binary") + + +class ConnectionStatus(Enum): + """Статус соединения с PostgreSQL.""" + DISCONNECTED = "disconnected" + CONNECTING = "connecting" + CONNECTED = "connected" + ERROR = "error" + + +class PostgreSQLClient: + """Singleton класс для работы с PostgreSQL.""" + + _instance: Optional['PostgreSQLClient'] = None + _lock = threading.Lock() + + def __new__(cls): + """Singleton паттерн для единого экземпляра клиента.""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """Инициализация клиента PostgreSQL.""" + # Проверяем, что инициализация выполняется только один раз + if hasattr(self, '_initialized'): + return + + self.config = config.postgresql + self.logger = logger + + # Инициализируем атрибуты по умолчанию + self.connection_pool: Optional[pool.ThreadedConnectionPool] = None + self.message_queue: Queue[Dict[str, Any]] = Queue() + self.running = False + self.forwarder_thread: Optional[threading.Thread] = None + self.connection_status = ConnectionStatus.DISCONNECTED + + # Статистика + self.sent_count = 0 + self.failed_count = 0 + self.retry_count = 0 + self.reconnect_count = 0 + self._initialized = False + + if not POSTGRESQL_AVAILABLE: + self.logger.error("PostgreSQL client library not available") + return + + # Инициализируем клиент + self._init_client() + self._initialized = True + + def _init_client(self) -> None: + """Инициализация пула соединений PostgreSQL.""" + if not POSTGRESQL_AVAILABLE: + return + + try: + # Создаем пул соединений + self.connection_pool = pool.ThreadedConnectionPool( + minconn=1, + maxconn=self.config.connection_pool_size, + host=self.config.host, + port=self.config.port, + database=self.config.database, + user=self.config.user, + password=self.config.password, + connect_timeout=self.config.connection_timeout + ) + + # Проверяем соединение + conn = self.connection_pool.getconn() + if conn: + # Создаем таблицу если её нет + self._create_table(conn) + self.connection_pool.putconn(conn) + self.connection_status = ConnectionStatus.CONNECTED + self.logger.info("PostgreSQL connection pool initialized") + else: + self.connection_status = ConnectionStatus.ERROR + self.logger.error("Failed to get connection from pool") + + except Exception as e: + self.connection_status = ConnectionStatus.ERROR + self.logger.error( + f"Failed to initialize PostgreSQL connection pool: {e}", + exc_info=True + ) + + def _create_table(self, conn) -> None: + """Создание таблицы для CAN сообщений если её нет.""" + try: + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS can_messages ( + id BIGSERIAL PRIMARY KEY, + timestamp TIMESTAMP NOT NULL, + interface VARCHAR(50) NOT NULL, + can_id INTEGER NOT NULL, + can_id_hex VARCHAR(10) NOT NULL, + is_extended BOOLEAN NOT NULL, + dlc INTEGER NOT NULL, + data BYTEA NOT NULL, + data_hex VARCHAR(32) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); + + CREATE INDEX IF NOT EXISTS idx_can_messages_timestamp ON can_messages(timestamp); + CREATE INDEX IF NOT EXISTS idx_can_messages_can_id ON can_messages(can_id); + CREATE INDEX IF NOT EXISTS idx_can_messages_interface ON can_messages(interface); + """) + conn.commit() + cursor.close() + self.logger.info("PostgreSQL table 'can_messages' created or verified") + except Exception as e: + conn.rollback() + self.logger.error(f"Failed to create table: {e}", exc_info=True) + raise + + def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float, block: bool = False) -> bool: + """ + Добавление CAN сообщения в очередь для отправки. + + Args: + interface: Имя интерфейса (например, 'can0') + can_id: CAN ID сообщения + dlc: Data Length Code + data: Данные сообщения (bytes) + timestamp: Временная метка сообщения (float в секундах) + block: Блокировать ли при переполнении очереди + + Returns: + True если сообщение добавлено в очередь + """ + if not self.connection_pool: + return False + + try: + # Добавляем сообщение в очередь для пакетной отправки + if block: + self.message_queue.put({ + "interface": interface, + "can_id": can_id, + "can_id_hex": hex(can_id), + "dlc": dlc, + "data": data, + "data_hex": data.hex().upper(), + "timestamp": timestamp, + "is_extended": can_id > 0x7FF + }) + else: + try: + self.message_queue.put_nowait({ + "interface": interface, + "can_id": can_id, + "can_id_hex": hex(can_id), + "dlc": dlc, + "data": data, + "data_hex": data.hex().upper(), + "timestamp": timestamp, + "is_extended": can_id > 0x7FF + }) + except: + # Очередь переполнена - пропускаем сообщение + self.failed_count += 1 + return False + return True + except Exception as e: + self.logger.error( + f"Failed to queue message for PostgreSQL: {e}", + exc_info=True + ) + self.failed_count += 1 + return False + + def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int: + """ + Пакетная отправка сообщений в PostgreSQL. + + Добавляет сообщения в очередь для асинхронной отправки через forwarder loop. + + Args: + messages: Список словарей с данными сообщений + block: Блокировать ли при переполнении очереди + + Returns: + Количество успешно добавленных в очередь сообщений + """ + if not self.connection_pool or not messages: + return 0 + + # Проверяем соединение перед добавлением в очередь + if self.connection_status != ConnectionStatus.CONNECTED: + if not self._health_check(): + # Соединение недоступно - пропускаем батч без ошибки + self.failed_count += len(messages) + return 0 + else: + self.connection_status = ConnectionStatus.CONNECTED + + # Проверяем заполненность очереди перед добавлением + queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0 + if queue_usage > 0.9 and not block: + # Очередь почти переполнена - пропускаем батч + self.failed_count += len(messages) + return 0 + + # Добавляем сообщения в очередь для асинхронной отправки + added_count = 0 + for msg in messages: + try: + if block: + self.message_queue.put(msg) + else: + try: + self.message_queue.put_nowait(msg) + except: + # Очередь переполнена - пропускаем оставшиеся сообщения + break + added_count += 1 + except Exception as e: + self.logger.debug(f"Failed to queue message: {e}") + break + + if added_count < len(messages): + self.failed_count += (len(messages) - added_count) + + return added_count + + def _send_messages_batch(self, messages: List[Dict[str, Any]]) -> int: + """ + Непосредственная отправка батча сообщений в PostgreSQL. + + Этот метод вызывается из forwarder loop для реальной отправки данных. + + Args: + messages: Список словарей с данными сообщений + + Returns: + Количество успешно отправленных сообщений + """ + if not self.connection_pool or not messages: + return 0 + + # Проверяем соединение перед отправкой + if self.connection_status != ConnectionStatus.CONNECTED: + if not self._health_check(): + self.logger.warning("PostgreSQL connection not available, skipping batch") + self.failed_count += len(messages) + return 0 + else: + self.connection_status = ConnectionStatus.CONNECTED + + conn = None + try: + # Получаем соединение из пула + conn = self.connection_pool.getconn() + if not conn: + self.failed_count += len(messages) + return 0 + + cursor = conn.cursor() + + # Подготавливаем данные для batch insert + insert_query = """ + INSERT INTO can_messages (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """ + + values = [] + for msg in messages: + from datetime import datetime + ts = datetime.fromtimestamp(msg["timestamp"]) + values.append(( + ts, + msg["interface"], + msg["can_id"], + msg.get("can_id_hex", hex(msg["can_id"])), + msg.get("is_extended", msg["can_id"] > 0x7FF), + msg["dlc"], + msg["data"], + msg.get("data_hex", msg["data"].hex().upper() if isinstance(msg["data"], bytes) else "") + )) + + # Выполняем batch insert + execute_batch(cursor, insert_query, values) + conn.commit() + cursor.close() + + sent = len(messages) + self.sent_count += sent + self.logger.debug( + f"Sent {sent} messages to PostgreSQL", + extra={"batch_size": sent} + ) + return sent + + except Exception as e: + if conn: + conn.rollback() + self.failed_count += len(messages) + self.logger.error( + f"Failed to send messages batch to PostgreSQL: {e}", + exc_info=True, + extra={"batch_size": len(messages)} + ) + return 0 + finally: + if conn: + self.connection_pool.putconn(conn) + + def _health_check(self) -> bool: + """Проверка здоровья соединения с PostgreSQL.""" + if not self.connection_pool: + return False + + try: + conn = self.connection_pool.getconn() + if conn: + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + cursor.close() + self.connection_pool.putconn(conn) + self.connection_status = ConnectionStatus.CONNECTED + self.last_health_check = time.time() + return True + except Exception as e: + self.logger.debug(f"PostgreSQL health check failed: {e}") + return False + + def _reconnect(self) -> None: + """Переподключение к PostgreSQL.""" + if self.connection_status == ConnectionStatus.CONNECTING: + return + + self.connection_status = ConnectionStatus.CONNECTING + self.reconnect_count += 1 + + try: + # Закрываем старый пул + if self.connection_pool: + self.connection_pool.closeall() + + # Создаем новый пул + self._init_client() + + except Exception as e: + self.connection_status = ConnectionStatus.ERROR + self.logger.error( + f"Failed to reconnect to PostgreSQL: {e}", + exc_info=True + ) + + def _forwarder_loop(self) -> None: + """Основной цикл для отправки сообщений в PostgreSQL.""" + self.logger.info("PostgreSQL forwarder loop started") + + batch = [] + last_flush_time = time.time() + + while self.running or not self.message_queue.empty(): + try: + # Собираем сообщения в батч + try: + message = self.message_queue.get(timeout=0.1) + batch.append(message) + except Empty: + pass + + # Отправляем батч если он заполнен или прошло достаточно времени + current_time = time.time() + should_flush = ( + len(batch) >= self.config.batch_size or + (batch and (current_time - last_flush_time) >= self.config.flush_interval) + ) + + if should_flush: + if batch: + # Отправляем батч напрямую в PostgreSQL + self._send_messages_batch(batch) + batch = [] + last_flush_time = current_time + + except Exception as e: + self.logger.error( + f"Error in forwarder loop: {e}", + exc_info=True + ) + time.sleep(0.1) + + # Отправляем оставшиеся сообщения + if batch: + self._send_messages_batch(batch) + + self.logger.info( + "PostgreSQL forwarder loop stopped", + extra={ + "sent_count": self.sent_count, + "failed_count": self.failed_count + } + ) + + def start(self) -> None: + """Запуск forwarder потока для отправки сообщений.""" + if not self.config.enabled or not self.connection_pool: + return + + if hasattr(self, 'running') and self.running: + self.logger.warning("PostgreSQL forwarder is already running") + return + + self.running = True + + # Запускаем forwarder поток + # НЕ используем daemon=True для корректного завершения + self.forwarder_thread = threading.Thread( + target=self._forwarder_loop, + name="PostgreSQL-Forwarder", + daemon=False + ) + self.forwarder_thread.start() + + self.logger.info("PostgreSQL forwarder started") + + def stop(self) -> None: + """Остановка forwarder потока.""" + if not hasattr(self, 'running') or not self.running: + return + + self.logger.info("Stopping PostgreSQL forwarder...") + self.running = False + + # Даем время на обработку оставшихся сообщений в очереди + max_wait_time = 5.0 + wait_start = time.time() + while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time: + time.sleep(0.1) + + if not self.message_queue.empty(): + remaining = self.message_queue.qsize() + self.logger.warning( + f"PostgreSQL queue not empty after shutdown, {remaining} messages remaining" + ) + + # Ждем завершения потока + if self.forwarder_thread and self.forwarder_thread.is_alive(): + self.forwarder_thread.join(timeout=10.0) + if self.forwarder_thread.is_alive(): + self.logger.warning("Forwarder thread did not stop gracefully") + + # Закрываем пул соединений + if self.connection_pool: + try: + self.connection_pool.closeall() + self.connection_pool = None + except Exception as e: + self.logger.error(f"Error closing PostgreSQL connection pool: {e}", exc_info=True) + + self.connection_status = ConnectionStatus.DISCONNECTED + self.logger.info("PostgreSQL forwarder stopped") + + def get_stats(self) -> Dict[str, Any]: + """Получение статистики клиента.""" + return { + "enabled": self.config.enabled, + "initialized": self._initialized and self.connection_pool is not None, + "running": getattr(self, 'running', False), + "connection_status": self.connection_status.value, + "sent_count": self.sent_count, + "failed_count": self.failed_count, + "retry_count": self.retry_count, + "reconnect_count": self.reconnect_count, + "queue_size": self.message_queue.qsize(), + "host": self.config.host if self.config.enabled else None, + "database": self.config.database if self.config.enabled else None + } + + def close(self) -> None: + """Закрытие соединения с PostgreSQL.""" + self.stop() + + +# Глобальный экземпляр клиента +_postgresql_instance: Optional[PostgreSQLClient] = None + + +def get_postgresql_client() -> PostgreSQLClient: + """ + Получение глобального экземпляра PostgreSQL клиента (singleton). + + Returns: + Экземпляр PostgreSQLClient + """ + global _postgresql_instance + if _postgresql_instance is None: + _postgresql_instance = PostgreSQLClient() + return _postgresql_instance + diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index f15c186..6aac103 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -13,7 +13,7 @@ from typing import Optional, Dict, Any, List from logger import get_logger from config import config from can_frame import CANFrame -from handlers import BaseHandler, StorageHandler, InfluxDBHandler +from handlers import BaseHandler, StorageHandler, PostgreSQLHandler logger = get_logger(__name__) @@ -70,8 +70,8 @@ class MessageProcessor: # Storage handler всегда включен handlers.append(StorageHandler(enabled=True)) - # InfluxDB handler зависит от конфигурации - handlers.append(InfluxDBHandler(enabled=None)) # None = из config + # PostgreSQL handler зависит от конфигурации + handlers.append(PostgreSQLHandler(enabled=None)) # None = из config return handlers @@ -317,9 +317,9 @@ class MessageProcessor: self.running = True - # Запускаем специальные обработчики (например, InfluxDB forwarder) + # Запускаем специальные обработчики (например, PostgreSQL forwarder) for handler in self.handlers: - if isinstance(handler, InfluxDBHandler) and handler.is_initialized(): + if isinstance(handler, PostgreSQLHandler) and handler.is_initialized(): try: handler.start() except Exception as e: diff --git a/can_sniffer/src/storage/storage.py b/can_sniffer/src/storage/storage.py index 63d3ad6..609e84f 100644 --- a/can_sniffer/src/storage/storage.py +++ b/can_sniffer/src/storage/storage.py @@ -246,7 +246,7 @@ class Storage: def get_unprocessed_messages(self, limit: int = 1000) -> list: """ - Получение необработанных сообщений для отправки в InfluxDB. + Получение необработанных сообщений для отправки в PostgreSQL. Args: limit: Максимальное количество сообщений