From adcf1ea36bb2a27048ed57f0257bb7bf6a016dc1 Mon Sep 17 00:00:00 2001 From: Ewerton Scaboro da Silva Date: Tue, 15 Aug 2023 13:54:01 -0700 Subject: [PATCH] Fix DUP flag in re-sent telemetry over MQTT protocol (#2519) Unfortunately telemetry messages that are resent by the MQTT transport are not marked with the DUP flag as TRUE (as should by spec MQTT-3.3.1-2). Note that telemetry messages are always sent by the azure-iot-sdk-c MQTT transport with QOS1, so no control is necessary for the DUP flag related to QOS0. azure-iot-sdk-c's MQTT transport does not attempt ot resend TWIN messages or direct methods responses, so this fix does not apply to these messaging features. Related issue: https://github.com/azure/azure-iot-sdk-c/issues/2514 --- iothub_client/src/iothubtransport_mqtt_common.c | 15 +++++++++++---- .../iothubtransport_mqtt_common_ut.c | 3 +++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/iothub_client/src/iothubtransport_mqtt_common.c b/iothub_client/src/iothubtransport_mqtt_common.c index 6e6698e13b..886f59e431 100644 --- a/iothub_client/src/iothubtransport_mqtt_common.c +++ b/iothub_client/src/iothubtransport_mqtt_common.c @@ -48,6 +48,8 @@ #define SAS_TOKEN_DEFAULT_LEN 10 #define RESEND_TIMEOUT_VALUE_MIN 1*60 #define TELEMETRY_MSG_TIMEOUT_MIN 2*60 +#define MQTT_MESSAGE_DUP_FLAG_TRUE true +#define MQTT_MESSAGE_DUP_FLAG_FALSE false #define DEFAULT_CONNECTION_INTERVAL 30 #define FAILED_CONN_BACKOFF_VALUE 5 #define STATUS_CODE_FAILURE_VALUE 500 @@ -1025,7 +1027,7 @@ static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_me // // publishTelemetryMsg invokes the umqtt layer to send a PUBLISH message. // -static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len) +static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len, bool isDuplicate) { int result; STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent), transport_data->auto_url_encode_decode); @@ -1044,7 +1046,12 @@ static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_M } else { - if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0) + if (mqttmessage_setIsDuplicateMsg(mqttMsg, isDuplicate) != 0) + { + LogError("Failed setting DUP flag"); + result = MU_FAILURE; + } + else if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0) { LogError("Failed retrieving tickcounter info"); result = MU_FAILURE; @@ -2501,7 +2508,7 @@ static void ProcessPendingTelemetryMessages(PMQTTTRANSPORT_HANDLE_DATA transport } else { - if (publishTelemetryMsg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0) + if (publishTelemetryMsg(transport_data, msg_detail_entry, messagePayload, messageLength, MQTT_MESSAGE_DUP_FLAG_TRUE) != 0) { (void)DList_RemoveEntryList(current_entry); notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); @@ -3161,7 +3168,7 @@ static void ProcessPublishStateDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data) mqttMsgEntry->msgCreationTime = current_ms; mqttMsgEntry->iotHubMessageEntry = iothubMsgList; mqttMsgEntry->packet_id = getNextPacketId(transport_data); - if (publishTelemetryMsg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0) + if (publishTelemetryMsg(transport_data, mqttMsgEntry, messagePayload, messageLength, MQTT_MESSAGE_DUP_FLAG_FALSE) != 0) { (void)(DList_RemoveEntryList(currentListEntry)); notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); diff --git a/iothub_client/tests/iothubtransport_mqtt_common_ut/iothubtransport_mqtt_common_ut.c b/iothub_client/tests/iothubtransport_mqtt_common_ut/iothubtransport_mqtt_common_ut.c index 347fe07105..401907ca0c 100644 --- a/iothub_client/tests/iothubtransport_mqtt_common_ut/iothubtransport_mqtt_common_ut.c +++ b/iothub_client/tests/iothubtransport_mqtt_common_ut/iothubtransport_mqtt_common_ut.c @@ -840,6 +840,8 @@ TEST_SUITE_INITIALIZE(suite_init) REGISTER_GLOBAL_MOCK_RETURN(mqtt_client_publish, 0); REGISTER_GLOBAL_MOCK_FAIL_RETURN(mqtt_client_publish, MU_FAILURE); + REGISTER_GLOBAL_MOCK_RETURNS(mqttmessage_setIsDuplicateMsg, 0, MU_FAILURE); + REGISTER_GLOBAL_MOCK_RETURN(mqttmessage_create, TEST_MQTT_MESSAGE_HANDLE); REGISTER_GLOBAL_MOCK_FAIL_RETURN(mqttmessage_create, NULL); @@ -1470,6 +1472,7 @@ static void setup_IoTHubTransport_MQTT_Common_DoWork_events_mocks( { EXPECTED_CALL(STRING_c_str(IGNORED_PTR_ARG)).CallCannotFail(); EXPECTED_CALL(mqttmessage_create_in_place(IGNORED_NUM_ARG, IGNORED_PTR_ARG, DELIVER_AT_LEAST_ONCE, IGNORED_PTR_ARG, appMsgSize)); + STRICT_EXPECTED_CALL(mqttmessage_setIsDuplicateMsg(IGNORED_PTR_ARG, resend)); STRICT_EXPECTED_CALL(tickcounter_get_current_ms(IGNORED_PTR_ARG, IGNORED_PTR_ARG)); STRICT_EXPECTED_CALL(mqtt_client_publish(IGNORED_PTR_ARG, IGNORED_PTR_ARG)); STRICT_EXPECTED_CALL(mqttmessage_destroy(TEST_MQTT_MESSAGE_HANDLE));