Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon committed Mar 19, 2024
2 parents d7fd7b8 + 00018ac commit a3d49b3
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions tb_device_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def __put(self, block, timeout):
self.__not_full.wait(remaining)

def get(self, block=True, timeout=None):
item = None
with self.__not_empty:
if not block:
if not self.__queue:
Expand All @@ -255,9 +256,9 @@ def get(self, block=True, timeout=None):
if remaining <= 0.0:
raise TimeoutError("Timeout while trying to get item from queue")
self.__not_empty.wait(remaining)
item = self.__queue.pop()
item = self.__queue.popleft()
self.__not_full.notify()
return item
return item

def put_nowait(self, item):
return self.put(item, False)
Expand All @@ -269,7 +270,7 @@ def full(self):
return len(self.__queue) == self.__maxsize

def empty(self):
return not self.__queue
return not self.__queue or self.qsize() == 0

def qsize(self):
return len(self.__queue)
Expand Down Expand Up @@ -622,19 +623,20 @@ def __sending_thread_main(self):
if not self.is_connected():
continue
if not self.__rate_limit.check_limit_reached():
if not self.__sending_queue.empty():
if (not self.__sending_queue.empty()
and self.__rate_limit.get_minimal_limit() > len(self._client._out_packet)):
item = self.__sending_queue.get(False)
if item is not None:
info = self._client.publish(item["topic"], item["data"], qos=item["qos"])
if TBPublishInfo.TB_ERR_QUEUE_SIZE == info.rc:
self.__sending_queue.put_left(item, True)
continue
self.__responses[item['id']] = {"info": info, "timeout_ts": int(time()) + DEFAULT_TIMEOUT}
self.__responses[item['id']] = {"info": info, "timeout_ts": int(time.time()) + DEFAULT_TIMEOUT}
self.__rate_limit.add_counter()
else:
sleep(0.1)
time.sleep(0.1)
except Exception as e:
log.exception(e)
log.exception("Error during data sending:", exc_info=e)
sleep(1)

def __housekeeping_thread_main(self):
Expand Down

0 comments on commit a3d49b3

Please sign in to comment.