Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

application controlled acknowledgements to match Java behaviour #753

Merged
merged 8 commits into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ transport
set to "websockets" to send MQTT over WebSockets. Leave at the default of
"tcp" to use raw TCP.

manual_ack
defaults to False, allowing the library to acknowledge messages automatically after on_message callback return
passing them to on_message callback. When set to True, every message
must be manually acknowledged by application call to
client.ack( *message.mid* , *message.qos* )


Constructor Example
...................
Expand Down
47 changes: 44 additions & 3 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,8 @@ def on_connect(client, userdata, flags, rc):
"""

def __init__(self, client_id="", clean_session=None, userdata=None,
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True,
manual_ack=False ):
"""client_id is the unique client id string used when connecting to the
broker. If client_id is zero length or None, then the behaviour is
defined by which protocol version is in use. If using MQTT v3.1.1, then
Expand Down Expand Up @@ -523,11 +524,20 @@ def __init__(self, client_id="", clean_session=None, userdata=None,

Set transport to "websockets" to use WebSockets as the transport
mechanism. Set to "tcp" to use raw TCP, which is the default.

Normally, when a message is received, the library automatically
acknowledges after on_message callback returns. manual_ack=True allows the application to
acknowledge receipt after it has completed processing of a message
using a the ack() method. This addresses vulnerabilty to message loss
if applications fails while processing a message, or while it pending
locally.

"""

if transport.lower() not in ('websockets', 'tcp'):
raise ValueError(
'transport must be "websockets" or "tcp", not %s' % transport)
self._manual_ack = manual_ack
self._transport = transport.lower()
self._protocol = protocol
self._userdata = userdata
Expand Down Expand Up @@ -3328,16 +3338,44 @@ def _handle_publish(self):
return MQTT_ERR_SUCCESS
elif message.qos == 1:
self._handle_on_message(message)
return self._send_puback(message.mid)
if self._manual_ack:
return MQTT_ERR_SUCCESS
else:
return self._send_puback(message.mid)
elif message.qos == 2:

rc = self._send_pubrec(message.mid)

message.state = mqtt_ms_wait_for_pubrel
with self._in_message_mutex:
self._in_messages[message.mid] = message

return rc
else:
return MQTT_ERR_PROTOCOL

def ack(self, mid: int, qos: int) -> int:
"""
send an acknowledgement for a given message id. (stored in message.mid )
only useful in QoS=1 and auto_ack=False
"""
if self._manual_ack :
if qos == 1:
return self._send_puback(mid)
elif qos == 2:
return self._send_pubcomp(mid)

return MQTT_ERR_SUCCESS

def manual_ack_set(self, on):
"""
The paho library normally acknowledges messages as soon as they are delivered to the caller.
If manual_ack is turned on, then the caller MUST manually acknowledge every message once
application processing is complete.
"""
self._manual_ack = on


def _handle_pubrel(self):
if self._protocol == MQTTv5:
if self._in_packet['remaining_length'] < 2:
Expand Down Expand Up @@ -3367,7 +3405,10 @@ def _handle_pubrel(self):
# is possible that we must known about this message.
# Choose to acknwoledge this messsage (and thus losing a message) but
# avoid hanging. See #284.
return self._send_pubcomp(mid)
if self._manual_ack:
return MQTT_ERR_SUCCESS
else:
return self._send_pubcomp(mid)

def _update_inflight(self):
# Dont lock message_mutex here
Expand Down
Loading