Skip to content

Commit

Permalink
Fix DUP flag in re-sent telemetry over MQTT protocol (#2519)
Browse files Browse the repository at this point in the history
Unfortunately telemetry messages that are resent by the MQTT transport
are not marked with the DUP flag as TRUE (as should by spec MQTT-3.3.1-2).
Note that telemetry messages are always sent by the azure-iot-sdk-c MQTT
transport with QOS1, so no control is necessary for the DUP flag related to QOS0.
azure-iot-sdk-c's MQTT transport does not attempt ot resend TWIN messages
or direct methods responses, so this fix does not apply to these
messaging features.
Related issue: #2514
  • Loading branch information
ewertons committed Aug 15, 2023
1 parent 230b8f4 commit adcf1ea
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
15 changes: 11 additions & 4 deletions iothub_client/src/iothubtransport_mqtt_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
#define SAS_TOKEN_DEFAULT_LEN 10
#define RESEND_TIMEOUT_VALUE_MIN 1*60
#define TELEMETRY_MSG_TIMEOUT_MIN 2*60
#define MQTT_MESSAGE_DUP_FLAG_TRUE true
#define MQTT_MESSAGE_DUP_FLAG_FALSE false
#define DEFAULT_CONNECTION_INTERVAL 30
#define FAILED_CONN_BACKOFF_VALUE 5
#define STATUS_CODE_FAILURE_VALUE 500
Expand Down Expand Up @@ -1025,7 +1027,7 @@ static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_me
//
// publishTelemetryMsg invokes the umqtt layer to send a PUBLISH message.
//
static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len, bool isDuplicate)
{
int result;
STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent), transport_data->auto_url_encode_decode);
Expand All @@ -1044,7 +1046,12 @@ static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_M
}
else
{
if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
if (mqttmessage_setIsDuplicateMsg(mqttMsg, isDuplicate) != 0)
{
LogError("Failed setting DUP flag");
result = MU_FAILURE;
}
else if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
{
LogError("Failed retrieving tickcounter info");
result = MU_FAILURE;
Expand Down Expand Up @@ -2501,7 +2508,7 @@ static void ProcessPendingTelemetryMessages(PMQTTTRANSPORT_HANDLE_DATA transport
}
else
{
if (publishTelemetryMsg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0)
if (publishTelemetryMsg(transport_data, msg_detail_entry, messagePayload, messageLength, MQTT_MESSAGE_DUP_FLAG_TRUE) != 0)
{
(void)DList_RemoveEntryList(current_entry);
notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
Expand Down Expand Up @@ -3161,7 +3168,7 @@ static void ProcessPublishStateDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data)
mqttMsgEntry->msgCreationTime = current_ms;
mqttMsgEntry->iotHubMessageEntry = iothubMsgList;
mqttMsgEntry->packet_id = getNextPacketId(transport_data);
if (publishTelemetryMsg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0)
if (publishTelemetryMsg(transport_data, mqttMsgEntry, messagePayload, messageLength, MQTT_MESSAGE_DUP_FLAG_FALSE) != 0)
{
(void)(DList_RemoveEntryList(currentListEntry));
notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,8 @@ TEST_SUITE_INITIALIZE(suite_init)
REGISTER_GLOBAL_MOCK_RETURN(mqtt_client_publish, 0);
REGISTER_GLOBAL_MOCK_FAIL_RETURN(mqtt_client_publish, MU_FAILURE);

REGISTER_GLOBAL_MOCK_RETURNS(mqttmessage_setIsDuplicateMsg, 0, MU_FAILURE);

REGISTER_GLOBAL_MOCK_RETURN(mqttmessage_create, TEST_MQTT_MESSAGE_HANDLE);
REGISTER_GLOBAL_MOCK_FAIL_RETURN(mqttmessage_create, NULL);

Expand Down Expand Up @@ -1470,6 +1472,7 @@ static void setup_IoTHubTransport_MQTT_Common_DoWork_events_mocks(
{
EXPECTED_CALL(STRING_c_str(IGNORED_PTR_ARG)).CallCannotFail();
EXPECTED_CALL(mqttmessage_create_in_place(IGNORED_NUM_ARG, IGNORED_PTR_ARG, DELIVER_AT_LEAST_ONCE, IGNORED_PTR_ARG, appMsgSize));
STRICT_EXPECTED_CALL(mqttmessage_setIsDuplicateMsg(IGNORED_PTR_ARG, resend));
STRICT_EXPECTED_CALL(tickcounter_get_current_ms(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(mqtt_client_publish(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(mqttmessage_destroy(TEST_MQTT_MESSAGE_HANDLE));
Expand Down

0 comments on commit adcf1ea

Please sign in to comment.