Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wait_for_publish that could hang for QoS=0 message #796

Merged
merged 3 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ v2.0.0 - 2023-xx-xx
- Improve tests & linters. Modernize build (drop setup.py, use pyproject.toml)
- Fix is_connected property to correctly return False when connection is lost
and loop_start/loop_forever isn't used. Closes #525.

- Fix wait_for_publish that could hang with QoS == 0 message on reconnection
or publish during connection. Closes #549.


v1.6.1 - 2021-10-21
Expand Down
35 changes: 19 additions & 16 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ def timed_out() -> bool:
while not self._published and not timed_out():
self._condition.wait(timeout_tenth)

if self.rc > 0:
raise RuntimeError(f'Message publish failed: {error_string(self.rc)}')

def is_published(self) -> bool:
"""Returns True if the message associated with this object has been
published, else returns False."""
Expand Down Expand Up @@ -1193,17 +1196,24 @@ def reconnect(self) -> MQTTErrorCode:
"pos": 0,
}

self._out_packet = collections.deque()

with self._msgtime_mutex:
self._last_msg_in = time_func()
self._last_msg_out = time_func()

self._ping_t = 0.0
self._state = mqtt_cs_new

self._sock_close()

# Mark all currently outgoing QoS = 0 packets as lost,
# or `wait_for_publish()` could hang forever
for pkt in self._out_packet:
if pkt["command"] & 0xF0 == PUBLISH and pkt["qos"] == 0 and pkt["info"] is not None:
pkt["info"].rc = MQTT_ERR_CONN_LOST
pkt["info"]._set_as_published()

self._out_packet.clear()

with self._msgtime_mutex:
self._last_msg_in = time_func()
self._last_msg_out = time_func()

# Put messages in progress in a valid state.
self._messages_reconnect_reset()

Expand Down Expand Up @@ -1263,11 +1273,9 @@ def _loop(self, timeout: float = 1.0) -> MQTTErrorCode:
if timeout < 0.0:
raise ValueError('Invalid timeout.')

try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
if self.want_write():
wlist = [self._sock]
except IndexError:
else:
wlist = []

# used to check if there are any bytes left in the (SSL) socket
Expand Down Expand Up @@ -1751,12 +1759,7 @@ def want_write(self) -> bool:
"""Call to determine if there is network data waiting to be written.
Useful if you are calling select() yourself rather than using loop().
"""
try:
packet = self._out_packet.popleft()
self._out_packet.appendleft(packet)
return True
except IndexError:
return False
return len(self._out_packet) > 0

def loop_misc(self) -> MQTTErrorCode:
"""Process miscellaneous network events. Use in place of calling loop() if you
Expand Down
55 changes: 55 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,61 @@ def on_disconnect(*args):
assert not mqttc.is_connected()


class TestPublish:
def test_publish_before_connect(self, fake_broker: FakeBroker) -> None:
mqttc = client.Client(
"test_publish_before_connect",
)

def on_connect(mqttc, obj, flags, rc):
assert rc == 0

mqttc.on_connect = on_connect

mqttc.loop_start()
mqttc.connect("localhost", fake_broker.port)
mqttc.enable_logger()

try:
mi = mqttc.publish("test", "testing")

fake_broker.start()

packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed
# re-call fake_broker.start() to take the 2nd connection done by client
# ... this is probably a bug, when using loop_start/loop_forever
# and doing a connect() before, the TCP connection is opened twice.
fake_broker.start()

connect_packet = paho_test.gen_connect(
"test_publish_before_connect", keepalive=60,
proto_ver=client.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

with pytest.raises(RuntimeError):
mi.wait_for_publish(1)

mqttc.disconnect()

disconnect_packet = paho_test.gen_disconnect()
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == disconnect_packet

finally:
mqttc.loop_stop()

packet_in = fake_broker.receive_packet(1)
assert not packet_in # Check connection is closed

class TestPublishBroker2Client:

def test_invalid_utf8_topic(self, fake_broker):
Expand Down
Loading