Fix problem with postgres connect and autoreconnect
This commit is contained in:
@@ -1,17 +1,17 @@
|
|||||||
[Unit]
|
[Unit]
|
||||||
Description=CAN Bus Sniffer
|
Description=CAN Bus Sniffer
|
||||||
After=network.target can-setup.service
|
# Ждем полной готовности сети (важно для PostgreSQL)
|
||||||
Wants=can-setup.service
|
After=network-online.target can-setup.service
|
||||||
|
Wants=network-online.target can-setup.service
|
||||||
|
|
||||||
[Service]
|
[Service]
|
||||||
Type=simple
|
Type=simple
|
||||||
User=root
|
User=root
|
||||||
WorkingDirectory=/opt/can_sniffer
|
WorkingDirectory=/opt/can_sniffer
|
||||||
|
|
||||||
# Только PYTHONUNBUFFERED для вывода логов
|
|
||||||
Environment=PYTHONUNBUFFERED=1
|
Environment=PYTHONUNBUFFERED=1
|
||||||
|
|
||||||
# Запуск - config.json читается из /opt/can_sniffer/src/
|
# Запуск
|
||||||
ExecStart=/opt/can_sniffer/venv/bin/python /opt/can_sniffer/src/main.py
|
ExecStart=/opt/can_sniffer/venv/bin/python /opt/can_sniffer/src/main.py
|
||||||
|
|
||||||
# Graceful shutdown
|
# Graceful shutdown
|
||||||
|
|||||||
@@ -144,8 +144,9 @@ class PostgreSQLClient:
|
|||||||
def _init_client(self) -> None:
|
def _init_client(self) -> None:
|
||||||
"""Инициализация пула соединений PostgreSQL."""
|
"""Инициализация пула соединений PostgreSQL."""
|
||||||
if not POSTGRESQL_AVAILABLE:
|
if not POSTGRESQL_AVAILABLE:
|
||||||
|
self.connection_status = ConnectionStatus.ERROR
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Создаем пул соединений
|
# Создаем пул соединений
|
||||||
self.connection_pool = pool.ThreadedConnectionPool(
|
self.connection_pool = pool.ThreadedConnectionPool(
|
||||||
@@ -158,7 +159,7 @@ class PostgreSQLClient:
|
|||||||
password=self.config.password,
|
password=self.config.password,
|
||||||
connect_timeout=self.config.connection_timeout
|
connect_timeout=self.config.connection_timeout
|
||||||
)
|
)
|
||||||
|
|
||||||
# Проверяем соединение
|
# Проверяем соединение
|
||||||
conn = self.connection_pool.getconn()
|
conn = self.connection_pool.getconn()
|
||||||
if conn:
|
if conn:
|
||||||
@@ -166,17 +167,18 @@ class PostgreSQLClient:
|
|||||||
self._create_table(conn)
|
self._create_table(conn)
|
||||||
self.connection_pool.putconn(conn)
|
self.connection_pool.putconn(conn)
|
||||||
self.connection_status = ConnectionStatus.CONNECTED
|
self.connection_status = ConnectionStatus.CONNECTED
|
||||||
self.logger.info("PostgreSQL connection pool initialized")
|
self.logger.info(
|
||||||
|
f"PostgreSQL connected: {self.config.host}:{self.config.port}/{self.config.database}"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
|
self.connection_pool = None
|
||||||
self.connection_status = ConnectionStatus.ERROR
|
self.connection_status = ConnectionStatus.ERROR
|
||||||
self.logger.error("Failed to get connection from pool")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
self.connection_pool = None
|
||||||
self.connection_status = ConnectionStatus.ERROR
|
self.connection_status = ConnectionStatus.ERROR
|
||||||
self.logger.error(
|
# Используем warning вместо error - это нормальная ситуация при старте без сети
|
||||||
f"Failed to initialize PostgreSQL connection pool: {e}",
|
self.logger.warning(f"PostgreSQL not available: {e}")
|
||||||
exc_info=True
|
|
||||||
)
|
|
||||||
|
|
||||||
def _create_table(self, conn) -> None:
|
def _create_table(self, conn) -> None:
|
||||||
"""Создание таблицы для CAN сообщений если её нет."""
|
"""Создание таблицы для CAN сообщений если её нет."""
|
||||||
@@ -536,28 +538,41 @@ class PostgreSQLClient:
|
|||||||
)
|
)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def _reconnect(self) -> None:
|
def _reconnect(self) -> bool:
|
||||||
"""Переподключение к PostgreSQL."""
|
"""
|
||||||
|
Переподключение к PostgreSQL.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True если подключение успешно
|
||||||
|
"""
|
||||||
if self.connection_status == ConnectionStatus.CONNECTING:
|
if self.connection_status == ConnectionStatus.CONNECTING:
|
||||||
return
|
return False
|
||||||
|
|
||||||
self.connection_status = ConnectionStatus.CONNECTING
|
self.connection_status = ConnectionStatus.CONNECTING
|
||||||
self._increment_reconnect()
|
self._increment_reconnect()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Закрываем старый пул
|
# Закрываем старый пул если есть
|
||||||
if self.connection_pool:
|
if self.connection_pool:
|
||||||
self.connection_pool.closeall()
|
try:
|
||||||
|
self.connection_pool.closeall()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self.connection_pool = None
|
||||||
|
|
||||||
# Создаем новый пул
|
# Создаем новый пул
|
||||||
self._init_client()
|
self._init_client()
|
||||||
|
|
||||||
|
if self.connection_status == ConnectionStatus.CONNECTED:
|
||||||
|
self.logger.info("Successfully connected to PostgreSQL")
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.connection_status = ConnectionStatus.ERROR
|
self.connection_status = ConnectionStatus.ERROR
|
||||||
self.logger.error(
|
self.logger.warning(f"Failed to connect to PostgreSQL: {e}")
|
||||||
f"Failed to reconnect to PostgreSQL: {e}",
|
return False
|
||||||
exc_info=True
|
|
||||||
)
|
|
||||||
|
|
||||||
def _forwarder_loop(self) -> None:
|
def _forwarder_loop(self) -> None:
|
||||||
"""Основной цикл для отправки сообщений в PostgreSQL."""
|
"""Основной цикл для отправки сообщений в PostgreSQL."""
|
||||||
@@ -565,12 +580,21 @@ class PostgreSQLClient:
|
|||||||
|
|
||||||
batch = []
|
batch = []
|
||||||
last_flush_time = time.time()
|
last_flush_time = time.time()
|
||||||
|
last_reconnect_attempt = 0.0
|
||||||
|
reconnect_interval = 10.0 # Интервал между попытками подключения
|
||||||
was_connected = self.connection_status == ConnectionStatus.CONNECTED
|
was_connected = self.connection_status == ConnectionStatus.CONNECTED
|
||||||
|
|
||||||
while self.running or not self.message_queue.empty():
|
while self.running or not self.message_queue.empty():
|
||||||
try:
|
try:
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
|
|
||||||
|
# Если нет соединения - пытаемся подключиться периодически
|
||||||
|
if not self.connection_pool or self.connection_status != ConnectionStatus.CONNECTED:
|
||||||
|
if current_time - last_reconnect_attempt >= reconnect_interval:
|
||||||
|
last_reconnect_attempt = current_time
|
||||||
|
self.logger.info("Attempting to connect to PostgreSQL...")
|
||||||
|
self._reconnect()
|
||||||
|
|
||||||
# Проверяем восстановление соединения и запускаем синхронизацию
|
# Проверяем восстановление соединения и запускаем синхронизацию
|
||||||
is_connected = self.connection_status == ConnectionStatus.CONNECTED
|
is_connected = self.connection_status == ConnectionStatus.CONNECTED
|
||||||
if is_connected:
|
if is_connected:
|
||||||
@@ -634,25 +658,29 @@ class PostgreSQLClient:
|
|||||||
|
|
||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
"""Запуск forwarder потока для отправки сообщений."""
|
"""Запуск forwarder потока для отправки сообщений."""
|
||||||
if not self.config.enabled or not self.connection_pool:
|
if not self.config.enabled:
|
||||||
|
self.logger.info("PostgreSQL is disabled in config")
|
||||||
return
|
return
|
||||||
|
|
||||||
if hasattr(self, 'running') and self.running:
|
if hasattr(self, 'running') and self.running:
|
||||||
self.logger.warning("PostgreSQL forwarder is already running")
|
self.logger.warning("PostgreSQL forwarder is already running")
|
||||||
return
|
return
|
||||||
|
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
# Запускаем forwarder поток
|
# Запускаем forwarder поток даже если соединения нет
|
||||||
# НЕ используем daemon=True для корректного завершения
|
# Он будет периодически пытаться подключиться
|
||||||
self.forwarder_thread = threading.Thread(
|
self.forwarder_thread = threading.Thread(
|
||||||
target=self._forwarder_loop,
|
target=self._forwarder_loop,
|
||||||
name="PostgreSQL-Forwarder",
|
name="PostgreSQL-Forwarder",
|
||||||
daemon=False
|
daemon=False
|
||||||
)
|
)
|
||||||
self.forwarder_thread.start()
|
self.forwarder_thread.start()
|
||||||
|
|
||||||
self.logger.info("PostgreSQL forwarder started")
|
if self.connection_pool:
|
||||||
|
self.logger.info("PostgreSQL forwarder started (connected)")
|
||||||
|
else:
|
||||||
|
self.logger.info("PostgreSQL forwarder started (will retry connection)")
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
"""Остановка forwarder потока."""
|
"""Остановка forwarder потока."""
|
||||||
|
|||||||
Reference in New Issue
Block a user