Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add setpipelineforitems for process queues #1769

Merged
merged 5 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ void Pipeline::Start() {
// TODO: 加载该Go流水线
}

// TODO: 启用Process中改流水线对应的输入队列
ProcessQueueManager::GetInstance()->EnablePop(mName);

if (!mGoPipelineWithInput.isNull()) {
// TODO: 加载该Go流水线
Expand Down
4 changes: 4 additions & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class PipelineManager {
#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineManagerMock;
friend class PipelineManagerUnittest;
friend class ProcessQueueManagerUnittest;
friend class ExactlyOnceQueueManagerUnittest;
friend class BoundedProcessQueueUnittest;
friend class CircularProcessQueueUnittest;
friend class CommonConfigProviderUnittest;
#endif
};
Expand Down
15 changes: 13 additions & 2 deletions core/pipeline/queue/BoundedProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "pipeline/queue/BoundedProcessQueue.h"

#include "pipeline/PipelineManager.h"

using namespace std;

namespace logtail {
Expand All @@ -35,7 +37,7 @@ bool BoundedProcessQueue::Push(unique_ptr<ProcessQueueItem>&& item) {
}
item->mEnqueTime = chrono::system_clock::now();
auto size = item->mEventGroup.DataSize();
mQueue.push(std::move(item));
mQueue.push_back(std::move(item));
ChangeStateIfNeededAfterPush();

mInItemsCnt->Add(1);
Expand All @@ -51,7 +53,7 @@ bool BoundedProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
return false;
}
item = std::move(mQueue.front());
mQueue.pop();
mQueue.pop_front();
if (ChangeStateIfNeededAfterPop()) {
GiveFeedback();
}
Expand All @@ -65,6 +67,15 @@ bool BoundedProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
return true;
}

void BoundedProcessQueue::SetPipelineForItems(const std::string& name) const {
auto p = PipelineManager::GetInstance()->FindConfigByName(name);
for (auto& item : mQueue) {
if (!item->mPipeline) {
item->mPipeline = p;
}
}
}

void BoundedProcessQueue::SetUpStreamFeedbacks(vector<FeedbackInterface*>&& feedbacks) {
mUpStreamFeedbacks.clear();
for (auto& item : feedbacks) {
Expand Down
3 changes: 2 additions & 1 deletion core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
void SetPipelineForItems(const std::string& name) const override;

void SetUpStreamFeedbacks(std::vector<FeedbackInterface*>&& feedbacks);

Expand All @@ -43,7 +44,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process

void GiveFeedback() const override;

std::queue<std::unique_ptr<ProcessQueueItem>> mQueue;
std::deque<std::unique_ptr<ProcessQueueItem>> mQueue;
std::vector<FeedbackInterface*> mUpStreamFeedbacks;

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down
10 changes: 10 additions & 0 deletions core/pipeline/queue/CircularProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "pipeline/queue/CircularProcessQueue.h"

#include "logger/Logger.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/QueueKeyManager.h"

using namespace std;
Expand Down Expand Up @@ -71,6 +72,15 @@ bool CircularProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
return true;
}

void CircularProcessQueue::SetPipelineForItems(const std::string& name) const {
auto p = PipelineManager::GetInstance()->FindConfigByName(name);
for (auto& item : mQueue) {
if (!item->mPipeline) {
item->mPipeline = p;
}
}
}

void CircularProcessQueue::Reset(size_t cap) {
// it seems more reasonable to retain extra items and process them immediately, however this contray to current
// framework design so we simply discard extra items, considering that it is a rare case to change capacity
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/CircularProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CircularProcessQueue : virtual public QueueInterface<std::unique_ptr<Proce

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
void SetPipelineForItems(const std::string& name) const override;

void Reset(size_t cap);

Expand Down
11 changes: 7 additions & 4 deletions core/pipeline/queue/ExactlyOnceQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,23 @@ bool ExactlyOnceQueueManager::IsAllProcessQueueEmpty() const {
return true;
}

void ExactlyOnceQueueManager::InvalidatePopProcessQueue(const string& configName) {
void ExactlyOnceQueueManager::DisablePopProcessQueue(const string& configName, bool isPipelineRemoving) {
lock_guard<mutex> lock(mProcessQueueMux);
for (auto& iter : mProcessQueues) {
if (iter.second->GetConfigName() == configName) {
iter.second->InvalidatePop();
iter.second->DisablePop();
if (!isPipelineRemoving) {
iter.second->SetPipelineForItems(configName);
}
}
}
}

void ExactlyOnceQueueManager::ValidatePopProcessQueue(const string& configName) {
void ExactlyOnceQueueManager::EnablePopProcessQueue(const string& configName) {
lock_guard<mutex> lock(mProcessQueueMux);
for (auto& iter : mProcessQueues) {
if (iter.second->GetConfigName() == configName) {
iter.second->ValidatePop();
iter.second->EnablePop();
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/pipeline/queue/ExactlyOnceQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class ExactlyOnceQueueManager {
// 0: success, 1: queue is full, 2: queue not found
int PushProcessQueue(QueueKey key, std::unique_ptr<ProcessQueueItem>&& item);
bool IsAllProcessQueueEmpty() const;
void InvalidatePopProcessQueue(const std::string& configName);
void ValidatePopProcessQueue(const std::string& configName);
void DisablePopProcessQueue(const std::string& configName, bool isPipelineRemoving);
void EnablePopProcessQueue(const std::string& configName);

// 0: success, 1: queue is full, 2: queue not found
int PushSenderQueue(QueueKey key, std::unique_ptr<SenderQueueItem>&& item);
Expand Down
8 changes: 5 additions & 3 deletions core/pipeline/queue/ProcessQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ class ProcessQueueInterface : virtual public QueueInterface<std::unique_ptr<Proc

void SetDownStreamQueues(std::vector<BoundedSenderQueueInterface*>&& ques);

void InvalidatePop() { mValidToPop = false; }
void ValidatePop() { mValidToPop = true; }
void DisablePop() { mValidToPop = false; }
void EnablePop() { mValidToPop = true; }

virtual void SetPipelineForItems(const std::string& name) const = 0;

void Reset() { mDownStreamQueues.clear(); }

Expand All @@ -57,7 +59,7 @@ class ProcessQueueInterface : virtual public QueueInterface<std::unique_ptr<Proc
std::string mConfigName;

std::vector<BoundedSenderQueueInterface*> mDownStreamQueues;
bool mValidToPop = true;
bool mValidToPop = false;

#ifdef APSARA_UNIT_TEST_MAIN
friend class BoundedProcessQueueUnittest;
Expand Down
15 changes: 9 additions & 6 deletions core/pipeline/queue/ProcessQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,29 +229,32 @@ bool ProcessQueueManager::SetFeedbackInterface(QueueKey key, vector<FeedbackInte
return true;
}

void ProcessQueueManager::InvalidatePop(const string& configName) {
void ProcessQueueManager::DisablePop(const string& configName, bool isPipelineRemoving) {
if (QueueKeyManager::GetInstance()->HasKey(configName)) {
auto key = QueueKeyManager::GetInstance()->GetKey(configName);
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
(*iter->second.first)->InvalidatePop();
(*iter->second.first)->DisablePop();
if (!isPipelineRemoving) {
(*iter->second.first)->SetPipelineForItems(configName);
}
}
} else {
ExactlyOnceQueueManager::GetInstance()->InvalidatePopProcessQueue(configName);
ExactlyOnceQueueManager::GetInstance()->DisablePopProcessQueue(configName, isPipelineRemoving);
}
}

void ProcessQueueManager::ValidatePop(const string& configName) {
void ProcessQueueManager::EnablePop(const string& configName) {
if (QueueKeyManager::GetInstance()->HasKey(configName)) {
auto key = QueueKeyManager::GetInstance()->GetKey(configName);
lock_guard<mutex> lock(mQueueMux);
auto iter = mQueues.find(key);
if (iter != mQueues.end()) {
(*iter->second.first)->ValidatePop();
(*iter->second.first)->EnablePop();
}
} else {
ExactlyOnceQueueManager::GetInstance()->ValidatePopProcessQueue(configName);
ExactlyOnceQueueManager::GetInstance()->EnablePopProcessQueue(configName);
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class ProcessQueueManager : public FeedbackInterface {
bool IsAllQueueEmpty() const;
bool SetDownStreamQueues(QueueKey key, std::vector<BoundedSenderQueueInterface*>&& ques);
bool SetFeedbackInterface(QueueKey key, std::vector<FeedbackInterface*>&& feedback);
void InvalidatePop(const std::string& configName);
void ValidatePop(const std::string& configName);
void DisablePop(const std::string& configName, bool isPipelineRemoving);
void EnablePop(const std::string& configName);

bool Wait(uint64_t ms);
void Trigger();
Expand Down
28 changes: 26 additions & 2 deletions core/unittest/queue/BoundedProcessQueueUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "common/FeedbackInterface.h"
#include "models/PipelineEventGroup.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/BoundedProcessQueue.h"
#include "pipeline/queue/SenderQueue.h"
#include "unittest/Unittest.h"
Expand All @@ -30,6 +31,7 @@ class BoundedProcessQueueUnittest : public testing::Test {
void TestPush();
void TestPop();
void TestMetric();
void TestSetPipeline();

protected:
static void SetUpTestCase() { sCtx.SetConfigName("test_config"); }
Expand All @@ -44,6 +46,7 @@ class BoundedProcessQueueUnittest : public testing::Test {
mFeedback1.reset(new FeedbackInterfaceMock);
mFeedback2.reset(new FeedbackInterfaceMock);
mQueue->SetUpStreamFeedbacks(vector<FeedbackInterface*>{mFeedback1.get(), mFeedback2.get()});
mQueue->EnablePop();
}

private:
Expand Down Expand Up @@ -91,9 +94,9 @@ void BoundedProcessQueueUnittest::TestPop() {

mQueue->Push(GenerateItem());
// invalidate pop
mQueue->InvalidatePop();
mQueue->DisablePop();
APSARA_TEST_EQUAL(0, mQueue->Pop(item));
mQueue->ValidatePop();
mQueue->EnablePop();

// downstream queues are not valid to push
mSenderQueue1->mValidToPush = false;
Expand Down Expand Up @@ -140,9 +143,30 @@ void BoundedProcessQueueUnittest::TestMetric() {
APSARA_TEST_EQUAL(1U, mQueue->mValidToPushFlag->GetValue());
}

void BoundedProcessQueueUnittest::TestSetPipeline() {
auto pipeline = make_shared<Pipeline>();
PipelineManager::GetInstance()->mPipelineNameEntityMap["test_config"] = pipeline;

auto item1 = GenerateItem();
auto p1 = item1.get();
auto pipelineTmp = make_shared<Pipeline>();
item1->mPipeline = pipelineTmp;

auto item2 = GenerateItem();
auto p2 = item2.get();

mQueue->Push(std::move(item1));
mQueue->Push(std::move(item2));
mQueue->SetPipelineForItems("test_config");

APSARA_TEST_EQUAL(pipelineTmp, p1->mPipeline);
APSARA_TEST_EQUAL(pipeline, p2->mPipeline);
}

UNIT_TEST_CASE(BoundedProcessQueueUnittest, TestPush)
UNIT_TEST_CASE(BoundedProcessQueueUnittest, TestPop)
UNIT_TEST_CASE(BoundedProcessQueueUnittest, TestMetric)
UNIT_TEST_CASE(BoundedProcessQueueUnittest, TestSetPipeline)

} // namespace logtail

Expand Down
28 changes: 26 additions & 2 deletions core/unittest/queue/CircularProcessQueueUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <memory>

#include "models/PipelineEventGroup.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/CircularProcessQueue.h"
#include "pipeline/queue/SenderQueue.h"
#include "unittest/Unittest.h"
Expand All @@ -28,6 +29,7 @@ class CircularProcessQueueUnittest : public testing::Test {
void TestPop();
void TestReset();
void TestMetric();
void TestSetPipeline();

protected:
static void SetUpTestCase() { sCtx.SetConfigName("test_config"); }
Expand All @@ -38,6 +40,7 @@ class CircularProcessQueueUnittest : public testing::Test {
mSenderQueue1.reset(new SenderQueue(10, 0, 10, 0, "", sCtx));
mSenderQueue2.reset(new SenderQueue(10, 0, 10, 0, "", sCtx));
mQueue->SetDownStreamQueues(vector<BoundedSenderQueueInterface*>{mSenderQueue1.get(), mSenderQueue2.get()});
mQueue->EnablePop();
}

private:
Expand Down Expand Up @@ -97,9 +100,9 @@ void CircularProcessQueueUnittest::TestPop() {

mQueue->Push(GenerateItem(1));
// invalidate pop
mQueue->InvalidatePop();
mQueue->DisablePop();
APSARA_TEST_FALSE(mQueue->Pop(item));
mQueue->ValidatePop();
mQueue->EnablePop();

// downstream queues are not valid to push
mSenderQueue1->mValidToPush = false;
Expand Down Expand Up @@ -177,10 +180,31 @@ void CircularProcessQueueUnittest::TestMetric() {
APSARA_TEST_EQUAL(0U, mQueue->mQueueDataSizeByte->GetValue());
}

void CircularProcessQueueUnittest::TestSetPipeline() {
auto pipeline = make_shared<Pipeline>();
PipelineManager::GetInstance()->mPipelineNameEntityMap["test_config"] = pipeline;

auto item1 = GenerateItem(1);
auto p1 = item1.get();
auto pipelineTmp = make_shared<Pipeline>();
item1->mPipeline = pipelineTmp;

auto item2 = GenerateItem(1);
auto p2 = item2.get();

mQueue->Push(std::move(item1));
mQueue->Push(std::move(item2));
mQueue->SetPipelineForItems("test_config");

APSARA_TEST_EQUAL(pipelineTmp, p1->mPipeline);
APSARA_TEST_EQUAL(pipeline, p2->mPipeline);
}

UNIT_TEST_CASE(CircularProcessQueueUnittest, TestPush)
UNIT_TEST_CASE(CircularProcessQueueUnittest, TestPop)
UNIT_TEST_CASE(CircularProcessQueueUnittest, TestReset)
UNIT_TEST_CASE(CircularProcessQueueUnittest, TestMetric)
UNIT_TEST_CASE(CircularProcessQueueUnittest, TestSetPipeline)

} // namespace logtail

Expand Down
Loading
Loading