Skip to content

Commit

Permalink
Disallow changing connection properties while connected
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreF committed Jan 20, 2024
1 parent a0b311b commit b065003
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 16 deletions.
59 changes: 43 additions & 16 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,11 @@ def host(self) -> str:
@host.setter
def host(self, value: str) -> None:
"""
Update host. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
Update host. This may not be called if the connection is already open.
"""
if not self._connection_closed():
raise RuntimeError("updating host on established connection is not supported")

if not value:
raise ValueError("Invalid host.")
self._host = value
Expand All @@ -763,9 +765,11 @@ def port(self) -> int:
@port.setter
def port(self, value: int) -> None:
"""
Update port. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
Update port. This may not be called if the connection is already open.
"""
if not self._connection_closed():
raise RuntimeError("updating port on established connection is not supported")

if value <= 0:
raise ValueError("Invalid port number.")
self._port = value
Expand All @@ -778,7 +782,7 @@ def keepalive(self) -> int:
@keepalive.setter
def keepalive(self, value: int) -> None:
"""Update the client keepalive interval. This may not be called if the connection is already open."""
if self._sock is not None:
if not self._connection_closed():
# The issue here is that the previous value of keepalive matter to possibly
# sent ping packet.
raise RuntimeError("updating keepalive on established connection is not supported")
Expand All @@ -797,9 +801,11 @@ def transport(self) -> Literal["tcp", "websockets"]:
def transport(self, value: Literal["tcp", "websockets"]) -> None:
"""
Update transport which should be "tcp" or "websockets".
This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
This may not be called if the connection is already open.
"""
if not self._connection_closed():
raise RuntimeError("updating transport on established connection is not supported")

self._transport = value

@property
Expand All @@ -814,7 +820,10 @@ def connect_timeout(self) -> float:

@connect_timeout.setter
def connect_timeout(self, value: float) -> None:
"Change connect_timeout for future (re)connection"
"Change connect_timeout. This may not be called if the connection is already open."
if not self._connection_closed():
raise RuntimeError("updating connect_timeout on established connection is not supported")

if value <= 0.0:
raise ValueError("timeout must be a positive number")

Expand All @@ -830,9 +839,11 @@ def username(self) -> str | None:
@username.setter
def username(self, value: str | None) -> None:
"""
Update username. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
Update username. This may not be called if the connection is already open.
"""
if not self._connection_closed():
raise RuntimeError("updating username on established connection is not supported")

if value is None:
self._username = None
else:
Expand All @@ -848,9 +859,11 @@ def password(self) -> str | None:
@password.setter
def password(self, value: str | None) -> None:
"""
Update password. This will only be used on future (re)connection. You should probably
use reconnect() to update the connection if established.
Update password. This may not be called if the connection is already open.
"""
if not self._connection_closed():
raise RuntimeError("updating password on established connection is not supported")

if value is None:
self._password = None
else:
Expand All @@ -863,8 +876,8 @@ def max_inflight_messages(self) -> int:

@max_inflight_messages.setter
def max_inflight_messages(self, value: int) -> None:
"Update max_inflight_messages. It's behavior is undefined if the connection is already open"
if self._sock is not None:
"Update max_inflight_messages. This may not be called if the connection is already open."
if not self._connection_closed():
# Not tested. Some doubt that everything is okay when max_inflight change between 0
# and > 0 value because _update_inflight is skipped when _max_inflight_messages == 0
raise RuntimeError("updating max_inflight_messages on established connection is not supported")
Expand All @@ -881,8 +894,8 @@ def max_queued_messages(self) -> int:

@max_queued_messages.setter
def max_queued_messages(self, value: int) -> None:
"Update max_queued_messages. It's behavior is undefined if the connection is already open"
if self._sock is not None:
"Update max_queued_messages. This may not be called if the connection is already open."
if not self._connection_closed():
# Not tested.
raise RuntimeError("updating max_queued_messages on established connection is not supported")

Expand Down Expand Up @@ -1296,6 +1309,8 @@ def connect_async(
connect call that can be used with loop_start() to provide very quick
start.
Any already established connection will be terminated immediately.
host is the hostname or IP address of the remote broker.
port is the network port of the server host to connect to. Defaults to
1883. Note that the default port for MQTT over SSL/TLS is 8883 so if you
Expand All @@ -1313,6 +1328,10 @@ def connect_async(
if bind_port < 0:
raise ValueError('Invalid bind port number.')

# Switch to state NEW to allow update of host, port & co.
self._sock_close()
self._state = ConnectionState.MQTT_CS_NEW

self.host = host
self.port = port
self.keepalive = keepalive
Expand Down Expand Up @@ -1643,6 +1662,14 @@ def enable_bridge_mode(self) -> None:
"""
self._client_mode = MQTT_BRIDGE

def _connection_closed(self) -> bool:
"""
Return true if the connection is closed (and not trying to be opened).
"""
return (
self._state == ConnectionState.MQTT_CS_NEW
or (self._state in (ConnectionState.MQTT_CS_DISCONNECTING, ConnectionState.MQTT_CS_DISCONNECTED) and self._sock is None))

def is_connected(self) -> bool:
"""Returns the current status of the connection
Expand Down
104 changes: 104 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,110 @@ def on_connect(mqttc, obj, flags, rc):
finally:
mqttc.loop_stop()

def test_connection_properties(self, proto_ver, fake_broker):
mqttc = client.Client("client-id", protocol=proto_ver)
mqttc.enable_logger()

is_connected = threading.Event()
is_disconnected = threading.Event()

def on_connect(mqttc, obj, flags, rc):
assert rc == 0
is_connected.set()

def on_disconnect(*args):
import logging
logging.info("disco")
is_disconnected.set()

mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect

mqttc.host = "localhost"
mqttc.connect_timeout = 7
mqttc.port = fake_broker.port
mqttc.keepalive = 7
mqttc.max_inflight_messages = 7
mqttc.max_queued_messages = 7
mqttc.transport = "tcp"
mqttc.username = "username"
mqttc.password = "password"

mqttc.reconnect()

# As soon as connection try to be established, no longer accept updates
with pytest.raises(RuntimeError):
mqttc.host = "localhost"

mqttc.loop_start()

try:
fake_broker.start()

connect_packet = paho_test.gen_connect(
"client-id",
keepalive=7,
username="username",
password="password",
proto_ver=proto_ver,
)
packet_in = fake_broker.receive_packet(1000)
assert packet_in # Check connection was not closed
assert packet_in == connect_packet

connack_packet = paho_test.gen_connack(rc=0)
count = fake_broker.send_packet(connack_packet)
assert count # Check connection was not closed
assert count == len(connack_packet)

is_connected.wait()

# Check that all connections related properties can't be updated
with pytest.raises(RuntimeError):
mqttc.host = "localhost"

with pytest.raises(RuntimeError):
mqttc.connect_timeout = 7

with pytest.raises(RuntimeError):
mqttc.port = fake_broker.port

with pytest.raises(RuntimeError):
mqttc.keepalive = 7

with pytest.raises(RuntimeError):
mqttc.max_inflight_messages = 7

with pytest.raises(RuntimeError):
mqttc.max_queued_messages = 7

with pytest.raises(RuntimeError):
mqttc.transport = "tcp"

with pytest.raises(RuntimeError):
mqttc.username = "username"

with pytest.raises(RuntimeError):
mqttc.password = "password"

# close the connection, but from broker
fake_broker.finish()

is_disconnected.wait()
assert not mqttc.is_connected()

# still not allowed to update, because client try to reconnect in background
with pytest.raises(RuntimeError):
mqttc.host = "localhost"

mqttc.disconnect()

# Now it's allowed, connection is closing AND not trying to reconnect
mqttc.host = "localhost"

finally:
mqttc.loop_stop()


class Test_connect_v5:
"""
Expand Down

0 comments on commit b065003

Please sign in to comment.