From a32e4dd40ee3f234e5a73973af1719250b5042da Mon Sep 17 00:00:00 2001 From: imbeacon Date: Mon, 14 Oct 2024 07:26:36 +0300 Subject: [PATCH] Updated splitted messages processing to avoid queue size error --- tb_device_mqtt.py | 91 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/tb_device_mqtt.py b/tb_device_mqtt.py index 5000710..cf5716b 100644 --- a/tb_device_mqtt.py +++ b/tb_device_mqtt.py @@ -29,6 +29,7 @@ from enum import Enum from paho.mqtt.reasoncodes import ReasonCodes +from paho.mqtt.client import MQTT_ERR_QUEUE_SIZE from orjson import dumps, loads, JSONDecodeError @@ -148,8 +149,13 @@ def mid(self): def get(self): if isinstance(self.message_info, list): - for info in self.message_info: - info.wait_for_publish(timeout=1) + try: + for info in self.message_info: + info.wait_for_publish(timeout=1) + except Exception as e: + global log + log = logging.getLogger('tb_connection') + log.error("Error while waiting for publish: %s", e) else: self.message_info.wait_for_publish(timeout=1) return self.rc() @@ -345,6 +351,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser self.__device_sub_dict = {} self.__device_client_rpc_dict = {} self.__attr_request_number = 0 + self.__error_logged = 0 self.max_payload_size = max_payload_size self.service_configuration_callback = self.on_service_configuration telemetry_rate_limit, telemetry_dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host, @@ -357,7 +364,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser self._telemetry_dp_rate_limit = RateLimit(telemetry_dp_rate_limit, "Rate limit for telemetry data points") self.max_inflight_messages_set(self._telemetry_rate_limit.get_minimal_limit()) self.__attrs_request_timeout = {} - self.__timeout_thread = Thread(target=self.__timeout_check) + self.__timeout_thread = Thread(target=self.__timeout_check, name="Timeout check thread") self.__timeout_thread.daemon = True self.__timeout_thread.start() self._client.on_connect = self._on_connect @@ -671,6 +678,8 @@ def request_service_configuration(self, callback): self.send_rpc_call("getSessionLimits", {"timeout": 5000}, callback) def on_service_configuration(self, _, response, *args, **kwargs): + global log + log = logging.getLogger('tb_connection') if "error" in response: log.warning("Timeout while waiting for service configuration!, session will use default configuration.") self.rate_limits_received = True @@ -694,6 +703,7 @@ def on_service_configuration(self, _, response, *args, **kwargs): self._telemetry_rate_limit.get_minimal_limit(), service_config.get('maxInflightMessages', 100)) * 80 / 100) self.max_inflight_messages_set(max_inflight_messages) + self.max_queued_messages_set(max_inflight_messages) if service_config.get('maxPayloadSize'): self.max_payload_size = int(int(service_config.get('maxPayloadSize')) * 80 / 100) log.info("Service configuration was successfully retrieved and applied.") @@ -744,19 +754,30 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim if waited: log.debug("Rate limit released, sending data to ThingsBoard...") - def wait_until_current_queued_messages_processed(self): + def _wait_until_current_queued_messages_processed(self): previous_notification_time = 0 current_out_messages = len(self._client._out_messages) * 2 inflight_messages = self._client._max_inflight_messages or 5 + logger = None + waiting_started = int(monotonic()) + connection_was_lost = False + timeout_for_break = 600 + if current_out_messages > 0: while current_out_messages >= inflight_messages and not self.stopped: current_out_messages = len(self._client._out_messages) if int(monotonic()) - previous_notification_time > 5 and current_out_messages > inflight_messages: - log.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r", + if logger is None: + logger = logging.getLogger('tb_connection') + logger.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r", current_out_messages, inflight_messages) previous_notification_time = int(monotonic()) + if not self.is_connected(): + connection_was_lost = True if current_out_messages >= inflight_messages: sleep(.001) + if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost: + break def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None, msg_rate_limit=None, dp_rate_limit=None): @@ -834,32 +855,44 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate for part in split_messages: if not part: continue - dp_rate_limit.increase_rate_limit_counter(part['datapoints']) - rate_limited = self._wait_for_rate_limit_released(timeout, - message_rate_limit=msg_rate_limit, - dp_rate_limit=dp_rate_limit, - amount=part['datapoints']) - if rate_limited: - return rate_limited - msg_rate_limit.increase_rate_limit_counter() - kwargs["payload"] = dumps(part['message']) - self.wait_until_current_queued_messages_processed() - if not self.stopped: - if device is not None: - log.debug("Device: %s, Sending message to topic: %s ", device, topic) - if part['datapoints'] > 0: - log.debug("Sending message with %i datapoints", part['datapoints']) - log.debug("Message payload: %r", kwargs["payload"]) - log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__) - log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__) - else: - log.debug("Sending message with %r", kwargs["payload"]) - log.debug("Message payload: %r", kwargs["payload"]) - log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__) - log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__) - results.append(self._client.publish(**kwargs)) + self.__send_split_message(results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit, topic) return TBPublishInfo(results) + def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit, + topic): + dp_rate_limit.increase_rate_limit_counter(part['datapoints']) + rate_limited = self._wait_for_rate_limit_released(timeout, + message_rate_limit=msg_rate_limit, + dp_rate_limit=dp_rate_limit, + amount=part['datapoints']) + if rate_limited: + return rate_limited + msg_rate_limit.increase_rate_limit_counter() + kwargs["payload"] = dumps(part['message']) + self._wait_until_current_queued_messages_processed() + if not self.stopped: + if device is not None: + log.debug("Device: %s, Sending message to topic: %s ", device, topic) + if part['datapoints'] > 0: + log.debug("Sending message with %i datapoints", part['datapoints']) + log.debug("Message payload: %r", kwargs["payload"]) + log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__) + log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__) + else: + log.debug("Sending message with %r", kwargs["payload"]) + log.debug("Message payload: %r", kwargs["payload"]) + log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__) + log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__) + result = self._client.publish(**kwargs) + if result.rc == MQTT_ERR_QUEUE_SIZE: + while not self.stopped and result.rc == MQTT_ERR_QUEUE_SIZE: + if int(monotonic()) - self.__error_logged > 10: + log.warning("Queue size exceeded, waiting for messages to be processed by paho client.") + self.__error_logged = int(monotonic()) + sleep(.01) # Give some time for paho to process messages + result = self._client.publish(**kwargs) + results.append(result) + def _subscribe_to_topic(self, topic, qos=None, timeout=DEFAULT_TIMEOUT): if qos is None: qos = self.quality_of_service