remove influxdb and add postgresql
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
pydantic>=2.0.0
|
||||
pydantic-settings>=2.0.0
|
||||
python-can>=4.0.0
|
||||
influxdb-client>=1.36.0
|
||||
psycopg2-binary>=2.9.0
|
||||
|
||||
|
||||
@@ -66,30 +66,34 @@ class StorageConfig(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
class InfluxDBConfig(BaseModel):
|
||||
"""Конфигурация InfluxDB."""
|
||||
class PostgreSQLConfig(BaseModel):
|
||||
"""Конфигурация PostgreSQL."""
|
||||
|
||||
model_config = {"extra": "ignore"}
|
||||
|
||||
enabled: bool = Field(
|
||||
default=True,
|
||||
description="Включить отправку данных в InfluxDB"
|
||||
description="Включить отправку данных в PostgreSQL"
|
||||
)
|
||||
url: str = Field(
|
||||
default="http://localhost:8086",
|
||||
description="URL сервера InfluxDB"
|
||||
host: str = Field(
|
||||
default="localhost",
|
||||
description="Хост PostgreSQL сервера"
|
||||
)
|
||||
token: str = Field(
|
||||
port: int = Field(
|
||||
default=5432,
|
||||
description="Порт PostgreSQL сервера"
|
||||
)
|
||||
database: str = Field(
|
||||
default="can_bus",
|
||||
description="Имя базы данных"
|
||||
)
|
||||
user: str = Field(
|
||||
default="postgres",
|
||||
description="Имя пользователя PostgreSQL"
|
||||
)
|
||||
password: str = Field(
|
||||
default="",
|
||||
description="Токен аутентификации InfluxDB"
|
||||
)
|
||||
org: str = Field(
|
||||
default="",
|
||||
description="Организация InfluxDB"
|
||||
)
|
||||
bucket: str = Field(
|
||||
default="can_data",
|
||||
description="Имя bucket для данных"
|
||||
description="Пароль пользователя PostgreSQL"
|
||||
)
|
||||
batch_size: int = Field(
|
||||
default=1000,
|
||||
@@ -99,10 +103,6 @@ class InfluxDBConfig(BaseModel):
|
||||
default=5,
|
||||
description="Интервал отправки батча (секунды)"
|
||||
)
|
||||
timeout: int = Field(
|
||||
default=10,
|
||||
description="Таймаут подключения (секунды)"
|
||||
)
|
||||
max_retries: int = Field(
|
||||
default=3,
|
||||
description="Максимальное количество попыток повтора при ошибке"
|
||||
@@ -111,9 +111,13 @@ class InfluxDBConfig(BaseModel):
|
||||
default=1.0,
|
||||
description="Базовый интервал backoff для повторов (секунды)"
|
||||
)
|
||||
health_check_interval: int = Field(
|
||||
default=30,
|
||||
description="Интервал проверки здоровья соединения (секунды)"
|
||||
connection_pool_size: int = Field(
|
||||
default=5,
|
||||
description="Размер пула соединений"
|
||||
)
|
||||
connection_timeout: int = Field(
|
||||
default=10,
|
||||
description="Таймаут подключения (секунды)"
|
||||
)
|
||||
|
||||
|
||||
@@ -183,7 +187,7 @@ class Config(BaseSettings):
|
||||
|
||||
can: CanConfig = Field(default_factory=CanConfig)
|
||||
storage: StorageConfig = Field(default_factory=StorageConfig)
|
||||
influxdb: InfluxDBConfig = Field(default_factory=InfluxDBConfig)
|
||||
postgresql: PostgreSQLConfig = Field(default_factory=PostgreSQLConfig)
|
||||
logging: LoggingConfig = Field(default_factory=LoggingConfig)
|
||||
general: GeneralConfig = Field(default_factory=GeneralConfig)
|
||||
|
||||
@@ -291,7 +295,7 @@ class Config(BaseSettings):
|
||||
"""Получение всей секции конфигурации.
|
||||
|
||||
Args:
|
||||
section: Имя секции, например 'can', 'influxdb'
|
||||
section: Имя секции, например 'can', 'postgresql'
|
||||
|
||||
Returns:
|
||||
Объект конфигурации секции
|
||||
|
||||
@@ -6,11 +6,11 @@
|
||||
|
||||
from .base import BaseHandler
|
||||
from .storage_handler import StorageHandler
|
||||
from .influxdb_handler import InfluxDBHandler
|
||||
from .postgresql_handler import PostgreSQLHandler
|
||||
|
||||
__all__ = [
|
||||
'BaseHandler',
|
||||
'StorageHandler',
|
||||
'InfluxDBHandler',
|
||||
'PostgreSQLHandler',
|
||||
]
|
||||
|
||||
|
||||
@@ -1,53 +1,53 @@
|
||||
"""
|
||||
Обработчик для отправки CAN сообщений в InfluxDB.
|
||||
Обработчик для отправки CAN сообщений в PostgreSQL.
|
||||
"""
|
||||
|
||||
from typing import List, Dict, Any, Optional
|
||||
from can_frame import CANFrame
|
||||
from .base import BaseHandler
|
||||
from influxdb_handler import get_influxdb_client
|
||||
from influxdb_handler.influxdb_client import ConnectionStatus
|
||||
from postgresql_handler import get_postgresql_client
|
||||
from postgresql_handler.postgresql_client import ConnectionStatus
|
||||
from config import config
|
||||
|
||||
|
||||
class InfluxDBHandler(BaseHandler):
|
||||
"""Обработчик для отправки в InfluxDB."""
|
||||
class PostgreSQLHandler(BaseHandler):
|
||||
"""Обработчик для отправки в PostgreSQL."""
|
||||
|
||||
def __init__(self, enabled: Optional[bool] = None):
|
||||
"""
|
||||
Инициализация обработчика InfluxDB.
|
||||
Инициализация обработчика PostgreSQL.
|
||||
|
||||
Args:
|
||||
enabled: Включен ли обработчик. Если None, берется из config.influxdb.enabled
|
||||
enabled: Включен ли обработчик. Если None, берется из config.postgresql.enabled
|
||||
"""
|
||||
super().__init__(
|
||||
name="influxdb",
|
||||
enabled=enabled if enabled is not None else config.influxdb.enabled
|
||||
name="postgresql",
|
||||
enabled=enabled if enabled is not None else config.postgresql.enabled
|
||||
)
|
||||
self.influxdb_client = None
|
||||
self.postgresql_client = None
|
||||
|
||||
def initialize(self) -> bool:
|
||||
"""Инициализация InfluxDB клиента."""
|
||||
"""Инициализация PostgreSQL клиента."""
|
||||
if not self.enabled:
|
||||
return False
|
||||
|
||||
try:
|
||||
self.influxdb_client = get_influxdb_client()
|
||||
self.postgresql_client = get_postgresql_client()
|
||||
self._initialized = True
|
||||
self.logger.info("InfluxDB handler initialized")
|
||||
self.logger.info("PostgreSQL handler initialized")
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True)
|
||||
self.influxdb_client = None
|
||||
self.logger.error(f"Failed to initialize PostgreSQL: {e}", exc_info=True)
|
||||
self.postgresql_client = None
|
||||
return False
|
||||
|
||||
def handle(self, frame: CANFrame) -> bool:
|
||||
"""Обработка одного CAN фрейма."""
|
||||
if not self.enabled or not self._initialized or not self.influxdb_client:
|
||||
if not self.enabled or not self._initialized or not self.postgresql_client:
|
||||
return False
|
||||
|
||||
try:
|
||||
return self.influxdb_client.write_message(
|
||||
return self.postgresql_client.write_message(
|
||||
interface=frame.bus,
|
||||
can_id=frame.can_id,
|
||||
dlc=frame.dlc,
|
||||
@@ -56,7 +56,7 @@ class InfluxDBHandler(BaseHandler):
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
f"Failed to send frame to InfluxDB: {e}",
|
||||
f"Failed to send frame to PostgreSQL: {e}",
|
||||
exc_info=True,
|
||||
extra={"can_id": frame.can_id_hex}
|
||||
)
|
||||
@@ -66,40 +66,43 @@ class InfluxDBHandler(BaseHandler):
|
||||
"""
|
||||
Обработка батча CAN фреймов.
|
||||
|
||||
Неблокирующий метод - при ошибках или переполнении очереди InfluxDB
|
||||
Неблокирующий метод - при ошибках или переполнении очереди PostgreSQL
|
||||
просто пропускает батч, не останавливая обработку других handlers.
|
||||
"""
|
||||
if not self.enabled or not self._initialized or not self.influxdb_client or not frames:
|
||||
if not self.enabled or not self._initialized or not self.postgresql_client or not frames:
|
||||
return 0
|
||||
|
||||
try:
|
||||
# Проверяем состояние соединения перед обработкой
|
||||
if hasattr(self.influxdb_client, 'connection_status'):
|
||||
if self.influxdb_client.connection_status != ConnectionStatus.CONNECTED:
|
||||
if hasattr(self.postgresql_client, 'connection_status'):
|
||||
if self.postgresql_client.connection_status != ConnectionStatus.CONNECTED:
|
||||
# Соединение недоступно - пропускаем батч без ошибки
|
||||
return 0
|
||||
|
||||
# Конвертируем CANFrame в формат для InfluxDB
|
||||
influx_messages = []
|
||||
# Конвертируем CANFrame в формат для PostgreSQL
|
||||
postgresql_messages = []
|
||||
for frame in frames:
|
||||
influx_messages.append({
|
||||
postgresql_messages.append({
|
||||
"interface": frame.bus,
|
||||
"can_id": frame.can_id,
|
||||
"can_id_hex": frame.can_id_hex,
|
||||
"dlc": frame.dlc,
|
||||
"data": frame.data,
|
||||
"timestamp": frame.timestamp # float timestamp в секундах
|
||||
"data_hex": frame.data_hex,
|
||||
"timestamp": frame.timestamp, # float timestamp в секундах
|
||||
"is_extended": frame.is_extended
|
||||
})
|
||||
|
||||
if influx_messages:
|
||||
# Пытаемся добавить в очередь InfluxDB (неблокирующий режим)
|
||||
if postgresql_messages:
|
||||
# Пытаемся добавить в очередь PostgreSQL (неблокирующий режим)
|
||||
# Если очередь переполнена, пропускаем батч
|
||||
return self.influxdb_client.write_messages_batch(influx_messages)
|
||||
return self.postgresql_client.write_messages_batch(postgresql_messages)
|
||||
return 0
|
||||
except Exception as e:
|
||||
# Ошибка не должна останавливать обработку других handlers
|
||||
# Логируем, но не пробрасываем исключение
|
||||
self.logger.error(
|
||||
f"Failed to send frames batch to InfluxDB: {e}",
|
||||
f"Failed to send frames batch to PostgreSQL: {e}",
|
||||
exc_info=True,
|
||||
extra={"batch_size": len(frames)}
|
||||
)
|
||||
@@ -107,25 +110,25 @@ class InfluxDBHandler(BaseHandler):
|
||||
|
||||
def flush(self) -> None:
|
||||
"""Принудительная отправка накопленных данных."""
|
||||
# InfluxDB forwarder сам управляет flush через свой цикл
|
||||
# PostgreSQL forwarder сам управляет flush через свой цикл
|
||||
# Но можно вызвать явный flush если нужно
|
||||
pass
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""Корректное завершение работы обработчика."""
|
||||
if self.influxdb_client:
|
||||
if self.postgresql_client:
|
||||
try:
|
||||
self.influxdb_client.close()
|
||||
self.logger.info("InfluxDB handler closed")
|
||||
self.postgresql_client.close()
|
||||
self.logger.info("PostgreSQL handler closed")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error closing InfluxDB: {e}", exc_info=True)
|
||||
self.logger.error(f"Error closing PostgreSQL: {e}", exc_info=True)
|
||||
self._initialized = False
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Получение статистики обработчика."""
|
||||
if self.influxdb_client:
|
||||
if self.postgresql_client:
|
||||
try:
|
||||
stats = self.influxdb_client.get_stats()
|
||||
stats = self.postgresql_client.get_stats()
|
||||
stats["handler"] = self.name
|
||||
stats["enabled"] = self.enabled
|
||||
stats["initialized"] = self._initialized
|
||||
@@ -139,10 +142,10 @@ class InfluxDBHandler(BaseHandler):
|
||||
}
|
||||
|
||||
def start(self) -> None:
|
||||
"""Запуск InfluxDB forwarder (если используется)."""
|
||||
if self.influxdb_client:
|
||||
"""Запуск PostgreSQL forwarder (если используется)."""
|
||||
if self.postgresql_client:
|
||||
try:
|
||||
self.influxdb_client.start()
|
||||
self.postgresql_client.start()
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to start InfluxDB forwarder: {e}", exc_info=True)
|
||||
self.logger.error(f"Failed to start PostgreSQL forwarder: {e}", exc_info=True)
|
||||
|
||||
@@ -48,8 +48,8 @@ def main():
|
||||
})
|
||||
|
||||
logger.info("Configuration loaded", extra={
|
||||
"influxdb_enabled": config.influxdb.enabled,
|
||||
"influxdb_url": config.influxdb.url if config.influxdb.enabled else None,
|
||||
"postgresql_enabled": config.postgresql.enabled,
|
||||
"postgresql_host": config.postgresql.host if config.postgresql.enabled else None,
|
||||
"storage_path": config.storage.database_path
|
||||
})
|
||||
|
||||
|
||||
12
can_sniffer/src/postgresql_handler/__init__.py
Normal file
12
can_sniffer/src/postgresql_handler/__init__.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""
|
||||
Модуль для работы с PostgreSQL.
|
||||
|
||||
Предоставляет singleton класс для отправки CAN сообщений в PostgreSQL
|
||||
с поддержкой пакетной отправки, connection pooling, retry с backoff.
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from .postgresql_client import PostgreSQLClient, get_postgresql_client
|
||||
|
||||
__all__ = ['PostgreSQLClient', 'get_postgresql_client']
|
||||
|
||||
533
can_sniffer/src/postgresql_handler/postgresql_client.py
Normal file
533
can_sniffer/src/postgresql_handler/postgresql_client.py
Normal file
@@ -0,0 +1,533 @@
|
||||
"""
|
||||
Модуль для работы с PostgreSQL.
|
||||
|
||||
Предоставляет singleton класс для отправки CAN сообщений в PostgreSQL
|
||||
с поддержкой пакетной отправки, connection pooling, retry с backoff.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
from queue import Queue, Empty
|
||||
from typing import Optional, List, Dict, Any
|
||||
from enum import Enum
|
||||
|
||||
from config import config
|
||||
from logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Импортируем PostgreSQL клиент
|
||||
try:
|
||||
import psycopg2
|
||||
from psycopg2 import pool, sql
|
||||
from psycopg2.extras import execute_batch
|
||||
POSTGRESQL_AVAILABLE = True
|
||||
except ImportError:
|
||||
POSTGRESQL_AVAILABLE = False
|
||||
psycopg2 = None
|
||||
pool = None
|
||||
sql = None
|
||||
execute_batch = None
|
||||
logger.warning("psycopg2 not installed. Install with: pip install psycopg2-binary")
|
||||
|
||||
|
||||
class ConnectionStatus(Enum):
|
||||
"""Статус соединения с PostgreSQL."""
|
||||
DISCONNECTED = "disconnected"
|
||||
CONNECTING = "connecting"
|
||||
CONNECTED = "connected"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
class PostgreSQLClient:
|
||||
"""Singleton класс для работы с PostgreSQL."""
|
||||
|
||||
_instance: Optional['PostgreSQLClient'] = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
"""Singleton паттерн для единого экземпляра клиента."""
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
"""Инициализация клиента PostgreSQL."""
|
||||
# Проверяем, что инициализация выполняется только один раз
|
||||
if hasattr(self, '_initialized'):
|
||||
return
|
||||
|
||||
self.config = config.postgresql
|
||||
self.logger = logger
|
||||
|
||||
# Инициализируем атрибуты по умолчанию
|
||||
self.connection_pool: Optional[pool.ThreadedConnectionPool] = None
|
||||
self.message_queue: Queue[Dict[str, Any]] = Queue()
|
||||
self.running = False
|
||||
self.forwarder_thread: Optional[threading.Thread] = None
|
||||
self.connection_status = ConnectionStatus.DISCONNECTED
|
||||
|
||||
# Статистика
|
||||
self.sent_count = 0
|
||||
self.failed_count = 0
|
||||
self.retry_count = 0
|
||||
self.reconnect_count = 0
|
||||
self._initialized = False
|
||||
|
||||
if not POSTGRESQL_AVAILABLE:
|
||||
self.logger.error("PostgreSQL client library not available")
|
||||
return
|
||||
|
||||
# Инициализируем клиент
|
||||
self._init_client()
|
||||
self._initialized = True
|
||||
|
||||
def _init_client(self) -> None:
|
||||
"""Инициализация пула соединений PostgreSQL."""
|
||||
if not POSTGRESQL_AVAILABLE:
|
||||
return
|
||||
|
||||
try:
|
||||
# Создаем пул соединений
|
||||
self.connection_pool = pool.ThreadedConnectionPool(
|
||||
minconn=1,
|
||||
maxconn=self.config.connection_pool_size,
|
||||
host=self.config.host,
|
||||
port=self.config.port,
|
||||
database=self.config.database,
|
||||
user=self.config.user,
|
||||
password=self.config.password,
|
||||
connect_timeout=self.config.connection_timeout
|
||||
)
|
||||
|
||||
# Проверяем соединение
|
||||
conn = self.connection_pool.getconn()
|
||||
if conn:
|
||||
# Создаем таблицу если её нет
|
||||
self._create_table(conn)
|
||||
self.connection_pool.putconn(conn)
|
||||
self.connection_status = ConnectionStatus.CONNECTED
|
||||
self.logger.info("PostgreSQL connection pool initialized")
|
||||
else:
|
||||
self.connection_status = ConnectionStatus.ERROR
|
||||
self.logger.error("Failed to get connection from pool")
|
||||
|
||||
except Exception as e:
|
||||
self.connection_status = ConnectionStatus.ERROR
|
||||
self.logger.error(
|
||||
f"Failed to initialize PostgreSQL connection pool: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
def _create_table(self, conn) -> None:
|
||||
"""Создание таблицы для CAN сообщений если её нет."""
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS can_messages (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
timestamp TIMESTAMP NOT NULL,
|
||||
interface VARCHAR(50) NOT NULL,
|
||||
can_id INTEGER NOT NULL,
|
||||
can_id_hex VARCHAR(10) NOT NULL,
|
||||
is_extended BOOLEAN NOT NULL,
|
||||
dlc INTEGER NOT NULL,
|
||||
data BYTEA NOT NULL,
|
||||
data_hex VARCHAR(32) NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_can_messages_timestamp ON can_messages(timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_can_messages_can_id ON can_messages(can_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_can_messages_interface ON can_messages(interface);
|
||||
""")
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
self.logger.info("PostgreSQL table 'can_messages' created or verified")
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
self.logger.error(f"Failed to create table: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def write_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float, block: bool = False) -> bool:
|
||||
"""
|
||||
Добавление CAN сообщения в очередь для отправки.
|
||||
|
||||
Args:
|
||||
interface: Имя интерфейса (например, 'can0')
|
||||
can_id: CAN ID сообщения
|
||||
dlc: Data Length Code
|
||||
data: Данные сообщения (bytes)
|
||||
timestamp: Временная метка сообщения (float в секундах)
|
||||
block: Блокировать ли при переполнении очереди
|
||||
|
||||
Returns:
|
||||
True если сообщение добавлено в очередь
|
||||
"""
|
||||
if not self.connection_pool:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Добавляем сообщение в очередь для пакетной отправки
|
||||
if block:
|
||||
self.message_queue.put({
|
||||
"interface": interface,
|
||||
"can_id": can_id,
|
||||
"can_id_hex": hex(can_id),
|
||||
"dlc": dlc,
|
||||
"data": data,
|
||||
"data_hex": data.hex().upper(),
|
||||
"timestamp": timestamp,
|
||||
"is_extended": can_id > 0x7FF
|
||||
})
|
||||
else:
|
||||
try:
|
||||
self.message_queue.put_nowait({
|
||||
"interface": interface,
|
||||
"can_id": can_id,
|
||||
"can_id_hex": hex(can_id),
|
||||
"dlc": dlc,
|
||||
"data": data,
|
||||
"data_hex": data.hex().upper(),
|
||||
"timestamp": timestamp,
|
||||
"is_extended": can_id > 0x7FF
|
||||
})
|
||||
except:
|
||||
# Очередь переполнена - пропускаем сообщение
|
||||
self.failed_count += 1
|
||||
return False
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
f"Failed to queue message for PostgreSQL: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
self.failed_count += 1
|
||||
return False
|
||||
|
||||
def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int:
|
||||
"""
|
||||
Пакетная отправка сообщений в PostgreSQL.
|
||||
|
||||
Добавляет сообщения в очередь для асинхронной отправки через forwarder loop.
|
||||
|
||||
Args:
|
||||
messages: Список словарей с данными сообщений
|
||||
block: Блокировать ли при переполнении очереди
|
||||
|
||||
Returns:
|
||||
Количество успешно добавленных в очередь сообщений
|
||||
"""
|
||||
if not self.connection_pool or not messages:
|
||||
return 0
|
||||
|
||||
# Проверяем соединение перед добавлением в очередь
|
||||
if self.connection_status != ConnectionStatus.CONNECTED:
|
||||
if not self._health_check():
|
||||
# Соединение недоступно - пропускаем батч без ошибки
|
||||
self.failed_count += len(messages)
|
||||
return 0
|
||||
else:
|
||||
self.connection_status = ConnectionStatus.CONNECTED
|
||||
|
||||
# Проверяем заполненность очереди перед добавлением
|
||||
queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0
|
||||
if queue_usage > 0.9 and not block:
|
||||
# Очередь почти переполнена - пропускаем батч
|
||||
self.failed_count += len(messages)
|
||||
return 0
|
||||
|
||||
# Добавляем сообщения в очередь для асинхронной отправки
|
||||
added_count = 0
|
||||
for msg in messages:
|
||||
try:
|
||||
if block:
|
||||
self.message_queue.put(msg)
|
||||
else:
|
||||
try:
|
||||
self.message_queue.put_nowait(msg)
|
||||
except:
|
||||
# Очередь переполнена - пропускаем оставшиеся сообщения
|
||||
break
|
||||
added_count += 1
|
||||
except Exception as e:
|
||||
self.logger.debug(f"Failed to queue message: {e}")
|
||||
break
|
||||
|
||||
if added_count < len(messages):
|
||||
self.failed_count += (len(messages) - added_count)
|
||||
|
||||
return added_count
|
||||
|
||||
def _send_messages_batch(self, messages: List[Dict[str, Any]]) -> int:
|
||||
"""
|
||||
Непосредственная отправка батча сообщений в PostgreSQL.
|
||||
|
||||
Этот метод вызывается из forwarder loop для реальной отправки данных.
|
||||
|
||||
Args:
|
||||
messages: Список словарей с данными сообщений
|
||||
|
||||
Returns:
|
||||
Количество успешно отправленных сообщений
|
||||
"""
|
||||
if not self.connection_pool or not messages:
|
||||
return 0
|
||||
|
||||
# Проверяем соединение перед отправкой
|
||||
if self.connection_status != ConnectionStatus.CONNECTED:
|
||||
if not self._health_check():
|
||||
self.logger.warning("PostgreSQL connection not available, skipping batch")
|
||||
self.failed_count += len(messages)
|
||||
return 0
|
||||
else:
|
||||
self.connection_status = ConnectionStatus.CONNECTED
|
||||
|
||||
conn = None
|
||||
try:
|
||||
# Получаем соединение из пула
|
||||
conn = self.connection_pool.getconn()
|
||||
if not conn:
|
||||
self.failed_count += len(messages)
|
||||
return 0
|
||||
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Подготавливаем данные для batch insert
|
||||
insert_query = """
|
||||
INSERT INTO can_messages (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||||
"""
|
||||
|
||||
values = []
|
||||
for msg in messages:
|
||||
from datetime import datetime
|
||||
ts = datetime.fromtimestamp(msg["timestamp"])
|
||||
values.append((
|
||||
ts,
|
||||
msg["interface"],
|
||||
msg["can_id"],
|
||||
msg.get("can_id_hex", hex(msg["can_id"])),
|
||||
msg.get("is_extended", msg["can_id"] > 0x7FF),
|
||||
msg["dlc"],
|
||||
msg["data"],
|
||||
msg.get("data_hex", msg["data"].hex().upper() if isinstance(msg["data"], bytes) else "")
|
||||
))
|
||||
|
||||
# Выполняем batch insert
|
||||
execute_batch(cursor, insert_query, values)
|
||||
conn.commit()
|
||||
cursor.close()
|
||||
|
||||
sent = len(messages)
|
||||
self.sent_count += sent
|
||||
self.logger.debug(
|
||||
f"Sent {sent} messages to PostgreSQL",
|
||||
extra={"batch_size": sent}
|
||||
)
|
||||
return sent
|
||||
|
||||
except Exception as e:
|
||||
if conn:
|
||||
conn.rollback()
|
||||
self.failed_count += len(messages)
|
||||
self.logger.error(
|
||||
f"Failed to send messages batch to PostgreSQL: {e}",
|
||||
exc_info=True,
|
||||
extra={"batch_size": len(messages)}
|
||||
)
|
||||
return 0
|
||||
finally:
|
||||
if conn:
|
||||
self.connection_pool.putconn(conn)
|
||||
|
||||
def _health_check(self) -> bool:
|
||||
"""Проверка здоровья соединения с PostgreSQL."""
|
||||
if not self.connection_pool:
|
||||
return False
|
||||
|
||||
try:
|
||||
conn = self.connection_pool.getconn()
|
||||
if conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT 1")
|
||||
cursor.fetchone()
|
||||
cursor.close()
|
||||
self.connection_pool.putconn(conn)
|
||||
self.connection_status = ConnectionStatus.CONNECTED
|
||||
self.last_health_check = time.time()
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.debug(f"PostgreSQL health check failed: {e}")
|
||||
return False
|
||||
|
||||
def _reconnect(self) -> None:
|
||||
"""Переподключение к PostgreSQL."""
|
||||
if self.connection_status == ConnectionStatus.CONNECTING:
|
||||
return
|
||||
|
||||
self.connection_status = ConnectionStatus.CONNECTING
|
||||
self.reconnect_count += 1
|
||||
|
||||
try:
|
||||
# Закрываем старый пул
|
||||
if self.connection_pool:
|
||||
self.connection_pool.closeall()
|
||||
|
||||
# Создаем новый пул
|
||||
self._init_client()
|
||||
|
||||
except Exception as e:
|
||||
self.connection_status = ConnectionStatus.ERROR
|
||||
self.logger.error(
|
||||
f"Failed to reconnect to PostgreSQL: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
|
||||
def _forwarder_loop(self) -> None:
|
||||
"""Основной цикл для отправки сообщений в PostgreSQL."""
|
||||
self.logger.info("PostgreSQL forwarder loop started")
|
||||
|
||||
batch = []
|
||||
last_flush_time = time.time()
|
||||
|
||||
while self.running or not self.message_queue.empty():
|
||||
try:
|
||||
# Собираем сообщения в батч
|
||||
try:
|
||||
message = self.message_queue.get(timeout=0.1)
|
||||
batch.append(message)
|
||||
except Empty:
|
||||
pass
|
||||
|
||||
# Отправляем батч если он заполнен или прошло достаточно времени
|
||||
current_time = time.time()
|
||||
should_flush = (
|
||||
len(batch) >= self.config.batch_size or
|
||||
(batch and (current_time - last_flush_time) >= self.config.flush_interval)
|
||||
)
|
||||
|
||||
if should_flush:
|
||||
if batch:
|
||||
# Отправляем батч напрямую в PostgreSQL
|
||||
self._send_messages_batch(batch)
|
||||
batch = []
|
||||
last_flush_time = current_time
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
f"Error in forwarder loop: {e}",
|
||||
exc_info=True
|
||||
)
|
||||
time.sleep(0.1)
|
||||
|
||||
# Отправляем оставшиеся сообщения
|
||||
if batch:
|
||||
self._send_messages_batch(batch)
|
||||
|
||||
self.logger.info(
|
||||
"PostgreSQL forwarder loop stopped",
|
||||
extra={
|
||||
"sent_count": self.sent_count,
|
||||
"failed_count": self.failed_count
|
||||
}
|
||||
)
|
||||
|
||||
def start(self) -> None:
|
||||
"""Запуск forwarder потока для отправки сообщений."""
|
||||
if not self.config.enabled or not self.connection_pool:
|
||||
return
|
||||
|
||||
if hasattr(self, 'running') and self.running:
|
||||
self.logger.warning("PostgreSQL forwarder is already running")
|
||||
return
|
||||
|
||||
self.running = True
|
||||
|
||||
# Запускаем forwarder поток
|
||||
# НЕ используем daemon=True для корректного завершения
|
||||
self.forwarder_thread = threading.Thread(
|
||||
target=self._forwarder_loop,
|
||||
name="PostgreSQL-Forwarder",
|
||||
daemon=False
|
||||
)
|
||||
self.forwarder_thread.start()
|
||||
|
||||
self.logger.info("PostgreSQL forwarder started")
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Остановка forwarder потока."""
|
||||
if not hasattr(self, 'running') or not self.running:
|
||||
return
|
||||
|
||||
self.logger.info("Stopping PostgreSQL forwarder...")
|
||||
self.running = False
|
||||
|
||||
# Даем время на обработку оставшихся сообщений в очереди
|
||||
max_wait_time = 5.0
|
||||
wait_start = time.time()
|
||||
while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time:
|
||||
time.sleep(0.1)
|
||||
|
||||
if not self.message_queue.empty():
|
||||
remaining = self.message_queue.qsize()
|
||||
self.logger.warning(
|
||||
f"PostgreSQL queue not empty after shutdown, {remaining} messages remaining"
|
||||
)
|
||||
|
||||
# Ждем завершения потока
|
||||
if self.forwarder_thread and self.forwarder_thread.is_alive():
|
||||
self.forwarder_thread.join(timeout=10.0)
|
||||
if self.forwarder_thread.is_alive():
|
||||
self.logger.warning("Forwarder thread did not stop gracefully")
|
||||
|
||||
# Закрываем пул соединений
|
||||
if self.connection_pool:
|
||||
try:
|
||||
self.connection_pool.closeall()
|
||||
self.connection_pool = None
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error closing PostgreSQL connection pool: {e}", exc_info=True)
|
||||
|
||||
self.connection_status = ConnectionStatus.DISCONNECTED
|
||||
self.logger.info("PostgreSQL forwarder stopped")
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Получение статистики клиента."""
|
||||
return {
|
||||
"enabled": self.config.enabled,
|
||||
"initialized": self._initialized and self.connection_pool is not None,
|
||||
"running": getattr(self, 'running', False),
|
||||
"connection_status": self.connection_status.value,
|
||||
"sent_count": self.sent_count,
|
||||
"failed_count": self.failed_count,
|
||||
"retry_count": self.retry_count,
|
||||
"reconnect_count": self.reconnect_count,
|
||||
"queue_size": self.message_queue.qsize(),
|
||||
"host": self.config.host if self.config.enabled else None,
|
||||
"database": self.config.database if self.config.enabled else None
|
||||
}
|
||||
|
||||
def close(self) -> None:
|
||||
"""Закрытие соединения с PostgreSQL."""
|
||||
self.stop()
|
||||
|
||||
|
||||
# Глобальный экземпляр клиента
|
||||
_postgresql_instance: Optional[PostgreSQLClient] = None
|
||||
|
||||
|
||||
def get_postgresql_client() -> PostgreSQLClient:
|
||||
"""
|
||||
Получение глобального экземпляра PostgreSQL клиента (singleton).
|
||||
|
||||
Returns:
|
||||
Экземпляр PostgreSQLClient
|
||||
"""
|
||||
global _postgresql_instance
|
||||
if _postgresql_instance is None:
|
||||
_postgresql_instance = PostgreSQLClient()
|
||||
return _postgresql_instance
|
||||
|
||||
@@ -13,7 +13,7 @@ from typing import Optional, Dict, Any, List
|
||||
from logger import get_logger
|
||||
from config import config
|
||||
from can_frame import CANFrame
|
||||
from handlers import BaseHandler, StorageHandler, InfluxDBHandler
|
||||
from handlers import BaseHandler, StorageHandler, PostgreSQLHandler
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -70,8 +70,8 @@ class MessageProcessor:
|
||||
# Storage handler всегда включен
|
||||
handlers.append(StorageHandler(enabled=True))
|
||||
|
||||
# InfluxDB handler зависит от конфигурации
|
||||
handlers.append(InfluxDBHandler(enabled=None)) # None = из config
|
||||
# PostgreSQL handler зависит от конфигурации
|
||||
handlers.append(PostgreSQLHandler(enabled=None)) # None = из config
|
||||
|
||||
return handlers
|
||||
|
||||
@@ -317,9 +317,9 @@ class MessageProcessor:
|
||||
|
||||
self.running = True
|
||||
|
||||
# Запускаем специальные обработчики (например, InfluxDB forwarder)
|
||||
# Запускаем специальные обработчики (например, PostgreSQL forwarder)
|
||||
for handler in self.handlers:
|
||||
if isinstance(handler, InfluxDBHandler) and handler.is_initialized():
|
||||
if isinstance(handler, PostgreSQLHandler) and handler.is_initialized():
|
||||
try:
|
||||
handler.start()
|
||||
except Exception as e:
|
||||
|
||||
@@ -246,7 +246,7 @@ class Storage:
|
||||
|
||||
def get_unprocessed_messages(self, limit: int = 1000) -> list:
|
||||
"""
|
||||
Получение необработанных сообщений для отправки в InfluxDB.
|
||||
Получение необработанных сообщений для отправки в PostgreSQL.
|
||||
|
||||
Args:
|
||||
limit: Максимальное количество сообщений
|
||||
|
||||
Reference in New Issue
Block a user