diff --git a/can_sniffer/src/influxdb_handler/influxdb_client.py b/can_sniffer/src/influxdb_handler/influxdb_client.py index 1290e6c..a1a35fd 100644 --- a/can_sniffer/src/influxdb_handler/influxdb_client.py +++ b/can_sniffer/src/influxdb_handler/influxdb_client.py @@ -412,6 +412,8 @@ class InfluxDBClient: """ Пакетная отправка сообщений в InfluxDB. + Добавляет сообщения в очередь для асинхронной отправки через forwarder loop. + Args: messages: Список словарей с данными сообщений block: Блокировать ли при переполнении очереди @@ -422,7 +424,7 @@ class InfluxDBClient: if not self.write_api or not messages: return 0 - # Проверяем соединение перед отправкой + # Проверяем соединение перед добавлением в очередь if self.connection_status != ConnectionStatus.CONNECTED: if not self._health_check(): # Соединение недоступно - пропускаем батч без ошибки @@ -460,6 +462,63 @@ class InfluxDBClient: return added_count + def _send_messages_batch(self, messages: List[Dict[str, Any]]) -> int: + """ + Непосредственная отправка батча сообщений в InfluxDB. + + Этот метод вызывается из forwarder loop для реальной отправки данных. + + Args: + messages: Список словарей с данными сообщений + + Returns: + Количество успешно отправленных сообщений + """ + if not self.write_api or not messages: + return 0 + + # Проверяем соединение перед отправкой + if self.connection_status != ConnectionStatus.CONNECTED: + if not self._health_check(): + self.logger.warning("InfluxDB connection not available, skipping batch") + self.failed_count += len(messages) + return 0 + else: + self.connection_status = ConnectionStatus.CONNECTED + + try: + # Создаем точки для InfluxDB + points = [] + for msg in messages: + point = self._create_point(msg) + if point: + points.append(point) + + if not points: + self.logger.warning("No valid points created from messages") + return 0 + + # Отправляем с retry + if self._write_with_retry(points): + sent = len(points) + self.sent_count += sent + self.logger.debug( + f"Sent {sent} messages to InfluxDB", + extra={"batch_size": sent} + ) + return sent + else: + return 0 + + except Exception as e: + self.failed_count += len(messages) + self.logger.error( + f"Failed to send messages batch to InfluxDB: {e}", + exc_info=True, + extra={"batch_size": len(messages)} + ) + return 0 + def _forwarder_loop(self) -> None: """Основной цикл для отправки сообщений в InfluxDB.""" self.logger.info("InfluxDB forwarder loop started") @@ -485,7 +544,8 @@ class InfluxDBClient: if should_flush: if batch: - self.write_messages_batch(batch) + # Отправляем батч напрямую в InfluxDB (не добавляем в очередь!) + self._send_messages_batch(batch) batch = [] last_flush_time = current_time @@ -498,7 +558,7 @@ class InfluxDBClient: # Отправляем оставшиеся сообщения if batch: - self.write_messages_batch(batch) + self._send_messages_batch(batch) self.logger.info( "InfluxDB forwarder loop stopped",