diff --git a/can_sniffer/src/handlers/influxdb_handler.py b/can_sniffer/src/handlers/influxdb_handler.py index 1cb83e7..1553bcc 100644 --- a/can_sniffer/src/handlers/influxdb_handler.py +++ b/can_sniffer/src/handlers/influxdb_handler.py @@ -6,6 +6,7 @@ from typing import List, Dict, Any, Optional from can_frame import CANFrame from .base import BaseHandler from influxdb_handler import get_influxdb_client +from influxdb_handler.influxdb_client import ConnectionStatus from config import config @@ -62,11 +63,22 @@ class InfluxDBHandler(BaseHandler): return False def handle_batch(self, frames: List[CANFrame]) -> int: - """Обработка батча CAN фреймов.""" + """ + Обработка батча CAN фреймов. + + Неблокирующий метод - при ошибках или переполнении очереди InfluxDB + просто пропускает батч, не останавливая обработку других handlers. + """ if not self.enabled or not self._initialized or not self.influxdb_client or not frames: return 0 try: + # Проверяем состояние соединения перед обработкой + if hasattr(self.influxdb_client, 'connection_status'): + if self.influxdb_client.connection_status != ConnectionStatus.CONNECTED: + # Соединение недоступно - пропускаем батч без ошибки + return 0 + # Конвертируем CANFrame в формат для InfluxDB influx_messages = [] for frame in frames: @@ -79,9 +91,13 @@ class InfluxDBHandler(BaseHandler): }) if influx_messages: + # Пытаемся добавить в очередь InfluxDB (неблокирующий режим) + # Если очередь переполнена, пропускаем батч return self.influxdb_client.write_messages_batch(influx_messages) return 0 except Exception as e: + # Ошибка не должна останавливать обработку других handlers + # Логируем, но не пробрасываем исключение self.logger.error( f"Failed to send frames batch to InfluxDB: {e}", exc_info=True, diff --git a/can_sniffer/src/influxdb_handler/influxdb_client.py b/can_sniffer/src/influxdb_handler/influxdb_client.py index c4a6172..92207a1 100644 --- a/can_sniffer/src/influxdb_handler/influxdb_client.py +++ b/can_sniffer/src/influxdb_handler/influxdb_client.py @@ -67,7 +67,10 @@ class InfluxDBClient: # Инициализируем атрибуты по умолчанию self.client: Optional[InfluxClient] = None self.write_api = None - self.message_queue: Queue[Dict[str, Any]] = Queue() + # Очередь с ограничением размера для предотвращения переполнения памяти + # Размер = batch_size * 10 (для буферизации нескольких батчей) + queue_maxsize = self.config.batch_size * 10 + self.message_queue: Queue[Dict[str, Any]] = Queue(maxsize=queue_maxsize) self.running = False self.forwarder_thread: Optional[threading.Thread] = None self.health_check_thread: Optional[threading.Thread] = None @@ -288,19 +291,36 @@ class InfluxDBClient: try: # Добавляем сообщение в очередь для пакетной отправки - self.message_queue.put({ - "interface": interface, - "can_id": can_id, - "dlc": dlc, - "data": data, - "timestamp": timestamp - }) + # Используем неблокирующий режим, если block=False + if block: + self.message_queue.put({ + "interface": interface, + "can_id": can_id, + "dlc": dlc, + "data": data, + "timestamp": timestamp + }) + else: + # Неблокирующий режим - если очередь полна, пропускаем сообщение + try: + self.message_queue.put_nowait({ + "interface": interface, + "can_id": can_id, + "dlc": dlc, + "data": data, + "timestamp": timestamp + }) + except: + # Очередь переполнена - пропускаем сообщение + self.failed_count += 1 + return False return True except Exception as e: self.logger.error( f"Failed to queue message for InfluxDB: {e}", exc_info=True ) + self.failed_count += 1 return False def _create_point(self, msg: Dict[str, Any]) -> Optional[Point]: @@ -388,15 +408,16 @@ class InfluxDBClient: ) return False - def write_messages_batch(self, messages: List[Dict[str, Any]]) -> int: + def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int: """ Пакетная отправка сообщений в InfluxDB. Args: messages: Список словарей с данными сообщений + block: Блокировать ли при переполнении очереди Returns: - Количество успешно отправленных сообщений + Количество успешно добавленных в очередь сообщений """ if not self.write_api or not messages: return 0 @@ -404,44 +425,40 @@ class InfluxDBClient: # Проверяем соединение перед отправкой if self.connection_status != ConnectionStatus.CONNECTED: if not self._health_check(): - self.logger.warning("InfluxDB connection not available, skipping batch") + # Соединение недоступно - пропускаем батч без ошибки self.failed_count += len(messages) return 0 else: self.connection_status = ConnectionStatus.CONNECTED - try: - # Создаем точки для InfluxDB - points = [] - for msg in messages: - point = self._create_point(msg) - if point: - points.append(point) - - if not points: - self.logger.warning("No valid points created from messages") - return 0 - - # Отправляем с retry - if self._write_with_retry(points): - sent = len(points) - self.sent_count += sent - self.logger.debug( - f"Sent {sent} messages to InfluxDB", - extra={"batch_size": sent} - ) - return sent - else: - return 0 - - except Exception as e: + # Проверяем заполненность очереди перед добавлением + queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0 + if queue_usage > 0.9 and not block: + # Очередь почти переполнена - пропускаем батч self.failed_count += len(messages) - self.logger.error( - f"Failed to send messages batch to InfluxDB: {e}", - exc_info=True, - extra={"batch_size": len(messages)} - ) return 0 + + # Добавляем сообщения в очередь для асинхронной отправки + added_count = 0 + for msg in messages: + try: + if block: + self.message_queue.put(msg) + else: + try: + self.message_queue.put_nowait(msg) + except: + # Очередь переполнена - пропускаем оставшиеся сообщения + break + added_count += 1 + except Exception as e: + self.logger.debug(f"Failed to queue message: {e}") + break + + if added_count < len(messages): + self.failed_count += (len(messages) - added_count) + + return added_count def _forwarder_loop(self) -> None: """Основной цикл для отправки сообщений в InfluxDB.""" diff --git a/can_sniffer/src/logger.py b/can_sniffer/src/logger.py index d05b92f..5fff07c 100644 --- a/can_sniffer/src/logger.py +++ b/can_sniffer/src/logger.py @@ -77,16 +77,49 @@ class PrettyFormatter(logging.Formatter): if not data: return "" + # Фильтруем None значения (опционально показываем их) + filtered_data = {k: v for k, v in data.items() if v is not None} + if not filtered_data and not any(v is None for v in data.values()): + return "" + + # Если все None, показываем их + if not filtered_data: + filtered_data = data + lines = [] - # Определяем максимальную ширину ключа - max_key_width = max(len(str(k)) for k in data.keys()) if data else 0 - max_key_width = max(max_key_width, 20) # Минимум 20 символов + # Вычисляем ширину для каждого элемента + items_info = [] + for key, value in filtered_data.items(): + formatted_value = self._format_value(value) + value_lines = formatted_value.split('\n') + key_width = len(str(key)) + max_value_width = max(len(line) for line in value_lines) + items_info.append({ + 'key': key, + 'value': value, + 'formatted': formatted_value, + 'value_lines': value_lines, + 'key_width': key_width, + 'max_value_width': max_value_width + }) + + # Определяем максимальную ширину ключа и значения + max_key_width = max(item['key_width'] for item in items_info) if items_info else 0 + max_key_width = max(max_key_width, 12) # Минимум 12 символов + + # Вычисляем ширину блока + max_content_width = 0 + for item in items_info: + content_width = max_key_width + 2 + item['max_value_width'] # key + ": " + value + max_content_width = max(max_content_width, content_width) + + border_width = max(max_content_width + 4, 40) # +4 для отступов, минимум 40 # Заголовок if title: title_line = f" {title} " - border_width = max(len(title_line), max_key_width + 20) + border_width = max(border_width, len(title_line) + 2) lines.append( f"{self.BOX_CHARS['top_left']}" f"{self.BOX_CHARS['horizontal'] * (border_width - 2)}" @@ -103,7 +136,6 @@ class PrettyFormatter(logging.Formatter): f"{self.BOX_CHARS['top_right']}" ) else: - border_width = max_key_width + 20 lines.append( f"{self.BOX_CHARS['top_left']}" f"{self.BOX_CHARS['horizontal'] * (border_width - 2)}" @@ -111,39 +143,30 @@ class PrettyFormatter(logging.Formatter): ) # Содержимое - for i, (key, value) in enumerate(data.items()): - # Форматируем значение - formatted_value = self._format_value(value) + for i, item in enumerate(items_info): + key_str = str(item['key']).ljust(max_key_width) - # Разбиваем длинные значения на несколько строк - value_lines = formatted_value.split('\n') - - for j, value_line in enumerate(value_lines): + for j, value_line in enumerate(item['value_lines']): if j == 0: # Первая строка с ключом - key_str = str(key).ljust(max_key_width) + content = f"{key_str}: {value_line}" + padding = border_width - len(content) - 4 # -4 для " | " и пробелов lines.append( f"{self.BOX_CHARS['vertical']} " - f"{key_str}: {value_line}" - f"{' ' * (border_width - len(key_str) - len(value_line) - 4)}" + f"{content}" + f"{' ' * max(0, padding)}" f" {self.BOX_CHARS['vertical']}" ) else: # Продолжение значения + content = f"{' ' * (max_key_width + 2)}{value_line}" + padding = border_width - len(content) - 4 lines.append( f"{self.BOX_CHARS['vertical']} " - f"{' ' * (max_key_width + 2)}{value_line}" - f"{' ' * (border_width - max_key_width - len(value_line) - 4)}" + f"{content}" + f"{' ' * max(0, padding)}" f" {self.BOX_CHARS['vertical']}" ) - - # Разделитель между элементами (кроме последнего) - if i < len(data) - 1: - lines.append( - f"{self.BOX_CHARS['vertical']}" - f"{' ' * (border_width - 2)}" - f"{self.BOX_CHARS['vertical']}" - ) # Нижняя граница lines.append( @@ -164,7 +187,9 @@ class PrettyFormatter(logging.Formatter): Returns: Отформатированная строка """ - if isinstance(value, dict): + if value is None: + return "—" # Используем длинное тире для None + elif isinstance(value, dict): # Вложенные словари - форматируем как JSON с отступами return json.dumps(value, indent=2, ensure_ascii=False) elif isinstance(value, list): @@ -176,14 +201,25 @@ class PrettyFormatter(logging.Formatter): # Числа - добавляем форматирование для больших чисел if isinstance(value, float): return f"{value:.6f}" if abs(value) < 1000 else f"{value:.2f}" + # Форматируем большие числа с разделителями тысяч return f"{value:,}" if abs(value) >= 1000 else str(value) elif isinstance(value, bytes): # Байты - показываем hex - return value.hex().upper() if len(value) <= 16 else value.hex().upper()[:32] + "..." + if len(value) <= 16: + return value.hex().upper() + return value.hex().upper()[:32] + "..." elif isinstance(value, bool): return "✓" if value else "✗" + elif isinstance(value, str): + # Обрезаем длинные строки + if len(value) > 80: + return value[:77] + "..." + return value else: - return str(value) + str_value = str(value) + if len(str_value) > 80: + return str_value[:77] + "..." + return str_value def _format_can_frame(self, data: Dict[str, Any]) -> str: """ @@ -359,19 +395,26 @@ class PrettyFormatter(logging.Formatter): # Добавляем дополнительные поля if extra_fields: + # Фильтруем None значения (кроме специальных случаев) + filtered_fields = {k: v for k, v in extra_fields.items() if v is not None} + + # Если после фильтрации ничего не осталось, не добавляем блок + if not filtered_fields: + return formatted + # Проверяем, это CAN фрейм или обычный dict - if any(field in extra_fields for field in ['can_id', 'can_id_int', 'can_id_hex', 'interface', 'bus']): + if any(field in filtered_fields for field in ['can_id', 'can_id_int', 'can_id_hex', 'interface', 'bus']): # CAN фрейм - специальное форматирование - formatted += self._format_can_frame(extra_fields) - elif 'first_message' in extra_fields and isinstance(extra_fields['first_message'], dict): + formatted += self._format_can_frame(filtered_fields) + elif 'first_message' in filtered_fields and isinstance(filtered_fields['first_message'], dict): # Батч с первым сообщением - batch_info = {k: v for k, v in extra_fields.items() if k != 'first_message'} + batch_info = {k: v for k, v in filtered_fields.items() if k != 'first_message'} if batch_info: formatted += self._format_dict_block(batch_info, "Batch Info") - formatted += self._format_can_frame(extra_fields['first_message']) + formatted += self._format_can_frame(filtered_fields['first_message']) else: # Обычный dict - блочное форматирование - formatted += self._format_dict_block(extra_fields) + formatted += self._format_dict_block(filtered_fields) # Добавляем информацию об исключении, если есть if record.exc_info: diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index e0b180d..8264777 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -26,16 +26,20 @@ class MessageProcessor: Каждый обработчик реализует интерфейс BaseHandler. """ - def __init__(self, handlers: Optional[List[BaseHandler]] = None, queue_size: int = 10000): + def __init__(self, handlers: Optional[List[BaseHandler]] = None, queue_size: Optional[int] = None): """ Инициализация процессора сообщений. Args: handlers: Список обработчиков для pipeline. Если None, создаются по умолчанию. - queue_size: Максимальный размер очереди сообщений + queue_size: Максимальный размер очереди сообщений. Если None, берется из config.general.buffer_size """ self.logger = logger + # Используем размер очереди из конфига, если не указан явно + if queue_size is None: + queue_size = config.general.buffer_size + # Очередь для асинхронной обработки сообщений # Храним CANFrame объекты (неизменяемые, легковесные) self.message_queue: Queue[CANFrame] = Queue(maxsize=queue_size) @@ -103,7 +107,7 @@ class MessageProcessor: extra={"handlers": [h.name for h in self.handlers]} ) - def enqueue(self, frame: CANFrame) -> bool: + def enqueue(self, frame: CANFrame, block: bool = False, timeout: Optional[float] = None) -> bool: """ Добавление CAN фрейма в очередь для асинхронной обработки. @@ -111,14 +115,21 @@ class MessageProcessor: Args: frame: CANFrame объект + block: Блокировать ли при переполнении очереди (для backpressure) + timeout: Таймаут для блокирующего режима (секунды) Returns: True если сообщение добавлено, False если очередь переполнена """ try: - # Пытаемся добавить в очередь без блокировки (non-blocking) - self.message_queue.put_nowait(frame) - return True + if block: + # Блокирующий режим - используется для backpressure + self.message_queue.put(frame, timeout=timeout) + return True + else: + # Неблокирующий режим - быстрое добавление + self.message_queue.put_nowait(frame) + return True except: # Очередь переполнена - пропускаем сообщение self.dropped_count += 1 @@ -126,15 +137,29 @@ class MessageProcessor: # Логируем предупреждение периодически (не каждое сообщение) if self.queue_full_warnings % 1000 == 0: + queue_usage = (self.message_queue.qsize() / self.message_queue.maxsize) * 100 self.logger.warning( f"Message queue full, dropped {self.dropped_count} messages", extra={ "dropped_count": self.dropped_count, - "queue_size": self.message_queue.qsize() + "queue_size": self.message_queue.qsize(), + "queue_maxsize": self.message_queue.maxsize, + "queue_usage_percent": round(queue_usage, 1) } ) return False + def get_queue_usage(self) -> float: + """ + Получение процента заполнения очереди. + + Returns: + Процент заполнения (0.0 - 1.0) + """ + if self.message_queue.maxsize == 0: + return 0.0 + return self.message_queue.qsize() / self.message_queue.maxsize + def process(self, frame: CANFrame) -> None: """ Публичный метод для обработки CAN фрейма. diff --git a/can_sniffer/src/socket_can/src.py b/can_sniffer/src/socket_can/src.py index ed15c64..925e2f2 100644 --- a/can_sniffer/src/socket_can/src.py +++ b/can_sniffer/src/socket_can/src.py @@ -73,10 +73,17 @@ class CANBusHandler: """Основной цикл чтения сообщений с шины.""" self.logger.info(f"Starting read loop for {self.interface}") + # Переменные для backpressure механизма + consecutive_drops = 0 + backpressure_delay = 0.0 + max_backpressure_delay = 0.5 # Максимальная задержка 500ms + while self.running: try: # Читаем сообщение с таймаутом для возможности проверки running - message = self.bus.recv(timeout=0.1) + # Увеличиваем таймаут при backpressure + recv_timeout = 0.1 + backpressure_delay + message = self.bus.recv(timeout=recv_timeout) if message is not None: self.message_count += 1 @@ -95,8 +102,53 @@ class CANBusHandler: continue # Вызываем callback для обработки сообщения + # Используем backpressure: если очередь заполнена, замедляем чтение try: - self.message_callback(frame) + # Проверяем использование очереди (если callback - это enqueue) + if hasattr(self.message_callback, '__self__'): + processor = getattr(self.message_callback, '__self__', None) + if processor and hasattr(processor, 'get_queue_usage'): + queue_usage = processor.get_queue_usage() + + # Если очередь заполнена более чем на 80%, используем блокирующий режим + if queue_usage > 0.8: + # Блокируем добавление с небольшим таймаутом для backpressure + if hasattr(processor, 'enqueue'): + success = processor.enqueue(frame, block=True, timeout=0.01) + if not success: + consecutive_drops += 1 + # Увеличиваем задержку при последовательных потерях + backpressure_delay = min( + max_backpressure_delay, + 0.001 * consecutive_drops + ) + continue + else: + self.message_callback(frame) + else: + # Очередь не заполнена - быстрое добавление + if hasattr(processor, 'enqueue'): + success = processor.enqueue(frame, block=False) + if not success: + consecutive_drops += 1 + backpressure_delay = min( + max_backpressure_delay, + 0.001 * consecutive_drops + ) + continue + else: + self.message_callback(frame) + + # Сбрасываем счетчик при успешной отправке + if queue_usage < 0.5: + consecutive_drops = 0 + backpressure_delay = 0.0 + else: + # Обычный callback без backpressure + self.message_callback(frame) + else: + # Обычный callback без backpressure + self.message_callback(frame) except Exception as e: self.logger.error( f"Error in message callback for {self.interface}: {e}",