diff --git a/setup.py b/setup.py index 4643f6c..60b2419 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ with open(path.join(this_directory, 'README.md')) as f: long_description = f.read() -VERSION = "1.8.8" +VERSION = "1.8.9" setup( version=VERSION, diff --git a/tb_device_mqtt.py b/tb_device_mqtt.py index 9a08acd..f7ce6a0 100644 --- a/tb_device_mqtt.py +++ b/tb_device_mqtt.py @@ -11,14 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import uuid from collections import deque from inspect import signature +from time import sleep import paho.mqtt.client as paho from math import ceil -import logging -from time import sleep + try: from time import monotonic as time except ImportError: @@ -26,6 +27,7 @@ import queue import ssl from threading import Lock, RLock, Thread, Condition +from enum import Enum from paho.mqtt.reasoncodes import ReasonCodes from simplejson import loads, dumps, JSONDecodeError @@ -118,6 +120,11 @@ def get_credentials(self): return self.__credentials +class TBSendMethod(Enum): + SUBSCRIBE = 0 + PUBLISH = 1 + + class TBPublishInfo: TB_ERR_AGAIN = -1 TB_ERR_SUCCESS = 0 @@ -178,6 +185,7 @@ def __init__(self, rate_limit): self.__start_time = time() self.__config = rate_limit self.__rate_limit_dict = {} + self.__lock = RLock() rate_configs = rate_limit.split(";") if "," in rate_limit: rate_configs = rate_limit.split(",") @@ -192,21 +200,25 @@ def __init__(self, rate_limit): self.__rate_limit_dict[rate_limit_time]["queue"].maxsize) def add_counter(self): - for rate_limit_time in self.__rate_limit_dict: - self.__rate_limit_dict[rate_limit_time]["queue"].put(1) + with self.__lock: + for rate_limit_time in self.__rate_limit_dict: + self.__rate_limit_dict[rate_limit_time]["queue"].put(1) def check_limit_reached(self): - for rate_limit_time in self.__rate_limit_dict: - rate_limit_point_queue = self.__rate_limit_dict[rate_limit_time]["queue"] - if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time < time(): - self.__rate_limit_dict[rate_limit_time]["start"] = time() - rate_limit_point_queue = queue.Queue(rate_limit_point_queue.maxsize) - self.__rate_limit_dict[rate_limit_time]["queue"] = rate_limit_point_queue - if rate_limit_point_queue.full(): - log.debug("Rate limit exceeded for %s second", rate_limit_time) - log.debug("Queue size: %s", rate_limit_point_queue.qsize()) - return True - return False + with self.__lock: + if self.__config == "0:0": + return False + for rate_limit_time in self.__rate_limit_dict: + rate_limit_point_queue = self.__rate_limit_dict[rate_limit_time]["queue"] + if self.__rate_limit_dict[rate_limit_time]["start"] + rate_limit_time < time(): + self.__rate_limit_dict[rate_limit_time]["start"] = time() + rate_limit_point_queue = queue.Queue(rate_limit_point_queue.maxsize) + self.__rate_limit_dict[rate_limit_time]["queue"] = rate_limit_point_queue + if rate_limit_point_queue.full(): + log.debug("Rate limit exceeded for %s second", rate_limit_time) + log.debug("Queue size: %s", rate_limit_point_queue.qsize()) + return True + return False def get_minimal_limit(self): minimal_limit = 1000000000 @@ -288,7 +300,7 @@ def maxsize(self): class TBDeviceMqttClient: """ThingsBoard MQTT client. This class provides interface to send data to ThingsBoard and receive data from""" def __init__(self, host, port=1883, username=None, password=None, quality_of_service=None, client_id="", - chunk_size=0, rate_limit="8:1;2000:60;30000:3600;"): + chunk_size=0, rate_limit="DEFAULT_RATE_LIMIT"): self._client = paho.Client(protocol=5, client_id=client_id) self.quality_of_service = quality_of_service if quality_of_service is not None else 1 self.__host = host @@ -309,6 +321,18 @@ def __init__(self, host, port=1883, username=None, password=None, quality_of_ser self.__device_sub_dict = {} self.__device_client_rpc_dict = {} self.__attr_request_number = 0 + # rate_limit = rate_limit if rate_limit != "DEFAULT_RATE_LIMIT" else "8:1;2000:60;30000:3600;" + if rate_limit == "DEFAULT_RATE_LIMIT": + if "thingsboard.cloud" in self.__host: + rate_limit = "8:1,450:60,30000:3600," + elif "tb" in self.__host and "cloud" in self.__host: + rate_limit = "8:1,450:60,30000:3600," + elif "demo.thingsboard.io" in self.__host: + rate_limit = "8:1,450:60,30000:3600," + else: + rate_limit = "0:0," + else: + rate_limit = rate_limit self.__rate_limit = RateLimit(rate_limit) self.max_inflight_messages_set(self.__rate_limit.get_minimal_limit()) self.__sending_queue = TBQueue() @@ -365,10 +389,10 @@ def _on_connect(self, client, userdata, flags, result_code, *extra_params): if result_code == 0: self.__is_connected = True log.info("MQTT client %r - Connected!", client) - self._client.subscribe(ATTRIBUTES_TOPIC, qos=self.quality_of_service) - self._client.subscribe(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service) - self._client.subscribe(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service) - self._client.subscribe(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service) + self._subscribe_to_topic(ATTRIBUTES_TOPIC, qos=self.quality_of_service) + self._subscribe_to_topic(ATTRIBUTES_TOPIC + "/response/+", qos=self.quality_of_service) + self._subscribe_to_topic(RPC_REQUEST_TOPIC + '+', qos=self.quality_of_service) + self._subscribe_to_topic(RPC_RESPONSE_TOPIC + '+', qos=self.quality_of_service) else: if isinstance(result_code, int): if result_code in RESULT_CODES: @@ -501,11 +525,11 @@ def _on_decoded_message(self, content, message): if message.topic.startswith("v1/devices/me/attributes"): self.firmware_info = loads(message.payload) if "/response/" in message.topic: - self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info,dict) else {} - if (self.firmware_info.get(FW_VERSION_ATTR) is not None and self.firmware_info.get( - FW_VERSION_ATTR) != self.current_firmware_info.get("current_" + FW_VERSION_ATTR)) or \ - (self.firmware_info.get(FW_TITLE_ATTR) is not None and self.firmware_info.get( - FW_TITLE_ATTR) != self.current_firmware_info.get("current_" + FW_TITLE_ATTR)): + self.firmware_info = self.firmware_info.get("shared", {}) if isinstance(self.firmware_info, dict) else {} + if ((self.firmware_info.get(FW_VERSION_ATTR) is not None + and self.firmware_info.get(FW_VERSION_ATTR) != self.current_firmware_info.get("current_" + FW_VERSION_ATTR)) + or (self.firmware_info.get(FW_TITLE_ATTR) is not None + and self.firmware_info.get(FW_TITLE_ATTR) != self.current_firmware_info.get("current_" + FW_TITLE_ATTR))): log.debug('Firmware is not the same') self.firmware_data = b'' self.__current_chunk = 0 @@ -633,16 +657,22 @@ def __sending_thread_main(self): try: if not self.is_connected(): continue - if not self.__rate_limit.check_limit_reached(): + if (not self.__rate_limit.check_limit_reached() + and self.__rate_limit.get_minimal_limit() > len(self._client._out_packet)): if not self.__sending_queue.empty(): item = self.__sending_queue.get(False) if item is not None: - info = self._client.publish(item["topic"], item["data"], qos=item["qos"]) - if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc: - self.__sending_queue.put_left(item, True) - continue - self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT} - self.__rate_limit.add_counter() + if item["method"] == TBSendMethod.PUBLISH: + info = self._client.publish(item["topic"], item["data"], qos=item["qos"]) + if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc: + self.__sending_queue.put_left(item, True) + continue + self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT} + self.__rate_limit.add_counter() + elif item["method"] == TBSendMethod.SUBSCRIBE: + result = self._client.subscribe(item["topic"], qos=item["qos"]) + self.__responses[item['id']] = {"info": result, "timeout_ts": int(time()) + DEFAULT_TIMEOUT} + self.__rate_limit.add_counter() else: sleep(0.1) except Exception as e: @@ -657,15 +687,50 @@ def __housekeeping_thread_main(self): for req_id in list(self.__responses.keys()): if int(time()) > self.__responses[req_id]["timeout_ts"]: try: - if req_id in self.__responses and self.__responses[req_id]["info"].is_published(): + if (req_id in self.__responses + and ((self.__responses[req_id]["method"] == TBSendMethod.PUBLISH + and self.__responses[req_id]["info"].is_published()) + or (self.__responses[req_id]["method"] == TBSendMethod.SUBSCRIBE))): self.__responses.pop(req_id) except (KeyError, AttributeError): pass except (Exception, RuntimeError, ValueError) as e: - log.exception("Error during housekeeping sent messages:", exc_info=e) + pass + # log.debug("Error during housekeeping sent messages:", exc_info=e) # log.debug("Timeout occurred while waiting for a reply from ThingsBoard!") sleep(0.1) + def _subscribe_to_topic(self, topic, callback=None, qos=None, wait_for_result=False, timeout=DEFAULT_TIMEOUT): + if qos is None: + qos = self.quality_of_service + req_id = uuid.uuid4() + self.__sending_queue.put_left({"topic": topic, "qos": qos, "callback": callback, "id": req_id, + "method": TBSendMethod.SUBSCRIBE}, True) + sending_queue_size = self.__sending_queue.qsize() + if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5: + self.__sending_queue_warning_published = int(time()) + log.warning("Sending queue is bigger than 1000000 messages (%r), consider increasing the rate limit, " + "or decreasing the amount of messages sent!", sending_queue_size) + + waiting_for_connection_message_time = 0 + while not self.is_connected(): + if self.stopped: + return -1, 128 + if time() - waiting_for_connection_message_time > 10.0: + log.warning("Waiting for connection to be established before subscribing for data on ThingsBoard!") + waiting_for_connection_message_time = time() + sleep(0.1) + + start_time = int(time()) + if wait_for_result: + while req_id not in list(self.__responses.keys()): + if 0 < timeout < int(time()) - start_time: + log.error("Timeout while waiting for a subscribe to ThingsBoard!") + return -1, 128 + sleep(0.1) + + return self.__responses.pop(req_id)["info"] + def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=False, timeout=DEFAULT_TIMEOUT): data = dumps(data) if qos is None: @@ -675,9 +740,11 @@ def _publish_data(self, data, topic, qos, wait_for_publish=True, high_priority=F raise TBQoSException("Quality of service (qos) value must be 0 or 1") req_id = uuid.uuid4() if high_priority: - self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": req_id}, False) + self.__sending_queue.put_left({"topic": topic, "data": data, "qos": qos, "id": req_id, + "method": TBSendMethod.PUBLISH}, False) else: - self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": req_id}, False) + self.__sending_queue.put({"topic": topic, "data": data, "qos": qos, "id": req_id, + "method": TBSendMethod.PUBLISH}, False) sending_queue_size = self.__sending_queue.qsize() if sending_queue_size > 1000000 and int(time()) - self.__sending_queue_warning_published > 5: self.__sending_queue_warning_published = int(time()) diff --git a/tb_gateway_mqtt.py b/tb_gateway_mqtt.py index 1281bb3..84407f7 100644 --- a/tb_gateway_mqtt.py +++ b/tb_gateway_mqtt.py @@ -14,10 +14,7 @@ # import logging -try: - from time import monotonic as time -except ImportError: - from time import time +import time from tb_device_mqtt import TBDeviceMqttClient GATEWAY_ATTRIBUTES_TOPIC = "v1/gateway/attributes" @@ -37,7 +34,7 @@ class TBGatewayAPI: class TBGatewayMqttClient(TBDeviceMqttClient): def __init__(self, host, port=1883, username=None, password=None, gateway=None, quality_of_service=1, client_id="", - rate_limit="8:1;400:60;24000:3600;"): + rate_limit="DEFAULT_RATE_LIMIT"): super().__init__(host, port, username, password, quality_of_service, client_id, rate_limit) self.quality_of_service = quality_of_service self.__max_sub_id = 0 @@ -54,12 +51,30 @@ def __init__(self, host, port=1883, username=None, password=None, gateway=None, def _on_connect(self, client, userdata, flags, result_code, *extra_params): super()._on_connect(client, userdata, flags, result_code, *extra_params) if result_code == 0: - gateway_attributes_topic_sub_id = int(self._client.subscribe(GATEWAY_ATTRIBUTES_TOPIC, qos=1)[1]) - self._gw_subscriptions[gateway_attributes_topic_sub_id] = GATEWAY_ATTRIBUTES_TOPIC - gateway_attributes_resp_sub_id = int(self._client.subscribe(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, qos=1)[1]) - self._gw_subscriptions[gateway_attributes_resp_sub_id] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC - gateway_rpc_topic_sub_id = int(self._client.subscribe(GATEWAY_RPC_TOPIC, qos=1)[1]) - self._gw_subscriptions[gateway_rpc_topic_sub_id] = GATEWAY_RPC_TOPIC + gateway_attributes_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_TOPIC, qos=1, + wait_for_result=True)[1]) + if gateway_attributes_topic_sub_id == 128: + log.error("Service subscription to topic %s - failed.", GATEWAY_ATTRIBUTES_TOPIC) + if gateway_attributes_topic_sub_id in self._gw_subscriptions: + del self._gw_subscriptions[gateway_attributes_topic_sub_id] + else: + self._gw_subscriptions[gateway_attributes_topic_sub_id] = GATEWAY_ATTRIBUTES_TOPIC + gateway_attributes_resp_sub_id = int(self._subscribe_to_topic(GATEWAY_ATTRIBUTES_RESPONSE_TOPIC, qos=1, + wait_for_result=True)[1]) + if gateway_attributes_resp_sub_id == 128: + log.error("Service subscription to topic %s - failed.", GATEWAY_ATTRIBUTES_RESPONSE_TOPIC) + if gateway_attributes_resp_sub_id in self._gw_subscriptions: + del self._gw_subscriptions[gateway_attributes_resp_sub_id] + else: + self._gw_subscriptions[gateway_attributes_resp_sub_id] = GATEWAY_ATTRIBUTES_RESPONSE_TOPIC + gateway_rpc_topic_sub_id = int(self._subscribe_to_topic(GATEWAY_RPC_TOPIC, qos=1, + wait_for_result=True)[1]) + if gateway_rpc_topic_sub_id == 128: + log.error("Service subscription to topic %s - failed.", GATEWAY_RPC_TOPIC) + if gateway_rpc_topic_sub_id in self._gw_subscriptions: + del self._gw_subscriptions[gateway_rpc_topic_sub_id] + else: + self._gw_subscriptions[gateway_rpc_topic_sub_id] = GATEWAY_RPC_TOPIC # gateway_rpc_topic_response_sub_id = int(self._client.subscribe(GATEWAY_RPC_RESPONSE_TOPIC)[1]) # self._gw_subscriptions[gateway_rpc_topic_response_sub_id] = GATEWAY_RPC_RESPONSE_TOPIC @@ -125,7 +140,7 @@ def __request_attributes(self, device, keys, callback, type_is_client=False): log.error("There are no keys to request") return False - ts_in_millis = int(round(time() * 1000)) + ts_in_millis = int(round(time.time() * 1000)) attr_request_number = self._add_attr_request_callback(callback) msg = {"keys": keys, "device": device,