From d72ea56772a01a24167c6c5eb8a8056ae5987218 Mon Sep 17 00:00:00 2001 From: qsethuk Date: Tue, 6 Jan 2026 18:38:42 +0300 Subject: [PATCH] Add storage module for sql --- .../src/socket_can/message_processor.py | 127 +++++- can_sniffer/src/storage/__init__.py | 6 + can_sniffer/src/storage/storage.py | 392 ++++++++++++++++++ 3 files changed, 503 insertions(+), 22 deletions(-) create mode 100644 can_sniffer/src/storage/__init__.py create mode 100644 can_sniffer/src/storage/storage.py diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index 5a6262d..7bd4ac0 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -12,6 +12,7 @@ from queue import Queue, Empty from typing import Optional, Tuple from logger import get_logger from config import config +from storage import get_storage logger = get_logger(__name__) @@ -48,16 +49,22 @@ class MessageProcessor: def _init_storage(self) -> None: """Инициализация локального хранилища (SQLite).""" - # TODO: Инициализация SQLite хранилища - # from storage import Storage - # self.storage = Storage(config.storage) - self.logger.info( - "Storage initialization", - extra={ - "type": config.storage.type, - "path": config.storage.database_path - } - ) + try: + self.storage = get_storage() + self.logger.info( + "Storage initialized", + extra={ + "type": config.storage.type, + "path": config.storage.database_path, + "wal_mode": config.storage.wal_mode + } + ) + except Exception as e: + self.logger.error( + f"Failed to initialize storage: {e}", + exc_info=True + ) + self.storage = None def _init_influxdb(self) -> None: """Инициализация клиента InfluxDB.""" @@ -183,10 +190,59 @@ class MessageProcessor: return try: - # Обрабатываем каждое сообщение в батче + # Подготавливаем данные для пакетного сохранения в SQLite + messages_to_save = [] + for interface, message in batch: - self._process_single(interface, message) - self.processed_count += 1 + # Логируем сообщение в зависимости от настроек + if self.log_messages: + self.logger.info( + "CAN message", + extra={ + "interface": interface, + "can_id": hex(message.arbitration_id), + "dlc": message.dlc, + "data": message.data.hex() if message.data else "", + "timestamp": message.timestamp + } + ) + else: + self.logger.debug( + "CAN message received", + extra={ + "interface": interface, + "can_id": hex(message.arbitration_id), + "dlc": message.dlc, + "data": message.data.hex() if message.data else "", + "timestamp": message.timestamp + } + ) + + # Подготавливаем данные для сохранения + messages_to_save.append(( + message.timestamp, + interface, + message.arbitration_id, + message.dlc, + message.data if message.data else b'' + )) + + # Пакетное сохранение в SQLite + if self.storage and messages_to_save: + saved_count = self.storage.save_messages_batch(messages_to_save) + if saved_count != len(messages_to_save): + self.logger.warning( + f"Not all messages saved: {saved_count}/{len(messages_to_save)}", + extra={"batch_size": len(messages_to_save)} + ) + + # Отправляем в InfluxDB (если включено) + if self.influxdb_client: + for interface, message in batch: + self._send_to_influxdb(interface, message) + + # Обновляем счетчик обработанных сообщений + self.processed_count += len(batch) except Exception as e: self.logger.error( @@ -199,6 +255,8 @@ class MessageProcessor: """ Обработка одного CAN сообщения. + Используется как fallback, если батчинг не применяется. + Args: interface: Имя интерфейса message: CAN сообщение @@ -206,7 +264,6 @@ class MessageProcessor: try: # Логируем сообщение в зависимости от настроек if self.log_messages: - # Логирование на INFO уровне (если включено) self.logger.info( "CAN message", extra={ @@ -218,7 +275,6 @@ class MessageProcessor: } ) else: - # Логирование на DEBUG уровне (по умолчанию) self.logger.debug( "CAN message received", extra={ @@ -256,13 +312,25 @@ class MessageProcessor: message: CAN сообщение """ if not self.storage: - # TODO: Реализовать сохранение в SQLite - # self.storage.save_message(interface, message) return try: - # self.storage.save_message(interface, message) - pass + message_id = self.storage.save_message( + interface=interface, + can_id=message.arbitration_id, + dlc=message.dlc, + data=message.data if message.data else b'', + timestamp=message.timestamp + ) + + if message_id is None: + self.logger.warning( + "Failed to save message to storage", + extra={ + "interface": interface, + "can_id": hex(message.arbitration_id) + } + ) except Exception as e: self.logger.error( f"Failed to save message to storage: {e}", @@ -320,8 +388,13 @@ class MessageProcessor: # Закрываем соединения if self.storage: - # self.storage.close() - pass + try: + self.storage.close() + except Exception as e: + self.logger.error( + f"Error closing storage: {e}", + exc_info=True + ) if self.influxdb_client: # self.influxdb_client.close() @@ -337,9 +410,19 @@ class MessageProcessor: def get_stats(self) -> dict: """Получение статистики процессора.""" - return { + stats = { "processed_count": self.processed_count, "dropped_count": self.dropped_count, "queue_size": self.message_queue.qsize(), "running": self.running } + + # Добавляем статистику хранилища, если оно инициализировано + if self.storage: + try: + storage_stats = self.storage.get_stats() + stats["storage"] = storage_stats + except Exception as e: + self.logger.debug(f"Failed to get storage stats: {e}") + + return stats diff --git a/can_sniffer/src/storage/__init__.py b/can_sniffer/src/storage/__init__.py new file mode 100644 index 0000000..1f0a2fe --- /dev/null +++ b/can_sniffer/src/storage/__init__.py @@ -0,0 +1,6 @@ +"""Модуль для работы с локальным хранилищем SQLite.""" + +from .storage import Storage, get_storage + +__all__ = ['Storage', 'get_storage'] + diff --git a/can_sniffer/src/storage/storage.py b/can_sniffer/src/storage/storage.py new file mode 100644 index 0000000..63d3ad6 --- /dev/null +++ b/can_sniffer/src/storage/storage.py @@ -0,0 +1,392 @@ +""" +Модуль для работы с локальным хранилищем SQLite. + +Предоставляет singleton класс для инициализации и работы с SQLite базой данных +для временного хранения CAN сообщений. +""" + +import sqlite3 +import threading +from pathlib import Path +from typing import Optional, Dict, Any +from contextlib import contextmanager + +from config import config +from logger import get_logger + +logger = get_logger(__name__) + + +class Storage: + """Singleton класс для работы с SQLite базой данных.""" + + _instance: Optional['Storage'] = None + _lock = threading.Lock() + + def __new__(cls): + """Singleton паттерн для единого экземпляра хранилища.""" + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """Инициализация хранилища.""" + # Проверяем, что инициализация выполняется только один раз + if hasattr(self, '_initialized'): + return + + self.config = config.storage + self.logger = logger + self.connection: Optional[sqlite3.Connection] = None + self._initialized = False + + # Инициализируем базу данных + self._init_database() + + def _init_database(self) -> None: + """Инициализация базы данных SQLite.""" + try: + # Определяем путь к базе данных + db_path = Path(self.config.database_path) + db_path.parent.mkdir(parents=True, exist_ok=True) + + self.logger.info( + "Initializing SQLite database", + extra={ + "path": str(db_path), + "wal_mode": self.config.wal_mode, + "sync_mode": self.config.sync_mode + } + ) + + # Создаем соединение + self.connection = sqlite3.connect( + str(db_path), + check_same_thread=False, # Разрешаем использование из разных потоков + timeout=30.0 # Таймаут для блокировок + ) + + # Настраиваем режим синхронизации + sync_mode_map = { + "NORMAL": "NORMAL", + "FULL": "FULL", + "OFF": "OFF" + } + sync_mode = sync_mode_map.get(self.config.sync_mode.upper(), "NORMAL") + self.connection.execute(f"PRAGMA synchronous = {sync_mode}") + + # Включаем WAL режим, если указано + if self.config.wal_mode: + self.connection.execute("PRAGMA journal_mode = WAL") + self.logger.info("WAL mode enabled") + + # Оптимизация для производительности + self.connection.execute("PRAGMA busy_timeout = 30000") # 30 секунд + self.connection.execute("PRAGMA cache_size = -64000") # 64MB кэш + self.connection.execute("PRAGMA temp_store = MEMORY") + + # Создаем таблицу для CAN сообщений + self._create_tables() + + self._initialized = True + self.logger.info("SQLite database initialized successfully") + + except Exception as e: + self.logger.error( + f"Failed to initialize SQLite database: {e}", + exc_info=True + ) + raise + + def _create_tables(self) -> None: + """Создание таблиц в базе данных.""" + if not self.connection: + raise RuntimeError("Database connection not initialized") + + cursor = self.connection.cursor() + + # Таблица для CAN сообщений + cursor.execute(""" + CREATE TABLE IF NOT EXISTS can_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL NOT NULL, + interface TEXT NOT NULL, + can_id INTEGER NOT NULL, + dlc INTEGER NOT NULL, + data BLOB NOT NULL, + processed INTEGER DEFAULT 0, + created_at REAL DEFAULT (julianday('now')) + ) + """) + + # Индексы для быстрого поиска + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_timestamp + ON can_messages(timestamp) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_interface + ON can_messages(interface) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_can_id + ON can_messages(can_id) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_processed + ON can_messages(processed) + """) + + # Комбинированный индекс для запросов по времени и интерфейсу + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_timestamp_interface + ON can_messages(timestamp, interface) + """) + + self.connection.commit() + self.logger.debug("Database tables and indexes created") + + @contextmanager + def _get_cursor(self): + """Context manager для получения курсора с автоматическим commit.""" + if not self.connection: + raise RuntimeError("Database connection not initialized") + + cursor = self.connection.cursor() + try: + yield cursor + self.connection.commit() + except Exception as e: + self.connection.rollback() + raise + finally: + cursor.close() + + def save_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> Optional[int]: + """ + Сохранение CAN сообщения в базу данных. + + Args: + interface: Имя интерфейса (например, 'can0') + can_id: CAN ID сообщения + dlc: Data Length Code + data: Данные сообщения (bytes) + timestamp: Временная метка сообщения + + Returns: + ID сохраненного сообщения или None в случае ошибки + """ + if not self.connection: + self.logger.error("Database connection not initialized") + return None + + try: + with self._get_cursor() as cursor: + cursor.execute(""" + INSERT INTO can_messages (timestamp, interface, can_id, dlc, data) + VALUES (?, ?, ?, ?, ?) + """, (timestamp, interface, can_id, dlc, data)) + + return cursor.lastrowid + + except Exception as e: + self.logger.error( + f"Failed to save message to database: {e}", + exc_info=True, + extra={ + "interface": interface, + "can_id": hex(can_id) + } + ) + return None + + def save_messages_batch(self, messages: list) -> int: + """ + Пакетное сохранение CAN сообщений. + + Args: + messages: Список кортежей (interface, can_id, dlc, data, timestamp) + + Returns: + Количество успешно сохраненных сообщений + """ + if not self.connection: + self.logger.error("Database connection not initialized") + return 0 + + if not messages: + return 0 + + try: + with self._get_cursor() as cursor: + cursor.executemany(""" + INSERT INTO can_messages (timestamp, interface, can_id, dlc, data) + VALUES (?, ?, ?, ?, ?) + """, messages) + + saved_count = cursor.rowcount + self.logger.debug( + f"Saved {saved_count} messages in batch", + extra={"batch_size": len(messages)} + ) + return saved_count + + except Exception as e: + self.logger.error( + f"Failed to save messages batch: {e}", + exc_info=True, + extra={"batch_size": len(messages)} + ) + return 0 + + def get_unprocessed_messages(self, limit: int = 1000) -> list: + """ + Получение необработанных сообщений для отправки в InfluxDB. + + Args: + limit: Максимальное количество сообщений + + Returns: + Список кортежей (id, timestamp, interface, can_id, dlc, data) + """ + if not self.connection: + self.logger.error("Database connection not initialized") + return [] + + try: + with self._get_cursor() as cursor: + cursor.execute(""" + SELECT id, timestamp, interface, can_id, dlc, data + FROM can_messages + WHERE processed = 0 + ORDER BY timestamp ASC + LIMIT ? + """, (limit,)) + + return cursor.fetchall() + + except Exception as e: + self.logger.error( + f"Failed to get unprocessed messages: {e}", + exc_info=True + ) + return [] + + def mark_as_processed(self, message_ids: list) -> int: + """ + Отметить сообщения как обработанные. + + Args: + message_ids: Список ID сообщений + + Returns: + Количество обновленных сообщений + """ + if not self.connection: + self.logger.error("Database connection not initialized") + return 0 + + if not message_ids: + return 0 + + try: + with self._get_cursor() as cursor: + placeholders = ','.join('?' * len(message_ids)) + cursor.execute(f""" + UPDATE can_messages + SET processed = 1 + WHERE id IN ({placeholders}) + """, message_ids) + + return cursor.rowcount + + except Exception as e: + self.logger.error( + f"Failed to mark messages as processed: {e}", + exc_info=True + ) + return 0 + + def get_stats(self) -> Dict[str, Any]: + """ + Получение статистики базы данных. + + Returns: + Словарь со статистикой + """ + if not self.connection: + return { + "initialized": False, + "total_messages": 0, + "unprocessed_messages": 0, + "processed_messages": 0 + } + + try: + with self._get_cursor() as cursor: + # Общее количество сообщений + cursor.execute("SELECT COUNT(*) FROM can_messages") + total = cursor.fetchone()[0] + + # Необработанные сообщения + cursor.execute("SELECT COUNT(*) FROM can_messages WHERE processed = 0") + unprocessed = cursor.fetchone()[0] + + # Обработанные сообщения + cursor.execute("SELECT COUNT(*) FROM can_messages WHERE processed = 1") + processed = cursor.fetchone()[0] + + return { + "initialized": True, + "total_messages": total, + "unprocessed_messages": unprocessed, + "processed_messages": processed, + "database_path": self.config.database_path + } + + except Exception as e: + self.logger.error( + f"Failed to get database stats: {e}", + exc_info=True + ) + return { + "initialized": True, + "error": str(e) + } + + def close(self) -> None: + """Закрытие соединения с базой данных.""" + if self.connection: + try: + self.connection.close() + self.logger.info("Database connection closed") + except Exception as e: + self.logger.error( + f"Error closing database connection: {e}", + exc_info=True + ) + finally: + self.connection = None + + +# Глобальный экземпляр хранилища +_storage_instance: Optional[Storage] = None + + +def get_storage() -> Storage: + """ + Получение глобального экземпляра хранилища. + + Returns: + Экземпляр Storage + """ + global _storage_instance + if _storage_instance is None: + _storage_instance = Storage() + return _storage_instance +