Skip to content

Commit

Permalink
Change message_queue indexing from pointer address to uint32_t (#2497)
Browse files Browse the repository at this point in the history
This change fixes a memory corruption caused by message queue when the
following scenario occurs:
MESSAGE1 is enqueued (address #1)
MESSAGE1 is put in progress by mq
MESSAGE1 is destroyed by the app
MESSAGE2 is enqueued (re-using address #1, which was made available
by the previous step)
MESSAGE2 is put in progress by the mq
MESSAGE2 processing is marked as completed by the app
> Here the message queue searches the list of in-progress messages by
the message address in memory. Since MESSAGE2 now has the same address
of MESSAGE1 (which was freed), message_queue picks the entry for
MESSAGE1 for removing from the in-progress list, leaving MESSAGE2
behind.
  • Loading branch information
ewertons committed Jun 19, 2023
1 parent b2b8ff1 commit 62321f2
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 24 deletions.
4 changes: 2 additions & 2 deletions iothub_client/inc/internal/message_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ typedef void(*MESSAGE_PROCESSING_COMPLETED_CALLBACK)(MQ_MESSAGE_HANDLE message,
* @brief Callback that MUST be invoked by PROCESS_MESSAGE_CALLBACK (user provided) to signal to MESSAGE_QUEUE that a message has been processed.
* @remarks Besides causing MESSAGE_QUEUE to dequeue the message from its internal lists, causes MESSAGE_PROCESSING_COMPLETED_CALLBACK to be triggered.
*/
typedef void(*PROCESS_MESSAGE_COMPLETED_CALLBACK)(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason);
typedef void(*PROCESS_MESSAGE_COMPLETED_CALLBACK)(MESSAGE_QUEUE_HANDLE message_queue, uint32_t message_id, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason);

/**
* @brief User-provided callback invoked by MESSAGE_QUEUE when a messages is ready to be processed, getting internally moved from "pending" to "in-progress".
*/
typedef void(*PROCESS_MESSAGE_CALLBACK)(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* user_context);
typedef void(*PROCESS_MESSAGE_CALLBACK)(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, uint32_t message_id, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* user_context);

typedef struct MESSAGE_QUEUE_CONFIG_TAG
{
Expand Down
8 changes: 5 additions & 3 deletions iothub_client/src/iothubtransport_amqp_messenger.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ typedef struct AMQP_MESSENGER_INSTANCE_TAG

typedef struct MESSAGE_SEND_CONTEXT_TAG
{
uint32_t mq_message_id;
MESSAGE_HANDLE message;
bool is_destroyed;

Expand Down Expand Up @@ -858,11 +859,11 @@ static void on_send_complete_callback(void* context, MESSAGE_SEND_RESULT send_re
mq_result = MESSAGE_QUEUE_ERROR;
}

msg_ctx->on_process_message_completed_callback(msg_ctx->messenger->send_queue, (MQ_MESSAGE_HANDLE)msg_ctx->message, mq_result, NULL);
msg_ctx->on_process_message_completed_callback(msg_ctx->messenger->send_queue, msg_ctx->mq_message_id, mq_result, NULL);
}
}

static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* context)
static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, uint32_t message_id, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* context)
{
if (message_queue == NULL || message == NULL || on_process_message_completed_callback == NULL || context == NULL)
{
Expand All @@ -871,12 +872,13 @@ static void on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_M
else
{
MESSAGE_SEND_CONTEXT* message_context = (MESSAGE_SEND_CONTEXT*)context;
message_context->mq_message_id = message_id;
message_context->on_process_message_completed_callback = on_process_message_completed_callback;

if (messagesender_send_async(message_context->messenger->message_sender, (MESSAGE_HANDLE)message, on_send_complete_callback, context, 0) == NULL)
{
LogError("Failed sending AMQP message");
on_process_message_completed_callback(message_queue, message, MESSAGE_QUEUE_ERROR, NULL);
on_process_message_completed_callback(message_queue, message_id, MESSAGE_QUEUE_ERROR, NULL);
}

message_destroy((MESSAGE_HANDLE)message);
Expand Down
22 changes: 13 additions & 9 deletions iothub_client/src/message_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ static const char* SAVED_OPTION_MAX_PROCESSING_TIME_SECS = "SAVED_OPTION_MAX_PRO

struct MESSAGE_QUEUE_TAG
{
uint32_t id_counter;
size_t max_message_enqueued_time_secs;
size_t max_message_processing_time_secs;
size_t max_retry_count;
Expand All @@ -37,6 +38,7 @@ struct MESSAGE_QUEUE_TAG

typedef struct MESSAGE_QUEUE_ITEM_TAG
{
uint32_t id;
MQ_MESSAGE_HANDLE message;
MESSAGE_PROCESSING_COMPLETED_CALLBACK on_message_processing_completed_callback;
void* user_context;
Expand All @@ -49,11 +51,11 @@ typedef struct MESSAGE_QUEUE_ITEM_TAG

// ---------- Helper Functions ---------- //

static bool find_item_by_message_ptr(LIST_ITEM_HANDLE list_item, const void* match_context)
static bool find_item_by_message_id(LIST_ITEM_HANDLE list_item, const void* match_context)
{
bool result;
MESSAGE_QUEUE_ITEM* current_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item);
MQ_MESSAGE_HANDLE* target_item = (MQ_MESSAGE_HANDLE*)match_context;
uint32_t message_id = *(uint32_t*)match_context;

if (current_item == NULL)
{
Expand All @@ -62,7 +64,7 @@ static bool find_item_by_message_ptr(LIST_ITEM_HANDLE list_item, const void* mat
}
else
{
result = (current_item->message == target_item);
result = (current_item->id == message_id);
}
return result;
}
Expand Down Expand Up @@ -128,19 +130,19 @@ static void dequeue_message_and_fire_callback(SINGLYLINKEDLIST_HANDLE list, LIST
free(mq_item);
}

static void on_process_message_completed_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason)
static void on_process_message_completed_callback(MESSAGE_QUEUE_HANDLE message_queue, uint32_t message_id, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason)
{
if (message == NULL || message_queue == NULL)
if (message_queue == NULL)
{
LogError("on_process_message_completed_callback invoked with NULL arguments (message=%p, message_queue=%p)", message, message_queue);
LogError("on_process_message_completed_callback invoked with NULL arguments (message_id=%u, message_queue=%p)", message_id, message_queue);
}
else
{
LIST_ITEM_HANDLE list_item;

if ((list_item = singlylinkedlist_find(message_queue->in_progress, find_item_by_message_ptr, message)) == NULL)
if ((list_item = singlylinkedlist_find(message_queue->in_progress, find_item_by_message_id, &message_id)) == NULL)
{
LogError("on_process_message_completed_callback invoked for a message not in the in-progress list (%p)", message);
LogError("on_process_message_completed_callback invoked for a message not in the in-progress list (%u)", message_id);
}
else
{
Expand Down Expand Up @@ -291,7 +293,7 @@ static void process_pending_messages(MESSAGE_QUEUE_HANDLE message_queue)
{
mq_item->number_of_attempts++;

message_queue->on_process_message_callback(message_queue, mq_item->message, on_process_message_completed_callback, mq_item->user_context);
message_queue->on_process_message_callback(message_queue, mq_item->message, mq_item->id, on_process_message_completed_callback, mq_item->user_context);
}
}
}
Expand Down Expand Up @@ -479,6 +481,7 @@ MESSAGE_QUEUE_HANDLE message_queue_create(MESSAGE_QUEUE_CONFIG* config)
}
else
{
// Here it already sets the id_counter to zero.
memset(result, 0, sizeof(MESSAGE_QUEUE));

if ((result->pending = singlylinkedlist_create()) == NULL)
Expand Down Expand Up @@ -542,6 +545,7 @@ int message_queue_add(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE mess
}
else
{
mq_item->id = message_queue->id_counter++;
mq_item->message = message;
mq_item->on_message_processing_completed_callback = on_message_processing_completed_callback;
mq_item->user_context = user_context;
Expand Down
24 changes: 14 additions & 10 deletions iothub_client/tests/message_queue_ut/message_queue_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static void on_umock_c_error(UMOCK_C_ERROR_CODE error_code)
#define TEST_PROCESS_COMPLETE_CONTEXT (void*)0x7773
#define TEST_USER_CONTEXT (void*)0x7773
#define USE_DEFAULT_CONFIG NULL
#define TEST_SOME_OTHER_MESSAGE (MQ_MESSAGE_HANDLE)0x7777
#define TEST_SOME_OTHER_MESSAGE_ID 17777
#define TEST_MQ_MESSAGE_HANDLE_2 (MQ_MESSAGE_HANDLE)0x7778
#define TEST_LIST_ITEM_HANDLE (LIST_ITEM_HANDLE)0x7779
#define TEST_LIST_ITEM_VALUE (void*)0x7780
Expand Down Expand Up @@ -167,12 +167,14 @@ static time_t add_seconds(time_t base_time, int seconds)

static MESSAGE_QUEUE_HANDLE TEST_on_process_message_callback_message_queue;
static MQ_MESSAGE_HANDLE TEST_on_process_message_callback_message;
static uint32_t TEST_on_process_message_callback_message_id;
static PROCESS_MESSAGE_COMPLETED_CALLBACK TEST_on_process_message_callback_on_process_message_completed_callback;
static void* TEST_on_process_message_callback_context;
static void TEST_on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* user_context)
static void TEST_on_process_message_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, uint32_t message_id, PROCESS_MESSAGE_COMPLETED_CALLBACK on_process_message_completed_callback, void* user_context)
{
TEST_on_process_message_callback_message_queue = message_queue;
TEST_on_process_message_callback_message = message;
TEST_on_process_message_callback_message_id = message_id;
TEST_on_process_message_callback_on_process_message_completed_callback = on_process_message_completed_callback;
TEST_on_process_message_callback_context = user_context;
}
Expand Down Expand Up @@ -1264,7 +1266,7 @@ TEST_FUNCTION(message_queue_retrieve_options_failure_checks)
message_queue_destroy(mq);
}

TEST_FUNCTION(on_message_processing_completed_callback_NULL_message)
TEST_FUNCTION(on_message_processing_completed_callback_INVALID_message)
{
// arrange
MESSAGE_QUEUE_HANDLE mq = create_message_queue(USE_DEFAULT_CONFIG);
Expand All @@ -1273,9 +1275,11 @@ TEST_FUNCTION(on_message_processing_completed_callback_NULL_message)
crank_message_queue(mq, TEST_current_time, 1, 0, NULL);

umock_c_reset_all_calls();
STRICT_EXPECTED_CALL(singlylinkedlist_find(IGNORED_PTR_ARG, IGNORED_PTR_ARG, IGNORED_PTR_ARG));
STRICT_EXPECTED_CALL(singlylinkedlist_item_get_value(IGNORED_PTR_ARG));

// act
TEST_on_process_message_callback_on_process_message_completed_callback(mq, NULL, MESSAGE_QUEUE_SUCCESS, TEST_REASON);
TEST_on_process_message_callback_on_process_message_completed_callback(mq, 12345, MESSAGE_QUEUE_SUCCESS, TEST_REASON);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
Expand All @@ -1300,7 +1304,7 @@ TEST_FUNCTION(on_message_processing_completed_callback_NULL_message_queue)
umock_c_reset_all_calls();

// act
TEST_on_process_message_callback_on_process_message_completed_callback(NULL, TEST_on_process_message_callback_message, MESSAGE_QUEUE_SUCCESS, TEST_REASON);
TEST_on_process_message_callback_on_process_message_completed_callback(NULL, TEST_on_process_message_callback_message_id, MESSAGE_QUEUE_SUCCESS, TEST_REASON);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
Expand All @@ -1326,7 +1330,7 @@ TEST_FUNCTION(on_message_processing_completed_callback_MESSAGE_not_present)
set_on_message_processing_completed_callback_expected_calls(1, -1, false);

// act
TEST_on_process_message_callback_on_process_message_completed_callback(mq, TEST_SOME_OTHER_MESSAGE, MESSAGE_QUEUE_SUCCESS, TEST_REASON);
TEST_on_process_message_callback_on_process_message_completed_callback(mq, TEST_SOME_OTHER_MESSAGE_ID, MESSAGE_QUEUE_SUCCESS, TEST_REASON);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
Expand Down Expand Up @@ -1355,7 +1359,7 @@ TEST_FUNCTION(on_message_processing_completed_callback_success)
set_on_message_processing_completed_callback_expected_calls(1, 0, false);

// act
TEST_on_process_message_callback_on_process_message_completed_callback(mq, TEST_on_process_message_callback_message, MESSAGE_QUEUE_SUCCESS, TEST_REASON);
TEST_on_process_message_callback_on_process_message_completed_callback(mq, TEST_on_process_message_callback_message_id, MESSAGE_QUEUE_SUCCESS, TEST_REASON);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
Expand Down Expand Up @@ -1387,13 +1391,13 @@ TEST_FUNCTION(on_message_processing_completed_callback_RETRYABLE_ERROR)

// act
TEST_on_process_message_callback_on_process_message_completed_callback(mq,
TEST_on_process_message_callback_message, MESSAGE_QUEUE_RETRYABLE_ERROR, NULL);
TEST_on_process_message_callback_message_id, MESSAGE_QUEUE_RETRYABLE_ERROR, NULL);
message_queue_do_work(mq);
TEST_on_process_message_callback_on_process_message_completed_callback(mq,
TEST_on_process_message_callback_message, MESSAGE_QUEUE_RETRYABLE_ERROR, NULL);
TEST_on_process_message_callback_message_id, MESSAGE_QUEUE_RETRYABLE_ERROR, NULL);
message_queue_do_work(mq);
TEST_on_process_message_callback_on_process_message_completed_callback(mq,
TEST_on_process_message_callback_message, MESSAGE_QUEUE_RETRYABLE_ERROR, NULL);
TEST_on_process_message_callback_message_id, MESSAGE_QUEUE_RETRYABLE_ERROR, NULL);

// assert
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
Expand Down

0 comments on commit 62321f2

Please sign in to comment.