Skip to content

Commit

Permalink
Improved processing with configured rate limits
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Oct 11, 2024
1 parent c184c05 commit bd80281
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions tb_device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,12 @@ def _on_message(self, client, userdata, message):

def _on_decoded_message(self, content, message):
if message.topic.startswith(RPC_REQUEST_TOPIC):
self._messages_rate_limit.increase_rate_limit_counter()
request_id = message.topic[len(RPC_REQUEST_TOPIC):len(message.topic)]
if self.__device_on_server_side_rpc_response:
self.__device_on_server_side_rpc_response(request_id, content)
elif message.topic.startswith(RPC_RESPONSE_TOPIC):
self._messages_rate_limit.increase_rate_limit_counter()
with self._lock:
request_id = int(message.topic[len(RPC_RESPONSE_TOPIC):len(message.topic)])
if self.__device_client_rpc_dict.get(request_id):
Expand All @@ -504,6 +506,7 @@ def _on_decoded_message(self, content, message):
if callback is not None:
callback(request_id, content, None)
elif message.topic == ATTRIBUTES_TOPIC:
self._messages_rate_limit.increase_rate_limit_counter()
dict_results = []
with self._lock:
# callbacks for everything
Expand All @@ -524,6 +527,7 @@ def _on_decoded_message(self, content, message):
for res in dict_results:
res(content, None)
elif message.topic.startswith(ATTRIBUTES_TOPIC_RESPONSE):
self._messages_rate_limit.increase_rate_limit_counter()
with self._lock:
req_id = int(message.topic[len(ATTRIBUTES_TOPIC + "/response/"):])
# pop callback and use it
Expand All @@ -537,6 +541,7 @@ def _on_decoded_message(self, content, message):
callback(content, None)

if message.topic.startswith("v1/devices/me/attributes"):
self._messages_rate_limit.increase_rate_limit_counter()
self.firmware_info = loads(message.payload)
if "/response/" in message.topic:
self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info, dict) else {}
Expand Down Expand Up @@ -734,7 +739,8 @@ def _wait_for_rate_limit_released(self, timeout, message_rate_limit, dp_rate_lim
else:
log.debug("Waiting for rate limit to be released...")
log_posted = True
sleep(.01)
if limit_reached_check:
sleep(.005)
if waited:
log.debug("Rate limit released, sending data to ThingsBoard...")

Expand All @@ -749,7 +755,8 @@ def wait_until_current_queued_messages_processed(self):
log.debug("Waiting for messages to be processed by paho client, current queue size - %r, max inflight messages: %r",
current_out_messages, inflight_messages)
previous_notification_time = int(monotonic())
sleep(.001)
if current_out_messages >= inflight_messages:
sleep(.001)

def _send_request(self, _type, kwargs, timeout=DEFAULT_TIMEOUT, device=None,
msg_rate_limit=None, dp_rate_limit=None):
Expand Down

0 comments on commit bd80281

Please sign in to comment.