From 932c1a45048caf2258a5ad0ae28e178e1b5470e2 Mon Sep 17 00:00:00 2001 From: Alexander Poletaev Date: Wed, 28 Jan 2026 00:32:31 +0300 Subject: [PATCH] Fix problem with postgres connect and autoreconnect --- can_sniffer/deploy/can-sniffer.service | 8 +- .../postgresql_handler/postgresql_client.py | 88 ++++++++++++------- 2 files changed, 62 insertions(+), 34 deletions(-) diff --git a/can_sniffer/deploy/can-sniffer.service b/can_sniffer/deploy/can-sniffer.service index cee10e1..b742282 100644 --- a/can_sniffer/deploy/can-sniffer.service +++ b/can_sniffer/deploy/can-sniffer.service @@ -1,17 +1,17 @@ [Unit] Description=CAN Bus Sniffer -After=network.target can-setup.service -Wants=can-setup.service +# Ждем полной готовности сети (важно для PostgreSQL) +After=network-online.target can-setup.service +Wants=network-online.target can-setup.service [Service] Type=simple User=root WorkingDirectory=/opt/can_sniffer -# Только PYTHONUNBUFFERED для вывода логов Environment=PYTHONUNBUFFERED=1 -# Запуск - config.json читается из /opt/can_sniffer/src/ +# Запуск ExecStart=/opt/can_sniffer/venv/bin/python /opt/can_sniffer/src/main.py # Graceful shutdown diff --git a/can_sniffer/src/postgresql_handler/postgresql_client.py b/can_sniffer/src/postgresql_handler/postgresql_client.py index 25e1495..dd4e4c0 100644 --- a/can_sniffer/src/postgresql_handler/postgresql_client.py +++ b/can_sniffer/src/postgresql_handler/postgresql_client.py @@ -144,8 +144,9 @@ class PostgreSQLClient: def _init_client(self) -> None: """Инициализация пула соединений PostgreSQL.""" if not POSTGRESQL_AVAILABLE: + self.connection_status = ConnectionStatus.ERROR return - + try: # Создаем пул соединений self.connection_pool = pool.ThreadedConnectionPool( @@ -158,7 +159,7 @@ class PostgreSQLClient: password=self.config.password, connect_timeout=self.config.connection_timeout ) - + # Проверяем соединение conn = self.connection_pool.getconn() if conn: @@ -166,17 +167,18 @@ class PostgreSQLClient: self._create_table(conn) self.connection_pool.putconn(conn) self.connection_status = ConnectionStatus.CONNECTED - self.logger.info("PostgreSQL connection pool initialized") + self.logger.info( + f"PostgreSQL connected: {self.config.host}:{self.config.port}/{self.config.database}" + ) else: + self.connection_pool = None self.connection_status = ConnectionStatus.ERROR - self.logger.error("Failed to get connection from pool") - + except Exception as e: + self.connection_pool = None self.connection_status = ConnectionStatus.ERROR - self.logger.error( - f"Failed to initialize PostgreSQL connection pool: {e}", - exc_info=True - ) + # Используем warning вместо error - это нормальная ситуация при старте без сети + self.logger.warning(f"PostgreSQL not available: {e}") def _create_table(self, conn) -> None: """Создание таблицы для CAN сообщений если её нет.""" @@ -536,28 +538,41 @@ class PostgreSQLClient: ) return 0 - def _reconnect(self) -> None: - """Переподключение к PostgreSQL.""" + def _reconnect(self) -> bool: + """ + Переподключение к PostgreSQL. + + Returns: + True если подключение успешно + """ if self.connection_status == ConnectionStatus.CONNECTING: - return - + return False + self.connection_status = ConnectionStatus.CONNECTING self._increment_reconnect() - + try: - # Закрываем старый пул + # Закрываем старый пул если есть if self.connection_pool: - self.connection_pool.closeall() - + try: + self.connection_pool.closeall() + except Exception: + pass + self.connection_pool = None + # Создаем новый пул self._init_client() - + + if self.connection_status == ConnectionStatus.CONNECTED: + self.logger.info("Successfully connected to PostgreSQL") + return True + else: + return False + except Exception as e: self.connection_status = ConnectionStatus.ERROR - self.logger.error( - f"Failed to reconnect to PostgreSQL: {e}", - exc_info=True - ) + self.logger.warning(f"Failed to connect to PostgreSQL: {e}") + return False def _forwarder_loop(self) -> None: """Основной цикл для отправки сообщений в PostgreSQL.""" @@ -565,12 +580,21 @@ class PostgreSQLClient: batch = [] last_flush_time = time.time() + last_reconnect_attempt = 0.0 + reconnect_interval = 10.0 # Интервал между попытками подключения was_connected = self.connection_status == ConnectionStatus.CONNECTED while self.running or not self.message_queue.empty(): try: current_time = time.time() + # Если нет соединения - пытаемся подключиться периодически + if not self.connection_pool or self.connection_status != ConnectionStatus.CONNECTED: + if current_time - last_reconnect_attempt >= reconnect_interval: + last_reconnect_attempt = current_time + self.logger.info("Attempting to connect to PostgreSQL...") + self._reconnect() + # Проверяем восстановление соединения и запускаем синхронизацию is_connected = self.connection_status == ConnectionStatus.CONNECTED if is_connected: @@ -634,25 +658,29 @@ class PostgreSQLClient: def start(self) -> None: """Запуск forwarder потока для отправки сообщений.""" - if not self.config.enabled or not self.connection_pool: + if not self.config.enabled: + self.logger.info("PostgreSQL is disabled in config") return - + if hasattr(self, 'running') and self.running: self.logger.warning("PostgreSQL forwarder is already running") return - + self.running = True - - # Запускаем forwarder поток - # НЕ используем daemon=True для корректного завершения + + # Запускаем forwarder поток даже если соединения нет + # Он будет периодически пытаться подключиться self.forwarder_thread = threading.Thread( target=self._forwarder_loop, name="PostgreSQL-Forwarder", daemon=False ) self.forwarder_thread.start() - - self.logger.info("PostgreSQL forwarder started") + + if self.connection_pool: + self.logger.info("PostgreSQL forwarder started (connected)") + else: + self.logger.info("PostgreSQL forwarder started (will retry connection)") def stop(self) -> None: """Остановка forwarder потока."""