Skip to content

Commit

Permalink
Close socket then clear out_packet queue
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreF committed Jan 20, 2024
1 parent df0ff4e commit 2063ffb
Showing 1 changed file with 14 additions and 22 deletions.
36 changes: 14 additions & 22 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def timed_out() -> bool:
with self._condition:
while not self._published and not timed_out():
self._condition.wait(timeout_tenth)

if self.rc > 0:
raise RuntimeError(f'Message publish failed: {error_string(self.rc)}')

Expand Down Expand Up @@ -1196,25 +1196,24 @@ def reconnect(self) -> MQTTErrorCode:
"pos": 0,
}

# Before dropping all out_packet, ensure any QoS == 0 message info get
# marked as MQTT_ERR_CONN_LOST or the wait_for_publish() could hang forever
old_queue = self._out_packet
self._out_packet = collections.deque()
self._ping_t = 0.0
self._state = mqtt_cs_new

self._sock_close()

for pkt in old_queue:
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0:
# Mark all currently outgoing QoS = 0 packets as lost,
# or `wait_for_publish()` could hang forever
for pkt in self._out_packet:
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0 and pkt["info"] is not None:
pkt["info"].rc = MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()

self._out_packet.clear()

with self._msgtime_mutex:
self._last_msg_in = time_func()
self._last_msg_out = time_func()

self._ping_t = 0.0
self._state = mqtt_cs_new

self._sock_close()

# Put messages in progress in a valid state.
self._messages_reconnect_reset()

Expand Down Expand Up @@ -1274,11 +1273,9 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode:
if timeout < 0.0:
raise ValueError('Invalid timeout.')

try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
if self.want_write():
wlist = [self._sock]
except IndexError:
else:
wlist = []

# used to check if there are any bytes left in the (SSL) socket
Expand Down Expand Up @@ -1762,12 +1759,7 @@ def want_write(self) -> bool:
"""Call to determine if there is network data waiting to be written.
Useful if you are calling select() yourself rather than using loop().
"""
try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
return True
except IndexError:
return False
return len(self._out_packet) > 0

def loop_misc(self) -> MQTTErrorCode:
"""Process miscellaneous network events. Use in place of calling loop() if you
Expand Down

0 comments on commit 2063ffb

Please sign in to comment.