Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reload pipeline config independently #1713

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9f501d7
feat: reload Go pipeline config independently
Abingcbc Aug 22, 2024
f8d7a0a
fix unittest
Abingcbc Aug 23, 2024
750f732
fix unittest
Abingcbc Aug 23, 2024
8f73fa7
fix
Abingcbc Aug 23, 2024
3496d95
fix
Abingcbc Aug 25, 2024
c5206af
fix
Abingcbc Sep 12, 2024
a6556f3
fix
Abingcbc Sep 12, 2024
5a3c10f
fix
Abingcbc Sep 13, 2024
d8d4c5b
unittest
Abingcbc Sep 15, 2024
96955d9
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 16, 2024
62f3a35
fix
Abingcbc Sep 16, 2024
d602845
fix
Abingcbc Sep 16, 2024
8c1ad10
fix
Abingcbc Sep 16, 2024
fd478b7
fix
Abingcbc Sep 16, 2024
9ddb08d
fix
Abingcbc Sep 17, 2024
409f575
self telemetry
Abingcbc Sep 18, 2024
9e76b8a
fix
Abingcbc Sep 18, 2024
d307d95
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 19, 2024
82baa18
fix
Abingcbc Sep 20, 2024
d8c28cd
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 20, 2024
6442438
fix
Abingcbc Sep 20, 2024
fad1bfe
fix
Abingcbc Sep 20, 2024
0d9a2a7
fix
Abingcbc Sep 20, 2024
9f0424e
fix
Abingcbc Sep 23, 2024
4e022c5
fix
Abingcbc Sep 23, 2024
5155278
fix
Abingcbc Sep 23, 2024
0a05f02
fix
Abingcbc Sep 23, 2024
b29b57f
fix
Abingcbc Sep 24, 2024
e770c59
fix
Abingcbc Sep 24, 2024
5d46f61
fix
Abingcbc Sep 24, 2024
7714655
fix
Abingcbc Sep 24, 2024
f5c4bf4
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 24, 2024
bea03ba
test: calculate incremental unittest coverage
Abingcbc Sep 25, 2024
2bf161e
fix
Abingcbc Sep 25, 2024
1cda480
Merge remote-tracking branch 'origin/coverage' into go-config
Abingcbc Sep 25, 2024
98e14ec
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 25, 2024
7634196
fix
Abingcbc Sep 26, 2024
25b10d2
fix
Abingcbc Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,19 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
return false;
}

bool LogtailPlugin::UnloadPipeline(const std::string& project,
const std::string& logstore,
const std::string& pipelineName) {
bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
if (!mPluginValid) {
LOG_ERROR(sLogger, ("UnloadPipeline", "plugin not valid"));
return false;
}

if (mPluginValid && mUnloadPipelineFun != NULL) {
GoString goProject;
GoString goLogstore;
GoString goConfigName;

goProject.n = project.size();
goProject.p = project.c_str();
goLogstore.n = logstore.size();
goLogstore.p = logstore.c_str();
goConfigName.n = pipelineName.size();
goConfigName.p = pipelineName.c_str();

return mUnloadPipelineFun(goProject, goLogstore, goConfigName) == 0;
return mUnloadPipelineFun(goConfigName) == 0;
}

return false;
Expand All @@ -139,7 +131,7 @@ void LogtailPlugin::StopAll(bool withInputFlag) {
mStopAllFun(withInputFlag ? 1 : 0);
auto stopAllCost = GetCurrentTimeInMilliSeconds() - stopAllStart;
LOG_INFO(sLogger, ("Go pipelines stop all", "succeeded")("cost", ToString(stopAllCost) + "ms"));
if (stopAllCost >= 60 * 1000) {
if (stopAllCost >= 10 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(HOLD_ON_TOO_SLOW_ALARM,
"Stopping all Go pipelines took " + ToString(stopAllCost) + "ms");
}
Expand All @@ -156,13 +148,21 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
mStopFun(goConfigName, removedFlag ? 1 : 0);
auto stopCost = GetCurrentTimeInMilliSeconds() - stopStart;
LOG_INFO(sLogger, ("Go pipelines stop", "succeeded")("config", configName)("cost", ToString(stopCost) + "ms"));
if (stopCost >= 30 * 1000) {
if (stopCost >= 10 * 1000) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
LogtailAlarm::GetInstance()->SendAlarm(
HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
}

void LogtailPlugin::StopBuiltIn() {
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop built-in", "starts"));
mStopBuiltInFun();
LOG_INFO(sLogger, ("Go pipelines stop built-in", "succeeded"));
}
}

void LogtailPlugin::Start(const std::string& configName) {
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
Expand Down Expand Up @@ -397,6 +397,12 @@ bool LogtailPlugin::LoadPluginBase() {
LOG_ERROR(sLogger, ("load Stop error, Message", error));
return mPluginValid;
}
// 停止内置功能
mStopBuiltInFun = (StopBuiltInFun)loader.LoadMethod("StopBuiltIn", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load StopBuiltIn error, Message", error));
return mPluginValid;
}
// 插件恢复
mStartFun = (StartFun)loader.LoadMethod("Start", error);
if (!error.empty()) {
Expand Down
7 changes: 5 additions & 2 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ struct K8sContainerMeta {
// Methods export by plugin.
typedef GoInt (*LoadGlobalConfigFun)(GoString);
typedef GoInt (*LoadPipelineFun)(GoString p, GoString l, GoString c, GoInt64 k, GoString p2);
typedef GoInt (*UnloadPipelineFun)(GoString p, GoString l, GoString c);
typedef GoInt (*UnloadPipelineFun)(GoString c);
typedef void (*StopAllFun)(GoInt);
typedef void (*StopFun)(GoString, GoInt);
typedef void (*StopBuiltInFun)();
typedef void (*StartFun)(GoString);
typedef GoInt (*InitPluginBaseFun)();
typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg);
Expand Down Expand Up @@ -212,9 +213,10 @@ class LogtailPlugin {
const std::string& logstore = "",
const std::string& region = "",
logtail::QueueKey logstoreKey = 0);
bool UnloadPipeline(const std::string& project, const std::string& logstore, const std::string& pipelineName);
bool UnloadPipeline(const std::string& pipelineName);
void StopAll(bool withInputFlag);
void Stop(const std::string& configName, bool removingFlag);
void StopBuiltIn();
void Start(const std::string& configName);

bool IsPluginOpened() { return mPluginValid; }
Expand Down Expand Up @@ -264,6 +266,7 @@ class LogtailPlugin {
UnloadPipelineFun mUnloadPipelineFun;
StopAllFun mStopAllFun;
StopFun mStopFun;
StopBuiltInFun mStopBuiltInFun;
StartFun mStartFun;
volatile bool mPluginValid;
logtail::FlusherSLS mPluginAlarmConfig;
Expand Down
11 changes: 5 additions & 6 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void Pipeline::Stop(bool isRemoving) {
}

ProcessQueueManager::GetInstance()->DisablePop(mName, isRemoving);
WaitAllInProcessFinish();
WaitAllItemsInProcessFinished();

if (!isRemoving) {
FlushBatch();
Expand Down Expand Up @@ -514,8 +514,7 @@ bool Pipeline::LoadGoPipelines() const {
mContext.GetLogstoreName(),
mContext.GetRegion());
if (!mGoPipelineWithoutInput.isNull()) {
LogtailPlugin::GetInstance()->UnloadPipeline(
mContext.GetProjectName(), mContext.GetLogstoreName(), GetConfigNameOfGoPipelineWithoutInput());
LogtailPlugin::GetInstance()->UnloadPipeline(GetConfigNameOfGoPipelineWithoutInput());
}
return false;
}
Expand All @@ -539,13 +538,13 @@ PluginInstance::PluginMeta Pipeline::GenNextPluginMeta(bool lastOne) {
std::to_string(mPluginID.load()), std::to_string(mPluginID.load()), std::to_string(childNodeID));
}

void Pipeline::WaitAllInProcessFinish() {
void Pipeline::WaitAllItemsInProcessFinished() {
uint64_t startTime = GetCurrentTimeInMilliSeconds();
bool alarmOnce = false;
while (mInProcessCnt.load() != 0) {
usleep(1000 * 10); // 10ms
this_thread::sleep_for(chrono::milliseconds(100)); // 100ms
uint64_t duration = GetCurrentTimeInMilliSeconds() - startTime;
if (!alarmOnce && duration > 1000) { // 1s
if (!alarmOnce && duration > 10000) { // 10s
LOG_ERROR(sLogger, ("pipeline stop", "too slow")("config", mName)("cost", duration));
LogtailAlarm::GetInstance()->SendAlarm(CONFIG_UPDATE_ALARM,
string("pipeline stop too slow, config: ") + mName
Expand Down
3 changes: 1 addition & 2 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Pipeline {
Json::Value& dst);
void CopyNativeGlobalParamToGoPipeline(Json::Value& root);
bool ShouldAddPluginToGoPipelineWithInput() const { return mInputs.empty() && mProcessorLine.empty(); }
void WaitAllInProcessFinish();
void WaitAllItemsInProcessFinished();

std::string mName;
std::vector<std::unique_ptr<InputInstance>> mInputs;
Expand All @@ -110,7 +110,6 @@ class Pipeline {
CounterPtr mProcessorsInGroupsCnt;
CounterPtr mProcessorsInGroupDataSizeBytes;
CounterPtr mProcessorsTotalDelayMs;
CounterPtr mLoadDelayMs;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineMock;
Expand Down
7 changes: 4 additions & 3 deletions core/pipeline/PipelineManager.cpp
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(name, ConfigFeedbackStatus::DELETED);
}
for (auto& config : diff.mModified) {
auto iter = mPipelineNameEntityMap.find(config.mName);
auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue
if (!p) {
LOG_WARNING(sLogger,
LOG_WARNING(sLogger,
("failed to build pipeline for existing config",
"keep current pipeline running")("config", config.mName));
LogtailAlarm::GetInstance()->SendAlarm(
Expand All @@ -115,6 +114,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
LOG_INFO(sLogger,
("pipeline building for existing config succeeded",
"stop the old pipeline and start the new one")("config", config.mName));
auto iter = mPipelineNameEntityMap.find(config.mName);
iter->second->Stop(false);
DecreasePluginUsageCnt(iter->second->GetPluginStatistics());

Expand All @@ -126,7 +126,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
for (auto& config : diff.mAdded) {
auto p = BuildPipeline(std::move(config));
if (!p) {
LOG_WARNING(sLogger,
LOG_WARNING(sLogger,
("failed to build pipeline for new config", "skip current object")("config", config.mName));
LogtailAlarm::GetInstance()->SendAlarm(
CATEGORY_CONFIG_ALARM,
Expand Down Expand Up @@ -231,6 +231,7 @@ void PipelineManager::StopAllPipelines() {
FileServer::GetInstance()->Stop();

LogtailPlugin::GetInstance()->StopAll(true);
LogtailPlugin::GetInstance()->StopBuiltIn();

bool logProcessFlushFlag = false;
for (int i = 0; !logProcessFlushFlag && i < 500; ++i) {
Expand Down
10 changes: 4 additions & 6 deletions core/pipeline/plugin/interface/Flusher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ bool Flusher::Stop(bool isPipelineRemoving) {
}

void Flusher::SetPipelineForItemsWhenStop() {
vector<SenderQueueItem*> items;
SenderQueueManager::GetInstance()->GetAllAvailableItems(mQueueKey, items, false);
if (HasContext()) {
auto pipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName());
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
for (auto item : items) {
item->mPipeline = pipeline;
if (!pipeline) {
LOG_ERROR(sLogger, ("failed to get pipeline context", "context not found")("action", "not set pipeline"));
return;
}
} else {
LOG_ERROR(sLogger, ("failed to get pipeline context", "context not found")("action", "discard data"));
SenderQueueManager::GetInstance()->SetPipelineForItems(mQueueKey, pipeline);
}
}

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class Flusher : public Plugin {
#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
#endif
};

Expand Down
3 changes: 1 addition & 2 deletions core/pipeline/queue/BoundedProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ bool BoundedProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
return true;
}

void BoundedProcessQueue::SetPipelineForItems(const std::string& name) const {
auto p = PipelineManager::GetInstance()->FindConfigByName(name);
void BoundedProcessQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
for (auto& item : mQueue) {
if (!item->mPipeline) {
item->mPipeline = p;
Expand Down
5 changes: 3 additions & 2 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ namespace logtail {
class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<ProcessQueueItem>>,
public ProcessQueueInterface {
public:
BoundedProcessQueue(size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const PipelineContext& ctx);
BoundedProcessQueue(
size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const PipelineContext& ctx);

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
void SetPipelineForItems(const std::string& name) const override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

void SetUpStreamFeedbacks(std::vector<FeedbackInterface*>&& feedbacks);

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/BoundedSenderQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface<std::unique_ptr

void SetRateLimiter(uint32_t maxRate);
void SetConcurrencyLimiters(std::vector<std::shared_ptr<ConcurrencyLimiter>>&& limiters);
virtual void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const = 0;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

#ifdef APSARA_UNIT_TEST_MAIN
std::optional<RateLimiter>& GetRateLimiter() { return mRateLimiter; }
Expand Down
3 changes: 1 addition & 2 deletions core/pipeline/queue/CircularProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ bool CircularProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
return true;
}

void CircularProcessQueue::SetPipelineForItems(const std::string& name) const {
auto p = PipelineManager::GetInstance()->FindConfigByName(name);
void CircularProcessQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
for (auto& item : mQueue) {
if (!item->mPipeline) {
item->mPipeline = p;
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/CircularProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CircularProcessQueue : virtual public QueueInterface<std::unique_ptr<Proce

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
void SetPipelineForItems(const std::string& name) const override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;

void Reset(size_t cap);

Expand Down
17 changes: 14 additions & 3 deletions core/pipeline/queue/ExactlyOnceQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

#include "common/Flags.h"
#include "common/TimeUtil.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/input/InputFile.h"
#include "logger/Logger.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/input/InputFile.h"

DEFINE_FLAG_INT32(logtail_queue_gc_threshold_sec, "2min", 2 * 60);
DEFINE_FLAG_INT64(logtail_queue_max_used_time_per_round_in_msec, "500ms", 500);
Expand Down Expand Up @@ -154,7 +154,10 @@ void ExactlyOnceQueueManager::DisablePopProcessQueue(const string& configName, b
if (iter.second->GetConfigName() == configName) {
iter.second->DisablePop();
if (!isPipelineRemoving) {
iter.second->SetPipelineForItems(configName);
auto p = PipelineManager::GetInstance()->FindConfigByName(configName);
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
if (p) {
iter.second->SetPipelineForItems(p);
}
}
}
}
Expand Down Expand Up @@ -276,6 +279,14 @@ uint32_t ExactlyOnceQueueManager::GetProcessQueueCnt() const {
return mProcessQueues.size();
}

void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, std::shared_ptr<Pipeline>& p) {
lock_guard<mutex> lock(mSenderQueueMux);
auto iter = mSenderQueues.find(key);
if (iter != mSenderQueues.end()) {
iter->second.SetPipelineForItems(p);
}
}

#ifdef APSARA_UNIT_TEST_MAIN
void ExactlyOnceQueueManager::Clear() {
{
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ExactlyOnceQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ExactlyOnceQueueManager {
void GetAllAvailableSenderQueueItems(std::vector<SenderQueueItem*>& item, bool withLimits = true);
bool RemoveSenderQueueItem(QueueKey key, SenderQueueItem* item);
bool IsAllSenderQueueEmpty() const;
void SetPipelineForSenderItems(QueueKey key, std::shared_ptr<Pipeline>& p);

void ClearTimeoutQueues();

Expand Down
8 changes: 8 additions & 0 deletions core/pipeline/queue/ExactlyOnceSenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,12 @@ void ExactlyOnceSenderQueue::Reset(const vector<RangeCheckpointPtr>& checkpoints
mRangeCheckpoints = checkpoints;
}

void ExactlyOnceSenderQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
for (size_t index = 0; index < mCapacity; ++index) {
if (!mQueue[index]) {
mQueue[index]->mPipeline = p;
}
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
}
}

} // namespace logtail
5 changes: 4 additions & 1 deletion core/pipeline/queue/ExactlyOnceSenderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ namespace logtail {
// not thread-safe, should be protected explicitly by queue manager
class ExactlyOnceSenderQueue : public BoundedSenderQueueInterface {
public:
ExactlyOnceSenderQueue(const std::vector<RangeCheckpointPtr>& checkpoints, QueueKey key, const PipelineContext& ctx);
ExactlyOnceSenderQueue(const std::vector<RangeCheckpointPtr>& checkpoints,
QueueKey key,
const PipelineContext& ctx);

bool Push(std::unique_ptr<SenderQueueItem>&& item) override;
bool Remove(SenderQueueItem* item) override;
void GetAllAvailableItems(std::vector<SenderQueueItem*>& items, bool withLimits = true) override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;

void Reset(const std::vector<RangeCheckpointPtr>& checkpoints);

Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/ProcessQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ProcessQueueInterface : virtual public QueueInterface<std::unique_ptr<Proc
void DisablePop() { mValidToPop = false; }
void EnablePop() { mValidToPop = true; }

virtual void SetPipelineForItems(const std::string& name) const = 0;
virtual void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const = 0;

void Reset() { mDownStreamQueues.clear(); }

Expand Down
5 changes: 4 additions & 1 deletion core/pipeline/queue/ProcessQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ void ProcessQueueManager::DisablePop(const string& configName, bool isPipelineRe
if (iter != mQueues.end()) {
(*iter->second.first)->DisablePop();
if (!isPipelineRemoving) {
(*iter->second.first)->SetPipelineForItems(configName);
auto p = PipelineManager::GetInstance()->FindConfigByName(configName);
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
if (p) {
(*iter->second.first)->SetPipelineForItems(p);
}
}
}
} else {
Expand Down
8 changes: 8 additions & 0 deletions core/pipeline/queue/SenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,12 @@ void SenderQueue::GetAllAvailableItems(vector<SenderQueueItem*>& items, bool wit
}
}

void SenderQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
for (auto& item : mQueue) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
if (!item->mPipeline) {
item->mPipeline = p;
}
}
}

} // namespace logtail
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class SenderQueue : public BoundedSenderQueueInterface {
bool Push(std::unique_ptr<SenderQueueItem>&& item) override;
bool Remove(SenderQueueItem* item) override;
void GetAllAvailableItems(std::vector<SenderQueueItem*>& items, bool withLimits = true) override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;

private:
size_t Size() const override { return mSize; }
Expand Down
Loading