Skip to content

Commit

Permalink
Add guard against crash in amqp messenger/mq (#2608)
Browse files Browse the repository at this point in the history
There are instances where a message will be destroyed at the iothubtransport amqp messeger layer,
but be still in flight at the messagesender (uamqp) layer, running at the risk of messagesender
invoking the send complete callback and resulting in a memory read error over an address previously
freed. Using the async operation feature of uamqp adds an extra layer of safety to cancel the
existing send operation, preventing the callback to be fired improperly.
  • Loading branch information
ewertons authored Apr 11, 2024
1 parent 15171e9 commit fce8040
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ int main(void)
ThreadAPI_Sleep((unsigned int)sleep_ms);
counter += sleep_ms;

g_continueRunning = !(messages_sent > MESSAGE_COUNT);
g_continueRunning &= !(messages_sent > MESSAGE_COUNT);
} while (g_continueRunning);

(void)printf("client_amqp_shared_sample has received a quit message, call DoWork %d more time to complete final sending...\r\n", DOWORK_LOOP_NUM);
Expand Down
26 changes: 20 additions & 6 deletions iothub_client/src/iothubtransport_amqp_messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "azure_uamqp_c/messaging.h"
#include "azure_uamqp_c/message_sender.h"
#include "azure_uamqp_c/message_receiver.h"
#include "azure_uamqp_c/async_operation.h"
#include "internal/message_queue.h"
#include "internal/iothub_client_retry_control.h"
#include "internal/iothubtransport_amqp_messenger.h"
Expand Down Expand Up @@ -86,9 +87,24 @@ typedef struct MESSAGE_SEND_CONTEXT_TAG
void* user_context;

PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback;

// Handle to the async operation instance returned
// by messagesender_send_async().
ASYNC_OPERATION_HANDLE async_operation;
} MESSAGE_SEND_CONTEXT;

static void destroy_message_send_context(MESSAGE_SEND_CONTEXT* context)
{
if (context->async_operation != NULL)
{
(void)async_operation_cancel(context->async_operation);
}

free(context);
}

#define mark_message_send_context_completed(context) \
context->async_operation = NULL

static MESSAGE_SEND_CONTEXT* create_message_send_context(void)
{
Expand Down Expand Up @@ -272,11 +288,6 @@ static AMQP_MESSENGER_CONFIG* clone_configuration(const AMQP_MESSENGER_CONFIG* c
return result;
}

static void destroy_message_send_context(MESSAGE_SEND_CONTEXT* context)
{
free(context);
}

static STRING_HANDLE create_link_address(const char* host_fqdn, const char* device_id, const char* module_id, const char* address_suffix)
{
STRING_HANDLE link_address;
Expand Down Expand Up @@ -850,6 +861,8 @@ static void on_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_re
MESSAGE_QUEUE_RESULT mq_result;
MESSAGE_SEND_CONTEXT* msg_ctx = (MESSAGE_SEND_CONTEXT*)context;

mark_message_send_context_completed(msg_ctx);

if (send_result == MESSAGE_SEND_OK)
{
mq_result = MESSAGE_QUEUE_SUCCESS;
Expand All @@ -874,8 +887,9 @@ static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_M
MESSAGE_SEND_CONTEXT* message_context = (MESSAGE_SEND_CONTEXT*)context;
message_context->mq_message_id = message_id;
message_context->on_process_message_completed_callback = on_process_message_completed_callback;
message_context->async_operation = messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0);

if (messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0) == NULL)
if (message_context->async_operation == NULL)
{
LogError("Failed sending AMQP message");
on_process_message_completed_callback(message_queue, message_id, MESSAGE_QUEUE_ERROR, NULL);
Expand Down
17 changes: 3 additions & 14 deletions iothub_client/src/message_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,7 @@ static void process_pending_messages(MESSAGE_QUEUE_HANDLE message_queue)
{
LogError("failed moving message out of pending list (%p)", mq_item->message);

if (mq_item->on_message_processing_completed_callback != NULL)
{
mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context);
}
fire_message_callback(mq_item, MESSAGE_QUEUE_ERROR, NULL);

// Not freeing since this would cause a memory A/V on the next call.

Expand All @@ -271,22 +268,14 @@ static void process_pending_messages(MESSAGE_QUEUE_HANDLE message_queue)
{
LogError("failed setting message processing_start_time (%p)", mq_item->message);

if (mq_item->on_message_processing_completed_callback != NULL)
{
mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context);
}

fire_message_callback(mq_item, MESSAGE_QUEUE_ERROR, NULL);
free(mq_item);
}
else if (singlylinkedlist_add(message_queue->in_progress, (const void*)mq_item) == NULL)
{
LogError("failed moving message to in-progress list (%p)", mq_item->message);

if (mq_item->on_message_processing_completed_callback != NULL)
{
mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context);
}

fire_message_callback(mq_item, MESSAGE_QUEUE_ERROR, NULL);
free(mq_item);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,14 @@ static void real_free(void* ptr)
#define please_mock_messaging_delivery_accepted MOCK_ENABLED
#define please_mock_messaging_delivery_rejected MOCK_ENABLED
#define please_mock_messaging_delivery_released MOCK_ENABLED
#define please_mock_async_operation_cancel MOCK_ENABLED

#include "azure_uamqp_c/session.h"
#include "azure_uamqp_c/link.h"
#include "azure_uamqp_c/messaging.h"
#include "azure_uamqp_c/message_sender.h"
#include "azure_uamqp_c/message_receiver.h"
#include "azure_uamqp_c/async_operation.h"
#include "internal/message_queue.h"

#undef ENABLE_MOCK_FILTERING_SWITCH
Expand Down

0 comments on commit fce8040

Please sign in to comment.