Skip to content

Commit

Permalink
Add test for is_connected using loop and loop_start
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreF committed Jan 10, 2024
1 parent 44eae87 commit c2072bb
Showing 1 changed file with 116 additions and 4 deletions.
120 changes: 116 additions & 4 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import threading
import time
import unicodedata

import paho.mqtt.client as client
import pytest
from paho.mqtt.enums import MQTTErrorCode, MQTTProtocolVersion
from paho.mqtt.reasoncodes import ReasonCodes

import tests.paho_test as paho_test

# Import test fixture
from tests.testsupport.broker import fake_broker # noqa: F401
from tests.testsupport.broker import FakeBroker, fake_broker # noqa: F401


@pytest.mark.parametrize("proto_ver", [
(client.MQTTv31),
(client.MQTTv311),
(MQTTProtocolVersion.MQTTv31),
(MQTTProtocolVersion.MQTTv311),
])
class Test_connect:
"""
Expand Down Expand Up @@ -100,7 +102,7 @@ class Test_connect_v5:

def test_01_broker_no_support(self, fake_broker):
mqttc = client.Client(
"01-broker-no-support", protocol=client.MQTTv5)
"01-broker-no-support", protocol=MQTTProtocolVersion.MQTTv5)

def on_connect(mqttc, obj, flags, reason, properties):
assert reason == 132
Expand Down Expand Up @@ -137,6 +139,116 @@ def on_connect(mqttc, obj, flags, reason, properties):
mqttc.loop_stop()


class TestConnectionLost:
def test_with_loop_start(self, fake_broker: FakeBroker):
mqttc = client.Client(
"test_with_loop_start",
protocol=MQTTProtocolVersion.MQTTv311,
reconnect_on_failure=False,
)

on_connect_reached = threading.Event()
on_disconnect_reached = threading.Event()


def on_connect(mqttc, obj, flags, rc):
assert rc == 0
on_connect_reached.set()

def on_disconnect(*args):
on_disconnect_reached.set()

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect

mqttc.connect_async("localhost", fake_broker.port)
mqttc.loop_start()

try:
fake_broker.start()

connect_packet = paho_test.gen_connect(
"test_with_loop_start", keepalive=60,
proto_ver=MQTTProtocolVersion.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

assert on_connect_reached.wait(1)
assert mqttc.is_connected()

fake_broker.finish()

assert on_disconnect_reached.wait(1)
assert not mqttc.is_connected()

finally:
mqttc.loop_stop()

def test_with_loop(self, fake_broker: FakeBroker):
mqttc = client.Client(
"test_with_loop",
clean_session=True,
)

on_connect_reached = threading.Event()
on_disconnect_reached = threading.Event()


def on_connect(mqttc, obj, flags, rc):
assert rc == 0
on_connect_reached.set()

def on_disconnect(*args):
on_disconnect_reached.set()

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect


try:
mqttc.connect("localhost", fake_broker.port)

fake_broker.start()

# not yet connected, packet are not yet processed by loop()
assert not mqttc.is_connected()

# connect packet is sent during connect() call
connect_packet = paho_test.gen_connect(
"test_with_loop", keepalive=60,
proto_ver=MQTTProtocolVersion.MQTTv311)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

# call loop() to process the connack packet
assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_SUCCESS

assert on_connect_reached.wait(1)
assert mqttc.is_connected()

fake_broker.finish()

# call loop() to detect the connection lost
assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_CONN_LOST

assert on_disconnect_reached.wait(1)
assert not mqttc.is_connected()

finally:
mqttc.loop_stop()

class TestPublishBroker2Client:

def test_invalid_utf8_topic(self, fake_broker):
Expand Down

0 comments on commit c2072bb

Please sign in to comment.