fix segment fault on influxdb handler on end
This commit is contained in:
@@ -520,14 +520,16 @@ class InfluxDBClient:
|
|||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
# Запускаем forwarder поток
|
# Запускаем forwarder поток
|
||||||
|
# НЕ используем daemon=True для корректного завершения
|
||||||
self.forwarder_thread = threading.Thread(
|
self.forwarder_thread = threading.Thread(
|
||||||
target=self._forwarder_loop,
|
target=self._forwarder_loop,
|
||||||
name="InfluxDB-Forwarder",
|
name="InfluxDB-Forwarder",
|
||||||
daemon=True
|
daemon=False
|
||||||
)
|
)
|
||||||
self.forwarder_thread.start()
|
self.forwarder_thread.start()
|
||||||
|
|
||||||
# Запускаем health check поток
|
# Запускаем health check поток
|
||||||
|
# Health check может быть daemon, так как он не критичен при shutdown
|
||||||
self.health_check_thread = threading.Thread(
|
self.health_check_thread = threading.Thread(
|
||||||
target=self._health_check_loop,
|
target=self._health_check_loop,
|
||||||
name="InfluxDB-HealthCheck",
|
name="InfluxDB-HealthCheck",
|
||||||
@@ -545,6 +547,18 @@ class InfluxDBClient:
|
|||||||
self.logger.info("Stopping InfluxDB forwarder...")
|
self.logger.info("Stopping InfluxDB forwarder...")
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
|
# Даем время на обработку оставшихся сообщений в очереди
|
||||||
|
max_wait_time = 5.0
|
||||||
|
wait_start = time.time()
|
||||||
|
while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time:
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
if not self.message_queue.empty():
|
||||||
|
remaining = self.message_queue.qsize()
|
||||||
|
self.logger.warning(
|
||||||
|
f"InfluxDB queue not empty after shutdown, {remaining} messages remaining"
|
||||||
|
)
|
||||||
|
|
||||||
# Ждем завершения потоков
|
# Ждем завершения потоков
|
||||||
if self.forwarder_thread and self.forwarder_thread.is_alive():
|
if self.forwarder_thread and self.forwarder_thread.is_alive():
|
||||||
self.forwarder_thread.join(timeout=10.0)
|
self.forwarder_thread.join(timeout=10.0)
|
||||||
@@ -552,20 +566,23 @@ class InfluxDBClient:
|
|||||||
self.logger.warning("Forwarder thread did not stop gracefully")
|
self.logger.warning("Forwarder thread did not stop gracefully")
|
||||||
|
|
||||||
if self.health_check_thread and self.health_check_thread.is_alive():
|
if self.health_check_thread and self.health_check_thread.is_alive():
|
||||||
self.health_check_thread.join(timeout=5.0)
|
self.health_check_thread.join(timeout=2.0)
|
||||||
if self.health_check_thread.is_alive():
|
if self.health_check_thread.is_alive():
|
||||||
self.logger.warning("Health check thread did not stop gracefully")
|
self.logger.warning("Health check thread did not stop gracefully")
|
||||||
|
|
||||||
# Закрываем write API и клиент
|
# Закрываем write API и клиент
|
||||||
|
# Важно: закрываем в правильном порядке
|
||||||
if self.write_api:
|
if self.write_api:
|
||||||
try:
|
try:
|
||||||
self.write_api.close()
|
self.write_api.close()
|
||||||
|
self.write_api = None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error closing write API: {e}", exc_info=True)
|
self.logger.error(f"Error closing write API: {e}", exc_info=True)
|
||||||
|
|
||||||
if self.client:
|
if self.client:
|
||||||
try:
|
try:
|
||||||
self.client.close()
|
self.client.close()
|
||||||
|
self.client = None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True)
|
self.logger.error(f"Error closing InfluxDB client: {e}", exc_info=True)
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,13 @@ def signal_handler(sig, frame):
|
|||||||
"""Обработчик сигналов для graceful shutdown."""
|
"""Обработчик сигналов для graceful shutdown."""
|
||||||
logger.info("Received shutdown signal, stopping gracefully...")
|
logger.info("Received shutdown signal, stopping gracefully...")
|
||||||
if sniffer:
|
if sniffer:
|
||||||
sniffer.stop()
|
try:
|
||||||
|
sniffer.stop()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error during shutdown: {e}", exc_info=True)
|
||||||
|
# Даем время на завершение потоков перед выходом
|
||||||
|
import time
|
||||||
|
time.sleep(0.5)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -184,11 +184,14 @@ class MessageProcessor:
|
|||||||
last_flush_time = time.time()
|
last_flush_time = time.time()
|
||||||
flush_interval = 5.0 # Периодический flush обработчиков
|
flush_interval = 5.0 # Периодический flush обработчиков
|
||||||
|
|
||||||
|
# Обрабатываем сообщения пока очередь не пуста или пока running=True
|
||||||
while self.running or not self.message_queue.empty():
|
while self.running or not self.message_queue.empty():
|
||||||
try:
|
try:
|
||||||
# Получаем сообщение из очереди с таймаутом
|
# Получаем сообщение из очереди с таймаутом
|
||||||
|
# Используем меньший таймаут при shutdown для быстрого завершения
|
||||||
|
timeout = batch_interval if self.running else 0.1
|
||||||
try:
|
try:
|
||||||
frame = self.message_queue.get(timeout=batch_interval)
|
frame = self.message_queue.get(timeout=timeout)
|
||||||
batch.append(frame)
|
batch.append(frame)
|
||||||
except Empty:
|
except Empty:
|
||||||
# Если очередь пуста, обрабатываем накопленный батч
|
# Если очередь пуста, обрабатываем накопленный батч
|
||||||
@@ -196,6 +199,9 @@ class MessageProcessor:
|
|||||||
self._process_batch(batch)
|
self._process_batch(batch)
|
||||||
batch = []
|
batch = []
|
||||||
last_batch_time = time.time()
|
last_batch_time = time.time()
|
||||||
|
# Если shutdown и очередь пуста - выходим
|
||||||
|
if not self.running and self.message_queue.empty():
|
||||||
|
break
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Обрабатываем батч если он заполнен или прошло достаточно времени
|
# Обрабатываем батч если он заполнен или прошло достаточно времени
|
||||||
@@ -323,10 +329,11 @@ class MessageProcessor:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Запускаем поток обработки сообщений
|
# Запускаем поток обработки сообщений
|
||||||
|
# НЕ используем daemon=True, чтобы поток мог корректно завершиться
|
||||||
self.processing_thread = threading.Thread(
|
self.processing_thread = threading.Thread(
|
||||||
target=self._processing_loop,
|
target=self._processing_loop,
|
||||||
name="MessageProcessor",
|
name="MessageProcessor",
|
||||||
daemon=True
|
daemon=False
|
||||||
)
|
)
|
||||||
self.processing_thread.start()
|
self.processing_thread.start()
|
||||||
self.logger.info("Message processor started")
|
self.logger.info("Message processor started")
|
||||||
@@ -336,6 +343,19 @@ class MessageProcessor:
|
|||||||
self.logger.info("Shutting down message processor...")
|
self.logger.info("Shutting down message processor...")
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
|
# Даем время на обработку оставшихся сообщений
|
||||||
|
# Ждем пока очередь не опустеет или не пройдет таймаут
|
||||||
|
max_wait_time = 10.0
|
||||||
|
wait_start = time.time()
|
||||||
|
while not self.message_queue.empty() and (time.time() - wait_start) < max_wait_time:
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
if not self.message_queue.empty():
|
||||||
|
remaining = self.message_queue.qsize()
|
||||||
|
self.logger.warning(
|
||||||
|
f"Queue not empty after shutdown signal, {remaining} messages remaining"
|
||||||
|
)
|
||||||
|
|
||||||
# Ждем завершения потока обработки
|
# Ждем завершения потока обработки
|
||||||
if self.processing_thread and self.processing_thread.is_alive():
|
if self.processing_thread and self.processing_thread.is_alive():
|
||||||
self.processing_thread.join(timeout=5.0)
|
self.processing_thread.join(timeout=5.0)
|
||||||
|
|||||||
@@ -189,10 +189,11 @@ class CANBusHandler:
|
|||||||
return
|
return
|
||||||
|
|
||||||
self.running = True
|
self.running = True
|
||||||
|
# НЕ используем daemon=True для корректного завершения
|
||||||
self.thread = threading.Thread(
|
self.thread = threading.Thread(
|
||||||
target=self._read_loop,
|
target=self._read_loop,
|
||||||
name=f"CAN-{self.interface}",
|
name=f"CAN-{self.interface}",
|
||||||
daemon=True
|
daemon=False
|
||||||
)
|
)
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
self.logger.info(f"Started reading from {self.interface}")
|
self.logger.info(f"Started reading from {self.interface}")
|
||||||
|
|||||||
Reference in New Issue
Block a user