diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index 5b7206f5..50e74dec 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -455,7 +455,7 @@ def timed_out() -> bool: with self._condition: while not self._published and not timed_out(): self._condition.wait(timeout_tenth) - + if self.rc > 0: raise RuntimeError(f'Message publish failed: {error_string(self.rc)}') @@ -1196,25 +1196,24 @@ def reconnect(self) -> MQTTErrorCode: "pos": 0, } - # Before dropping all out_packet, ensure any QoS == 0 message info get - # marked as MQTT_ERR_CONN_LOST or the wait_for_publish() could hang forever - old_queue = self._out_packet - self._out_packet = collections.deque() + self._ping_t = 0.0 + self._state = mqtt_cs_new + + self._sock_close() - for pkt in old_queue: - if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0: + # Mark all currently outgoing QoS = 0 packets as lost, + # or `wait_for_publish()` could hang forever + for pkt in self._out_packet: + if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0 and pkt["info"] is not None: pkt["info"].rc = MQTT_ERR_CONN_LOST pkt["info"]._set_as_published() + self._out_packet.clear() + with self._msgtime_mutex: self._last_msg_in = time_func() self._last_msg_out = time_func() - self._ping_t = 0.0 - self._state = mqtt_cs_new - - self._sock_close() - # Put messages in progress in a valid state. self._messages_reconnect_reset() @@ -1274,11 +1273,9 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode: if timeout < 0.0: raise ValueError('Invalid timeout.') - try: - packet = self._out_packet.popleft() - self._out_packet.appendleft(packet) + if self.want_write(): wlist = [self._sock] - except IndexError: + else: wlist = [] # used to check if there are any bytes left in the (SSL) socket @@ -1762,12 +1759,7 @@ def want_write(self) -> bool: """Call to determine if there is network data waiting to be written. Useful if you are calling select() yourself rather than using loop(). """ - try: - packet = self._out_packet.popleft() - self._out_packet.appendleft(packet) - return True - except IndexError: - return False + return len(self._out_packet) > 0 def loop_misc(self) -> MQTTErrorCode: """Process miscellaneous network events. Use in place of calling loop() if you