Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 committed Sep 12, 2024
1 parent 9d2cf08 commit 462245b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 23 deletions.
6 changes: 4 additions & 2 deletions core/pipeline/queue/SenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ bool SenderQueue::Remove(SenderQueueItem* item) {
if (item == nullptr) {
return false;
}
auto size = item->mData.size();
auto enQueuTime = item->mEnqueTime;
size_t size = 0;
chrono::system_clock::time_point enQueuTime;
auto index = mRead;
for (; index < mWrite; ++index) {
if (mQueue[index % mCapacity].get() == item) {
size = item->mData.size();
enQueuTime = item->mEnqueTime;
mQueue[index % mCapacity].reset();
break;
}
Expand Down
6 changes: 2 additions & 4 deletions core/unittest/queue/ExactlyOnceQueueManagerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class ExactlyOnceQueueManagerUnittest : public testing::Test {
protected:
static void SetUpTestCase() {
InputFeedbackInterfaceRegistry::GetInstance()->LoadFeedbackInterfaces();
sEventGroup.reset(new PipelineEventGroup(make_shared<SourceBuffer>()));
for (size_t i = 0; i < 5; ++i) {
auto cpt = make_shared<RangeCheckpoint>();
cpt->index = i;
Expand All @@ -68,7 +67,6 @@ class ExactlyOnceQueueManagerUnittest : public testing::Test {
private:
static const size_t sDataSize = 10;

static unique_ptr<PipelineEventGroup> sEventGroup;
static ExactlyOnceQueueManager* sManager;
static vector<RangeCheckpointPtr> sCheckpoints;
static PipelineContext sCtx;
Expand All @@ -81,7 +79,6 @@ class ExactlyOnceQueueManagerUnittest : public testing::Test {
};

const size_t ExactlyOnceQueueManagerUnittest::sDataSize;
unique_ptr<PipelineEventGroup> ExactlyOnceQueueManagerUnittest::sEventGroup;
ExactlyOnceQueueManager* ExactlyOnceQueueManagerUnittest::sManager;
vector<RangeCheckpointPtr> ExactlyOnceQueueManagerUnittest::sCheckpoints;
PipelineContext ExactlyOnceQueueManagerUnittest::sCtx;
Expand Down Expand Up @@ -294,7 +291,8 @@ void ExactlyOnceQueueManagerUnittest::OnPipelineUpdate() {
}

unique_ptr<ProcessQueueItem> ExactlyOnceQueueManagerUnittest::GenerateProcessItem() {
return make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0);
PipelineEventGroup g(make_shared<SourceBuffer>());
return make_unique<ProcessQueueItem>(std::move(g), 0);
}

unique_ptr<SenderQueueItem> ExactlyOnceQueueManagerUnittest::GenerateSenderItem() {
Expand Down
34 changes: 17 additions & 17 deletions core/unittest/queue/ProcessQueueManagerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ class ProcessQueueManagerUnittest : public testing::Test {
void OnPipelineUpdate();

protected:
static void SetUpTestCase() {
sEventGroup.reset(new PipelineEventGroup(make_shared<SourceBuffer>()));
sProcessQueueManager = ProcessQueueManager::GetInstance();
}
static void SetUpTestCase() { sProcessQueueManager = ProcessQueueManager::GetInstance(); }

void TearDown() override {
QueueKeyManager::GetInstance()->Clear();
Expand All @@ -49,12 +46,15 @@ class ProcessQueueManagerUnittest : public testing::Test {
}

private:
static unique_ptr<PipelineEventGroup> sEventGroup;
static ProcessQueueManager* sProcessQueueManager;
static PipelineContext sCtx;

unique_ptr<ProcessQueueItem> GenerateItem() {
PipelineEventGroup g(make_shared<SourceBuffer>());
return make_unique<ProcessQueueItem>(std::move(g), 0);
}
};

unique_ptr<PipelineEventGroup> ProcessQueueManagerUnittest::sEventGroup;
ProcessQueueManager* ProcessQueueManagerUnittest::sProcessQueueManager;
PipelineContext ProcessQueueManagerUnittest::sCtx;

Expand Down Expand Up @@ -239,24 +239,24 @@ void ProcessQueueManagerUnittest::TestPushQueue() {

// queue belongs to normal process queue
APSARA_TEST_TRUE(sProcessQueueManager->IsValidToPush(0));
APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(0, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0)));
APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(0, GenerateItem()));

// queue belongs to exactly once process queue
APSARA_TEST_TRUE(sProcessQueueManager->IsValidToPush(1));
APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(1, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0)));
APSARA_TEST_EQUAL(0, sProcessQueueManager->PushQueue(1, GenerateItem()));

// no queue exists
APSARA_TEST_FALSE(sProcessQueueManager->IsValidToPush(2));
APSARA_TEST_EQUAL(2, sProcessQueueManager->PushQueue(2, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0)));
APSARA_TEST_EQUAL(2, sProcessQueueManager->PushQueue(2, GenerateItem()));

// invalid to push
static_cast<BoundedProcessQueue*>(sProcessQueueManager->mQueues[0].first->get())->mValidToPush = false;
APSARA_TEST_FALSE(sProcessQueueManager->IsValidToPush(0));
APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(0, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0)));
APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(0, GenerateItem()));

ExactlyOnceQueueManager::GetInstance()->mProcessQueues[1]->mValidToPush = false;
APSARA_TEST_FALSE(sProcessQueueManager->IsValidToPush(1));
APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(1, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0)));
APSARA_TEST_EQUAL(1, sProcessQueueManager->PushQueue(1, GenerateItem()));
}

void ProcessQueueManagerUnittest::TestPopItem() {
Expand All @@ -279,8 +279,8 @@ void ProcessQueueManagerUnittest::TestPopItem() {
ctx.SetConfigName("test_config_5");
ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue(5, 0, ctx, vector<RangeCheckpointPtr>(5));

sProcessQueueManager->PushQueue(key2, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0));
sProcessQueueManager->PushQueue(key3, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0));
sProcessQueueManager->PushQueue(key2, GenerateItem());
sProcessQueueManager->PushQueue(key3, GenerateItem());
sProcessQueueManager->mCurrentQueueIndex = {1, prev(prev(sProcessQueueManager->mPriorityQueue[1].end()))};

// the item comes from the queue between current index and queue list end
Expand All @@ -295,15 +295,15 @@ void ProcessQueueManagerUnittest::TestPopItem() {
APSARA_TEST_EQUAL(1U, sProcessQueueManager->mCurrentQueueIndex.first);
APSARA_TEST_TRUE(sProcessQueueManager->mCurrentQueueIndex.second == sProcessQueueManager->mQueues[key3].first);

sProcessQueueManager->PushQueue(key1, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0));
sProcessQueueManager->PushQueue(key1, GenerateItem());
// the item comes from queue list other than the one pointed by current index
APSARA_TEST_TRUE(sProcessQueueManager->PopItem(0, item, configName));
APSARA_TEST_EQUAL("test_config_1", configName);
APSARA_TEST_EQUAL(0U, sProcessQueueManager->mCurrentQueueIndex.first);
APSARA_TEST_TRUE(sProcessQueueManager->mCurrentQueueIndex.second == sProcessQueueManager->mQueues[key1].first);

sProcessQueueManager->mCurrentQueueIndex = {1, prev(sProcessQueueManager->mPriorityQueue[1].end())};
sProcessQueueManager->PushQueue(5, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0));
sProcessQueueManager->PushQueue(5, GenerateItem());
// the item comes from exactly once queue
APSARA_TEST_TRUE(sProcessQueueManager->PopItem(0, item, configName));
APSARA_TEST_EQUAL("test_config_5", configName);
Expand All @@ -325,7 +325,7 @@ void ProcessQueueManagerUnittest::TestIsAllQueueEmpty() {
APSARA_TEST_TRUE(sProcessQueueManager->IsAllQueueEmpty());

// non empty normal process queue
sProcessQueueManager->PushQueue(0, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0));
sProcessQueueManager->PushQueue(0, GenerateItem());
APSARA_TEST_FALSE(sProcessQueueManager->IsAllQueueEmpty());

unique_ptr<ProcessQueueItem> item;
Expand All @@ -334,7 +334,7 @@ void ProcessQueueManagerUnittest::TestIsAllQueueEmpty() {
APSARA_TEST_TRUE(sProcessQueueManager->IsAllQueueEmpty());

// non empty exactly once process queue
sProcessQueueManager->PushQueue(2, make_unique<ProcessQueueItem>(std::move(*sEventGroup), 0));
sProcessQueueManager->PushQueue(2, GenerateItem());
APSARA_TEST_FALSE(sProcessQueueManager->IsAllQueueEmpty());

sProcessQueueManager->PopItem(0, item, configName);
Expand Down

0 comments on commit 462245b

Please sign in to comment.