add handlers and pipeline

This commit is contained in:
2026-01-07 03:17:48 +03:00
parent adb2cf89cd
commit 0d2be70032
6 changed files with 629 additions and 177 deletions

View File

@@ -0,0 +1,16 @@
"""
Модуль обработчиков CAN сообщений.
Предоставляет плагинную архитектуру для обработки CAN фреймов.
"""
from .base import BaseHandler
from .storage_handler import StorageHandler
from .influxdb_handler import InfluxDBHandler
__all__ = [
'BaseHandler',
'StorageHandler',
'InfluxDBHandler',
]

View File

@@ -0,0 +1,113 @@
"""
Базовый класс для обработчиков CAN сообщений.
Предоставляет интерфейс для плагинной архитектуры обработки CAN фреймов.
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from can_frame import CANFrame
from logger import get_logger
class BaseHandler(ABC):
"""
Базовый класс для всех обработчиков CAN сообщений.
Каждый обработчик реализует pipeline для обработки CAN фреймов.
Обработчики могут быть отключены, включены и конфигурированы независимо.
"""
def __init__(self, name: str, enabled: bool = True):
"""
Инициализация обработчика.
Args:
name: Имя обработчика (для логирования и идентификации)
enabled: Включен ли обработчик (по умолчанию True)
"""
self.name = name
self.enabled = enabled
self.logger = get_logger(f"{__name__}.{name}")
self._initialized = False
@abstractmethod
def initialize(self) -> bool:
"""
Инициализация обработчика.
Вызывается один раз при создании обработчика.
Здесь должна быть логика подключения к внешним сервисам и т.д.
Returns:
True если инициализация успешна, False иначе
"""
pass
@abstractmethod
def handle(self, frame: CANFrame) -> bool:
"""
Обработка одного CAN фрейма.
Args:
frame: CANFrame для обработки
Returns:
True если обработка успешна, False иначе
"""
pass
@abstractmethod
def handle_batch(self, frames: List[CANFrame]) -> int:
"""
Обработка батча CAN фреймов.
Может быть более эффективной, чем обработка по одному.
Если обработчик не поддерживает батчинг, может просто вызывать handle() для каждого.
Args:
frames: Список CANFrame для обработки
Returns:
Количество успешно обработанных фреймов
"""
pass
@abstractmethod
def flush(self) -> None:
"""
Принудительная отправка накопленных данных.
Вызывается периодически или при завершении работы.
Обработчики могут накапливать данные для батчевой отправки.
"""
pass
@abstractmethod
def shutdown(self) -> None:
"""
Корректное завершение работы обработчика.
Вызывается при остановке приложения.
Здесь должна быть логика закрытия соединений, сохранения данных и т.д.
"""
pass
@abstractmethod
def get_stats(self) -> Dict[str, Any]:
"""
Получение статистики обработчика.
Returns:
Словарь со статистикой (обработано, ошибок, очередь и т.д.)
"""
pass
def is_enabled(self) -> bool:
"""Проверка, включен ли обработчик."""
return self.enabled
def is_initialized(self) -> bool:
"""Проверка, инициализирован ли обработчик."""
return self._initialized

View File

@@ -0,0 +1,144 @@
"""
Пример обработчика для демонстрации плагинной архитектуры.
Этот файл показывает, как легко добавить новый обработчик (например, Kafka, MQTT, WebSocket).
"""
from typing import List, Dict, Any
from can_frame import CANFrame
from .base import BaseHandler
class ExampleHandler(BaseHandler):
"""
Пример обработчика для демонстрации.
Этот обработчик можно использовать как шаблон для создания новых:
- KafkaHandler
- MQTTHandler
- WebSocketHandler
- FileHandler
и т.д.
"""
def __init__(self, enabled: bool = True):
"""Инициализация примера обработчика."""
super().__init__(name="example", enabled=enabled)
self.processed_count = 0
self.failed_count = 0
def initialize(self) -> bool:
"""Инициализация обработчика."""
if not self.enabled:
return False
try:
# Здесь должна быть логика подключения к внешнему сервису
# Например: self.kafka_producer = KafkaProducer(...)
# Или: self.mqtt_client = mqtt.Client(...)
self._initialized = True
self.logger.info("Example handler initialized")
return True
except Exception as e:
self.logger.error(f"Failed to initialize example handler: {e}", exc_info=True)
return False
def handle(self, frame: CANFrame) -> bool:
"""Обработка одного CAN фрейма."""
if not self.enabled or not self._initialized:
return False
try:
# Здесь должна быть логика отправки одного фрейма
# Например: self.kafka_producer.send('can-topic', frame.to_dict())
# Или: self.mqtt_client.publish('can/data', frame.to_dict())
self.processed_count += 1
return True
except Exception as e:
self.logger.error(
f"Failed to handle frame: {e}",
exc_info=True,
extra={"can_id": frame.can_id_hex}
)
self.failed_count += 1
return False
def handle_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча CAN фреймов."""
if not self.enabled or not self._initialized or not frames:
return 0
try:
# Здесь должна быть логика пакетной отправки
# Например: self.kafka_producer.send_batch([...])
# Или: self.mqtt_client.publish_batch([...])
processed = 0
for frame in frames:
if self.handle(frame):
processed += 1
return processed
except Exception as e:
self.logger.error(
f"Failed to handle batch: {e}",
exc_info=True,
extra={"batch_size": len(frames)}
)
self.failed_count += len(frames)
return 0
def flush(self) -> None:
"""Принудительная отправка накопленных данных."""
if not self.enabled or not self._initialized:
return
try:
# Здесь должна быть логика принудительной отправки
# Например: self.kafka_producer.flush()
# Или: self.mqtt_client.flush()
pass
except Exception as e:
self.logger.error(f"Failed to flush: {e}", exc_info=True)
def shutdown(self) -> None:
"""Корректное завершение работы обработчика."""
if self._initialized:
try:
# Здесь должна быть логика закрытия соединений
# Например: self.kafka_producer.close()
# Или: self.mqtt_client.disconnect()
self.logger.info("Example handler closed")
except Exception as e:
self.logger.error(f"Error closing example handler: {e}", exc_info=True)
self._initialized = False
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики обработчика."""
return {
"handler": self.name,
"enabled": self.enabled,
"initialized": self._initialized,
"processed_count": self.processed_count,
"failed_count": self.failed_count
}
# Пример использования:
#
# from handlers import BaseHandler, StorageHandler, InfluxDBHandler
# from handlers.example_handler import ExampleHandler
# from socket_can.message_processor import MessageProcessor
#
# # Создаем кастомный pipeline
# handlers = [
# StorageHandler(enabled=True),
# InfluxDBHandler(enabled=True),
# ExampleHandler(enabled=True), # Новый обработчик!
# ]
#
# # Используем в MessageProcessor
# processor = MessageProcessor(handlers=handlers)
# processor.start()

View File

@@ -0,0 +1,132 @@
"""
Обработчик для отправки CAN сообщений в InfluxDB.
"""
from typing import List, Dict, Any, Optional
from can_frame import CANFrame
from .base import BaseHandler
from influxdb_handler import get_influxdb_client
from config import config
class InfluxDBHandler(BaseHandler):
"""Обработчик для отправки в InfluxDB."""
def __init__(self, enabled: Optional[bool] = None):
"""
Инициализация обработчика InfluxDB.
Args:
enabled: Включен ли обработчик. Если None, берется из config.influxdb.enabled
"""
super().__init__(
name="influxdb",
enabled=enabled if enabled is not None else config.influxdb.enabled
)
self.influxdb_client = None
def initialize(self) -> bool:
"""Инициализация InfluxDB клиента."""
if not self.enabled:
return False
try:
self.influxdb_client = get_influxdb_client()
self._initialized = True
self.logger.info("InfluxDB handler initialized")
return True
except Exception as e:
self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True)
self.influxdb_client = None
return False
def handle(self, frame: CANFrame) -> bool:
"""Обработка одного CAN фрейма."""
if not self.enabled or not self._initialized or not self.influxdb_client:
return False
try:
return self.influxdb_client.write_message(
interface=frame.bus,
can_id=frame.can_id,
dlc=frame.dlc,
data=frame.data,
timestamp=frame.timestamp
)
except Exception as e:
self.logger.error(
f"Failed to send frame to InfluxDB: {e}",
exc_info=True,
extra={"can_id": frame.can_id_hex}
)
return False
def handle_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча CAN фреймов."""
if not self.enabled or not self._initialized or not self.influxdb_client or not frames:
return 0
try:
# Конвертируем CANFrame в формат для InfluxDB
influx_messages = []
for frame in frames:
influx_messages.append({
"interface": frame.bus,
"can_id": frame.can_id,
"dlc": frame.dlc,
"data": frame.data,
"timestamp": frame.timestamp # float timestamp в секундах
})
if influx_messages:
return self.influxdb_client.write_messages_batch(influx_messages)
return 0
except Exception as e:
self.logger.error(
f"Failed to send frames batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(frames)}
)
return 0
def flush(self) -> None:
"""Принудительная отправка накопленных данных."""
# InfluxDB forwarder сам управляет flush через свой цикл
# Но можно вызвать явный flush если нужно
pass
def shutdown(self) -> None:
"""Корректное завершение работы обработчика."""
if self.influxdb_client:
try:
self.influxdb_client.close()
self.logger.info("InfluxDB handler closed")
except Exception as e:
self.logger.error(f"Error closing InfluxDB: {e}", exc_info=True)
self._initialized = False
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики обработчика."""
if self.influxdb_client:
try:
stats = self.influxdb_client.get_stats()
stats["handler"] = self.name
stats["enabled"] = self.enabled
stats["initialized"] = self._initialized
return stats
except Exception:
pass
return {
"handler": self.name,
"enabled": self.enabled,
"initialized": self._initialized
}
def start(self) -> None:
"""Запуск InfluxDB forwarder (если используется)."""
if self.influxdb_client:
try:
self.influxdb_client.start()
except Exception as e:
self.logger.error(f"Failed to start InfluxDB forwarder: {e}", exc_info=True)

View File

@@ -0,0 +1,119 @@
"""
Обработчик для сохранения CAN сообщений в SQLite.
"""
from typing import List, Dict, Any
from can_frame import CANFrame
from .base import BaseHandler
from storage import get_storage
class StorageHandler(BaseHandler):
"""Обработчик для сохранения в SQLite."""
def __init__(self, enabled: bool = True):
"""Инициализация обработчика storage."""
super().__init__(name="storage", enabled=enabled)
self.storage = None
def initialize(self) -> bool:
"""Инициализация storage."""
if not self.enabled:
return False
try:
self.storage = get_storage()
self._initialized = True
self.logger.info("Storage handler initialized")
return True
except Exception as e:
self.logger.error(f"Failed to initialize storage: {e}", exc_info=True)
self.storage = None
return False
def handle(self, frame: CANFrame) -> bool:
"""Обработка одного CAN фрейма."""
if not self.enabled or not self._initialized or not self.storage:
return False
try:
message_id = self.storage.save_message(
interface=frame.bus,
can_id=frame.can_id,
dlc=frame.dlc,
data=frame.data,
timestamp=frame.timestamp
)
return message_id is not None
except Exception as e:
self.logger.error(
f"Failed to save frame: {e}",
exc_info=True,
extra={"can_id": frame.can_id_hex}
)
return False
def handle_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча CAN фреймов."""
if not self.enabled or not self._initialized or not self.storage or not frames:
return 0
try:
# Конвертируем CANFrame в формат для storage
messages = []
for frame in frames:
messages.append((
frame.timestamp, # float timestamp в секундах
frame.bus,
frame.can_id,
frame.dlc,
frame.data
))
saved_count = self.storage.save_messages_batch(messages)
if saved_count != len(frames):
self.logger.warning(
f"Not all frames saved: {saved_count}/{len(frames)}",
extra={"batch_size": len(frames)}
)
return saved_count
except Exception as e:
self.logger.error(
f"Failed to save frames batch: {e}",
exc_info=True,
extra={"batch_size": len(frames)}
)
return 0
def flush(self) -> None:
"""Принудительная отправка накопленных данных."""
# SQLite не требует явного flush, данные сохраняются сразу
pass
def shutdown(self) -> None:
"""Корректное завершение работы обработчика."""
if self.storage:
try:
self.storage.close()
self.logger.info("Storage handler closed")
except Exception as e:
self.logger.error(f"Error closing storage: {e}", exc_info=True)
self._initialized = False
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики обработчика."""
if self.storage:
try:
stats = self.storage.get_stats()
stats["handler"] = self.name
stats["enabled"] = self.enabled
stats["initialized"] = self._initialized
return stats
except Exception:
pass
return {
"handler": self.name,
"enabled": self.enabled,
"initialized": self._initialized
}

View File

@@ -1,7 +1,7 @@
"""
Модуль для обработки CAN сообщений.
Обрабатывает входящие CAN сообщения и сохраняет их в SQLite и InfluxDB.
Обрабатывает входящие CAN сообщения через pipeline обработчиков.
Использует очередь для асинхронной обработки, чтобы не блокировать чтение CAN сообщений.
"""
@@ -9,167 +9,29 @@ import threading
import time
from queue import Queue, Empty
from typing import Optional, Dict, Any, List
from abc import ABC, abstractmethod
from logger import get_logger
from config import config
from storage import get_storage
from influxdb_handler import get_influxdb_client
from can_frame import CANFrame
from handlers import BaseHandler, StorageHandler, InfluxDBHandler
logger = get_logger(__name__)
class MessageHandler(ABC):
"""Базовый класс для обработчиков сообщений."""
@abstractmethod
def process_batch(self, frames: List[CANFrame]) -> int:
"""
Обработка батча CAN фреймов.
Args:
frames: Список CANFrame объектов
Returns:
Количество успешно обработанных сообщений
"""
pass
@abstractmethod
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики обработчика."""
pass
class StorageHandler(MessageHandler):
"""Обработчик для сохранения в SQLite."""
def __init__(self):
"""Инициализация обработчика storage."""
self.logger = logger.getChild("storage_handler")
self.storage = None
self._init_storage()
def _init_storage(self) -> None:
"""Инициализация storage."""
try:
self.storage = get_storage()
self.logger.info("Storage handler initialized")
except Exception as e:
self.logger.error(f"Failed to initialize storage: {e}", exc_info=True)
self.storage = None
def process_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча сообщений для сохранения в SQLite."""
if not self.storage or not frames:
return 0
try:
# Конвертируем CANFrame в формат для storage
messages = []
for frame in frames:
messages.append((
frame.timestamp, # float timestamp в секундах
frame.bus,
frame.can_id,
frame.dlc,
frame.data
))
saved_count = self.storage.save_messages_batch(messages)
if saved_count != len(frames):
self.logger.warning(
f"Not all messages saved: {saved_count}/{len(frames)}",
extra={"batch_size": len(frames)}
)
return saved_count
except Exception as e:
self.logger.error(
f"Failed to save messages batch: {e}",
exc_info=True,
extra={"batch_size": len(frames)}
)
return 0
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики storage."""
if self.storage:
try:
return self.storage.get_stats()
except Exception:
pass
return {"initialized": False}
class InfluxDBHandler(MessageHandler):
"""Обработчик для отправки в InfluxDB."""
def __init__(self):
"""Инициализация обработчика InfluxDB."""
self.logger = logger.getChild("influxdb_handler")
self.influxdb_client = None
self._init_influxdb()
def _init_influxdb(self) -> None:
"""Инициализация InfluxDB клиента."""
if not config.influxdb.enabled:
self.logger.info("InfluxDB is disabled in configuration")
return
try:
self.influxdb_client = get_influxdb_client()
self.logger.info("InfluxDB handler initialized")
except Exception as e:
self.logger.error(f"Failed to initialize InfluxDB: {e}", exc_info=True)
self.influxdb_client = None
def process_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча сообщений для отправки в InfluxDB."""
if not self.influxdb_client or not frames:
return 0
try:
# Конвертируем CANFrame в формат для InfluxDB
influx_messages = []
for frame in frames:
influx_messages.append({
"interface": frame.bus,
"can_id": frame.can_id,
"dlc": frame.dlc,
"data": frame.data,
"timestamp": frame.timestamp # float timestamp в секундах
})
if influx_messages:
return self.influxdb_client.write_messages_batch(influx_messages)
return 0
except Exception as e:
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(frames)}
)
return 0
def get_stats(self) -> Dict[str, Any]:
"""Получение статистики InfluxDB."""
if self.influxdb_client:
try:
return self.influxdb_client.get_stats()
except Exception:
pass
return {"enabled": False, "initialized": False}
class MessageProcessor:
"""Класс для обработки и сохранения CAN сообщений с асинхронной обработкой."""
"""
Класс для обработки и сохранения CAN сообщений с асинхронной обработкой.
def __init__(self, queue_size: int = 10000):
Использует плагинную архитектуру обработчиков (pipeline).
Каждый обработчик реализует интерфейс BaseHandler.
"""
def __init__(self, handlers: Optional[List[BaseHandler]] = None, queue_size: int = 10000):
"""
Инициализация процессора сообщений.
Args:
handlers: Список обработчиков для pipeline. Если None, создаются по умолчанию.
queue_size: Максимальный размер очереди сообщений
"""
self.logger = logger
@@ -186,20 +48,59 @@ class MessageProcessor:
self.queue_full_warnings = 0
# Инициализируем обработчики
self.handlers: List[MessageHandler] = []
self._init_handlers()
def _init_handlers(self) -> None:
"""Инициализация обработчиков сообщений."""
# Добавляем обработчики в порядке приоритета
self.handlers.append(StorageHandler())
if handlers is None:
handlers = self._create_default_handlers()
if config.influxdb.enabled:
self.handlers.append(InfluxDBHandler())
self.handlers: List[BaseHandler] = []
self._init_handlers(handlers)
def _create_default_handlers(self) -> List[BaseHandler]:
"""
Создание обработчиков по умолчанию из конфигурации.
Returns:
Список обработчиков
"""
handlers = []
# Storage handler всегда включен
handlers.append(StorageHandler(enabled=True))
# InfluxDB handler зависит от конфигурации
handlers.append(InfluxDBHandler(enabled=None)) # None = из config
return handlers
def _init_handlers(self, handlers: List[BaseHandler]) -> None:
"""
Инициализация обработчиков.
Args:
handlers: Список обработчиков для инициализации
"""
for handler in handlers:
if handler.is_enabled():
try:
if handler.initialize():
self.handlers.append(handler)
self.logger.info(
f"Handler '{handler.name}' initialized successfully"
)
else:
self.logger.warning(
f"Handler '{handler.name}' initialization failed"
)
except Exception as e:
self.logger.error(
f"Error initializing handler '{handler.name}': {e}",
exc_info=True
)
else:
self.logger.debug(f"Handler '{handler.name}' is disabled")
self.logger.info(
f"Initialized {len(self.handlers)} message handlers",
extra={"handlers": [type(h).__name__ for h in self.handlers]}
f"Initialized {len(self.handlers)}/{len(handlers)} handlers",
extra={"handlers": [h.name for h in self.handlers]}
)
def enqueue(self, frame: CANFrame) -> bool:
@@ -255,6 +156,8 @@ class MessageProcessor:
batch_size = config.general.batch_size
batch_interval = config.general.batch_interval
last_batch_time = time.time()
last_flush_time = time.time()
flush_interval = 5.0 # Периодический flush обработчиков
while self.running or not self.message_queue.empty():
try:
@@ -282,6 +185,11 @@ class MessageProcessor:
batch = []
last_batch_time = current_time
# Периодический flush обработчиков
if (current_time - last_flush_time) >= flush_interval:
self._flush_handlers()
last_flush_time = current_time
except Exception as e:
self.logger.error(
f"Error in processing loop: {e}",
@@ -292,6 +200,9 @@ class MessageProcessor:
if batch:
self._process_batch(batch)
# Финальный flush всех обработчиков
self._flush_handlers()
self.logger.info(
"Message processing loop stopped",
extra={
@@ -302,7 +213,7 @@ class MessageProcessor:
def _process_batch(self, batch: List[CANFrame]) -> None:
"""
Обработка батча CAN фреймов.
Обработка батча CAN фреймов через pipeline обработчиков.
Args:
batch: Список CANFrame объектов
@@ -312,7 +223,6 @@ class MessageProcessor:
try:
# Логируем батч на уровне DEBUG (если уровень DEBUG включен)
# Это позволяет контролировать вывод через уровни логирования
if batch:
first_frame = batch[0]
self.logger.debug(
@@ -329,13 +239,16 @@ class MessageProcessor:
}
)
# Обрабатываем батч через все обработчики
# Обрабатываем батч через все обработчики (pipeline)
for handler in self.handlers:
if not handler.is_enabled() or not handler.is_initialized():
continue
try:
handler.process_batch(batch)
handler.handle_batch(batch)
except Exception as e:
self.logger.error(
f"Error in handler {type(handler).__name__}: {e}",
f"Error in handler '{handler.name}': {e}",
exc_info=True,
extra={"batch_size": len(batch)}
)
@@ -350,6 +263,18 @@ class MessageProcessor:
extra={"batch_size": len(batch)}
)
def _flush_handlers(self) -> None:
"""Принудительный flush всех обработчиков."""
for handler in self.handlers:
if handler.is_enabled() and handler.is_initialized():
try:
handler.flush()
except Exception as e:
self.logger.error(
f"Error flushing handler '{handler.name}': {e}",
exc_info=True
)
def start(self) -> None:
"""Запуск обработки сообщений в отдельном потоке."""
if self.running:
@@ -358,10 +283,16 @@ class MessageProcessor:
self.running = True
# Запускаем InfluxDB forwarder, если он есть
# Запускаем специальные обработчики (например, InfluxDB forwarder)
for handler in self.handlers:
if isinstance(handler, InfluxDBHandler) and handler.influxdb_client:
handler.influxdb_client.start()
if isinstance(handler, InfluxDBHandler) and handler.is_initialized():
try:
handler.start()
except Exception as e:
self.logger.error(
f"Failed to start handler '{handler.name}': {e}",
exc_info=True
)
# Запускаем поток обработки сообщений
self.processing_thread = threading.Thread(
@@ -383,16 +314,13 @@ class MessageProcessor:
if self.processing_thread.is_alive():
self.logger.warning("Processing thread did not stop gracefully")
# Закрываем обработчики
# Закрываем все обработчики
for handler in self.handlers:
try:
if isinstance(handler, StorageHandler) and handler.storage:
handler.storage.close()
elif isinstance(handler, InfluxDBHandler) and handler.influxdb_client:
handler.influxdb_client.close()
handler.shutdown()
except Exception as e:
self.logger.error(
f"Error closing handler {type(handler).__name__}: {e}",
f"Error shutting down handler '{handler.name}': {e}",
exc_info=True
)
@@ -410,16 +338,16 @@ class MessageProcessor:
"processed_count": self.processed_count,
"dropped_count": self.dropped_count,
"queue_size": self.message_queue.qsize(),
"running": self.running
"running": self.running,
"handlers_count": len(self.handlers)
}
# Добавляем статистику всех обработчиков
for handler in self.handlers:
try:
handler_stats = handler.get_stats()
handler_name = type(handler).__name__.replace("Handler", "").lower()
stats[handler_name] = handler_stats
stats[handler.name] = handler_stats
except Exception as e:
self.logger.debug(f"Failed to get stats from {type(handler).__name__}: {e}")
self.logger.debug(f"Failed to get stats from handler '{handler.name}': {e}")
return stats