Skip to content

Commit

Permalink
Supporting multiple MqttCallback in a single MqttAndroidClient
Browse files Browse the repository at this point in the history
Instead of using a single MqttCallback, we are using a MqttCallback list,
in order to allow to a single client to notify multiple consumers of the
MQTT client actions.
  • Loading branch information
anaktas authored and hannesa2 committed Nov 15, 2020
1 parent 768fa00 commit 3ef5741
Showing 1 changed file with 64 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -99,8 +100,8 @@ public class MqttAndroidClient extends BroadcastReceiver implements IMqttAsyncCl
private MqttClientPersistence persistence = null;
private MqttConnectOptions connectOptions;
private IMqttToken connectToken;
// The MqttCallback provided by the application
private MqttCallback callback;
// The MqttCallback list provided by the application
private ArrayList<MqttCallback> callbacksList = new ArrayList<>();
private MqttTraceHandler traceCallback;
private boolean traceEnabled = false;
private volatile boolean receiverRegistered = false;
Expand Down Expand Up @@ -1127,8 +1128,34 @@ public IMqttDeliveryToken[] getPendingDeliveryTokens() {
*/
@Override
public void setCallback(MqttCallback callback) {
this.callback = callback;
if (callbacksList == null) callbacksList = new ArrayList<>();
callbacksList.add(callback);
}

/**
* Adds a callback listener to use for events that happen asynchronously.
* <p>
* There are a number of events that the listener will be notified about.
* These include:
* </p>
* <ul>
* <li>A new message has arrived and is ready to be processed</li>
* <li>The connection to the server has been lost</li>
* <li>Delivery of a message to the server has completed</li>
* </ul>
* <p>
* Other events that track the progress of an individual operation such as
* connect and subscribe can be tracked using the {@link MqttToken} returned
* from each non-blocking method or using setting a
* {@link IMqttActionListener} on the non-blocking method.
* <p>
*
* @param callback which will be invoked for certain asynchronous events
* @see MqttCallback
*/
public void addCallback(MqttCallback callback) {
if (callbacksList == null) callbacksList = new ArrayList<>();
callbacksList.add(callback);
}

/**
Expand Down Expand Up @@ -1263,8 +1290,10 @@ private void disconnected(Bundle data) {
if (token != null) {
((MqttTokenAndroid) token).notifyComplete();
}
if (callback != null) {
callback.connectionLost(null);
if (callbacksList != null) {
for (MqttCallback callback : callbacksList) {
callback.connectionLost(null);
}
}
}

Expand All @@ -1274,21 +1303,27 @@ private void disconnected(Bundle data) {
* @param data
*/
private void connectionLostAction(Bundle data) {
if (callback != null) {
Exception reason = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
callback.connectionLost(reason);
Exception reason = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);

if (callbacksList != null) {
for (MqttCallback callback : callbacksList) {
callback.connectionLost(reason);
}
}
}

private void connectExtendedAction(Bundle data) {
// This is called differently from a normal connect
boolean reconnect = data.getBoolean(MqttServiceConstants.CALLBACK_RECONNECT, false);
String serverURI = data.getString(MqttServiceConstants.CALLBACK_SERVER_URI);

if (callback instanceof MqttCallbackExtended) {
boolean reconnect = data.getBoolean(MqttServiceConstants.CALLBACK_RECONNECT, false);
String serverURI = data.getString(MqttServiceConstants.CALLBACK_SERVER_URI);
((MqttCallbackExtended) callback).connectComplete(reconnect, serverURI);
if (callbacksList != null) {
for (MqttCallback callback : callbacksList) {
if (callback instanceof MqttCallbackExtended) {
((MqttCallbackExtended) callback).connectComplete(reconnect, serverURI);
}
}
}

}

/**
Expand Down Expand Up @@ -1349,11 +1384,13 @@ private void unSubscribeAction(Bundle data) {
*/
private void messageDeliveredAction(Bundle data) {
IMqttToken token = removeMqttToken(data);
Status status = (Status) data.getSerializable(MqttServiceConstants.CALLBACK_STATUS);
if (token != null) {
if (callback != null) {
Status status = (Status) data.getSerializable(MqttServiceConstants.CALLBACK_STATUS);
if (callbacksList != null) {
if (status == Status.OK && token instanceof IMqttDeliveryToken) {
callback.deliveryComplete((IMqttDeliveryToken) token);
for (MqttCallback callback : callbacksList) {
callback.deliveryComplete((IMqttDeliveryToken) token);
}
}
}
}
Expand All @@ -1365,21 +1402,24 @@ private void messageDeliveredAction(Bundle data) {
* @param data
*/
private void messageArrivedAction(Bundle data) {
if (callback != null) {
String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);
String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);

ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);

if (callbacksList != null) {
try {
if (messageAck == Ack.AUTO_ACK) {
callback.messageArrived(destinationName, message);
for (MqttCallback callback : callbacksList) {
callback.messageArrived(destinationName, message);
}
mqttService.acknowledgeMessageArrival(clientHandle, messageId);
} else {
message.messageId = messageId;
callback.messageArrived(destinationName, message);
for (MqttCallback callback : callbacksList) {
callback.messageArrived(destinationName, message);
}
}

// let the service discard the saved message details
} catch (Exception e) {
mqttService.traceError(MqttService.TAG, "messageArrivedAction failed: " + e);
}
Expand Down

0 comments on commit 3ef5741

Please sign in to comment.