try to fix queue limit

This commit is contained in:
2026-01-07 12:15:50 +03:00
parent 6ca186903e
commit 5a94ec2ec4
5 changed files with 238 additions and 85 deletions

View File

@@ -6,6 +6,7 @@ from typing import List, Dict, Any, Optional
from can_frame import CANFrame
from .base import BaseHandler
from influxdb_handler import get_influxdb_client
from influxdb_handler.influxdb_client import ConnectionStatus
from config import config
@@ -62,11 +63,22 @@ class InfluxDBHandler(BaseHandler):
return False
def handle_batch(self, frames: List[CANFrame]) -> int:
"""Обработка батча CAN фреймов."""
"""
Обработка батча CAN фреймов.
Неблокирующий метод - при ошибках или переполнении очереди InfluxDB
просто пропускает батч, не останавливая обработку других handlers.
"""
if not self.enabled or not self._initialized or not self.influxdb_client or not frames:
return 0
try:
# Проверяем состояние соединения перед обработкой
if hasattr(self.influxdb_client, 'connection_status'):
if self.influxdb_client.connection_status != ConnectionStatus.CONNECTED:
# Соединение недоступно - пропускаем батч без ошибки
return 0
# Конвертируем CANFrame в формат для InfluxDB
influx_messages = []
for frame in frames:
@@ -79,9 +91,13 @@ class InfluxDBHandler(BaseHandler):
})
if influx_messages:
# Пытаемся добавить в очередь InfluxDB (неблокирующий режим)
# Если очередь переполнена, пропускаем батч
return self.influxdb_client.write_messages_batch(influx_messages)
return 0
except Exception as e:
# Ошибка не должна останавливать обработку других handlers
# Логируем, но не пробрасываем исключение
self.logger.error(
f"Failed to send frames batch to InfluxDB: {e}",
exc_info=True,

View File

@@ -67,7 +67,10 @@ class InfluxDBClient:
# Инициализируем атрибуты по умолчанию
self.client: Optional[InfluxClient] = None
self.write_api = None
self.message_queue: Queue[Dict[str, Any]] = Queue()
# Очередь с ограничением размера для предотвращения переполнения памяти
# Размер = batch_size * 10 (для буферизации нескольких батчей)
queue_maxsize = self.config.batch_size * 10
self.message_queue: Queue[Dict[str, Any]] = Queue(maxsize=queue_maxsize)
self.running = False
self.forwarder_thread: Optional[threading.Thread] = None
self.health_check_thread: Optional[threading.Thread] = None
@@ -288,19 +291,36 @@ class InfluxDBClient:
try:
# Добавляем сообщение в очередь для пакетной отправки
self.message_queue.put({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
})
# Используем неблокирующий режим, если block=False
if block:
self.message_queue.put({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
})
else:
# Неблокирующий режим - если очередь полна, пропускаем сообщение
try:
self.message_queue.put_nowait({
"interface": interface,
"can_id": can_id,
"dlc": dlc,
"data": data,
"timestamp": timestamp
})
except:
# Очередь переполнена - пропускаем сообщение
self.failed_count += 1
return False
return True
except Exception as e:
self.logger.error(
f"Failed to queue message for InfluxDB: {e}",
exc_info=True
)
self.failed_count += 1
return False
def _create_point(self, msg: Dict[str, Any]) -> Optional[Point]:
@@ -388,15 +408,16 @@ class InfluxDBClient:
)
return False
def write_messages_batch(self, messages: List[Dict[str, Any]]) -> int:
def write_messages_batch(self, messages: List[Dict[str, Any]], block: bool = False) -> int:
"""
Пакетная отправка сообщений в InfluxDB.
Args:
messages: Список словарей с данными сообщений
block: Блокировать ли при переполнении очереди
Returns:
Количество успешно отправленных сообщений
Количество успешно добавленных в очередь сообщений
"""
if not self.write_api or not messages:
return 0
@@ -404,44 +425,40 @@ class InfluxDBClient:
# Проверяем соединение перед отправкой
if self.connection_status != ConnectionStatus.CONNECTED:
if not self._health_check():
self.logger.warning("InfluxDB connection not available, skipping batch")
# Соединение недоступно - пропускаем батч без ошибки
self.failed_count += len(messages)
return 0
else:
self.connection_status = ConnectionStatus.CONNECTED
try:
# Создаем точки для InfluxDB
points = []
for msg in messages:
point = self._create_point(msg)
if point:
points.append(point)
if not points:
self.logger.warning("No valid points created from messages")
return 0
# Отправляем с retry
if self._write_with_retry(points):
sent = len(points)
self.sent_count += sent
self.logger.debug(
f"Sent {sent} messages to InfluxDB",
extra={"batch_size": sent}
)
return sent
else:
return 0
except Exception as e:
# Проверяем заполненность очереди перед добавлением
queue_usage = self.message_queue.qsize() / self.message_queue.maxsize if self.message_queue.maxsize > 0 else 0
if queue_usage > 0.9 and not block:
# Очередь почти переполнена - пропускаем батч
self.failed_count += len(messages)
self.logger.error(
f"Failed to send messages batch to InfluxDB: {e}",
exc_info=True,
extra={"batch_size": len(messages)}
)
return 0
# Добавляем сообщения в очередь для асинхронной отправки
added_count = 0
for msg in messages:
try:
if block:
self.message_queue.put(msg)
else:
try:
self.message_queue.put_nowait(msg)
except:
# Очередь переполнена - пропускаем оставшиеся сообщения
break
added_count += 1
except Exception as e:
self.logger.debug(f"Failed to queue message: {e}")
break
if added_count < len(messages):
self.failed_count += (len(messages) - added_count)
return added_count
def _forwarder_loop(self) -> None:
"""Основной цикл для отправки сообщений в InfluxDB."""

View File

@@ -77,16 +77,49 @@ class PrettyFormatter(logging.Formatter):
if not data:
return ""
# Фильтруем None значения (опционально показываем их)
filtered_data = {k: v for k, v in data.items() if v is not None}
if not filtered_data and not any(v is None for v in data.values()):
return ""
# Если все None, показываем их
if not filtered_data:
filtered_data = data
lines = []
# Определяем максимальную ширину ключа
max_key_width = max(len(str(k)) for k in data.keys()) if data else 0
max_key_width = max(max_key_width, 20) # Минимум 20 символов
# Вычисляем ширину для каждого элемента
items_info = []
for key, value in filtered_data.items():
formatted_value = self._format_value(value)
value_lines = formatted_value.split('\n')
key_width = len(str(key))
max_value_width = max(len(line) for line in value_lines)
items_info.append({
'key': key,
'value': value,
'formatted': formatted_value,
'value_lines': value_lines,
'key_width': key_width,
'max_value_width': max_value_width
})
# Определяем максимальную ширину ключа и значения
max_key_width = max(item['key_width'] for item in items_info) if items_info else 0
max_key_width = max(max_key_width, 12) # Минимум 12 символов
# Вычисляем ширину блока
max_content_width = 0
for item in items_info:
content_width = max_key_width + 2 + item['max_value_width'] # key + ": " + value
max_content_width = max(max_content_width, content_width)
border_width = max(max_content_width + 4, 40) # +4 для отступов, минимум 40
# Заголовок
if title:
title_line = f" {title} "
border_width = max(len(title_line), max_key_width + 20)
border_width = max(border_width, len(title_line) + 2)
lines.append(
f"{self.BOX_CHARS['top_left']}"
f"{self.BOX_CHARS['horizontal'] * (border_width - 2)}"
@@ -103,7 +136,6 @@ class PrettyFormatter(logging.Formatter):
f"{self.BOX_CHARS['top_right']}"
)
else:
border_width = max_key_width + 20
lines.append(
f"{self.BOX_CHARS['top_left']}"
f"{self.BOX_CHARS['horizontal'] * (border_width - 2)}"
@@ -111,39 +143,30 @@ class PrettyFormatter(logging.Formatter):
)
# Содержимое
for i, (key, value) in enumerate(data.items()):
# Форматируем значение
formatted_value = self._format_value(value)
for i, item in enumerate(items_info):
key_str = str(item['key']).ljust(max_key_width)
# Разбиваем длинные значения на несколько строк
value_lines = formatted_value.split('\n')
for j, value_line in enumerate(value_lines):
for j, value_line in enumerate(item['value_lines']):
if j == 0:
# Первая строка с ключом
key_str = str(key).ljust(max_key_width)
content = f"{key_str}: {value_line}"
padding = border_width - len(content) - 4 # -4 для " | " и пробелов
lines.append(
f"{self.BOX_CHARS['vertical']} "
f"{key_str}: {value_line}"
f"{' ' * (border_width - len(key_str) - len(value_line) - 4)}"
f"{content}"
f"{' ' * max(0, padding)}"
f" {self.BOX_CHARS['vertical']}"
)
else:
# Продолжение значения
content = f"{' ' * (max_key_width + 2)}{value_line}"
padding = border_width - len(content) - 4
lines.append(
f"{self.BOX_CHARS['vertical']} "
f"{' ' * (max_key_width + 2)}{value_line}"
f"{' ' * (border_width - max_key_width - len(value_line) - 4)}"
f"{content}"
f"{' ' * max(0, padding)}"
f" {self.BOX_CHARS['vertical']}"
)
# Разделитель между элементами (кроме последнего)
if i < len(data) - 1:
lines.append(
f"{self.BOX_CHARS['vertical']}"
f"{' ' * (border_width - 2)}"
f"{self.BOX_CHARS['vertical']}"
)
# Нижняя граница
lines.append(
@@ -164,7 +187,9 @@ class PrettyFormatter(logging.Formatter):
Returns:
Отформатированная строка
"""
if isinstance(value, dict):
if value is None:
return "" # Используем длинное тире для None
elif isinstance(value, dict):
# Вложенные словари - форматируем как JSON с отступами
return json.dumps(value, indent=2, ensure_ascii=False)
elif isinstance(value, list):
@@ -176,14 +201,25 @@ class PrettyFormatter(logging.Formatter):
# Числа - добавляем форматирование для больших чисел
if isinstance(value, float):
return f"{value:.6f}" if abs(value) < 1000 else f"{value:.2f}"
# Форматируем большие числа с разделителями тысяч
return f"{value:,}" if abs(value) >= 1000 else str(value)
elif isinstance(value, bytes):
# Байты - показываем hex
return value.hex().upper() if len(value) <= 16 else value.hex().upper()[:32] + "..."
if len(value) <= 16:
return value.hex().upper()
return value.hex().upper()[:32] + "..."
elif isinstance(value, bool):
return "" if value else ""
elif isinstance(value, str):
# Обрезаем длинные строки
if len(value) > 80:
return value[:77] + "..."
return value
else:
return str(value)
str_value = str(value)
if len(str_value) > 80:
return str_value[:77] + "..."
return str_value
def _format_can_frame(self, data: Dict[str, Any]) -> str:
"""
@@ -359,19 +395,26 @@ class PrettyFormatter(logging.Formatter):
# Добавляем дополнительные поля
if extra_fields:
# Фильтруем None значения (кроме специальных случаев)
filtered_fields = {k: v for k, v in extra_fields.items() if v is not None}
# Если после фильтрации ничего не осталось, не добавляем блок
if not filtered_fields:
return formatted
# Проверяем, это CAN фрейм или обычный dict
if any(field in extra_fields for field in ['can_id', 'can_id_int', 'can_id_hex', 'interface', 'bus']):
if any(field in filtered_fields for field in ['can_id', 'can_id_int', 'can_id_hex', 'interface', 'bus']):
# CAN фрейм - специальное форматирование
formatted += self._format_can_frame(extra_fields)
elif 'first_message' in extra_fields and isinstance(extra_fields['first_message'], dict):
formatted += self._format_can_frame(filtered_fields)
elif 'first_message' in filtered_fields and isinstance(filtered_fields['first_message'], dict):
# Батч с первым сообщением
batch_info = {k: v for k, v in extra_fields.items() if k != 'first_message'}
batch_info = {k: v for k, v in filtered_fields.items() if k != 'first_message'}
if batch_info:
formatted += self._format_dict_block(batch_info, "Batch Info")
formatted += self._format_can_frame(extra_fields['first_message'])
formatted += self._format_can_frame(filtered_fields['first_message'])
else:
# Обычный dict - блочное форматирование
formatted += self._format_dict_block(extra_fields)
formatted += self._format_dict_block(filtered_fields)
# Добавляем информацию об исключении, если есть
if record.exc_info:

View File

@@ -26,16 +26,20 @@ class MessageProcessor:
Каждый обработчик реализует интерфейс BaseHandler.
"""
def __init__(self, handlers: Optional[List[BaseHandler]] = None, queue_size: int = 10000):
def __init__(self, handlers: Optional[List[BaseHandler]] = None, queue_size: Optional[int] = None):
"""
Инициализация процессора сообщений.
Args:
handlers: Список обработчиков для pipeline. Если None, создаются по умолчанию.
queue_size: Максимальный размер очереди сообщений
queue_size: Максимальный размер очереди сообщений. Если None, берется из config.general.buffer_size
"""
self.logger = logger
# Используем размер очереди из конфига, если не указан явно
if queue_size is None:
queue_size = config.general.buffer_size
# Очередь для асинхронной обработки сообщений
# Храним CANFrame объекты (неизменяемые, легковесные)
self.message_queue: Queue[CANFrame] = Queue(maxsize=queue_size)
@@ -103,7 +107,7 @@ class MessageProcessor:
extra={"handlers": [h.name for h in self.handlers]}
)
def enqueue(self, frame: CANFrame) -> bool:
def enqueue(self, frame: CANFrame, block: bool = False, timeout: Optional[float] = None) -> bool:
"""
Добавление CAN фрейма в очередь для асинхронной обработки.
@@ -111,14 +115,21 @@ class MessageProcessor:
Args:
frame: CANFrame объект
block: Блокировать ли при переполнении очереди (для backpressure)
timeout: Таймаут для блокирующего режима (секунды)
Returns:
True если сообщение добавлено, False если очередь переполнена
"""
try:
# Пытаемся добавить в очередь без блокировки (non-blocking)
self.message_queue.put_nowait(frame)
return True
if block:
# Блокирующий режим - используется для backpressure
self.message_queue.put(frame, timeout=timeout)
return True
else:
# Неблокирующий режим - быстрое добавление
self.message_queue.put_nowait(frame)
return True
except:
# Очередь переполнена - пропускаем сообщение
self.dropped_count += 1
@@ -126,15 +137,29 @@ class MessageProcessor:
# Логируем предупреждение периодически (не каждое сообщение)
if self.queue_full_warnings % 1000 == 0:
queue_usage = (self.message_queue.qsize() / self.message_queue.maxsize) * 100
self.logger.warning(
f"Message queue full, dropped {self.dropped_count} messages",
extra={
"dropped_count": self.dropped_count,
"queue_size": self.message_queue.qsize()
"queue_size": self.message_queue.qsize(),
"queue_maxsize": self.message_queue.maxsize,
"queue_usage_percent": round(queue_usage, 1)
}
)
return False
def get_queue_usage(self) -> float:
"""
Получение процента заполнения очереди.
Returns:
Процент заполнения (0.0 - 1.0)
"""
if self.message_queue.maxsize == 0:
return 0.0
return self.message_queue.qsize() / self.message_queue.maxsize
def process(self, frame: CANFrame) -> None:
"""
Публичный метод для обработки CAN фрейма.

View File

@@ -73,10 +73,17 @@ class CANBusHandler:
"""Основной цикл чтения сообщений с шины."""
self.logger.info(f"Starting read loop for {self.interface}")
# Переменные для backpressure механизма
consecutive_drops = 0
backpressure_delay = 0.0
max_backpressure_delay = 0.5 # Максимальная задержка 500ms
while self.running:
try:
# Читаем сообщение с таймаутом для возможности проверки running
message = self.bus.recv(timeout=0.1)
# Увеличиваем таймаут при backpressure
recv_timeout = 0.1 + backpressure_delay
message = self.bus.recv(timeout=recv_timeout)
if message is not None:
self.message_count += 1
@@ -95,8 +102,53 @@ class CANBusHandler:
continue
# Вызываем callback для обработки сообщения
# Используем backpressure: если очередь заполнена, замедляем чтение
try:
self.message_callback(frame)
# Проверяем использование очереди (если callback - это enqueue)
if hasattr(self.message_callback, '__self__'):
processor = getattr(self.message_callback, '__self__', None)
if processor and hasattr(processor, 'get_queue_usage'):
queue_usage = processor.get_queue_usage()
# Если очередь заполнена более чем на 80%, используем блокирующий режим
if queue_usage > 0.8:
# Блокируем добавление с небольшим таймаутом для backpressure
if hasattr(processor, 'enqueue'):
success = processor.enqueue(frame, block=True, timeout=0.01)
if not success:
consecutive_drops += 1
# Увеличиваем задержку при последовательных потерях
backpressure_delay = min(
max_backpressure_delay,
0.001 * consecutive_drops
)
continue
else:
self.message_callback(frame)
else:
# Очередь не заполнена - быстрое добавление
if hasattr(processor, 'enqueue'):
success = processor.enqueue(frame, block=False)
if not success:
consecutive_drops += 1
backpressure_delay = min(
max_backpressure_delay,
0.001 * consecutive_drops
)
continue
else:
self.message_callback(frame)
# Сбрасываем счетчик при успешной отправке
if queue_usage < 0.5:
consecutive_drops = 0
backpressure_delay = 0.0
else:
# Обычный callback без backpressure
self.message_callback(frame)
else:
# Обычный callback без backpressure
self.message_callback(frame)
except Exception as e:
self.logger.error(
f"Error in message callback for {self.interface}: {e}",