Skip to content

Commit

Permalink
Merge pull request #753 from petersilva/manual_acks_1_6_1
Browse files Browse the repository at this point in the history
application controlled acknowledgements to match Java behaviour
  • Loading branch information
PierreF authored Dec 23, 2023
2 parents 912322a + 733d7f3 commit d9ade2e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ transport
set to "websockets" to send MQTT over WebSockets. Leave at the default of
"tcp" to use raw TCP.

manual_ack
defaults to False, allowing the library to acknowledge messages automatically after on_message callback return
passing them to on_message callback. When set to True, every message
must be manually acknowledged by application call to
client.ack( *message.mid* , *message.qos* )


Constructor Example
...................
Expand Down
47 changes: 44 additions & 3 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ def on_connect(client, userdata, flags, rc):
"""

def __init__(self, client_id="", clean_session=None, userdata=None,
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True,
manual_ack=False ):
"""client_id is the unique client id string used when connecting to the
broker. If client_id is zero length or None, then the behaviour is
defined by which protocol version is in use. If using MQTT v3.1.1, then
Expand Down Expand Up @@ -511,11 +512,20 @@ def __init__(self, client_id="", clean_session=None, userdata=None,
Set transport to "websockets" to use WebSockets as the transport
mechanism. Set to "tcp" to use raw TCP, which is the default.
Normally, when a message is received, the library automatically
acknowledges after on_message callback returns. manual_ack=True allows the application to
acknowledge receipt after it has completed processing of a message
using a the ack() method. This addresses vulnerabilty to message loss
if applications fails while processing a message, or while it pending
locally.
"""

if transport.lower() not in ('websockets', 'tcp'):
raise ValueError(
'transport must be "websockets" or "tcp", not %s' % transport)
self._manual_ack = manual_ack
self._transport = transport.lower()
self._protocol = protocol
self._userdata = userdata
Expand Down Expand Up @@ -3351,16 +3361,44 @@ def _handle_publish(self):
return MQTT_ERR_SUCCESS
elif message.qos == 1:
self._handle_on_message(message)
return self._send_puback(message.mid)
if self._manual_ack:
return MQTT_ERR_SUCCESS
else:
return self._send_puback(message.mid)
elif message.qos == 2:

rc = self._send_pubrec(message.mid)

message.state = mqtt_ms_wait_for_pubrel
with self._in_message_mutex:
self._in_messages[message.mid] = message

return rc
else:
return MQTT_ERR_PROTOCOL

def ack(self, mid: int, qos: int) -> int:
"""
send an acknowledgement for a given message id. (stored in message.mid )
only useful in QoS=1 and auto_ack=False
"""
if self._manual_ack :
if qos == 1:
return self._send_puback(mid)
elif qos == 2:
return self._send_pubcomp(mid)

return MQTT_ERR_SUCCESS

def manual_ack_set(self, on):
"""
The paho library normally acknowledges messages as soon as they are delivered to the caller.
If manual_ack is turned on, then the caller MUST manually acknowledge every message once
application processing is complete.
"""
self._manual_ack = on


def _handle_pubrel(self):
if self._protocol == MQTTv5:
if self._in_packet['remaining_length'] < 2:
Expand Down Expand Up @@ -3390,7 +3428,10 @@ def _handle_pubrel(self):
# is possible that we must known about this message.
# Choose to acknwoledge this messsage (and thus losing a message) but
# avoid hanging. See #284.
return self._send_pubcomp(mid)
if self._manual_ack:
return MQTT_ERR_SUCCESS
else:
return self._send_pubcomp(mid)

def _update_inflight(self):
# Dont lock message_mutex here
Expand Down

0 comments on commit d9ade2e

Please sign in to comment.