From 462245b19e8a762c15ddd0b0bce2d06a393cf809 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 12 Sep 2024 13:22:51 +0000 Subject: [PATCH] polish --- core/pipeline/queue/SenderQueue.cpp | 6 ++-- .../queue/ExactlyOnceQueueManagerUnittest.cpp | 6 ++-- .../queue/ProcessQueueManagerUnittest.cpp | 34 +++++++++---------- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/core/pipeline/queue/SenderQueue.cpp b/core/pipeline/queue/SenderQueue.cpp index d52bd18092..67e22ecf94 100644 --- a/core/pipeline/queue/SenderQueue.cpp +++ b/core/pipeline/queue/SenderQueue.cpp @@ -65,11 +65,13 @@ bool SenderQueue::Remove(SenderQueueItem* item) { if (item == nullptr) { return false; } - auto size = item->mData.size(); - auto enQueuTime = item->mEnqueTime; + size_t size = 0; + chrono::system_clock::time_point enQueuTime; auto index = mRead; for (; index < mWrite; ++index) { if (mQueue[index % mCapacity].get() == item) { + size = item->mData.size(); + enQueuTime = item->mEnqueTime; mQueue[index % mCapacity].reset(); break; } diff --git a/core/unittest/queue/ExactlyOnceQueueManagerUnittest.cpp b/core/unittest/queue/ExactlyOnceQueueManagerUnittest.cpp index f077296ae2..5e929eb477 100644 --- a/core/unittest/queue/ExactlyOnceQueueManagerUnittest.cpp +++ b/core/unittest/queue/ExactlyOnceQueueManagerUnittest.cpp @@ -43,7 +43,6 @@ class ExactlyOnceQueueManagerUnittest : public testing::Test { protected: static void SetUpTestCase() { InputFeedbackInterfaceRegistry::GetInstance()->LoadFeedbackInterfaces(); - sEventGroup.reset(new PipelineEventGroup(make_shared())); for (size_t i = 0; i < 5; ++i) { auto cpt = make_shared(); cpt->index = i; @@ -68,7 +67,6 @@ class ExactlyOnceQueueManagerUnittest : public testing::Test { private: static const size_t sDataSize = 10; - static unique_ptr sEventGroup; static ExactlyOnceQueueManager* sManager; static vector sCheckpoints; static PipelineContext sCtx; @@ -81,7 +79,6 @@ class ExactlyOnceQueueManagerUnittest : public testing::Test { }; const size_t ExactlyOnceQueueManagerUnittest::sDataSize; -unique_ptr ExactlyOnceQueueManagerUnittest::sEventGroup; ExactlyOnceQueueManager* ExactlyOnceQueueManagerUnittest::sManager; vector ExactlyOnceQueueManagerUnittest::sCheckpoints; PipelineContext ExactlyOnceQueueManagerUnittest::sCtx; @@ -294,7 +291,8 @@ void ExactlyOnceQueueManagerUnittest::OnPipelineUpdate() { } unique_ptr ExactlyOnceQueueManagerUnittest::GenerateProcessItem() { - return make_unique(std::move(*sEventGroup), 0); + PipelineEventGroup g(make_shared()); + return make_unique(std::move(g), 0); } unique_ptr ExactlyOnceQueueManagerUnittest::GenerateSenderItem() { diff --git a/core/unittest/queue/ProcessQueueManagerUnittest.cpp b/core/unittest/queue/ProcessQueueManagerUnittest.cpp index 3de2699965..d87bf88e96 100644 --- a/core/unittest/queue/ProcessQueueManagerUnittest.cpp +++ b/core/unittest/queue/ProcessQueueManagerUnittest.cpp @@ -37,10 +37,7 @@ class ProcessQueueManagerUnittest : public testing::Test { void OnPipelineUpdate(); protected: - static void SetUpTestCase() { - sEventGroup.reset(new PipelineEventGroup(make_shared())); - sProcessQueueManager = ProcessQueueManager::GetInstance(); - } + static void SetUpTestCase() { sProcessQueueManager = ProcessQueueManager::GetInstance(); } void TearDown() override { QueueKeyManager::GetInstance()->Clear(); @@ -49,12 +46,15 @@ class ProcessQueueManagerUnittest : public testing::Test { } private: - static unique_ptr sEventGroup; static ProcessQueueManager* sProcessQueueManager; static PipelineContext sCtx; + + unique_ptr GenerateItem() { + PipelineEventGroup g(make_shared()); + return make_unique(std::move(g), 0); + } }; -unique_ptr ProcessQueueManagerUnittest::sEventGroup; ProcessQueueManager* ProcessQueueManagerUnittest::sProcessQueueManager; PipelineContext ProcessQueueManagerUnittest::sCtx; @@ -239,24 +239,24 @@ void ProcessQueueManagerUnittest::TestPushQueue() { // queue belongs to normal process queue APSARA_TEST_TRUE(sProcessQueueManager->IsValidToPush(0)); - APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(0, make_unique(std::move(*sEventGroup), 0))); + APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(0, GenerateItem())); // queue belongs to exactly once process queue APSARA_TEST_TRUE(sProcessQueueManager->IsValidToPush(1)); - APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(1, make_unique(std::move(*sEventGroup), 0))); + APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(1, GenerateItem())); // no queue exists APSARA_TEST_FALSE(sProcessQueueManager->IsValidToPush(2)); - APSARA_TEST_EQUAL(2, sProcessQueueManager->PushQueue(2, make_unique(std::move(*sEventGroup), 0))); + APSARA_TEST_EQUAL(2, sProcessQueueManager->PushQueue(2, GenerateItem())); // invalid to push static_cast(sProcessQueueManager->mQueues[0].first->get())->mValidToPush = false; APSARA_TEST_FALSE(sProcessQueueManager->IsValidToPush(0)); - APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(0, make_unique(std::move(*sEventGroup), 0))); + APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(0, GenerateItem())); ExactlyOnceQueueManager::GetInstance()->mProcessQueues[1]->mValidToPush = false; APSARA_TEST_FALSE(sProcessQueueManager->IsValidToPush(1)); - APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(1, make_unique(std::move(*sEventGroup), 0))); + APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(1, GenerateItem())); } void ProcessQueueManagerUnittest::TestPopItem() { @@ -279,8 +279,8 @@ void ProcessQueueManagerUnittest::TestPopItem() { ctx.SetConfigName("test_config_5"); ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue(5, 0, ctx, vector(5)); - sProcessQueueManager->PushQueue(key2, make_unique(std::move(*sEventGroup), 0)); - sProcessQueueManager->PushQueue(key3, make_unique(std::move(*sEventGroup), 0)); + sProcessQueueManager->PushQueue(key2, GenerateItem()); + sProcessQueueManager->PushQueue(key3, GenerateItem()); sProcessQueueManager->mCurrentQueueIndex = {1, prev(prev(sProcessQueueManager->mPriorityQueue[1].end()))}; // the item comes from the queue between current index and queue list end @@ -295,7 +295,7 @@ void ProcessQueueManagerUnittest::TestPopItem() { APSARA_TEST_EQUAL(1U, sProcessQueueManager->mCurrentQueueIndex.first); APSARA_TEST_TRUE(sProcessQueueManager->mCurrentQueueIndex.second == sProcessQueueManager->mQueues[key3].first); - sProcessQueueManager->PushQueue(key1, make_unique(std::move(*sEventGroup), 0)); + sProcessQueueManager->PushQueue(key1, GenerateItem()); // the item comes from queue list other than the one pointed by current index APSARA_TEST_TRUE(sProcessQueueManager->PopItem(0, item, configName)); APSARA_TEST_EQUAL("test_config_1", configName); @@ -303,7 +303,7 @@ void ProcessQueueManagerUnittest::TestPopItem() { APSARA_TEST_TRUE(sProcessQueueManager->mCurrentQueueIndex.second == sProcessQueueManager->mQueues[key1].first); sProcessQueueManager->mCurrentQueueIndex = {1, prev(sProcessQueueManager->mPriorityQueue[1].end())}; - sProcessQueueManager->PushQueue(5, make_unique(std::move(*sEventGroup), 0)); + sProcessQueueManager->PushQueue(5, GenerateItem()); // the item comes from exactly once queue APSARA_TEST_TRUE(sProcessQueueManager->PopItem(0, item, configName)); APSARA_TEST_EQUAL("test_config_5", configName); @@ -325,7 +325,7 @@ void ProcessQueueManagerUnittest::TestIsAllQueueEmpty() { APSARA_TEST_TRUE(sProcessQueueManager->IsAllQueueEmpty()); // non empty normal process queue - sProcessQueueManager->PushQueue(0, make_unique(std::move(*sEventGroup), 0)); + sProcessQueueManager->PushQueue(0, GenerateItem()); APSARA_TEST_FALSE(sProcessQueueManager->IsAllQueueEmpty()); unique_ptr item; @@ -334,7 +334,7 @@ void ProcessQueueManagerUnittest::TestIsAllQueueEmpty() { APSARA_TEST_TRUE(sProcessQueueManager->IsAllQueueEmpty()); // non empty exactly once process queue - sProcessQueueManager->PushQueue(2, make_unique(std::move(*sEventGroup), 0)); + sProcessQueueManager->PushQueue(2, GenerateItem()); APSARA_TEST_FALSE(sProcessQueueManager->IsAllQueueEmpty()); sProcessQueueManager->PopItem(0, item, configName);