diff --git a/README.rst b/README.rst index add1eb52..b5f51424 100644 --- a/README.rst +++ b/README.rst @@ -197,6 +197,12 @@ transport set to "websockets" to send MQTT over WebSockets. Leave at the default of "tcp" to use raw TCP. +manual_ack + defaults to False, allowing the library to acknowledge messages automatically after on_message callback return + passing them to on_message callback. When set to True, every message + must be manually acknowledged by application call to + client.ack( *message.mid* , *message.qos* ) + Constructor Example ................... diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index cb1489d0..c2b01f3a 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -479,7 +479,8 @@ def on_connect(client, userdata, flags, rc): """ def __init__(self, client_id="", clean_session=None, userdata=None, - protocol=MQTTv311, transport="tcp", reconnect_on_failure=True): + protocol=MQTTv311, transport="tcp", reconnect_on_failure=True, + manual_ack=False ): """client_id is the unique client id string used when connecting to the broker. If client_id is zero length or None, then the behaviour is defined by which protocol version is in use. If using MQTT v3.1.1, then @@ -511,11 +512,20 @@ def __init__(self, client_id="", clean_session=None, userdata=None, Set transport to "websockets" to use WebSockets as the transport mechanism. Set to "tcp" to use raw TCP, which is the default. + + Normally, when a message is received, the library automatically + acknowledges after on_message callback returns. manual_ack=True allows the application to + acknowledge receipt after it has completed processing of a message + using a the ack() method. This addresses vulnerabilty to message loss + if applications fails while processing a message, or while it pending + locally. + """ if transport.lower() not in ('websockets', 'tcp'): raise ValueError( 'transport must be "websockets" or "tcp", not %s' % transport) + self._manual_ack = manual_ack self._transport = transport.lower() self._protocol = protocol self._userdata = userdata @@ -3351,16 +3361,44 @@ def _handle_publish(self): return MQTT_ERR_SUCCESS elif message.qos == 1: self._handle_on_message(message) - return self._send_puback(message.mid) + if self._manual_ack: + return MQTT_ERR_SUCCESS + else: + return self._send_puback(message.mid) elif message.qos == 2: + rc = self._send_pubrec(message.mid) + message.state = mqtt_ms_wait_for_pubrel with self._in_message_mutex: self._in_messages[message.mid] = message + return rc else: return MQTT_ERR_PROTOCOL + def ack(self, mid: int, qos: int) -> int: + """ + send an acknowledgement for a given message id. (stored in message.mid ) + only useful in QoS=1 and auto_ack=False + """ + if self._manual_ack : + if qos == 1: + return self._send_puback(mid) + elif qos == 2: + return self._send_pubcomp(mid) + + return MQTT_ERR_SUCCESS + + def manual_ack_set(self, on): + """ + The paho library normally acknowledges messages as soon as they are delivered to the caller. + If manual_ack is turned on, then the caller MUST manually acknowledge every message once + application processing is complete. + """ + self._manual_ack = on + + def _handle_pubrel(self): if self._protocol == MQTTv5: if self._in_packet['remaining_length'] < 2: @@ -3390,7 +3428,10 @@ def _handle_pubrel(self): # is possible that we must known about this message. # Choose to acknwoledge this messsage (and thus losing a message) but # avoid hanging. See #284. - return self._send_pubcomp(mid) + if self._manual_ack: + return MQTT_ERR_SUCCESS + else: + return self._send_pubcomp(mid) def _update_inflight(self): # Dont lock message_mutex here