diff --git a/can_sniffer/src/config.py b/can_sniffer/src/config.py index fd8d1bb..90d4583 100644 --- a/can_sniffer/src/config.py +++ b/can_sniffer/src/config.py @@ -45,9 +45,9 @@ class CanConfig(BaseModel): class StorageConfig(BaseModel): """Конфигурация локального хранилища (SQLite).""" - + model_config = {"extra": "ignore"} - + type: str = Field( default="sqlite", description="Тип хранилища" @@ -64,6 +64,10 @@ class StorageConfig(BaseModel): default="NORMAL", description="Режим синхронизации: NORMAL, FULL, OFF" ) + retention_days: int = Field( + default=7, + description="Дней хранения обработанных записей (для автоочистки)" + ) class PostgreSQLConfig(BaseModel): diff --git a/can_sniffer/src/handlers/storage_handler.py b/can_sniffer/src/handlers/storage_handler.py index 08fe573..8bd0442 100644 --- a/can_sniffer/src/handlers/storage_handler.py +++ b/can_sniffer/src/handlers/storage_handler.py @@ -91,14 +91,14 @@ class StorageHandler(BaseHandler): pass def shutdown(self) -> None: - """Корректное завершение работы обработчика.""" - if self.storage: - try: - self.storage.close() - self.logger.info("Storage handler closed") - except Exception as e: - self.logger.error(f"Error closing storage: {e}", exc_info=True) + """Корректное завершение работы обработчика. + + Примечание: НЕ закрываем Storage singleton здесь, так как он может + использоваться другими компонентами (например, для синхронизации с PostgreSQL). + Storage закрывается отдельно при полном завершении приложения. + """ self._initialized = False + self.logger.info("Storage handler shutdown complete") def get_stats(self) -> Dict[str, Any]: """Получение статистики обработчика.""" diff --git a/can_sniffer/src/postgresql_handler/postgresql_client.py b/can_sniffer/src/postgresql_handler/postgresql_client.py index 9948eb7..2ca009f 100644 --- a/can_sniffer/src/postgresql_handler/postgresql_client.py +++ b/can_sniffer/src/postgresql_handler/postgresql_client.py @@ -5,10 +5,12 @@ с поддержкой пакетной отправки, connection pooling, retry с backoff. """ +import queue import threading import time +from datetime import datetime, timezone from queue import Queue, Empty -from typing import Optional, List, Dict, Any +from typing import Optional, List, Dict, Any, Tuple from enum import Enum from config import config @@ -55,35 +57,90 @@ class PostgreSQLClient: def __init__(self): """Инициализация клиента PostgreSQL.""" - # Проверяем, что инициализация выполняется только один раз - if hasattr(self, '_initialized'): - return - - self.config = config.postgresql - self.logger = logger - - # Инициализируем атрибуты по умолчанию - self.connection_pool: Optional[pool.ThreadedConnectionPool] = None - self.message_queue: Queue[Dict[str, Any]] = Queue() - self.running = False - self.forwarder_thread: Optional[threading.Thread] = None - self.connection_status = ConnectionStatus.DISCONNECTED - - # Статистика - self.sent_count = 0 - self.failed_count = 0 - self.retry_count = 0 - self.reconnect_count = 0 - self._initialized = False - - if not POSTGRESQL_AVAILABLE: - self.logger.error("PostgreSQL client library not available") - return - - # Инициализируем клиент - self._init_client() - self._initialized = True - + # Защита от race condition при инициализации singleton + with self._lock: + # Проверяем, что инициализация выполняется только один раз + if hasattr(self, '_initialized') and self._initialized: + return + + self.config = config.postgresql + self.logger = logger + + # Инициализируем атрибуты по умолчанию + self.connection_pool: Optional[pool.ThreadedConnectionPool] = None + self.message_queue: Queue[Dict[str, Any]] = Queue(maxsize=config.general.buffer_size) + self.running = False + self.forwarder_thread: Optional[threading.Thread] = None + self.connection_status = ConnectionStatus.DISCONNECTED + + # Статистика с блокировкой для потокобезопасности + self._stats_lock = threading.Lock() + self._sent_count = 0 + self._failed_count = 0 + self._retry_count = 0 + self._reconnect_count = 0 + self._synced_count = 0 # Количество синхронизированных из SQLite + + # Флаг для запуска синхронизации после восстановления соединения + self._needs_sync = True + self._last_sync_time = 0.0 + self._sync_interval = 30.0 # Интервал синхронизации в секундах + + if not POSTGRESQL_AVAILABLE: + self.logger.error("PostgreSQL client library not available") + self._initialized = True # Отмечаем как инициализированный, чтобы не повторять + return + + # Инициализируем клиент + self._init_client() + self._initialized = True + + # Потокобезопасные свойства для статистики + @property + def sent_count(self) -> int: + with self._stats_lock: + return self._sent_count + + @property + def failed_count(self) -> int: + with self._stats_lock: + return self._failed_count + + @property + def retry_count(self) -> int: + with self._stats_lock: + return self._retry_count + + @property + def reconnect_count(self) -> int: + with self._stats_lock: + return self._reconnect_count + + @property + def synced_count(self) -> int: + with self._stats_lock: + return self._synced_count + + def _increment_sent(self, count: int = 1) -> None: + with self._stats_lock: + self._sent_count += count + + def _increment_failed(self, count: int = 1) -> None: + with self._stats_lock: + self._failed_count += count + + def _increment_retry(self) -> None: + with self._stats_lock: + self._retry_count += 1 + + def _increment_reconnect(self) -> None: + with self._stats_lock: + self._reconnect_count += 1 + + def _increment_synced(self, count: int = 1) -> None: + with self._stats_lock: + self._synced_count += count + def _init_client(self) -> None: """Инициализация пула соединений PostgreSQL.""" if not POSTGRESQL_AVAILABLE: @@ -194,9 +251,9 @@ class PostgreSQLClient: "timestamp": timestamp, "is_extended": can_id > 0x7FF }) - except: + except queue.Full: # Очередь переполнена - пропускаем сообщение - self.failed_count += 1 + self._increment_failed() return False return True except Exception as e: @@ -204,7 +261,7 @@ class PostgreSQLClient: f"Failed to queue message for PostgreSQL: {e}", exc_info=True ) - self.failed_count += 1 + self._increment_failed() return False def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int: @@ -227,7 +284,7 @@ class PostgreSQLClient: if self.connection_status != ConnectionStatus.CONNECTED: if not self._health_check(): # Соединение недоступно - пропускаем батч без ошибки - self.failed_count += len(messages) + self._increment_failed(len(messages)) return 0 else: self.connection_status = ConnectionStatus.CONNECTED @@ -236,7 +293,7 @@ class PostgreSQLClient: queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0 if queue_usage > 0.9 and not block: # Очередь почти переполнена - пропускаем батч - self.failed_count += len(messages) + self._increment_failed(len(messages)) return 0 # Добавляем сообщения в очередь для асинхронной отправки @@ -248,7 +305,7 @@ class PostgreSQLClient: else: try: self.message_queue.put_nowait(msg) - except: + except queue.Full: # Очередь переполнена - пропускаем оставшиеся сообщения break added_count += 1 @@ -257,7 +314,7 @@ class PostgreSQLClient: break if added_count < len(messages): - self.failed_count += (len(messages) - added_count) + self._increment_failed(len(messages) - added_count) return added_count @@ -280,7 +337,7 @@ class PostgreSQLClient: if self.connection_status != ConnectionStatus.CONNECTED: if not self._health_check(): self.logger.warning("PostgreSQL connection not available, skipping batch") - self.failed_count += len(messages) + self._increment_failed(len(messages)) return 0 else: self.connection_status = ConnectionStatus.CONNECTED @@ -290,7 +347,7 @@ class PostgreSQLClient: # Получаем соединение из пула conn = self.connection_pool.getconn() if not conn: - self.failed_count += len(messages) + self._increment_failed(len(messages)) return 0 cursor = conn.cursor() @@ -303,8 +360,8 @@ class PostgreSQLClient: values = [] for msg in messages: - from datetime import datetime - ts = datetime.fromtimestamp(msg["timestamp"]) + # Используем UTC для согласованности времени + ts = datetime.fromtimestamp(msg["timestamp"], tz=timezone.utc) values.append(( ts, msg["interface"], @@ -322,7 +379,7 @@ class PostgreSQLClient: cursor.close() sent = len(messages) - self.sent_count += sent + self._increment_sent(sent) self.logger.debug( f"Sent {sent} messages to PostgreSQL", extra={"batch_size": sent} @@ -332,22 +389,67 @@ class PostgreSQLClient: except Exception as e: if conn: conn.rollback() - self.failed_count += len(messages) self.logger.error( f"Failed to send messages batch to PostgreSQL: {e}", exc_info=True, extra={"batch_size": len(messages)} ) - return 0 + # Не увеличиваем failed_count здесь - это делает _send_messages_batch_with_retry + raise # Пробрасываем исключение для retry механизма finally: if conn: self.connection_pool.putconn(conn) - + + def _send_messages_batch_with_retry(self, messages: List[Dict[str, Any]]) -> int: + """ + Отправка батча сообщений с retry и exponential backoff. + + Args: + messages: Список словарей с данными сообщений + + Returns: + Количество успешно отправленных сообщений + """ + if not messages: + return 0 + + max_retries = self.config.max_retries + base_backoff = self.config.retry_backoff + + for attempt in range(max_retries): + try: + return self._send_messages_batch(messages) + except Exception as e: + self._increment_retry() + + if attempt < max_retries - 1: + # Exponential backoff: 1s, 2s, 4s... + delay = base_backoff * (2 ** attempt) + self.logger.warning( + f"PostgreSQL send failed (attempt {attempt + 1}/{max_retries}), " + f"retrying in {delay}s: {e}" + ) + time.sleep(delay) + + # Проверяем соединение перед повторной попыткой + if not self._health_check(): + self.logger.warning("PostgreSQL connection lost, attempting reconnect") + self._reconnect() + else: + # Все попытки исчерпаны + self.logger.error( + f"All {max_retries} retries failed for batch of {len(messages)} messages" + ) + self._increment_failed(len(messages)) + return 0 + + return 0 + def _health_check(self) -> bool: """Проверка здоровья соединения с PostgreSQL.""" if not self.connection_pool: return False - + try: conn = self.connection_pool.getconn() if conn: @@ -362,6 +464,73 @@ class PostgreSQLClient: except Exception as e: self.logger.debug(f"PostgreSQL health check failed: {e}") return False + + def _sync_from_sqlite(self) -> int: + """ + Синхронизация необработанных записей из SQLite в PostgreSQL. + + Читает записи с processed=0 из SQLite и отправляет их в PostgreSQL. + После успешной отправки помечает записи как обработанные. + + Returns: + Количество синхронизированных сообщений + """ + if self.connection_status != ConnectionStatus.CONNECTED: + return 0 + + try: + # Импортируем storage здесь, чтобы избежать циклических импортов + from storage import get_storage + storage = get_storage() + + # Получаем необработанные сообщения из SQLite + unprocessed = storage.get_unprocessed_messages(limit=1000) + if not unprocessed: + return 0 + + self.logger.info( + f"Syncing {len(unprocessed)} unprocessed messages from SQLite to PostgreSQL" + ) + + # Конвертируем записи SQLite в формат для PostgreSQL + # Формат из SQLite: (id, timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex) + messages = [] + sqlite_ids = [] + for row in unprocessed: + sqlite_id, ts, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex = row + sqlite_ids.append(sqlite_id) + messages.append({ + "interface": interface, + "can_id": can_id, + "can_id_hex": can_id_hex or hex(can_id), + "dlc": dlc, + "data": data, + "data_hex": data_hex or (data.hex().upper() if isinstance(data, bytes) else ""), + "timestamp": ts, + "is_extended": bool(is_extended) if is_extended is not None else (can_id > 0x7FF) + }) + + # Отправляем в PostgreSQL напрямую (не через очередь) + sent_count = self._send_messages_batch(messages) + + if sent_count > 0: + # Помечаем успешно отправленные как обработанные + # Помечаем все, так как _send_messages_batch либо отправляет всё, либо ничего + marked = storage.mark_as_processed(sqlite_ids) + self._increment_synced(marked) + self.logger.info( + f"Synced {sent_count} messages from SQLite, marked {marked} as processed" + ) + return sent_count + + return 0 + + except Exception as e: + self.logger.error( + f"Error syncing from SQLite: {e}", + exc_info=True + ) + return 0 def _reconnect(self) -> None: """Переподключение к PostgreSQL.""" @@ -369,7 +538,7 @@ class PostgreSQLClient: return self.connection_status = ConnectionStatus.CONNECTING - self.reconnect_count += 1 + self._increment_reconnect() try: # Закрываем старый пул @@ -389,49 +558,73 @@ class PostgreSQLClient: def _forwarder_loop(self) -> None: """Основной цикл для отправки сообщений в PostgreSQL.""" self.logger.info("PostgreSQL forwarder loop started") - + batch = [] last_flush_time = time.time() - + was_connected = self.connection_status == ConnectionStatus.CONNECTED + while self.running or not self.message_queue.empty(): try: + current_time = time.time() + + # Проверяем восстановление соединения и запускаем синхронизацию + is_connected = self.connection_status == ConnectionStatus.CONNECTED + if is_connected: + # Синхронизация при восстановлении соединения или по интервалу + should_sync = ( + (not was_connected and is_connected) or # Соединение восстановлено + (self._needs_sync) or # Первая синхронизация + (current_time - self._last_sync_time >= self._sync_interval) # По интервалу + ) + if should_sync: + synced = self._sync_from_sqlite() + self._last_sync_time = current_time + self._needs_sync = False + if synced > 0: + self.logger.debug(f"Synced {synced} messages from SQLite") + + was_connected = is_connected + # Собираем сообщения в батч try: message = self.message_queue.get(timeout=0.1) batch.append(message) except Empty: pass - + # Отправляем батч если он заполнен или прошло достаточно времени - current_time = time.time() should_flush = ( len(batch) >= self.config.batch_size or (batch and (current_time - last_flush_time) >= self.config.flush_interval) ) - + if should_flush: if batch: - # Отправляем батч напрямую в PostgreSQL - self._send_messages_batch(batch) + # Отправляем батч с retry механизмом + self._send_messages_batch_with_retry(batch) batch = [] last_flush_time = current_time - + except Exception as e: self.logger.error( f"Error in forwarder loop: {e}", exc_info=True ) time.sleep(0.1) - - # Отправляем оставшиеся сообщения + + # Отправляем оставшиеся сообщения с retry if batch: - self._send_messages_batch(batch) - + self._send_messages_batch_with_retry(batch) + + # Финальная синхронизация перед остановкой + self._sync_from_sqlite() + self.logger.info( "PostgreSQL forwarder loop stopped", extra={ "sent_count": self.sent_count, - "failed_count": self.failed_count + "failed_count": self.failed_count, + "synced_count": self.synced_count } ) @@ -505,6 +698,7 @@ class PostgreSQLClient: "failed_count": self.failed_count, "retry_count": self.retry_count, "reconnect_count": self.reconnect_count, + "synced_count": self.synced_count, "queue_size": self.message_queue.qsize(), "host": self.config.host if self.config.enabled else None, "database": self.config.database if self.config.enabled else None diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index 6aac103..c904855 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -5,6 +5,7 @@ Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений. """ +import queue import threading import time from queue import Queue, Empty @@ -45,12 +46,37 @@ class MessageProcessor: self.message_queue: Queue[CANFrame] = Queue(maxsize=queue_size) self.running = False self.processing_thread: Optional[threading.Thread] = None - - # Статистика - self.processed_count = 0 - self.dropped_count = 0 - self.queue_full_warnings = 0 - + + # Статистика с блокировкой для потокобезопасности + self._stats_lock = threading.Lock() + self._processed_count = 0 + self._dropped_count = 0 + self._queue_full_warnings = 0 + + @property + def processed_count(self) -> int: + with self._stats_lock: + return self._processed_count + + @property + def dropped_count(self) -> int: + with self._stats_lock: + return self._dropped_count + + @property + def queue_full_warnings(self) -> int: + with self._stats_lock: + return self._queue_full_warnings + + def _increment_processed(self, count: int = 1) -> None: + with self._stats_lock: + self._processed_count += count + + def _increment_dropped(self, count: int = 1) -> None: + with self._stats_lock: + self._dropped_count += count + self._queue_full_warnings += count + # Инициализируем обработчики if handlers is None: handlers = self._create_default_handlers() @@ -130,11 +156,10 @@ class MessageProcessor: # Неблокирующий режим - быстрое добавление self.message_queue.put_nowait(frame) return True - except: + except queue.Full: # Очередь переполнена - пропускаем сообщение - self.dropped_count += 1 - self.queue_full_warnings += 1 - + self._increment_dropped() + # Логируем предупреждение периодически (не каждое сообщение) if self.queue_full_warnings % 1000 == 0: queue_usage = (self.message_queue.qsize() / self.message_queue.maxsize) * 100 @@ -148,6 +173,11 @@ class MessageProcessor: } ) return False + except Exception as e: + # Неожиданная ошибка + self.logger.debug(f"Unexpected error in enqueue: {e}") + self._increment_dropped() + return False def get_queue_usage(self) -> float: """ @@ -287,8 +317,8 @@ class MessageProcessor: extra={"batch_size": len(batch)} ) - # Обновляем счетчик обработанных сообщений - self.processed_count += len(batch) + # Обновляем счетчик обработанных сообщений (атомарно) + self._increment_processed(len(batch)) except Exception as e: self.logger.error( diff --git a/can_sniffer/src/socket_can/src.py b/can_sniffer/src/socket_can/src.py index 230ff0f..08b1be9 100644 --- a/can_sniffer/src/socket_can/src.py +++ b/can_sniffer/src/socket_can/src.py @@ -48,7 +48,21 @@ class CANBusHandler: self.message_count = 0 self.error_count = 0 self.last_message_time: Optional[float] = None - + + # Кэшируем ссылки для быстрого доступа (избегаем рефлексии в hot path) + self._processor = None + self._has_backpressure = False + self._enqueue_method = None + self._get_queue_usage_method = None + + if hasattr(message_callback, '__self__'): + processor = getattr(message_callback, '__self__', None) + if processor and hasattr(processor, 'get_queue_usage') and hasattr(processor, 'enqueue'): + self._processor = processor + self._has_backpressure = True + self._enqueue_method = processor.enqueue + self._get_queue_usage_method = processor.get_queue_usage + # Применяем фильтры, если они есть if self.filters: self._apply_filters() @@ -104,48 +118,37 @@ class CANBusHandler: # Вызываем callback для обработки сообщения # Используем backpressure: если очередь заполнена, замедляем чтение try: - # Проверяем использование очереди (если callback - это enqueue) - if hasattr(self.message_callback, '__self__'): - processor = getattr(self.message_callback, '__self__', None) - if processor and hasattr(processor, 'get_queue_usage'): - queue_usage = processor.get_queue_usage() - - # Если очередь заполнена более чем на 80%, используем блокирующий режим - if queue_usage > 0.8: - # Блокируем добавление с небольшим таймаутом для backpressure - if hasattr(processor, 'enqueue'): - success = processor.enqueue(frame, block=True, timeout=0.01) - if not success: - consecutive_drops += 1 - # Увеличиваем задержку при последовательных потерях - backpressure_delay = min( - max_backpressure_delay, - 0.001 * consecutive_drops - ) - continue - else: - self.message_callback(frame) - else: - # Очередь не заполнена - быстрое добавление - if hasattr(processor, 'enqueue'): - success = processor.enqueue(frame, block=False) - if not success: - consecutive_drops += 1 - backpressure_delay = min( - max_backpressure_delay, - 0.001 * consecutive_drops - ) - continue - else: - self.message_callback(frame) - - # Сбрасываем счетчик при успешной отправке - if queue_usage < 0.5: - consecutive_drops = 0 - backpressure_delay = 0.0 + # Используем закэшированные ссылки для избежания рефлексии в hot path + if self._has_backpressure: + queue_usage = self._get_queue_usage_method() + + # Если очередь заполнена более чем на 80%, используем блокирующий режим + if queue_usage > 0.8: + # Блокируем добавление с небольшим таймаутом для backpressure + success = self._enqueue_method(frame, block=True, timeout=0.01) + if not success: + consecutive_drops += 1 + # Увеличиваем задержку при последовательных потерях + backpressure_delay = min( + max_backpressure_delay, + 0.001 * consecutive_drops + ) + continue else: - # Обычный callback без backpressure - self.message_callback(frame) + # Очередь не заполнена - быстрое добавление + success = self._enqueue_method(frame, block=False) + if not success: + consecutive_drops += 1 + backpressure_delay = min( + max_backpressure_delay, + 0.001 * consecutive_drops + ) + continue + + # Сбрасываем счетчик при успешной отправке + if queue_usage < 0.5: + consecutive_drops = 0 + backpressure_delay = 0.0 else: # Обычный callback без backpressure self.message_callback(frame) diff --git a/can_sniffer/src/storage/storage.py b/can_sniffer/src/storage/storage.py index 609e84f..f7dfd53 100644 --- a/can_sniffer/src/storage/storage.py +++ b/can_sniffer/src/storage/storage.py @@ -19,9 +19,10 @@ logger = get_logger(__name__) class Storage: """Singleton класс для работы с SQLite базой данных.""" - + _instance: Optional['Storage'] = None _lock = threading.Lock() + _write_lock = threading.Lock() # Мьютекс для потокобезопасной записи в SQLite def __new__(cls): """Singleton паттерн для единого экземпляра хранилища.""" @@ -33,17 +34,19 @@ class Storage: def __init__(self): """Инициализация хранилища.""" - # Проверяем, что инициализация выполняется только один раз - if hasattr(self, '_initialized'): - return - - self.config = config.storage - self.logger = logger - self.connection: Optional[sqlite3.Connection] = None - self._initialized = False - - # Инициализируем базу данных - self._init_database() + # Защита от race condition при инициализации singleton + with self._lock: + # Проверяем, что инициализация выполняется только один раз + if hasattr(self, '_initialized') and self._initialized: + return + + self.config = config.storage + self.logger = logger + self.connection: Optional[sqlite3.Connection] = None + self._initialized = False + + # Инициализируем базу данных + self._init_database() def _init_database(self) -> None: """Инициализация базы данных SQLite.""" @@ -99,27 +102,51 @@ class Storage: exc_info=True ) raise - + + def _migrate_add_column(self, cursor, table: str, column: str, column_def: str) -> None: + """Добавление колонки в таблицу, если она не существует. + + Args: + cursor: Курсор базы данных + table: Имя таблицы + column: Имя колонки + column_def: Определение колонки (тип и ограничения) + """ + try: + cursor.execute(f"SELECT {column} FROM {table} LIMIT 1") + except sqlite3.OperationalError: + # Колонка не существует, добавляем + cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {column_def}") + self.logger.info(f"Added column {column} to table {table}") + def _create_tables(self) -> None: """Создание таблиц в базе данных.""" if not self.connection: raise RuntimeError("Database connection not initialized") - + cursor = self.connection.cursor() - - # Таблица для CAN сообщений + + # Таблица для CAN сообщений (согласована с PostgreSQL схемой) cursor.execute(""" CREATE TABLE IF NOT EXISTS can_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp REAL NOT NULL, interface TEXT NOT NULL, can_id INTEGER NOT NULL, + can_id_hex TEXT NOT NULL DEFAULT '', + is_extended INTEGER NOT NULL DEFAULT 0, dlc INTEGER NOT NULL, data BLOB NOT NULL, + data_hex TEXT NOT NULL DEFAULT '', processed INTEGER DEFAULT 0, created_at REAL DEFAULT (julianday('now')) ) """) + + # Добавляем новые колонки для существующих таблиц (миграция) + self._migrate_add_column(cursor, "can_messages", "can_id_hex", "TEXT NOT NULL DEFAULT ''") + self._migrate_add_column(cursor, "can_messages", "is_extended", "INTEGER NOT NULL DEFAULT 0") + self._migrate_add_column(cursor, "can_messages", "data_hex", "TEXT NOT NULL DEFAULT ''") # Индексы для быстрого поиска cursor.execute(""" @@ -170,30 +197,36 @@ class Storage: def save_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> Optional[int]: """ Сохранение CAN сообщения в базу данных. - + Args: interface: Имя интерфейса (например, 'can0') can_id: CAN ID сообщения dlc: Data Length Code data: Данные сообщения (bytes) timestamp: Временная метка сообщения - + Returns: ID сохраненного сообщения или None в случае ошибки """ if not self.connection: self.logger.error("Database connection not initialized") return None - + try: - with self._get_cursor() as cursor: - cursor.execute(""" - INSERT INTO can_messages (timestamp, interface, can_id, dlc, data) - VALUES (?, ?, ?, ?, ?) - """, (timestamp, interface, can_id, dlc, data)) - - return cursor.lastrowid - + # Вычисляем дополнительные поля для совместимости с PostgreSQL + can_id_hex = hex(can_id) + is_extended = 1 if can_id > 0x7FF else 0 + data_hex = data.hex().upper() if isinstance(data, bytes) else "" + + with self._write_lock: + with self._get_cursor() as cursor: + cursor.execute(""" + INSERT INTO can_messages (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex)) + + return cursor.lastrowid + except Exception as e: self.logger.error( f"Failed to save message to database: {e}", @@ -208,34 +241,46 @@ class Storage: def save_messages_batch(self, messages: list) -> int: """ Пакетное сохранение CAN сообщений. - + Args: - messages: Список кортежей (interface, can_id, dlc, data, timestamp) - + messages: Список кортежей (timestamp, interface, can_id, dlc, data) + Returns: Количество успешно сохраненных сообщений """ if not self.connection: self.logger.error("Database connection not initialized") return 0 - + if not messages: return 0 - + try: - with self._get_cursor() as cursor: - cursor.executemany(""" - INSERT INTO can_messages (timestamp, interface, can_id, dlc, data) - VALUES (?, ?, ?, ?, ?) - """, messages) - - saved_count = cursor.rowcount - self.logger.debug( - f"Saved {saved_count} messages in batch", - extra={"batch_size": len(messages)} - ) - return saved_count - + # Преобразуем сообщения в расширенный формат с дополнительными полями + extended_messages = [] + for msg in messages: + timestamp, interface, can_id, dlc, data = msg + can_id_hex = hex(can_id) + is_extended = 1 if can_id > 0x7FF else 0 + data_hex = data.hex().upper() if isinstance(data, bytes) else "" + extended_messages.append(( + timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex + )) + + with self._write_lock: + with self._get_cursor() as cursor: + cursor.executemany(""" + INSERT INTO can_messages (timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, extended_messages) + + saved_count = cursor.rowcount + self.logger.debug( + f"Saved {saved_count} messages in batch", + extra={"batch_size": len(messages)} + ) + return saved_count + except Exception as e: self.logger.error( f"Failed to save messages batch: {e}", @@ -247,29 +292,29 @@ class Storage: def get_unprocessed_messages(self, limit: int = 1000) -> list: """ Получение необработанных сообщений для отправки в PostgreSQL. - + Args: limit: Максимальное количество сообщений - + Returns: - Список кортежей (id, timestamp, interface, can_id, dlc, data) + Список кортежей (id, timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex) """ if not self.connection: self.logger.error("Database connection not initialized") return [] - + try: with self._get_cursor() as cursor: cursor.execute(""" - SELECT id, timestamp, interface, can_id, dlc, data + SELECT id, timestamp, interface, can_id, can_id_hex, is_extended, dlc, data, data_hex FROM can_messages WHERE processed = 0 ORDER BY timestamp ASC LIMIT ? """, (limit,)) - + return cursor.fetchall() - + except Exception as e: self.logger.error( f"Failed to get unprocessed messages: {e}", @@ -280,31 +325,32 @@ class Storage: def mark_as_processed(self, message_ids: list) -> int: """ Отметить сообщения как обработанные. - + Args: message_ids: Список ID сообщений - + Returns: Количество обновленных сообщений """ if not self.connection: self.logger.error("Database connection not initialized") return 0 - + if not message_ids: return 0 - + try: - with self._get_cursor() as cursor: - placeholders = ','.join('?' * len(message_ids)) - cursor.execute(f""" - UPDATE can_messages - SET processed = 1 - WHERE id IN ({placeholders}) - """, message_ids) - - return cursor.rowcount - + with self._write_lock: + with self._get_cursor() as cursor: + placeholders = ','.join('?' * len(message_ids)) + cursor.execute(f""" + UPDATE can_messages + SET processed = 1 + WHERE id IN ({placeholders}) + """, message_ids) + + return cursor.rowcount + except Exception as e: self.logger.error( f"Failed to mark messages as processed: {e}", @@ -358,7 +404,50 @@ class Storage: "initialized": True, "error": str(e) } - + + def cleanup_old_messages(self, days: Optional[int] = None) -> int: + """ + Удаление обработанных записей старше указанного количества дней. + + Удаляет только записи с processed=1 для сохранения необработанных данных. + + Args: + days: Количество дней хранения. Если None, берется из config.storage.retention_days + + Returns: + Количество удаленных записей + """ + if not self.connection: + self.logger.error("Database connection not initialized") + return 0 + + if days is None: + days = self.config.retention_days + + try: + with self._write_lock: + with self._get_cursor() as cursor: + # julianday('now') - days дает дату N дней назад + cursor.execute(""" + DELETE FROM can_messages + WHERE processed = 1 + AND created_at < julianday('now') - ? + """, (days,)) + + deleted = cursor.rowcount + if deleted > 0: + self.logger.info( + f"Cleaned up {deleted} processed messages older than {days} days" + ) + return deleted + + except Exception as e: + self.logger.error( + f"Failed to cleanup old messages: {e}", + exc_info=True + ) + return 0 + def close(self) -> None: """Закрытие соединения с базой данных.""" if self.connection: