fix influxdb handler not send to db
This commit is contained in:
@@ -412,6 +412,8 @@ class InfluxDBClient:
|
||||
"""
|
||||
Пакетная отправка сообщений в InfluxDB.
|
||||
|
||||
Добавляет сообщения в очередь для асинхронной отправки через forwarder loop.
|
||||
|
||||
Args:
|
||||
messages: Список словарей с данными сообщений
|
||||
block: Блокировать ли при переполнении очереди
|
||||
@@ -422,7 +424,7 @@ class InfluxDBClient:
|
||||
if not self.write_api or not messages:
|
||||
return 0
|
||||
|
||||
# Проверяем соединение перед отправкой
|
||||
# Проверяем соединение перед добавлением в очередь
|
||||
if self.connection_status != ConnectionStatus.CONNECTED:
|
||||
if not self._health_check():
|
||||
# Соединение недоступно - пропускаем батч без ошибки
|
||||
@@ -460,6 +462,63 @@ class InfluxDBClient:
|
||||
|
||||
return added_count
|
||||
|
||||
def _send_messages_batch(self, messages: List[Dict[str, Any]]) -> int:
|
||||
"""
|
||||
Непосредственная отправка батча сообщений в InfluxDB.
|
||||
|
||||
Этот метод вызывается из forwarder loop для реальной отправки данных.
|
||||
|
||||
Args:
|
||||
messages: Список словарей с данными сообщений
|
||||
|
||||
Returns:
|
||||
Количество успешно отправленных сообщений
|
||||
"""
|
||||
if not self.write_api or not messages:
|
||||
return 0
|
||||
|
||||
# Проверяем соединение перед отправкой
|
||||
if self.connection_status != ConnectionStatus.CONNECTED:
|
||||
if not self._health_check():
|
||||
self.logger.warning("InfluxDB connection not available, skipping batch")
|
||||
self.failed_count += len(messages)
|
||||
return 0
|
||||
else:
|
||||
self.connection_status = ConnectionStatus.CONNECTED
|
||||
|
||||
try:
|
||||
# Создаем точки для InfluxDB
|
||||
points = []
|
||||
for msg in messages:
|
||||
point = self._create_point(msg)
|
||||
if point:
|
||||
points.append(point)
|
||||
|
||||
if not points:
|
||||
self.logger.warning("No valid points created from messages")
|
||||
return 0
|
||||
|
||||
# Отправляем с retry
|
||||
if self._write_with_retry(points):
|
||||
sent = len(points)
|
||||
self.sent_count += sent
|
||||
self.logger.debug(
|
||||
f"Sent {sent} messages to InfluxDB",
|
||||
extra={"batch_size": sent}
|
||||
)
|
||||
return sent
|
||||
else:
|
||||
return 0
|
||||
|
||||
except Exception as e:
|
||||
self.failed_count += len(messages)
|
||||
self.logger.error(
|
||||
f"Failed to send messages batch to InfluxDB: {e}",
|
||||
exc_info=True,
|
||||
extra={"batch_size": len(messages)}
|
||||
)
|
||||
return 0
|
||||
|
||||
def _forwarder_loop(self) -> None:
|
||||
"""Основной цикл для отправки сообщений в InfluxDB."""
|
||||
self.logger.info("InfluxDB forwarder loop started")
|
||||
@@ -485,7 +544,8 @@ class InfluxDBClient:
|
||||
|
||||
if should_flush:
|
||||
if batch:
|
||||
self.write_messages_batch(batch)
|
||||
# Отправляем батч напрямую в InfluxDB (не добавляем в очередь!)
|
||||
self._send_messages_batch(batch)
|
||||
batch = []
|
||||
last_flush_time = current_time
|
||||
|
||||
@@ -498,7 +558,7 @@ class InfluxDBClient:
|
||||
|
||||
# Отправляем оставшиеся сообщения
|
||||
if batch:
|
||||
self.write_messages_batch(batch)
|
||||
self._send_messages_batch(batch)
|
||||
|
||||
self.logger.info(
|
||||
"InfluxDB forwarder loop stopped",
|
||||
|
||||
Reference in New Issue
Block a user