Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Kind of) fix mqttv5 tests #781

Merged
merged 3 commits into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/tox.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v4
with:
repository: eclipse/paho.mqtt.testing
ref: a4dc694010217b291ee78ee13a6d1db812f9babd
path: paho.mqtt.testing
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}
Expand Down
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Once you have the code, it can be installed from your repository as well:
To perform all test (including MQTT v5 test), you also need to clone paho.mqtt.testing in paho.mqtt.python folder::

git clone https://github.com/eclipse/paho.mqtt.testing.git
cd paho.mqtt.testing
git checkout a4dc694010217b291ee78ee13a6d1db812f9babd

Known limitations
-----------------
Expand Down
3 changes: 0 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,3 @@ line-length = 167
"S105",
"S106",
]
"tests/test_mqttv5.py" = [
"F841", # TODO: fix when fixing this test file
]
95 changes: 35 additions & 60 deletions tests/test_mqttv5.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import threading
import time
import unittest
import unittest.mock

import paho.mqtt
import paho.mqtt.client
Expand Down Expand Up @@ -120,9 +121,9 @@ def cleanRetained(port):
curclient.loop_start()
callback.register(curclient)
curclient.connect(host="localhost", port=port)
response = callback.wait_connected()
callback.wait_connected()
curclient.subscribe("#", options=SubscribeOptions(qos=0))
response = callback.wait_subscribed() # wait for retained messages to arrive
callback.wait_subscribed() # wait for retained messages to arrive
time.sleep(1)
for message in callback.messages:
logging.info("deleting retained message for topic", message["message"])
Expand Down Expand Up @@ -164,18 +165,22 @@ def setUpClass(cls):
except ImportError as ie:
raise unittest.SkipTest("paho.mqtt.testing not present.") from ie

cls._test_broker = threading.Thread(
target=mqtt.brokers.run,
kwargs={
"config": ["listener 0"],
},
)
cls._test_broker.daemon = True
cls._test_broker.start()
# Wait a bit for TCP server to bind to an address
time.sleep(0.5)
# Hack to find the port used by the test broker...
cls._test_broker_port = mqtt.brokers.listeners.TCPListeners.server.socket.getsockname()[1]
# Hack: we need to patch `signal.signal()` because `mqtt.brokers.run()`
# calls it to set up a signal handler; however, that won't work
# from a thread...
with unittest.mock.patch("signal.signal", unittest.mock.MagicMock()):
cls._test_broker = threading.Thread(
target=mqtt.brokers.run,
kwargs={
"config": ["listener 0"],
},
)
cls._test_broker.daemon = True
cls._test_broker.start()
# Wait a bit for TCP server to bind to an address
time.sleep(0.5)
# Hack to find the port used by the test broker...
cls._test_broker_port = mqtt.brokers.listeners.TCPListeners.server.socket.getsockname()[1]
setData()
cleanup(cls._test_broker_port)

Expand Down Expand Up @@ -239,10 +244,6 @@ def test_connect_fail(self):
fclient.loop_stop()

def test_retained_message(self):
qos0topic = "fromb/qos 0"
qos1topic = "fromb/qos 1"
qos2topic = "fromb/qos2"
wildcardtopic = "fromb/+"

publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.UserProperty = ("a", "2")
Expand Down Expand Up @@ -372,15 +373,15 @@ def test_offline_message_queueing(self):
connect_properties.SessionExpiryInterval = 99999
oclient.loop_start()
oclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
response = ocallback.wait_connected()
ocallback.wait_connected()
oclient.subscribe(wildtopics[5], qos=2)
response = ocallback.wait_subscribed()
ocallback.wait_subscribed()
oclient.disconnect()
oclient.loop_stop()

bclient.loop_start()
bclient.connect(host="localhost", port=self._test_broker_port)
response = callback2.wait_connected()
callback2.wait_connected()
bclient.publish(topics[1], b"qos 0", 0)
bclient.publish(topics[2], b"qos 1", 1)
bclient.publish(topics[3], b"qos 2", 2)
Expand All @@ -393,7 +394,7 @@ def test_offline_message_queueing(self):
ocallback.register(oclient)
oclient.loop_start()
oclient.connect(host="localhost", port=self._test_broker_port, clean_start=False)
response = ocallback.wait_connected()
ocallback.wait_connected()
time.sleep(2)
oclient.disconnect()
oclient.loop_stop()
Expand Down Expand Up @@ -653,10 +654,10 @@ def test_payload_format(self):
pclient, pcallback = self.new_client(clientid)
pclient.loop_start()
pclient.connect_async(host="localhost", port=self._test_broker_port)
response = pcallback.wait_connected()
pcallback.wait_connected()

pclient.subscribe(topics[0], qos=2)
response = pcallback.wait_subscribed()
pcallback.wait_subscribed()
publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.PayloadFormatIndicator = 1
publish_properties.ContentType = "My name"
Expand Down Expand Up @@ -703,9 +704,9 @@ def test_message_expiry(self):
lbclient, lbcallback = self.new_client(clientid+" b")
lbclient.loop_start()
lbclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
response = lbcallback.wait_connected()
lbcallback.wait_connected()
lbclient.subscribe(topics[0], qos=2)
response = lbcallback.wait_subscribed()
lbcallback.wait_subscribed()
disconnect_properties = Properties(PacketTypes.DISCONNECT)
disconnect_properties.SessionExpiryInterval = 999999999
lbclient.disconnect(properties=disconnect_properties)
Expand Down Expand Up @@ -835,7 +836,7 @@ def test_subscribe_options(self):
properties.UserProperty = ("a", "2")
properties.UserProperty = ("c", "3")
laclient.unsubscribe(wildtopics[5], properties)
response = lacallback.wait_unsubscribed()
lacallback.wait_unsubscribed()

# check that we really did remove that subscription
laclient.subscribe(
Expand All @@ -856,7 +857,7 @@ def test_subscribe_options(self):
properties.UserProperty = ("a", "2")
properties.UserProperty = ("c", "3")
laclient.unsubscribe(wildtopics[5], properties)
response = lacallback.wait_unsubscribed()
lacallback.wait_unsubscribed()

lacallback.clear()
laclient.subscribe(
Expand All @@ -870,7 +871,7 @@ def test_subscribe_options(self):

# remove that subscription
laclient.unsubscribe(wildtopics[5])
response = lacallback.wait_unsubscribed()
lacallback.wait_unsubscribed()

laclient.subscribe(
wildtopics[5], options=SubscribeOptions(2, retainHandling=0))
Expand Down Expand Up @@ -990,21 +991,6 @@ def test_request_response(self):
def test_client_topic_alias(self):
clientid = 'client topic alias'

# no server side topic aliases allowed
laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port)
connack = lacallback.wait_connected()
laclient.loop_start()

publish_properties = Properties(PacketTypes.PUBLISH)
publish_properties.TopicAlias = 0 # topic alias 0 not allowed
laclient.publish(topics[0], "topic alias 0", 1,
properties=publish_properties)

# should get back a disconnect with Topic alias invalid
lacallback.wait_disconnected()
laclient.loop_stop()

connect_properties = Properties(PacketTypes.CONNECT)
connect_properties.TopicAliasMaximum = 0 # server topic aliases not allowed
connect_properties.SessionExpiryInterval = 99999
Expand Down Expand Up @@ -1067,11 +1053,8 @@ def test_server_topic_alias(self):

laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
connack = lacallback.wait_connected()
lacallback.wait_connected()
laclient.loop_start()
clientTopicAliasMaximum = 0
if hasattr(connack["properties"], "TopicAliasMaximum"):
clientTopicAliasMaximum = connack["properties"].TopicAliasMaximum

laclient.subscribe(topics[0], qos=2)
lacallback.wait_subscribed()
Expand Down Expand Up @@ -1106,13 +1089,9 @@ def test_server_topic_alias(self):

laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
connack = lacallback.wait_connected()
lacallback.wait_connected()
laclient.loop_start()

clientTopicAliasMaximum = 0
if hasattr(connack["properties"], "TopicAliasMaximum"):
clientTopicAliasMaximum = connack["properties"].TopicAliasMaximum

laclient.subscribe(topics[0], qos=2)
lacallback.wait_subscribed()

Expand All @@ -1138,13 +1117,9 @@ def test_server_topic_alias(self):

laclient, lacallback = self.new_client(clientid+" a")
laclient.connect(host="localhost", port=self._test_broker_port, properties=connect_properties)
connack = lacallback.wait_connected()
lacallback.wait_connected()
laclient.loop_start()

clientTopicAliasMaximum = 0
if hasattr(connack["properties"], "TopicAliasMaximum"):
clientTopicAliasMaximum = connack["properties"].TopicAliasMaximum

laclient.subscribe(topics[0], qos=2)
lacallback.wait_subscribed()

Expand Down Expand Up @@ -1304,7 +1279,7 @@ def test_shared_subscriptions(self):

laclient.subscribe(
[(shared_sub_topic, SubscribeOptions(2)), (topics[0], SubscribeOptions(2))])
response = lacallback.wait_subscribed()
lacallback.wait_subscribed()

lbclient, lbcallback = self.new_client(clientid+" b")
lbclient.connect(host="localhost", port=self._test_broker_port)
Expand All @@ -1316,7 +1291,7 @@ def test_shared_subscriptions(self):

lbclient.subscribe(
[(shared_sub_topic, SubscribeOptions(2)), (topics[0], 2)])
response = lbcallback.wait_subscribed()
lbcallback.wait_subscribed()

lacallback.clear()
lbcallback.clear()
Expand Down
Loading