Skip to content

Commit

Permalink
Updated splitted messages processing to avoid queue size error
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Oct 14, 2024
1 parent bd80281 commit a32e4dd
Showing 1 changed file with 62 additions and 29 deletions.
91 changes: 62 additions & 29 deletions tb_device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from enum import Enum

from paho.mqtt.reasoncodes import ReasonCodes
from paho.mqtt.client import MQTT_ERR_QUEUE_SIZE

from orjson import dumps, loads, JSONDecodeError

Expand Down Expand Up @@ -148,8 +149,13 @@ def mid(self):

def get(self):
if isinstance(self.message_info, list):
for info in self.message_info:
info.wait_for_publish(timeout=1)
try:
for info in self.message_info:
info.wait_for_publish(timeout=1)
except Exception as e:
global log
log = logging.getLogger('tb_connection')
log.error("Error while waiting for publish: %s", e)
else:
self.message_info.wait_for_publish(timeout=1)
return self.rc()
Expand Down Expand Up @@ -345,6 +351,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
self.__device_sub_dict = {}
self.__device_client_rpc_dict = {}
self.__attr_request_number = 0
self.__error_logged = 0
self.max_payload_size = max_payload_size
self.service_configuration_callback = self.on_service_configuration
telemetry_rate_limit, telemetry_dp_rate_limit = RateLimit.get_rate_limits_by_host(self.__host,
Expand All @@ -357,7 +364,7 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
self._telemetry_dp_rate_limit = RateLimit(telemetry_dp_rate_limit, "Rate limit for telemetry data points")
self.max_inflight_messages_set(self._telemetry_rate_limit.get_minimal_limit())
self.__attrs_request_timeout = {}
self.__timeout_thread = Thread(target=self.__timeout_check)
self.__timeout_thread = Thread(target=self.__timeout_check, name="Timeout check thread")
self.__timeout_thread.daemon = True
self.__timeout_thread.start()
self._client.on_connect = self._on_connect
Expand Down Expand Up @@ -671,6 +678,8 @@ def request_service_configuration(self, callback):
self.send_rpc_call("getSessionLimits", {"timeout": 5000}, callback)

def on_service_configuration(self, _, response, *args, **kwargs):
global log
log = logging.getLogger('tb_connection')
if "error" in response:
log.warning("Timeout while waiting for service configuration!, session will use default configuration.")
self.rate_limits_received = True
Expand All @@ -694,6 +703,7 @@ def on_service_configuration(self, _, response, *args, **kwargs):
self._telemetry_rate_limit.get_minimal_limit(),
service_config.get('maxInflightMessages', 100)) * 80 / 100)
self.max_inflight_messages_set(max_inflight_messages)
self.max_queued_messages_set(max_inflight_messages)
if service_config.get('maxPayloadSize'):
self.max_payload_size = int(int(service_config.get('maxPayloadSize')) * 80 / 100)
log.info("Service configuration was successfully retrieved and applied.")
Expand Down Expand Up @@ -744,19 +754,30 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
if waited:
log.debug("Rate limit released, sending data to ThingsBoard...")

def wait_until_current_queued_messages_processed(self):
def _wait_until_current_queued_messages_processed(self):
previous_notification_time = 0
current_out_messages = len(self._client._out_messages) * 2
inflight_messages = self._client._max_inflight_messages or 5
logger = None
waiting_started = int(monotonic())
connection_was_lost = False
timeout_for_break = 600

if current_out_messages > 0:
while current_out_messages >= inflight_messages and not self.stopped:
current_out_messages = len(self._client._out_messages)
if int(monotonic()) - previous_notification_time > 5 and current_out_messages > inflight_messages:
log.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r",
if logger is None:
logger = logging.getLogger('tb_connection')
logger.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r",
current_out_messages, inflight_messages)
previous_notification_time = int(monotonic())
if not self.is_connected():
connection_was_lost = True
if current_out_messages >= inflight_messages:
sleep(.001)
if int(monotonic()) - waiting_started > timeout_for_break and not connection_was_lost:
break

def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
msg_rate_limit=None, dp_rate_limit=None):
Expand Down Expand Up @@ -834,32 +855,44 @@ def __send_publish_with_limitations(self, kwargs, timeout, device=None, msg_rate
for part in split_messages:
if not part:
continue
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
rate_limited = self._wait_for_rate_limit_released(timeout,
message_rate_limit=msg_rate_limit,
dp_rate_limit=dp_rate_limit,
amount=part['datapoints'])
if rate_limited:
return rate_limited
msg_rate_limit.increase_rate_limit_counter()
kwargs["payload"] = dumps(part['message'])
self.wait_until_current_queued_messages_processed()
if not self.stopped:
if device is not None:
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
if part['datapoints'] > 0:
log.debug("Sending message with %i datapoints", part['datapoints'])
log.debug("Message payload: %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
else:
log.debug("Sending message with %r", kwargs["payload"])
log.debug("Message payload: %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
results.append(self._client.publish(**kwargs))
self.__send_split_message(results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit, topic)
return TBPublishInfo(results)

def __send_split_message(self, results, part, kwargs, timeout, device, msg_rate_limit, dp_rate_limit,
topic):
dp_rate_limit.increase_rate_limit_counter(part['datapoints'])
rate_limited = self._wait_for_rate_limit_released(timeout,
message_rate_limit=msg_rate_limit,
dp_rate_limit=dp_rate_limit,
amount=part['datapoints'])
if rate_limited:
return rate_limited
msg_rate_limit.increase_rate_limit_counter()
kwargs["payload"] = dumps(part['message'])
self._wait_until_current_queued_messages_processed()
if not self.stopped:
if device is not None:
log.debug("Device: %s, Sending message to topic: %s ", device, topic)
if part['datapoints'] > 0:
log.debug("Sending message with %i datapoints", part['datapoints'])
log.debug("Message payload: %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
else:
log.debug("Sending message with %r", kwargs["payload"])
log.debug("Message payload: %r", kwargs["payload"])
log.debug("Rate limits after sending message: %r", msg_rate_limit.__dict__)
log.debug("Data points rate limits after sending message: %r", dp_rate_limit.__dict__)
result = self._client.publish(**kwargs)
if result.rc == MQTT_ERR_QUEUE_SIZE:
while not self.stopped and result.rc == MQTT_ERR_QUEUE_SIZE:
if int(monotonic()) - self.__error_logged > 10:
log.warning("Queue size exceeded, waiting for messages to be processed by paho client.")
self.__error_logged = int(monotonic())
sleep(.01) # Give some time for paho to process messages
result = self._client.publish(**kwargs)
results.append(result)

def _subscribe_to_topic(self, topic, qos=None, timeout=DEFAULT_TIMEOUT):
if qos is None:
qos = self.quality_of_service
Expand Down

0 comments on commit a32e4dd

Please sign in to comment.