Skip to content

Commit

Permalink
Rate limits processing changed
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Mar 20, 2024
1 parent f8fd507 commit 36e7289
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 49 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
with open(path.join(this_directory, 'README.md')) as f:
long_description = f.read()

VERSION = "1.8.8"
VERSION = "1.8.9"

setup(
version=VERSION,
Expand Down
139 changes: 103 additions & 36 deletions tb_device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import uuid
from collections import deque
from inspect import signature
from time import sleep

import paho.mqtt.client as paho
from math import ceil
import logging
from time import sleep

try:
from time import monotonic as time
except ImportError:
from time import time
import queue
import ssl
from threading import Lock, RLock, Thread, Condition
from enum import Enum

from paho.mqtt.reasoncodes import ReasonCodes
from simplejson import loads, dumps, JSONDecodeError
Expand Down Expand Up @@ -118,6 +120,11 @@ def get_credentials(self):
return self.__credentials


class TBSendMethod(Enum):
SUBSCRIBE = 0
PUBLISH = 1


class TBPublishInfo:
TB_ERR_AGAIN = -1
TB_ERR_SUCCESS = 0
Expand Down Expand Up @@ -178,6 +185,7 @@ def __init__(self, rate_limit):
self.__start_time = time()
self.__config = rate_limit
self.__rate_limit_dict = {}
self.__lock = RLock()
rate_configs = rate_limit.split(";")
if "," in rate_limit:
rate_configs = rate_limit.split(",")
Expand All @@ -192,21 +200,25 @@ def __init__(self, rate_limit):
self.__rate_limit_dict[rate_limit_time]["queue"].maxsize)

def add_counter(self):
for rate_limit_time in self.__rate_limit_dict:
self.__rate_limit_dict[rate_limit_time]["queue"].put(1)
with self.__lock:
for rate_limit_time in self.__rate_limit_dict:
self.__rate_limit_dict[rate_limit_time]["queue"].put(1)

def check_limit_reached(self):
for rate_limit_time in self.__rate_limit_dict:
rate_limit_point_queue = self.__rate_limit_dict[rate_limit_time]["queue"]
if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time < time():
self.__rate_limit_dict[rate_limit_time]["start"] = time()
rate_limit_point_queue = queue.Queue(rate_limit_point_queue.maxsize)
self.__rate_limit_dict[rate_limit_time]["queue"] = rate_limit_point_queue
if rate_limit_point_queue.full():
log.debug("Rate limit exceeded for %s second", rate_limit_time)
log.debug("Queue size: %s", rate_limit_point_queue.qsize())
return True
return False
with self.__lock:
if self.__config == "0:0":
return False
for rate_limit_time in self.__rate_limit_dict:
rate_limit_point_queue = self.__rate_limit_dict[rate_limit_time]["queue"]
if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time < time():
self.__rate_limit_dict[rate_limit_time]["start"] = time()
rate_limit_point_queue = queue.Queue(rate_limit_point_queue.maxsize)
self.__rate_limit_dict[rate_limit_time]["queue"] = rate_limit_point_queue
if rate_limit_point_queue.full():
log.debug("Rate limit exceeded for %s second", rate_limit_time)
log.debug("Queue size: %s", rate_limit_point_queue.qsize())
return True
return False

def get_minimal_limit(self):
minimal_limit = 1000000000
Expand Down Expand Up @@ -288,7 +300,7 @@ def maxsize(self):
class TBDeviceMqttClient:
"""ThingsBoard MQTT client. This class provides interface to send data to ThingsBoard and receive data from"""
def __init__(self, host, port=1883, username=None, password=None, quality_of_service=None, client_id="",
chunk_size=0, rate_limit="8:1;2000:60;30000:3600;"):
chunk_size=0, rate_limit="DEFAULT_RATE_LIMIT"):
self._client = paho.Client(protocol=5, client_id=client_id)
self.quality_of_service = quality_of_service if quality_of_service is not None else 1
self.__host = host
Expand All @@ -309,6 +321,18 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser
self.__device_sub_dict = {}
self.__device_client_rpc_dict = {}
self.__attr_request_number = 0
# rate_limit = rate_limit if rate_limit != "DEFAULT_RATE_LIMIT" else "8:1;2000:60;30000:3600;"
if rate_limit == "DEFAULT_RATE_LIMIT":
if "thingsboard.cloud" in self.__host:
rate_limit = "8:1,450:60,30000:3600,"
elif "tb" in self.__host and "cloud" in self.__host:
rate_limit = "8:1,450:60,30000:3600,"
elif "demo.thingsboard.io" in self.__host:
rate_limit = "8:1,450:60,30000:3600,"
else:
rate_limit = "0:0,"
else:
rate_limit = rate_limit
self.__rate_limit = RateLimit(rate_limit)
self.max_inflight_messages_set(self.__rate_limit.get_minimal_limit())
self.__sending_queue = TBQueue()
Expand Down Expand Up @@ -365,10 +389,10 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params):
if result_code == 0:
self.__is_connected = True
log.info("MQTT client %r - Connected!", client)
self._client.subscribe(ATTRIBUTES_TOPIC, qos=self.quality_of_service)
self._client.subscribe(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service)
self._client.subscribe(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service)
self._client.subscribe(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service)
self._subscribe_to_topic(ATTRIBUTES_TOPIC, qos=self.quality_of_service)
self._subscribe_to_topic(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service)
self._subscribe_to_topic(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service)
self._subscribe_to_topic(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service)
else:
if isinstance(result_code, int):
if result_code in RESULT_CODES:
Expand Down Expand Up @@ -501,11 +525,11 @@ def _on_decoded_message(self, content, message):
if message.topic.startswith("v1/devices/me/attributes"):
self.firmware_info = loads(message.payload)
if "/response/" in message.topic:
self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info,dict) else {}
if (self.firmware_info.get(FW_VERSION_ATTR) is not None and self.firmware_info.get(
FW_VERSION_ATTR) != self.current_firmware_info.get("current_" + FW_VERSION_ATTR)) or \
(self.firmware_info.get(FW_TITLE_ATTR) is not None and self.firmware_info.get(
FW_TITLE_ATTR) != self.current_firmware_info.get("current_" + FW_TITLE_ATTR)):
self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info, dict) else {}
if ((self.firmware_info.get(FW_VERSION_ATTR) is not None
and self.firmware_info.get(FW_VERSION_ATTR) != self.current_firmware_info.get("current_" + FW_VERSION_ATTR))
or (self.firmware_info.get(FW_TITLE_ATTR) is not None
and self.firmware_info.get(FW_TITLE_ATTR) != self.current_firmware_info.get("current_" + FW_TITLE_ATTR))):
log.debug('Firmware is not the same')
self.firmware_data = b''
self.__current_chunk = 0
Expand Down Expand Up @@ -633,16 +657,22 @@ def __sending_thread_main(self):
try:
if not self.is_connected():
continue
if not self.__rate_limit.check_limit_reached():
if (not self.__rate_limit.check_limit_reached()
and self.__rate_limit.get_minimal_limit() > len(self._client._out_packet)):
if not self.__sending_queue.empty():
item = self.__sending_queue.get(False)
if item is not None:
info = self._client.publish(item["topic"], item["data"], qos=item["qos"])
if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc:
self.__sending_queue.put_left(item, True)
continue
self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT}
self.__rate_limit.add_counter()
if item["method"] == TBSendMethod.PUBLISH:
info = self._client.publish(item["topic"], item["data"], qos=item["qos"])
if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc:
self.__sending_queue.put_left(item, True)
continue
self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT}
self.__rate_limit.add_counter()
elif item["method"] == TBSendMethod.SUBSCRIBE:
result = self._client.subscribe(item["topic"], qos=item["qos"])
self.__responses[item['id']] = {"info": result, "timeout_ts": int(time()) + DEFAULT_TIMEOUT}
self.__rate_limit.add_counter()
else:
sleep(0.1)
except Exception as e:
Expand All @@ -657,15 +687,50 @@ def __housekeeping_thread_main(self):
for req_id in list(self.__responses.keys()):
if int(time()) > self.__responses[req_id]["timeout_ts"]:
try:
if req_id in self.__responses and self.__responses[req_id]["info"].is_published():
if (req_id in self.__responses
and ((self.__responses[req_id]["method"] == TBSendMethod.PUBLISH
and self.__responses[req_id]["info"].is_published())
or (self.__responses[req_id]["method"] == TBSendMethod.SUBSCRIBE))):
self.__responses.pop(req_id)
except (KeyError, AttributeError):
pass
except (Exception, RuntimeError, ValueError) as e:
log.exception("Error during housekeeping sent messages:", exc_info=e)
pass
# log.debug("Error during housekeeping sent messages:", exc_info=e)
# log.debug("Timeout occurred while waiting for a reply from ThingsBoard!")
sleep(0.1)

def _subscribe_to_topic(self, topic, callback=None, qos=None, wait_for_result=False, timeout=DEFAULT_TIMEOUT):
if qos is None:
qos = self.quality_of_service
req_id = uuid.uuid4()
self.__sending_queue.put_left({"topic": topic, "qos": qos, "callback": callback, "id": req_id,
"method": TBSendMethod.SUBSCRIBE}, True)
sending_queue_size = self.__sending_queue.qsize()
if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5:
self.__sending_queue_warning_published = int(time())
log.warning("Sending queue is bigger than 1000000 messages (%r), consider increasing the rate limit, "
"or decreasing the amount of messages sent!", sending_queue_size)

waiting_for_connection_message_time = 0
while not self.is_connected():
if self.stopped:
return -1, 128
if time() - waiting_for_connection_message_time > 10.0:
log.warning("Waiting for connection to be established before subscribing for data on ThingsBoard!")
waiting_for_connection_message_time = time()
sleep(0.1)

start_time = int(time())
if wait_for_result:
while req_id not in list(self.__responses.keys()):
if 0 < timeout < int(time()) - start_time:
log.error("Timeout while waiting for a subscribe to ThingsBoard!")
return -1, 128
sleep(0.1)

return self.__responses.pop(req_id)["info"]

def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=False, timeout=DEFAULT_TIMEOUT):
data = dumps(data)
if qos is None:
Expand All @@ -675,9 +740,11 @@ def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=F
raise TBQoSException("Quality of service (qos) value must be 0 or 1")
req_id = uuid.uuid4()
if high_priority:
self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": req_id}, False)
self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": req_id,
"method": TBSendMethod.PUBLISH}, False)
else:
self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": req_id}, False)
self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": req_id,
"method": TBSendMethod.PUBLISH}, False)
sending_queue_size = self.__sending_queue.qsize()
if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5:
self.__sending_queue_warning_published = int(time())
Expand Down
39 changes: 27 additions & 12 deletions tb_gateway_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
#

import logging
try:
from time import monotonic as time
except ImportError:
from time import time
import time
from tb_device_mqtt import TBDeviceMqttClient

GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes"
Expand All @@ -37,7 +34,7 @@ class TBGatewayAPI:

class TBGatewayMqttClient(TBDeviceMqttClient):
def __init__(self, host, port=1883, username=None, password=None, gateway=None, quality_of_service=1, client_id="",
rate_limit="8:1;400:60;24000:3600;"):
rate_limit="DEFAULT_RATE_LIMIT"):
super().__init__(host, port, username, password, quality_of_service, client_id, rate_limit)
self.quality_of_service = quality_of_service
self.__max_sub_id = 0
Expand All @@ -54,12 +51,30 @@ def __init__(self, host, port=1883, username=None, password=None, gateway=None,
def _on_connect(self, client, userdata, flags, result_code, *extra_params):
super()._on_connect(client, userdata, flags, result_code, *extra_params)
if result_code == 0:
gateway_attributes_topic_sub_id = int(self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1])
self._gw_subscriptions[gateway_attributes_topic_sub_id] = GATEWAY_ATTRIBUTES_TOPIC
gateway_attributes_resp_sub_id = int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, qos=1)[1])
self._gw_subscriptions[gateway_attributes_resp_sub_id] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC
gateway_rpc_topic_sub_id = int(self._client.subscribe(GATEWAY_RPC_TOPIC, qos=1)[1])
self._gw_subscriptions[gateway_rpc_topic_sub_id] = GATEWAY_RPC_TOPIC
gateway_attributes_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_TOPIC, qos=1,
wait_for_result=True)[1])
if gateway_attributes_topic_sub_id == 128:
log.error("Service subscription to topic %s - failed.", GATEWAY_ATTRIBUTES_TOPIC)
if gateway_attributes_topic_sub_id in self._gw_subscriptions:
del self._gw_subscriptions[gateway_attributes_topic_sub_id]
else:
self._gw_subscriptions[gateway_attributes_topic_sub_id] = GATEWAY_ATTRIBUTES_TOPIC
gateway_attributes_resp_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, qos=1,
wait_for_result=True)[1])
if gateway_attributes_resp_sub_id == 128:
log.error("Service subscription to topic %s - failed.", GATEWAY_ATTRIBUTES_RESPONSE_TOPIC)
if gateway_attributes_resp_sub_id in self._gw_subscriptions:
del self._gw_subscriptions[gateway_attributes_resp_sub_id]
else:
self._gw_subscriptions[gateway_attributes_resp_sub_id] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC
gateway_rpc_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_RPC_TOPIC, qos=1,
wait_for_result=True)[1])
if gateway_rpc_topic_sub_id == 128:
log.error("Service subscription to topic %s - failed.", GATEWAY_RPC_TOPIC)
if gateway_rpc_topic_sub_id in self._gw_subscriptions:
del self._gw_subscriptions[gateway_rpc_topic_sub_id]
else:
self._gw_subscriptions[gateway_rpc_topic_sub_id] = GATEWAY_RPC_TOPIC
# gateway_rpc_topic_response_sub_id = int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC)[1])
# self._gw_subscriptions[gateway_rpc_topic_response_sub_id] = GATEWAY_RPC_RESPONSE_TOPIC

Expand Down Expand Up @@ -125,7 +140,7 @@ def __request_attributes(self, device, keys, callback, type_is_client=False):
log.error("There are no keys to request")
return False

ts_in_millis = int(round(time() * 1000))
ts_in_millis = int(round(time.time() * 1000))
attr_request_number = self._add_attr_request_callback(callback)
msg = {"keys": keys,
"device": device,
Expand Down

0 comments on commit 36e7289

Please sign in to comment.