diff --git a/org.eclipse.paho.android.service/src/main/java/org/eclipse/paho/android/service/MqttAndroidClient.java b/org.eclipse.paho.android.service/src/main/java/org/eclipse/paho/android/service/MqttAndroidClient.java index 2d8698ae..84f0b8f3 100755 --- a/org.eclipse.paho.android.service/src/main/java/org/eclipse/paho/android/service/MqttAndroidClient.java +++ b/org.eclipse.paho.android.service/src/main/java/org/eclipse/paho/android/service/MqttAndroidClient.java @@ -51,6 +51,7 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; +import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -99,8 +100,8 @@ public class MqttAndroidClient extends BroadcastReceiver implements IMqttAsyncCl private MqttClientPersistence persistence = null; private MqttConnectOptions connectOptions; private IMqttToken connectToken; - // The MqttCallback provided by the application - private MqttCallback callback; + // The MqttCallback list provided by the application + private ArrayList callbacksList = new ArrayList<>(); private MqttTraceHandler traceCallback; private boolean traceEnabled = false; private volatile boolean receiverRegistered = false; @@ -1127,8 +1128,34 @@ public IMqttDeliveryToken[] getPendingDeliveryTokens() { */ @Override public void setCallback(MqttCallback callback) { - this.callback = callback; + if (callbacksList == null) callbacksList = new ArrayList<>(); + callbacksList.add(callback); + } + /** + * Adds a callback listener to use for events that happen asynchronously. + *

+ * There are a number of events that the listener will be notified about. + * These include: + *

+ * + *

+ * Other events that track the progress of an individual operation such as + * connect and subscribe can be tracked using the {@link MqttToken} returned + * from each non-blocking method or using setting a + * {@link IMqttActionListener} on the non-blocking method. + *

+ * + * @param callback which will be invoked for certain asynchronous events + * @see MqttCallback + */ + public void addCallback(MqttCallback callback) { + if (callbacksList == null) callbacksList = new ArrayList<>(); + callbacksList.add(callback); } /** @@ -1263,8 +1290,10 @@ private void disconnected(Bundle data) { if (token != null) { ((MqttTokenAndroid) token).notifyComplete(); } - if (callback != null) { - callback.connectionLost(null); + if (callbacksList != null) { + for (MqttCallback callback : callbacksList) { + callback.connectionLost(null); + } } } @@ -1274,21 +1303,27 @@ private void disconnected(Bundle data) { * @param data */ private void connectionLostAction(Bundle data) { - if (callback != null) { - Exception reason = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION); - callback.connectionLost(reason); + Exception reason = (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION); + + if (callbacksList != null) { + for (MqttCallback callback : callbacksList) { + callback.connectionLost(reason); + } } } private void connectExtendedAction(Bundle data) { // This is called differently from a normal connect + boolean reconnect = data.getBoolean(MqttServiceConstants.CALLBACK_RECONNECT, false); + String serverURI = data.getString(MqttServiceConstants.CALLBACK_SERVER_URI); - if (callback instanceof MqttCallbackExtended) { - boolean reconnect = data.getBoolean(MqttServiceConstants.CALLBACK_RECONNECT, false); - String serverURI = data.getString(MqttServiceConstants.CALLBACK_SERVER_URI); - ((MqttCallbackExtended) callback).connectComplete(reconnect, serverURI); + if (callbacksList != null) { + for (MqttCallback callback : callbacksList) { + if (callback instanceof MqttCallbackExtended) { + ((MqttCallbackExtended) callback).connectComplete(reconnect, serverURI); + } + } } - } /** @@ -1349,11 +1384,13 @@ private void unSubscribeAction(Bundle data) { */ private void messageDeliveredAction(Bundle data) { IMqttToken token = removeMqttToken(data); + Status status = (Status) data.getSerializable(MqttServiceConstants.CALLBACK_STATUS); if (token != null) { - if (callback != null) { - Status status = (Status) data.getSerializable(MqttServiceConstants.CALLBACK_STATUS); + if (callbacksList != null) { if (status == Status.OK && token instanceof IMqttDeliveryToken) { - callback.deliveryComplete((IMqttDeliveryToken) token); + for (MqttCallback callback : callbacksList) { + callback.deliveryComplete((IMqttDeliveryToken) token); + } } } } @@ -1365,21 +1402,24 @@ private void messageDeliveredAction(Bundle data) { * @param data */ private void messageArrivedAction(Bundle data) { - if (callback != null) { - String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID); - String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME); + String messageId = data.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID); + String destinationName = data.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME); - ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL); + ParcelableMqttMessage message = data.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL); + + if (callbacksList != null) { try { if (messageAck == Ack.AUTO_ACK) { - callback.messageArrived(destinationName, message); + for (MqttCallback callback : callbacksList) { + callback.messageArrived(destinationName, message); + } mqttService.acknowledgeMessageArrival(clientHandle, messageId); } else { message.messageId = messageId; - callback.messageArrived(destinationName, message); + for (MqttCallback callback : callbacksList) { + callback.messageArrived(destinationName, message); + } } - - // let the service discard the saved message details } catch (Exception e) { mqttService.traceError(MqttService.TAG, "messageArrivedAction failed: " + e); }