diff --git a/iothub_client/samples/iothub_ll_client_shared_sample/iothub_ll_client_shared_sample.c b/iothub_client/samples/iothub_ll_client_shared_sample/iothub_ll_client_shared_sample.c index 1380d0bc40..5fbafdcc2e 100644 --- a/iothub_client/samples/iothub_ll_client_shared_sample/iothub_ll_client_shared_sample.c +++ b/iothub_client/samples/iothub_ll_client_shared_sample/iothub_ll_client_shared_sample.c @@ -303,7 +303,7 @@ int main(void) ThreadAPI_Sleep((unsigned int)sleep_ms); counter += sleep_ms; - g_continueRunning = !(messages_sent > MESSAGE_COUNT); + g_continueRunning &= !(messages_sent > MESSAGE_COUNT); } while (g_continueRunning); (void)printf("client_amqp_shared_sample has received a quit message, call DoWork %d more time to complete final sending...\r\n", DOWORK_LOOP_NUM); diff --git a/iothub_client/src/iothubtransport_amqp_messenger.c b/iothub_client/src/iothubtransport_amqp_messenger.c index 369ef331da..3d48d5c43d 100644 --- a/iothub_client/src/iothubtransport_amqp_messenger.c +++ b/iothub_client/src/iothubtransport_amqp_messenger.c @@ -14,6 +14,7 @@ #include "azure_uamqp_c/messaging.h" #include "azure_uamqp_c/message_sender.h" #include "azure_uamqp_c/message_receiver.h" +#include "azure_uamqp_c/async_operation.h" #include "internal/message_queue.h" #include "internal/iothub_client_retry_control.h" #include "internal/iothubtransport_amqp_messenger.h" @@ -86,9 +87,24 @@ typedef struct MESSAGE_SEND_CONTEXT_TAG void* user_context; PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback; + + // Handle to the async operation instance returned + // by messagesender_send_async(). + ASYNC_OPERATION_HANDLE async_operation; } MESSAGE_SEND_CONTEXT; +static void destroy_message_send_context(MESSAGE_SEND_CONTEXT* context) +{ + if (context->async_operation != NULL) + { + (void)async_operation_cancel(context->async_operation); + } + + free(context); +} +#define mark_message_send_context_completed(context) \ + context->async_operation = NULL static MESSAGE_SEND_CONTEXT* create_message_send_context(void) { @@ -272,11 +288,6 @@ static AMQP_MESSENGER_CONFIG* clone_configuration(const AMQP_MESSENGER_CONFIG* c return result; } -static void destroy_message_send_context(MESSAGE_SEND_CONTEXT* context) -{ - free(context); -} - static STRING_HANDLE create_link_address(const char* host_fqdn, const char* device_id, const char* module_id, const char* address_suffix) { STRING_HANDLE link_address; @@ -850,6 +861,8 @@ static void on_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_re MESSAGE_QUEUE_RESULT mq_result; MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)context; + mark_message_send_context_completed(msg_ctx); + if (send_result == MESSAGE_SEND_OK) { mq_result = MESSAGE_QUEUE_SUCCESS; @@ -874,8 +887,9 @@ static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_M MESSAGE_SEND_CONTEXT* message_context = (MESSAGE_SEND_CONTEXT*)context; message_context->mq_message_id = message_id; message_context->on_process_message_completed_callback = on_process_message_completed_callback; + message_context->async_operation = messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0); - if (messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0) == NULL) + if (message_context->async_operation == NULL) { LogError("Failed sending AMQP message"); on_process_message_completed_callback(message_queue, message_id, MESSAGE_QUEUE_ERROR, NULL); diff --git a/iothub_client/src/message_queue.c b/iothub_client/src/message_queue.c index 3ebb4ad69f..ce824b0ffb 100644 --- a/iothub_client/src/message_queue.c +++ b/iothub_client/src/message_queue.c @@ -258,10 +258,7 @@ static void process_pending_messages(MESSAGE_QUEUE_HANDLE message_queue) { LogError("failed moving message out of pending list (%p)", mq_item->message); - if (mq_item->on_message_processing_completed_callback != NULL) - { - mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context); - } + fire_message_callback(mq_item, MESSAGE_QUEUE_ERROR, NULL); // Not freeing since this would cause a memory A/V on the next call. @@ -271,22 +268,14 @@ static void process_pending_messages(MESSAGE_QUEUE_HANDLE message_queue) { LogError("failed setting message processing_start_time (%p)", mq_item->message); - if (mq_item->on_message_processing_completed_callback != NULL) - { - mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context); - } - + fire_message_callback(mq_item, MESSAGE_QUEUE_ERROR, NULL); free(mq_item); } else if (singlylinkedlist_add(message_queue->in_progress, (const void*)mq_item) == NULL) { LogError("failed moving message to in-progress list (%p)", mq_item->message); - if (mq_item->on_message_processing_completed_callback != NULL) - { - mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context); - } - + fire_message_callback(mq_item, MESSAGE_QUEUE_ERROR, NULL); free(mq_item); } else diff --git a/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c b/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c index 67c5c19f4e..427553c35c 100644 --- a/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c +++ b/iothub_client/tests/iothubtr_amqp_msgr_ut/iothubtr_amqp_msgr_ut.c @@ -89,12 +89,14 @@ static void real_free(void* ptr) #define please_mock_messaging_delivery_accepted MOCK_ENABLED #define please_mock_messaging_delivery_rejected MOCK_ENABLED #define please_mock_messaging_delivery_released MOCK_ENABLED +#define please_mock_async_operation_cancel MOCK_ENABLED #include "azure_uamqp_c/session.h" #include "azure_uamqp_c/link.h" #include "azure_uamqp_c/messaging.h" #include "azure_uamqp_c/message_sender.h" #include "azure_uamqp_c/message_receiver.h" +#include "azure_uamqp_c/async_operation.h" #include "internal/message_queue.h" #undef ENABLE_MOCK_FILTERING_SWITCH