Update critical code

This commit is contained in:
2026-01-25 18:29:49 +03:00
parent 8ba620aa6c
commit 536698c9cc
6 changed files with 510 additions and 190 deletions

View File

@@ -45,9 +45,9 @@ class CanConfig(BaseModel):
class StorageConfig(BaseModel):
"""Конфигурация локального хранилища (SQLite)."""
model_config = {"extra": "ignore"}
type: str = Field(
default="sqlite",
description="Тип хранилища"
@@ -64,6 +64,10 @@ class StorageConfig(BaseModel):
default="NORMAL",
description="Режим синхронизации: NORMAL, FULL, OFF"
)
retention_days: int = Field(
default=7,
description="Дней хранения обработанных записей (для автоочистки)"
)
class PostgreSQLConfig(BaseModel):

View File

@@ -91,14 +91,14 @@ class StorageHandler(BaseHandler):
pass
def shutdown(self) -> None:
"""Корректное завершение работы обработчика."""
if self.storage:
try:
self.storage.close()
self.logger.info("Storage handler closed")
except Exception as e:
self.logger.error(f"Error closing storage: {e}", exc_info=True)
"""Корректное завершение работы обработчика.
Примечание: НЕ закрываем Storage singleton здесь, так как он может
использоваться другими компонентами (например, для синхронизации с PostgreSQL).
Storage закрывается отдельно при полном завершении приложения.
"""
self._initialized = False
self.logger.info("Storage handler shutdown complete")
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики обработчика."""

View File

@@ -5,10 +5,12 @@
с поддержкой пакетной отправки, connection pooling, retry с backoff.
"""
import queue
import threading
import time
from datetime import datetime, timezone
from queue import Queue, Empty
from typing import Optional, List, Dict, Any
from typing import Optional, List, Dict, Any, Tuple
from enum import Enum
from config import config
@@ -55,35 +57,90 @@ class PostgreSQLClient:
def __init__(self):
"""Инициализация клиента PostgreSQL."""
# Проверяем, что инициализация выполняется только один раз
if hasattr(self, '_initialized'):
return
self.config = config.postgresql
self.logger = logger
# Инициализируем атрибуты по умолчанию
self.connection_pool: Optional[pool.ThreadedConnectionPool] = None
self.message_queue: Queue[Dict[str, Any]] = Queue()
self.running = False
self.forwarder_thread: Optional[threading.Thread] = None
self.connection_status = ConnectionStatus.DISCONNECTED
# Статистика
self.sent_count = 0
self.failed_count = 0
self.retry_count = 0
self.reconnect_count = 0
self._initialized = False
if not POSTGRESQL_AVAILABLE:
self.logger.error("PostgreSQL client library not available")
return
# Инициализируем клиент
self._init_client()
self._initialized = True
# Защита от race condition при инициализации singleton
with self._lock:
# Проверяем, что инициализация выполняется только один раз
if hasattr(self, '_initialized') and self._initialized:
return
self.config = config.postgresql
self.logger = logger
# Инициализируем атрибуты по умолчанию
self.connection_pool: Optional[pool.ThreadedConnectionPool] = None
self.message_queue: Queue[Dict[str, Any]] = Queue(maxsize=config.general.buffer_size)
self.running = False
self.forwarder_thread: Optional[threading.Thread] = None
self.connection_status = ConnectionStatus.DISCONNECTED
# Статистика с блокировкой для потокобезопасности
self._stats_lock = threading.Lock()
self._sent_count = 0
self._failed_count = 0
self._retry_count = 0
self._reconnect_count = 0
self._synced_count = 0 # Количество синхронизированных из SQLite
# Флаг для запуска синхронизации после восстановления соединения
self._needs_sync = True
self._last_sync_time = 0.0
self._sync_interval = 30.0 # Интервал синхронизации в секундах
if not POSTGRESQL_AVAILABLE:
self.logger.error("PostgreSQL client library not available")
self._initialized = True # Отмечаем как инициализированный, чтобы не повторять
return
# Инициализируем клиент
self._init_client()
self._initialized = True
# Потокобезопасные свойства для статистики
@property
def sent_count(self) -> int:
with self._stats_lock:
return self._sent_count
@property
def failed_count(self) -> int:
with self._stats_lock:
return self._failed_count
@property
def retry_count(self) -> int:
with self._stats_lock:
return self._retry_count
@property
def reconnect_count(self) -> int:
with self._stats_lock:
return self._reconnect_count
@property
def synced_count(self) -> int:
with self._stats_lock:
return self._synced_count
def _increment_sent(self, count: int = 1) -> None:
with self._stats_lock:
self._sent_count += count
def _increment_failed(self, count: int = 1) -> None:
with self._stats_lock:
self._failed_count += count
def _increment_retry(self) -> None:
with self._stats_lock:
self._retry_count += 1
def _increment_reconnect(self) -> None:
with self._stats_lock:
self._reconnect_count += 1
def _increment_synced(self, count: int = 1) -> None:
with self._stats_lock:
self._synced_count += count
def _init_client(self) -> None:
"""Инициализация пула соединений PostgreSQL."""
if not POSTGRESQL_AVAILABLE:
@@ -194,9 +251,9 @@ class PostgreSQLClient:
"timestamp": timestamp,
"is_extended": can_id > 0x7FF
})
except:
except queue.Full:
# Очередь переполнена - пропускаем сообщение
self.failed_count += 1
self._increment_failed()
return False
return True
except Exception as e:
@@ -204,7 +261,7 @@ class PostgreSQLClient:
f"Failed to queue message for PostgreSQL: {e}",
exc_info=True
)
self.failed_count += 1
self._increment_failed()
return False
def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int:
@@ -227,7 +284,7 @@ class PostgreSQLClient:
if self.connection_status != ConnectionStatus.CONNECTED:
if not self._health_check():
# Соединение недоступно - пропускаем батч без ошибки
self.failed_count += len(messages)
self._increment_failed(len(messages))
return 0
else:
self.connection_status = ConnectionStatus.CONNECTED
@@ -236,7 +293,7 @@ class PostgreSQLClient:
queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0
if queue_usage > 0.9 and not block:
# Очередь почти переполнена - пропускаем батч
self.failed_count += len(messages)
self._increment_failed(len(messages))
return 0
# Добавляем сообщения в очередь для асинхронной отправки
@@ -248,7 +305,7 @@ class PostgreSQLClient:
else:
try:
self.message_queue.put_nowait(msg)
except:
except queue.Full:
# Очередь переполнена - пропускаем оставшиеся сообщения
break
added_count += 1
@@ -257,7 +314,7 @@ class PostgreSQLClient:
break
if added_count < len(messages):
self.failed_count += (len(messages) - added_count)
self._increment_failed(len(messages) - added_count)
return added_count
@@ -280,7 +337,7 @@ class PostgreSQLClient:
if self.connection_status != ConnectionStatus.CONNECTED:
if not self._health_check():
self.logger.warning("PostgreSQL connection not available, skipping batch")
self.failed_count += len(messages)
self._increment_failed(len(messages))
return 0
else:
self.connection_status = ConnectionStatus.CONNECTED
@@ -290,7 +347,7 @@ class PostgreSQLClient:
# Получаем соединение из пула
conn = self.connection_pool.getconn()
if not conn:
self.failed_count += len(messages)
self._increment_failed(len(messages))
return 0
cursor = conn.cursor()
@@ -303,8 +360,8 @@ class PostgreSQLClient:
values = []
for msg in messages:
from datetime import datetime
ts = datetime.fromtimestamp(msg["timestamp"])
# Используем UTC для согласованности времени
ts = datetime.fromtimestamp(msg["timestamp"], tz=timezone.utc)
values.append((
ts,
msg["interface"],
@@ -322,7 +379,7 @@ class PostgreSQLClient:
cursor.close()
sent = len(messages)
self.sent_count += sent
self._increment_sent(sent)
self.logger.debug(
f"Sent {sent} messages to PostgreSQL",
extra={"batch_size": sent}
@@ -332,22 +389,67 @@ class PostgreSQLClient:
except Exception as e:
if conn:
conn.rollback()
self.failed_count += len(messages)
self.logger.error(
f"Failed to send messages batch to PostgreSQL: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
)
return 0
# Не увеличиваем failed_count здесь - это делает _send_messages_batch_with_retry
raise # Пробрасываем исключение для retry механизма
finally:
if conn:
self.connection_pool.putconn(conn)
def _send_messages_batch_with_retry(self, messages: List[Dict[str, Any]]) -> int:
"""
Отправка батча сообщений с retry и exponential backoff.
Args:
messages: Список словарей с данными сообщений
Returns:
Количество успешно отправленных сообщений
"""
if not messages:
return 0
max_retries = self.config.max_retries
base_backoff = self.config.retry_backoff
for attempt in range(max_retries):
try:
return self._send_messages_batch(messages)
except Exception as e:
self._increment_retry()
if attempt < max_retries - 1:
# Exponential backoff: 1s, 2s, 4s...
delay = base_backoff * (2 ** attempt)
self.logger.warning(
f"PostgreSQL send failed (attempt {attempt + 1}/{max_retries}), "
f"retrying in {delay}s: {e}"
)
time.sleep(delay)
# Проверяем соединение перед повторной попыткой
if not self._health_check():
self.logger.warning("PostgreSQL connection lost, attempting reconnect")
self._reconnect()
else:
# Все попытки исчерпаны
self.logger.error(
f"All {max_retries} retries failed for batch of {len(messages)} messages"
)
self._increment_failed(len(messages))
return 0
return 0
def _health_check(self) -> bool:
"""Проверка здоровья соединения с PostgreSQL."""
if not self.connection_pool:
return False
try:
conn = self.connection_pool.getconn()
if conn:
@@ -362,6 +464,73 @@ class PostgreSQLClient:
except Exception as e:
self.logger.debug(f"PostgreSQL health check failed: {e}")
return False
def _sync_from_sqlite(self) -> int:
"""
Синхронизация необработанных записей из SQLite в PostgreSQL.
Читает записи с processed=0 из SQLite и отправляет их в PostgreSQL.
После успешной отправки помечает записи как обработанные.
Returns:
Количество синхронизированных сообщений
"""
if self.connection_status != ConnectionStatus.CONNECTED:
return 0
try:
# Импортируем storage здесь, чтобы избежать циклических импортов
from storage import get_storage
storage = get_storage()
# Получаем необработанные сообщения из SQLite
unprocessed = storage.get_unprocessed_messages(limit=1000)
if not unprocessed:
return 0
self.logger.info(
f"Syncing {len(unprocessed)} unprocessed messages from SQLite to PostgreSQL"
)
# Конвертируем записи SQLite в формат для PostgreSQL
# Формат из SQLite: (id, timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex)
messages = []
sqlite_ids = []
for row in unprocessed:
sqlite_id, ts, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex = row
sqlite_ids.append(sqlite_id)
messages.append({
"interface": interface,
"can_id": can_id,
"can_id_hex": can_id_hex or hex(can_id),
"dlc": dlc,
"data": data,
"data_hex": data_hex or (data.hex().upper() if isinstance(data, bytes) else ""),
"timestamp": ts,
"is_extended": bool(is_extended) if is_extended is not None else (can_id > 0x7FF)
})
# Отправляем в PostgreSQL напрямую (не через очередь)
sent_count = self._send_messages_batch(messages)
if sent_count > 0:
# Помечаем успешно отправленные как обработанные
# Помечаем все, так как _send_messages_batch либо отправляет всё, либо ничего
marked = storage.mark_as_processed(sqlite_ids)
self._increment_synced(marked)
self.logger.info(
f"Synced {sent_count} messages from SQLite, marked {marked} as processed"
)
return sent_count
return 0
except Exception as e:
self.logger.error(
f"Error syncing from SQLite: {e}",
exc_info=True
)
return 0
def _reconnect(self) -> None:
"""Переподключение к PostgreSQL."""
@@ -369,7 +538,7 @@ class PostgreSQLClient:
return
self.connection_status = ConnectionStatus.CONNECTING
self.reconnect_count += 1
self._increment_reconnect()
try:
# Закрываем старый пул
@@ -389,49 +558,73 @@ class PostgreSQLClient:
def _forwarder_loop(self) -> None:
"""Основной цикл для отправки сообщений в PostgreSQL."""
self.logger.info("PostgreSQL forwarder loop started")
batch = []
last_flush_time = time.time()
was_connected = self.connection_status == ConnectionStatus.CONNECTED
while self.running or not self.message_queue.empty():
try:
current_time = time.time()
# Проверяем восстановление соединения и запускаем синхронизацию
is_connected = self.connection_status == ConnectionStatus.CONNECTED
if is_connected:
# Синхронизация при восстановлении соединения или по интервалу
should_sync = (
(not was_connected and is_connected) or # Соединение восстановлено
(self._needs_sync) or # Первая синхронизация
(current_time - self._last_sync_time >= self._sync_interval) # По интервалу
)
if should_sync:
synced = self._sync_from_sqlite()
self._last_sync_time = current_time
self._needs_sync = False
if synced > 0:
self.logger.debug(f"Synced {synced} messages from SQLite")
was_connected = is_connected
# Собираем сообщения в батч
try:
message = self.message_queue.get(timeout=0.1)
batch.append(message)
except Empty:
pass
# Отправляем батч если он заполнен или прошло достаточно времени
current_time = time.time()
should_flush = (
len(batch) >= self.config.batch_size or
(batch and (current_time - last_flush_time) >= self.config.flush_interval)
)
if should_flush:
if batch:
# Отправляем батч напрямую в PostgreSQL
self._send_messages_batch(batch)
# Отправляем батч с retry механизмом
self._send_messages_batch_with_retry(batch)
batch = []
last_flush_time = current_time
except Exception as e:
self.logger.error(
f"Error in forwarder loop: {e}",
exc_info=True
)
time.sleep(0.1)
# Отправляем оставшиеся сообщения
# Отправляем оставшиеся сообщения с retry
if batch:
self._send_messages_batch(batch)
self._send_messages_batch_with_retry(batch)
# Финальная синхронизация перед остановкой
self._sync_from_sqlite()
self.logger.info(
"PostgreSQL forwarder loop stopped",
extra={
"sent_count": self.sent_count,
"failed_count": self.failed_count
"failed_count": self.failed_count,
"synced_count": self.synced_count
}
)
@@ -505,6 +698,7 @@ class PostgreSQLClient:
"failed_count": self.failed_count,
"retry_count": self.retry_count,
"reconnect_count": self.reconnect_count,
"synced_count": self.synced_count,
"queue_size": self.message_queue.qsize(),
"host": self.config.host if self.config.enabled else None,
"database": self.config.database if self.config.enabled else None

View File

@@ -5,6 +5,7 @@
Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений.
"""
import queue
import threading
import time
from queue import Queue, Empty
@@ -45,12 +46,37 @@ class MessageProcessor:
self.message_queue: Queue[CANFrame] = Queue(maxsize=queue_size)
self.running = False
self.processing_thread: Optional[threading.Thread] = None
# Статистика
self.processed_count = 0
self.dropped_count = 0
self.queue_full_warnings = 0
# Статистика с блокировкой для потокобезопасности
self._stats_lock = threading.Lock()
self._processed_count = 0
self._dropped_count = 0
self._queue_full_warnings = 0
@property
def processed_count(self) -> int:
with self._stats_lock:
return self._processed_count
@property
def dropped_count(self) -> int:
with self._stats_lock:
return self._dropped_count
@property
def queue_full_warnings(self) -> int:
with self._stats_lock:
return self._queue_full_warnings
def _increment_processed(self, count: int = 1) -> None:
with self._stats_lock:
self._processed_count += count
def _increment_dropped(self, count: int = 1) -> None:
with self._stats_lock:
self._dropped_count += count
self._queue_full_warnings += count
# Инициализируем обработчики
if handlers is None:
handlers = self._create_default_handlers()
@@ -130,11 +156,10 @@ class MessageProcessor:
# Неблокирующий режим - быстрое добавление
self.message_queue.put_nowait(frame)
return True
except:
except queue.Full:
# Очередь переполнена - пропускаем сообщение
self.dropped_count += 1
self.queue_full_warnings += 1
self._increment_dropped()
# Логируем предупреждение периодически (не каждое сообщение)
if self.queue_full_warnings % 1000 == 0:
queue_usage = (self.message_queue.qsize() / self.message_queue.maxsize) * 100
@@ -148,6 +173,11 @@ class MessageProcessor:
}
)
return False
except Exception as e:
# Неожиданная ошибка
self.logger.debug(f"Unexpected error in enqueue: {e}")
self._increment_dropped()
return False
def get_queue_usage(self) -> float:
"""
@@ -287,8 +317,8 @@ class MessageProcessor:
extra={"batch_size": len(batch)}
)
# Обновляем счетчик обработанных сообщений
self.processed_count += len(batch)
# Обновляем счетчик обработанных сообщений (атомарно)
self._increment_processed(len(batch))
except Exception as e:
self.logger.error(

View File

@@ -48,7 +48,21 @@ class CANBusHandler:
self.message_count = 0
self.error_count = 0
self.last_message_time: Optional[float] = None
# Кэшируем ссылки для быстрого доступа (избегаем рефлексии в hot path)
self._processor = None
self._has_backpressure = False
self._enqueue_method = None
self._get_queue_usage_method = None
if hasattr(message_callback, '__self__'):
processor = getattr(message_callback, '__self__', None)
if processor and hasattr(processor, 'get_queue_usage') and hasattr(processor, 'enqueue'):
self._processor = processor
self._has_backpressure = True
self._enqueue_method = processor.enqueue
self._get_queue_usage_method = processor.get_queue_usage
# Применяем фильтры, если они есть
if self.filters:
self._apply_filters()
@@ -104,48 +118,37 @@ class CANBusHandler:
# Вызываем callback для обработки сообщения
# Используем backpressure: если очередь заполнена, замедляем чтение
try:
# Проверяем использование очереди (если callback - это enqueue)
if hasattr(self.message_callback, '__self__'):
processor = getattr(self.message_callback, '__self__', None)
if processor and hasattr(processor, 'get_queue_usage'):
queue_usage = processor.get_queue_usage()
# Если очередь заполнена более чем на 80%, используем блокирующий режим
if queue_usage > 0.8:
# Блокируем добавление с небольшим таймаутом для backpressure
if hasattr(processor, 'enqueue'):
success = processor.enqueue(frame, block=True, timeout=0.01)
if not success:
consecutive_drops += 1
# Увеличиваем задержку при последовательных потерях
backpressure_delay = min(
max_backpressure_delay,
0.001 * consecutive_drops
)
continue
else:
self.message_callback(frame)
else:
# Очередь не заполнена - быстрое добавление
if hasattr(processor, 'enqueue'):
success = processor.enqueue(frame, block=False)
if not success:
consecutive_drops += 1
backpressure_delay = min(
max_backpressure_delay,
0.001 * consecutive_drops
)
continue
else:
self.message_callback(frame)
# Сбрасываем счетчик при успешной отправке
if queue_usage < 0.5:
consecutive_drops = 0
backpressure_delay = 0.0
# Используем закэшированные ссылки для избежания рефлексии в hot path
if self._has_backpressure:
queue_usage = self._get_queue_usage_method()
# Если очередь заполнена более чем на 80%, используем блокирующий режим
if queue_usage > 0.8:
# Блокируем добавление с небольшим таймаутом для backpressure
success = self._enqueue_method(frame, block=True, timeout=0.01)
if not success:
consecutive_drops += 1
# Увеличиваем задержку при последовательных потерях
backpressure_delay = min(
max_backpressure_delay,
0.001 * consecutive_drops
)
continue
else:
# Обычный callback без backpressure
self.message_callback(frame)
# Очередь не заполнена - быстрое добавление
success = self._enqueue_method(frame, block=False)
if not success:
consecutive_drops += 1
backpressure_delay = min(
max_backpressure_delay,
0.001 * consecutive_drops
)
continue
# Сбрасываем счетчик при успешной отправке
if queue_usage < 0.5:
consecutive_drops = 0
backpressure_delay = 0.0
else:
# Обычный callback без backpressure
self.message_callback(frame)

View File

@@ -19,9 +19,10 @@ logger = get_logger(__name__)
class Storage:
"""Singleton класс для работы с SQLite базой данных."""
_instance: Optional['Storage'] = None
_lock = threading.Lock()
_write_lock = threading.Lock() # Мьютекс для потокобезопасной записи в SQLite
def __new__(cls):
"""Singleton паттерн для единого экземпляра хранилища."""
@@ -33,17 +34,19 @@ class Storage:
def __init__(self):
"""Инициализация хранилища."""
# Проверяем, что инициализация выполняется только один раз
if hasattr(self, '_initialized'):
return
self.config = config.storage
self.logger = logger
self.connection: Optional[sqlite3.Connection] = None
self._initialized = False
# Инициализируем базу данных
self._init_database()
# Защита от race condition при инициализации singleton
with self._lock:
# Проверяем, что инициализация выполняется только один раз
if hasattr(self, '_initialized') and self._initialized:
return
self.config = config.storage
self.logger = logger
self.connection: Optional[sqlite3.Connection] = None
self._initialized = False
# Инициализируем базу данных
self._init_database()
def _init_database(self) -> None:
"""Инициализация базы данных SQLite."""
@@ -99,27 +102,51 @@ class Storage:
exc_info=True
)
raise
def _migrate_add_column(self, cursor, table: str, column: str, column_def: str) -> None:
"""Добавление колонки в таблицу, если она не существует.
Args:
cursor: Курсор базы данных
table: Имя таблицы
column: Имя колонки
column_def: Определение колонки (тип и ограничения)
"""
try:
cursor.execute(f"SELECT {column} FROM {table} LIMIT 1")
except sqlite3.OperationalError:
# Колонка не существует, добавляем
cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {column_def}")
self.logger.info(f"Added column {column} to table {table}")
def _create_tables(self) -> None:
"""Создание таблиц в базе данных."""
if not self.connection:
raise RuntimeError("Database connection not initialized")
cursor = self.connection.cursor()
# Таблица для CAN сообщений
# Таблица для CAN сообщений (согласована с PostgreSQL схемой)
cursor.execute("""
CREATE TABLE IF NOT EXISTS can_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
interface TEXT NOT NULL,
can_id INTEGER NOT NULL,
can_id_hex TEXT NOT NULL DEFAULT '',
is_extended INTEGER NOT NULL DEFAULT 0,
dlc INTEGER NOT NULL,
data BLOB NOT NULL,
data_hex TEXT NOT NULL DEFAULT '',
processed INTEGER DEFAULT 0,
created_at REAL DEFAULT (julianday('now'))
)
""")
# Добавляем новые колонки для существующих таблиц (миграция)
self._migrate_add_column(cursor, "can_messages", "can_id_hex", "TEXT NOT NULL DEFAULT ''")
self._migrate_add_column(cursor, "can_messages", "is_extended", "INTEGER NOT NULL DEFAULT 0")
self._migrate_add_column(cursor, "can_messages", "data_hex", "TEXT NOT NULL DEFAULT ''")
# Индексы для быстрого поиска
cursor.execute("""
@@ -170,30 +197,36 @@ class Storage:
def save_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> Optional[int]:
"""
Сохранение CAN сообщения в базу данных.
Args:
interface: Имя интерфейса (например, 'can0')
can_id: CAN ID сообщения
dlc: Data Length Code
data: Данные сообщения (bytes)
timestamp: Временная метка сообщения
Returns:
ID сохраненного сообщения или None в случае ошибки
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return None
try:
with self._get_cursor() as cursor:
cursor.execute("""
INSERT INTO can_messages (timestamp, interface, can_id, dlc, data)
VALUES (?, ?, ?, ?, ?)
""", (timestamp, interface, can_id, dlc, data))
return cursor.lastrowid
# Вычисляем дополнительные поля для совместимости с PostgreSQL
can_id_hex = hex(can_id)
is_extended = 1 if can_id > 0x7FF else 0
data_hex = data.hex().upper() if isinstance(data, bytes) else ""
with self._write_lock:
with self._get_cursor() as cursor:
cursor.execute("""
INSERT INTO can_messages (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex))
return cursor.lastrowid
except Exception as e:
self.logger.error(
f"Failed to save message to database: {e}",
@@ -208,34 +241,46 @@ class Storage:
def save_messages_batch(self, messages: list) -> int:
"""
Пакетное сохранение CAN сообщений.
Args:
messages: Список кортежей (interface, can_id, dlc, data, timestamp)
messages: Список кортежей (timestamp, interface, can_id, dlc, data)
Returns:
Количество успешно сохраненных сообщений
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return 0
if not messages:
return 0
try:
with self._get_cursor() as cursor:
cursor.executemany("""
INSERT INTO can_messages (timestamp, interface, can_id, dlc, data)
VALUES (?, ?, ?, ?, ?)
""", messages)
saved_count = cursor.rowcount
self.logger.debug(
f"Saved {saved_count} messages in batch",
extra={"batch_size": len(messages)}
)
return saved_count
# Преобразуем сообщения в расширенный формат с дополнительными полями
extended_messages = []
for msg in messages:
timestamp, interface, can_id, dlc, data = msg
can_id_hex = hex(can_id)
is_extended = 1 if can_id > 0x7FF else 0
data_hex = data.hex().upper() if isinstance(data, bytes) else ""
extended_messages.append((
timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex
))
with self._write_lock:
with self._get_cursor() as cursor:
cursor.executemany("""
INSERT INTO can_messages (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", extended_messages)
saved_count = cursor.rowcount
self.logger.debug(
f"Saved {saved_count} messages in batch",
extra={"batch_size": len(messages)}
)
return saved_count
except Exception as e:
self.logger.error(
f"Failed to save messages batch: {e}",
@@ -247,29 +292,29 @@ class Storage:
def get_unprocessed_messages(self, limit: int = 1000) -> list:
"""
Получение необработанных сообщений для отправки в PostgreSQL.
Args:
limit: Максимальное количество сообщений
Returns:
Список кортежей (id, timestamp, interface, can_id, dlc, data)
Список кортежей (id, timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex)
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return []
try:
with self._get_cursor() as cursor:
cursor.execute("""
SELECT id, timestamp, interface, can_id, dlc, data
SELECT id, timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex
FROM can_messages
WHERE processed = 0
ORDER BY timestamp ASC
LIMIT ?
""", (limit,))
return cursor.fetchall()
except Exception as e:
self.logger.error(
f"Failed to get unprocessed messages: {e}",
@@ -280,31 +325,32 @@ class Storage:
def mark_as_processed(self, message_ids: list) -> int:
"""
Отметить сообщения как обработанные.
Args:
message_ids: Список ID сообщений
Returns:
Количество обновленных сообщений
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return 0
if not message_ids:
return 0
try:
with self._get_cursor() as cursor:
placeholders = ','.join('?' * len(message_ids))
cursor.execute(f"""
UPDATE can_messages
SET processed = 1
WHERE id IN ({placeholders})
""", message_ids)
return cursor.rowcount
with self._write_lock:
with self._get_cursor() as cursor:
placeholders = ','.join('?' * len(message_ids))
cursor.execute(f"""
UPDATE can_messages
SET processed = 1
WHERE id IN ({placeholders})
""", message_ids)
return cursor.rowcount
except Exception as e:
self.logger.error(
f"Failed to mark messages as processed: {e}",
@@ -358,7 +404,50 @@ class Storage:
"initialized": True,
"error": str(e)
}
def cleanup_old_messages(self, days: Optional[int] = None) -> int:
"""
Удаление обработанных записей старше указанного количества дней.
Удаляет только записи с processed=1 для сохранения необработанных данных.
Args:
days: Количество дней хранения. Если None, берется из config.storage.retention_days
Returns:
Количество удаленных записей
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return 0
if days is None:
days = self.config.retention_days
try:
with self._write_lock:
with self._get_cursor() as cursor:
# julianday('now') - days дает дату N дней назад
cursor.execute("""
DELETE FROM can_messages
WHERE processed = 1
AND created_at < julianday('now') - ?
""", (days,))
deleted = cursor.rowcount
if deleted > 0:
self.logger.info(
f"Cleaned up {deleted} processed messages older than {days} days"
)
return deleted
except Exception as e:
self.logger.error(
f"Failed to cleanup old messages: {e}",
exc_info=True
)
return 0
def close(self) -> None:
"""Закрытие соединения с базой данных."""
if self.connection: