Add storage module for sql

This commit is contained in:
2026-01-06 18:38:42 +03:00
parent 62a71d298e
commit d72ea56772
3 changed files with 503 additions and 22 deletions

View File

@@ -12,6 +12,7 @@ from queue import Queue, Empty
from typing import Optional, Tuple
from logger import get_logger
from config import config
from storage import get_storage
logger = get_logger(__name__)
@@ -48,16 +49,22 @@ class MessageProcessor:
def _init_storage(self) -> None:
"""Инициализация локального хранилища (SQLite)."""
# TODO: Инициализация SQLite хранилища
# from storage import Storage
# self.storage = Storage(config.storage)
self.logger.info(
"Storage initialization",
extra={
"type": config.storage.type,
"path": config.storage.database_path
}
)
try:
self.storage = get_storage()
self.logger.info(
"Storage initialized",
extra={
"type": config.storage.type,
"path": config.storage.database_path,
"wal_mode": config.storage.wal_mode
}
)
except Exception as e:
self.logger.error(
f"Failed to initialize storage: {e}",
exc_info=True
)
self.storage = None
def _init_influxdb(self) -> None:
"""Инициализация клиента InfluxDB."""
@@ -183,10 +190,59 @@ class MessageProcessor:
return
try:
# Обрабатываем каждое сообщение в батче
# Подготавливаем данные для пакетного сохранения в SQLite
messages_to_save = []
for interface, message in batch:
self._process_single(interface, message)
self.processed_count += 1
# Логируем сообщение в зависимости от настроек
if self.log_messages:
self.logger.info(
"CAN message",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
else:
self.logger.debug(
"CAN message received",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id),
"dlc": message.dlc,
"data": message.data.hex() if message.data else "",
"timestamp": message.timestamp
}
)
# Подготавливаем данные для сохранения
messages_to_save.append((
message.timestamp,
interface,
message.arbitration_id,
message.dlc,
message.data if message.data else b''
))
# Пакетное сохранение в SQLite
if self.storage and messages_to_save:
saved_count = self.storage.save_messages_batch(messages_to_save)
if saved_count != len(messages_to_save):
self.logger.warning(
f"Not all messages saved: {saved_count}/{len(messages_to_save)}",
extra={"batch_size": len(messages_to_save)}
)
# Отправляем в InfluxDB (если включено)
if self.influxdb_client:
for interface, message in batch:
self._send_to_influxdb(interface, message)
# Обновляем счетчик обработанных сообщений
self.processed_count += len(batch)
except Exception as e:
self.logger.error(
@@ -199,6 +255,8 @@ class MessageProcessor:
"""
Обработка одного CAN сообщения.
Используется как fallback, если батчинг не применяется.
Args:
interface: Имя интерфейса
message: CAN сообщение
@@ -206,7 +264,6 @@ class MessageProcessor:
try:
# Логируем сообщение в зависимости от настроек
if self.log_messages:
# Логирование на INFO уровне (если включено)
self.logger.info(
"CAN message",
extra={
@@ -218,7 +275,6 @@ class MessageProcessor:
}
)
else:
# Логирование на DEBUG уровне (по умолчанию)
self.logger.debug(
"CAN message received",
extra={
@@ -256,13 +312,25 @@ class MessageProcessor:
message: CAN сообщение
"""
if not self.storage:
# TODO: Реализовать сохранение в SQLite
# self.storage.save_message(interface, message)
return
try:
# self.storage.save_message(interface, message)
pass
message_id = self.storage.save_message(
interface=interface,
can_id=message.arbitration_id,
dlc=message.dlc,
data=message.data if message.data else b'',
timestamp=message.timestamp
)
if message_id is None:
self.logger.warning(
"Failed to save message to storage",
extra={
"interface": interface,
"can_id": hex(message.arbitration_id)
}
)
except Exception as e:
self.logger.error(
f"Failed to save message to storage: {e}",
@@ -320,8 +388,13 @@ class MessageProcessor:
# Закрываем соединения
if self.storage:
# self.storage.close()
pass
try:
self.storage.close()
except Exception as e:
self.logger.error(
f"Error closing storage: {e}",
exc_info=True
)
if self.influxdb_client:
# self.influxdb_client.close()
@@ -337,9 +410,19 @@ class MessageProcessor:
def get_stats(self) -> dict:
"""Получение статистики процессора."""
return {
stats = {
"processed_count": self.processed_count,
"dropped_count": self.dropped_count,
"queue_size": self.message_queue.qsize(),
"running": self.running
}
# Добавляем статистику хранилища, если оно инициализировано
if self.storage:
try:
storage_stats = self.storage.get_stats()
stats["storage"] = storage_stats
except Exception as e:
self.logger.debug(f"Failed to get storage stats: {e}")
return stats

View File

@@ -0,0 +1,6 @@
"""Модуль для работы с локальным хранилищем SQLite."""
from .storage import Storage, get_storage
__all__ = ['Storage', 'get_storage']

View File

@@ -0,0 +1,392 @@
"""
Модуль для работы с локальным хранилищем SQLite.
Предоставляет singleton класс для инициализации и работы с SQLite базой данных
для временного хранения CAN сообщений.
"""
import sqlite3
import threading
from pathlib import Path
from typing import Optional, Dict, Any
from contextlib import contextmanager
from config import config
from logger import get_logger
logger = get_logger(__name__)
class Storage:
"""Singleton класс для работы с SQLite базой данных."""
_instance: Optional['Storage'] = None
_lock = threading.Lock()
def __new__(cls):
"""Singleton паттерн для единого экземпляра хранилища."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Инициализация хранилища."""
# Проверяем, что инициализация выполняется только один раз
if hasattr(self, '_initialized'):
return
self.config = config.storage
self.logger = logger
self.connection: Optional[sqlite3.Connection] = None
self._initialized = False
# Инициализируем базу данных
self._init_database()
def _init_database(self) -> None:
"""Инициализация базы данных SQLite."""
try:
# Определяем путь к базе данных
db_path = Path(self.config.database_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
self.logger.info(
"Initializing SQLite database",
extra={
"path": str(db_path),
"wal_mode": self.config.wal_mode,
"sync_mode": self.config.sync_mode
}
)
# Создаем соединение
self.connection = sqlite3.connect(
str(db_path),
check_same_thread=False, # Разрешаем использование из разных потоков
timeout=30.0 # Таймаут для блокировок
)
# Настраиваем режим синхронизации
sync_mode_map = {
"NORMAL": "NORMAL",
"FULL": "FULL",
"OFF": "OFF"
}
sync_mode = sync_mode_map.get(self.config.sync_mode.upper(), "NORMAL")
self.connection.execute(f"PRAGMA synchronous = {sync_mode}")
# Включаем WAL режим, если указано
if self.config.wal_mode:
self.connection.execute("PRAGMA journal_mode = WAL")
self.logger.info("WAL mode enabled")
# Оптимизация для производительности
self.connection.execute("PRAGMA busy_timeout = 30000") # 30 секунд
self.connection.execute("PRAGMA cache_size = -64000") # 64MB кэш
self.connection.execute("PRAGMA temp_store = MEMORY")
# Создаем таблицу для CAN сообщений
self._create_tables()
self._initialized = True
self.logger.info("SQLite database initialized successfully")
except Exception as e:
self.logger.error(
f"Failed to initialize SQLite database: {e}",
exc_info=True
)
raise
def _create_tables(self) -> None:
"""Создание таблиц в базе данных."""
if not self.connection:
raise RuntimeError("Database connection not initialized")
cursor = self.connection.cursor()
# Таблица для CAN сообщений
cursor.execute("""
CREATE TABLE IF NOT EXISTS can_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
interface TEXT NOT NULL,
can_id INTEGER NOT NULL,
dlc INTEGER NOT NULL,
data BLOB NOT NULL,
processed INTEGER DEFAULT 0,
created_at REAL DEFAULT (julianday('now'))
)
""")
# Индексы для быстрого поиска
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp
ON can_messages(timestamp)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_interface
ON can_messages(interface)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_can_id
ON can_messages(can_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_processed
ON can_messages(processed)
""")
# Комбинированный индекс для запросов по времени и интерфейсу
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_timestamp_interface
ON can_messages(timestamp, interface)
""")
self.connection.commit()
self.logger.debug("Database tables and indexes created")
@contextmanager
def _get_cursor(self):
"""Context manager для получения курсора с автоматическим commit."""
if not self.connection:
raise RuntimeError("Database connection not initialized")
cursor = self.connection.cursor()
try:
yield cursor
self.connection.commit()
except Exception as e:
self.connection.rollback()
raise
finally:
cursor.close()
def save_message(self, interface: str, can_id: int, dlc: int, data: bytes, timestamp: float) -> Optional[int]:
"""
Сохранение CAN сообщения в базу данных.
Args:
interface: Имя интерфейса (например, 'can0')
can_id: CAN ID сообщения
dlc: Data Length Code
data: Данные сообщения (bytes)
timestamp: Временная метка сообщения
Returns:
ID сохраненного сообщения или None в случае ошибки
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return None
try:
with self._get_cursor() as cursor:
cursor.execute("""
INSERT INTO can_messages (timestamp, interface, can_id, dlc, data)
VALUES (?, ?, ?, ?, ?)
""", (timestamp, interface, can_id, dlc, data))
return cursor.lastrowid
except Exception as e:
self.logger.error(
f"Failed to save message to database: {e}",
exc_info=True,
extra={
"interface": interface,
"can_id": hex(can_id)
}
)
return None
def save_messages_batch(self, messages: list) -> int:
"""
Пакетное сохранение CAN сообщений.
Args:
messages: Список кортежей (interface, can_id, dlc, data, timestamp)
Returns:
Количество успешно сохраненных сообщений
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return 0
if not messages:
return 0
try:
with self._get_cursor() as cursor:
cursor.executemany("""
INSERT INTO can_messages (timestamp, interface, can_id, dlc, data)
VALUES (?, ?, ?, ?, ?)
""", messages)
saved_count = cursor.rowcount
self.logger.debug(
f"Saved {saved_count} messages in batch",
extra={"batch_size": len(messages)}
)
return saved_count
except Exception as e:
self.logger.error(
f"Failed to save messages batch: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
)
return 0
def get_unprocessed_messages(self, limit: int = 1000) -> list:
"""
Получение необработанных сообщений для отправки в InfluxDB.
Args:
limit: Максимальное количество сообщений
Returns:
Список кортежей (id, timestamp, interface, can_id, dlc, data)
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return []
try:
with self._get_cursor() as cursor:
cursor.execute("""
SELECT id, timestamp, interface, can_id, dlc, data
FROM can_messages
WHERE processed = 0
ORDER BY timestamp ASC
LIMIT ?
""", (limit,))
return cursor.fetchall()
except Exception as e:
self.logger.error(
f"Failed to get unprocessed messages: {e}",
exc_info=True
)
return []
def mark_as_processed(self, message_ids: list) -> int:
"""
Отметить сообщения как обработанные.
Args:
message_ids: Список ID сообщений
Returns:
Количество обновленных сообщений
"""
if not self.connection:
self.logger.error("Database connection not initialized")
return 0
if not message_ids:
return 0
try:
with self._get_cursor() as cursor:
placeholders = ','.join('?' * len(message_ids))
cursor.execute(f"""
UPDATE can_messages
SET processed = 1
WHERE id IN ({placeholders})
""", message_ids)
return cursor.rowcount
except Exception as e:
self.logger.error(
f"Failed to mark messages as processed: {e}",
exc_info=True
)
return 0
def get_stats(self) -> Dict[str, Any]:
"""
Получение статистики базы данных.
Returns:
Словарь со статистикой
"""
if not self.connection:
return {
"initialized": False,
"total_messages": 0,
"unprocessed_messages": 0,
"processed_messages": 0
}
try:
with self._get_cursor() as cursor:
# Общее количество сообщений
cursor.execute("SELECT COUNT(*) FROM can_messages")
total = cursor.fetchone()[0]
# Необработанные сообщения
cursor.execute("SELECT COUNT(*) FROM can_messages WHERE processed = 0")
unprocessed = cursor.fetchone()[0]
# Обработанные сообщения
cursor.execute("SELECT COUNT(*) FROM can_messages WHERE processed = 1")
processed = cursor.fetchone()[0]
return {
"initialized": True,
"total_messages": total,
"unprocessed_messages": unprocessed,
"processed_messages": processed,
"database_path": self.config.database_path
}
except Exception as e:
self.logger.error(
f"Failed to get database stats: {e}",
exc_info=True
)
return {
"initialized": True,
"error": str(e)
}
def close(self) -> None:
"""Закрытие соединения с базой данных."""
if self.connection:
try:
self.connection.close()
self.logger.info("Database connection closed")
except Exception as e:
self.logger.error(
f"Error closing database connection: {e}",
exc_info=True
)
finally:
self.connection = None
# Глобальный экземпляр хранилища
_storage_instance: Optional[Storage] = None
def get_storage() -> Storage:
"""
Получение глобального экземпляра хранилища.
Returns:
Экземпляр Storage
"""
global _storage_instance
if _storage_instance is None:
_storage_instance = Storage()
return _storage_instance