diff --git a/can_sniffer/src/influxdb_handler/influxdb_client.py b/can_sniffer/src/influxdb_handler/influxdb_client.py index 92207a1..1290e6c 100644 --- a/can_sniffer/src/influxdb_handler/influxdb_client.py +++ b/can_sniffer/src/influxdb_handler/influxdb_client.py @@ -520,14 +520,16 @@ class InfluxDBClient: self.running = True # Запускаем forwarder поток + # НЕ используем daemon=True для корректного завершения self.forwarder_thread = threading.Thread( target=self._forwarder_loop, name="InfluxDB-Forwarder", - daemon=True + daemon=False ) self.forwarder_thread.start() # Запускаем health check поток + # Health check может быть daemon, так как он не критичен при shutdown self.health_check_thread = threading.Thread( target=self._health_check_loop, name="InfluxDB-HealthCheck", @@ -545,6 +547,18 @@ class InfluxDBClient: self.logger.info("Stopping InfluxDB forwarder...") self.running = False + # Даем время на обработку оставшихся сообщений в очереди + max_wait_time = 5.0 + wait_start = time.time() + while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time: + time.sleep(0.1) + + if not self.message_queue.empty(): + remaining = self.message_queue.qsize() + self.logger.warning( + f"InfluxDB queue not empty after shutdown, {remaining} messages remaining" + ) + # Ждем завершения потоков if self.forwarder_thread and self.forwarder_thread.is_alive(): self.forwarder_thread.join(timeout=10.0) @@ -552,20 +566,23 @@ class InfluxDBClient: self.logger.warning("Forwarder thread did not stop gracefully") if self.health_check_thread and self.health_check_thread.is_alive(): - self.health_check_thread.join(timeout=5.0) + self.health_check_thread.join(timeout=2.0) if self.health_check_thread.is_alive(): self.logger.warning("Health check thread did not stop gracefully") # Закрываем write API и клиент + # Важно: закрываем в правильном порядке if self.write_api: try: self.write_api.close() + self.write_api = None except Exception as e: self.logger.error(f"Error closing write API: {e}", exc_info=True) if self.client: try: self.client.close() + self.client = None except Exception as e: self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True) diff --git a/can_sniffer/src/main.py b/can_sniffer/src/main.py index 691b7be..52e0de7 100644 --- a/can_sniffer/src/main.py +++ b/can_sniffer/src/main.py @@ -23,7 +23,13 @@ def signal_handler(sig, frame): """Обработчик сигналов для graceful shutdown.""" logger.info("Received shutdown signal, stopping gracefully...") if sniffer: - sniffer.stop() + try: + sniffer.stop() + except Exception as e: + logger.error(f"Error during shutdown: {e}", exc_info=True) + # Даем время на завершение потоков перед выходом + import time + time.sleep(0.5) sys.exit(0) diff --git a/can_sniffer/src/socket_can/message_processor.py b/can_sniffer/src/socket_can/message_processor.py index 8264777..f15c186 100644 --- a/can_sniffer/src/socket_can/message_processor.py +++ b/can_sniffer/src/socket_can/message_processor.py @@ -184,11 +184,14 @@ class MessageProcessor: last_flush_time = time.time() flush_interval = 5.0 # Периодический flush обработчиков + # Обрабатываем сообщения пока очередь не пуста или пока running=True while self.running or not self.message_queue.empty(): try: # Получаем сообщение из очереди с таймаутом + # Используем меньший таймаут при shutdown для быстрого завершения + timeout = batch_interval if self.running else 0.1 try: - frame = self.message_queue.get(timeout=batch_interval) + frame = self.message_queue.get(timeout=timeout) batch.append(frame) except Empty: # Если очередь пуста, обрабатываем накопленный батч @@ -196,6 +199,9 @@ class MessageProcessor: self._process_batch(batch) batch = [] last_batch_time = time.time() + # Если shutdown и очередь пуста - выходим + if not self.running and self.message_queue.empty(): + break continue # Обрабатываем батч если он заполнен или прошло достаточно времени @@ -323,10 +329,11 @@ class MessageProcessor: ) # Запускаем поток обработки сообщений + # НЕ используем daemon=True, чтобы поток мог корректно завершиться self.processing_thread = threading.Thread( target=self._processing_loop, name="MessageProcessor", - daemon=True + daemon=False ) self.processing_thread.start() self.logger.info("Message processor started") @@ -336,6 +343,19 @@ class MessageProcessor: self.logger.info("Shutting down message processor...") self.running = False + # Даем время на обработку оставшихся сообщений + # Ждем пока очередь не опустеет или не пройдет таймаут + max_wait_time = 10.0 + wait_start = time.time() + while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time: + time.sleep(0.1) + + if not self.message_queue.empty(): + remaining = self.message_queue.qsize() + self.logger.warning( + f"Queue not empty after shutdown signal, {remaining} messages remaining" + ) + # Ждем завершения потока обработки if self.processing_thread and self.processing_thread.is_alive(): self.processing_thread.join(timeout=5.0) diff --git a/can_sniffer/src/socket_can/src.py b/can_sniffer/src/socket_can/src.py index 925e2f2..f048f6e 100644 --- a/can_sniffer/src/socket_can/src.py +++ b/can_sniffer/src/socket_can/src.py @@ -189,10 +189,11 @@ class CANBusHandler: return self.running = True + # НЕ используем daemon=True для корректного завершения self.thread = threading.Thread( target=self._read_loop, name=f"CAN-{self.interface}", - daemon=True + daemon=False ) self.thread.start() self.logger.info(f"Started reading from {self.interface}")