-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix wait_for_publish that could hang for QoS=0 message #796
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a test for this?
src/paho/mqtt/client.py
Outdated
if self.rc > 0: | ||
raise RuntimeError(f'Message publish failed: {error_string(self.rc)}') | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
src/paho/mqtt/client.py
Outdated
# Before dropping all out_packet, ensure any QoS == 0 message info get | ||
# marked as MQTT_ERR_CONN_LOST or the wait_for_publish() could hang forever | ||
old_queue = self._out_packet | ||
self._out_packet = collections.deque() | ||
|
||
for pkt in old_queue: | ||
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0: | ||
pkt["info"].rc = MQTT_ERR_CONN_LOST | ||
pkt["info"]._set_as_published() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some subtlety here as to why old_queue
is grabbed first and this is only done exactly after self._out_packet
has been reallocated? If not, it feels like
# Before dropping all out_packet, ensure any QoS == 0 message info get | |
# marked as MQTT_ERR_CONN_LOST or the wait_for_publish() could hang forever | |
old_queue = self._out_packet | |
self._out_packet = collections.deque() | |
for pkt in old_queue: | |
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0: | |
pkt["info"].rc = MQTT_ERR_CONN_LOST | |
pkt["info"]._set_as_published() | |
# Mark all currently outgoing QoS = 0 packets as lost, | |
# or `wait_for_publish()` could hang forever | |
for pkt in self._out_packet: | |
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0: | |
pkt["info"].rc = MQTT_ERR_CONN_LOST | |
pkt["info"]._set_as_published() | |
self._out_packet = collections.deque() |
would be the more obvious way to write this.
(self._out_packet.clear()
would also save some allocations.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason way for what happen if a publish()
if call during reconnect()
. It would end in the new _out_packet
or be marked at published... but well it's not very nice.
I'll check, but I think it better in all aspect to close the socket, then process the _set_as_published + clear the out_packet queue, because once closed we don't add message to out_packet.
4223c51
to
81be49f
Compare
81be49f
to
532ba6f
Compare
If reconnect() is called (which happen on auto-reconnection or on first connection done by loop_forever), any pending QoS = 0 message were dropped without unblocking wait_for_publish().
We now ensure wait_for_publish() is unblocked and raise RuntimeError with MQTT_ERR_CONN_LOST
Fix #549