Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 24, 2024
1 parent e770c59 commit 5d46f61
Show file tree
Hide file tree
Showing 26 changed files with 165 additions and 79 deletions.
8 changes: 4 additions & 4 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
}
}

void LogtailPlugin::StopBuiltIn() {
void LogtailPlugin::StopBuiltInModules() {
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop built-in", "starts"));
mStopBuiltInFun();
mStopBuiltInModulesFun();
LOG_INFO(sLogger, ("Go pipelines stop built-in", "succeeded"));
}
}
Expand Down Expand Up @@ -398,9 +398,9 @@ bool LogtailPlugin::LoadPluginBase() {
return mPluginValid;
}
// 停止内置功能
mStopBuiltInFun = (StopBuiltInFun)loader.LoadMethod("StopBuiltIn", error);
mStopBuiltInModulesFun = (StopBuiltInModulesFun)loader.LoadMethod("StopBuiltInModules", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load StopBuiltIn error, Message", error));
LOG_ERROR(sLogger, ("load StopBuiltInModules error, Message", error));
return mPluginValid;
}
// 插件恢复
Expand Down
6 changes: 3 additions & 3 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ typedef GoInt (*LoadPipelineFun)(GoString p, GoString l, GoString c, GoInt64 k,
typedef GoInt (*UnloadPipelineFun)(GoString c);
typedef void (*StopAllFun)(GoInt);
typedef void (*StopFun)(GoString, GoInt);
typedef void (*StopBuiltInFun)();
typedef void (*StopBuiltInModulesFun)();
typedef void (*StartFun)(GoString);
typedef GoInt (*InitPluginBaseFun)();
typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg);
Expand Down Expand Up @@ -216,7 +216,7 @@ class LogtailPlugin {
bool UnloadPipeline(const std::string& pipelineName);
void StopAll(bool withInputFlag);
void Stop(const std::string& configName, bool removingFlag);
void StopBuiltIn();
void StopBuiltInModules();
void Start(const std::string& configName);

bool IsPluginOpened() { return mPluginValid; }
Expand Down Expand Up @@ -266,7 +266,7 @@ class LogtailPlugin {
UnloadPipelineFun mUnloadPipelineFun;
StopAllFun mStopAllFun;
StopFun mStopFun;
StopBuiltInFun mStopBuiltInFun;
StopBuiltInModulesFun mStopBuiltInModulesFun;
StartFun mStartFun;
volatile bool mPluginValid;
logtail::FlusherSLS mPluginAlarmConfig;
Expand Down
42 changes: 3 additions & 39 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
#include "prometheus/PrometheusInputRunner.h"
#if defined(__linux__) && !defined(__ANDROID__)
#include "ebpf/eBPFServer.h"
#include "observer/ObserverManager.h"
#endif
#include "runner/LogProcess.h"
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
#include "app_config/AppConfig.h"
#include "shennong/ShennongManager.h"
#include "streamlog/StreamLogManager.h"
#endif
#include "config/feedbacker/ConfigFeedbackReceiver.h"
#include "pipeline/queue/ProcessQueueManager.h"
Expand All @@ -42,9 +40,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用
static bool isFileServerStarted = false, isInputObserverStarted = false;
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
static bool isInputStreamStarted = false;
#endif
bool isInputObserverChanged = false, isInputFileChanged = false, isInputStreamChanged = false,
isInputContainerStdioChanged = false;
for (const auto& name : diff.mRemoved) {
Expand Down Expand Up @@ -73,14 +68,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
ShennongManager::GetInstance()->Pause();
}
if (isInputStreamStarted && isInputStreamChanged) {
StreamLogManager::GetInstance()->ShutdownConfigUsage();
}
#endif
#if defined(__linux__) && !defined(__ANDROID__)
if (isInputObserverStarted && isInputObserverChanged) {
ObserverManager::GetInstance()->HoldOn(false);
}
#endif
if (isFileServerStarted && (isInputFileChanged || isInputContainerStdioChanged)) {
FileServer::GetInstance()->Pause();
Expand All @@ -98,7 +85,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
for (auto& config : diff.mModified) {
auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue
if (!p) {
LOG_WARNING(sLogger,
LOG_WARNING(sLogger,
("failed to build pipeline for existing config",
"keep current pipeline running")("config", config.mName));
LogtailAlarm::GetInstance()->SendAlarm(
Expand Down Expand Up @@ -126,7 +113,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
for (auto& config : diff.mAdded) {
auto p = BuildPipeline(std::move(config));
if (!p) {
LOG_WARNING(sLogger,
LOG_WARNING(sLogger,
("failed to build pipeline for new config", "skip current object")("config", config.mName));
LogtailAlarm::GetInstance()->SendAlarm(
CATEGORY_CONFIG_ALARM,
Expand Down Expand Up @@ -159,29 +146,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
}
}

#if defined(__linux__) && !defined(__ANDROID__)
if (isInputObserverChanged) {
if (isInputObserverStarted) {
ObserverManager::GetInstance()->Resume();
} else {
// input_observer_network always relies on PluginBase
LogtailPlugin::GetInstance()->LoadPluginBase();
ObserverManager::GetInstance()->Reload();
isInputObserverStarted = true;
}
}
#endif
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (isInputStreamChanged) {
if (isInputStreamStarted) {
StreamLogManager::GetInstance()->StartupConfigUsage();
} else {
if (AppConfig::GetInstance()->GetOpenStreamLog()) {
StreamLogManager::GetInstance()->Init();
isInputStreamStarted = true;
}
}
}
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
ShennongManager::GetInstance()->Resume();
}
Expand Down Expand Up @@ -225,13 +190,12 @@ void PipelineManager::StopAllPipelines() {
#endif
PrometheusInputRunner::GetInstance()->Stop();
#if defined(__linux__) && !defined(__ANDROID__)
ObserverManager::GetInstance()->HoldOn(true);
ebpf::eBPFServer::GetInstance()->Stop();
#endif
FileServer::GetInstance()->Stop();

LogtailPlugin::GetInstance()->StopAll(true);
LogtailPlugin::GetInstance()->StopBuiltIn();
LogtailPlugin::GetInstance()->StopBuiltInModules();

bool logProcessFlushFlag = false;
for (int i = 0; !logProcessFlushFlag && i < 500; ++i) {
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class PipelineManager {
friend class BoundedProcessQueueUnittest;
friend class CircularProcessQueueUnittest;
friend class CommonConfigProviderUnittest;
friend class FlusherUnittest;
#endif
};

Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/plugin/interface/Flusher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bool Flusher::Stop(bool isPipelineRemoving) {

void Flusher::SetPipelineForItemsWhenStop() {
if (HasContext()) {
auto pipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName());
const auto& pipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName());
if (!pipeline) {
LOG_ERROR(sLogger, ("failed to get pipeline context", "context not found")("action", "not set pipeline"));
return;
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/BoundedProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bool BoundedProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
return true;
}

void BoundedProcessQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
void BoundedProcessQueue::SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const {
for (auto& item : mQueue) {
if (!item->mPipeline) {
item->mPipeline = p;
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;
void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const override;

void SetUpStreamFeedbacks(std::vector<FeedbackInterface*>&& feedbacks);

Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/BoundedSenderQueueInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void BoundedSenderQueueInterface::GiveFeedback() const {
}

void BoundedSenderQueueInterface::Reset(size_t cap, size_t low, size_t high) {
queue<unique_ptr<SenderQueueItem>>().swap(mExtraBuffer);
deque<unique_ptr<SenderQueueItem>>().swap(mExtraBuffer);
mRateLimiter.reset();
mConcurrencyLimiters.clear();
BoundedQueueInterface::Reset(low, high);
Expand Down
8 changes: 6 additions & 2 deletions core/pipeline/queue/BoundedSenderQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface<std::unique_ptr

void SetRateLimiter(uint32_t maxRate);
void SetConcurrencyLimiters(std::vector<std::shared_ptr<ConcurrencyLimiter>>&& limiters);
virtual void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const = 0;
virtual void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const = 0;

#ifdef APSARA_UNIT_TEST_MAIN
std::optional<RateLimiter>& GetRateLimiter() { return mRateLimiter; }
Expand All @@ -63,10 +63,14 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface<std::unique_ptr
std::optional<RateLimiter> mRateLimiter;
std::vector<std::shared_ptr<ConcurrencyLimiter>> mConcurrencyLimiters;

std::queue<std::unique_ptr<SenderQueueItem>> mExtraBuffer;
std::deque<std::unique_ptr<SenderQueueItem>> mExtraBuffer;

IntGaugePtr mExtraBufferSize;
IntGaugePtr mExtraBufferDataSizeBytes;

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherUnittest;
#endif
};

} // namespace logtail
2 changes: 1 addition & 1 deletion core/pipeline/queue/CircularProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool CircularProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
return true;
}

void CircularProcessQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
void CircularProcessQueue::SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const {
for (auto& item : mQueue) {
if (!item->mPipeline) {
item->mPipeline = p;
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/CircularProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class CircularProcessQueue : virtual public QueueInterface<std::unique_ptr<Proce

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;
void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const override;

void Reset(size_t cap);

Expand Down
4 changes: 2 additions & 2 deletions core/pipeline/queue/ExactlyOnceQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void ExactlyOnceQueueManager::DisablePopProcessQueue(const string& configName, b
if (iter.second->GetConfigName() == configName) {
iter.second->DisablePop();
if (!isPipelineRemoving) {
auto p = PipelineManager::GetInstance()->FindConfigByName(configName);
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
iter.second->SetPipelineForItems(p);
}
Expand Down Expand Up @@ -279,7 +279,7 @@ uint32_t ExactlyOnceQueueManager::GetProcessQueueCnt() const {
return mProcessQueues.size();
}

void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, std::shared_ptr<Pipeline>& p) {
void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, const std::shared_ptr<Pipeline>& p) {
lock_guard<mutex> lock(mSenderQueueMux);
auto iter = mSenderQueues.find(key);
if (iter != mSenderQueues.end()) {
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/ExactlyOnceQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ExactlyOnceQueueManager {
void GetAllAvailableSenderQueueItems(std::vector<SenderQueueItem*>& item, bool withLimits = true);
bool RemoveSenderQueueItem(QueueKey key, SenderQueueItem* item);
bool IsAllSenderQueueEmpty() const;
void SetPipelineForSenderItems(QueueKey key, std::shared_ptr<Pipeline>& p);
void SetPipelineForSenderItems(QueueKey key, const std::shared_ptr<Pipeline>& p);

void ClearTimeoutQueues();

Expand Down
22 changes: 17 additions & 5 deletions core/pipeline/queue/ExactlyOnceSenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ bool ExactlyOnceSenderQueue::Push(unique_ptr<SenderQueueItem>&& item) {
}
if (!eo->IsComplete()) {
item->mEnqueTime = chrono::system_clock::now();
mExtraBuffer.push(std::move(item));
mExtraBuffer.push_back(std::move(item));
return true;
}
}
Expand All @@ -102,7 +102,7 @@ bool ExactlyOnceSenderQueue::Remove(SenderQueueItem* item) {

if (!mExtraBuffer.empty()) {
Push(std::move(mExtraBuffer.front()));
mExtraBuffer.pop();
mExtraBuffer.pop_front();
return true;
}
if (ChangeStateIfNeededAfterPop()) {
Expand Down Expand Up @@ -153,10 +153,22 @@ void ExactlyOnceSenderQueue::Reset(const vector<RangeCheckpointPtr>& checkpoints
mRangeCheckpoints = checkpoints;
}

void ExactlyOnceSenderQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
void ExactlyOnceSenderQueue::SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const {
if (Empty()) {
return;
}
for (size_t index = 0; index < mCapacity; ++index) {
if (!mQueue[index]) {
mQueue[index]->mPipeline = p;
SenderQueueItem* item = mQueue[index].get();
if (item == nullptr) {
continue;
}
if (!item->mPipeline) {
item->mPipeline = p;
}
}
for (auto& item : mExtraBuffer) {
if (!item->mPipeline) {
item->mPipeline = p;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/ExactlyOnceSenderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ExactlyOnceSenderQueue : public BoundedSenderQueueInterface {
bool Push(std::unique_ptr<SenderQueueItem>&& item) override;
bool Remove(SenderQueueItem* item) override;
void GetAllAvailableItems(std::vector<SenderQueueItem*>& items, bool withLimits = true) override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;
void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const override;

void Reset(const std::vector<RangeCheckpointPtr>& checkpoints);

Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/ProcessQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ProcessQueueInterface : virtual public QueueInterface<std::unique_ptr<Proc
void DisablePop() { mValidToPop = false; }
void EnablePop() { mValidToPop = true; }

virtual void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const = 0;
virtual void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const = 0;

void Reset() { mDownStreamQueues.clear(); }

Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/ProcessQueueItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ struct ProcessQueueItem {
if (mPipeline) {
mPipeline->AddInProcessCnt();
} else {
auto p = PipelineManager::GetInstance()->FindConfigByName(configName);
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/ProcessQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ void ProcessQueueManager::DisablePop(const string& configName, bool isPipelineRe
if (iter != mQueues.end()) {
(*iter->second.first)->DisablePop();
if (!isPipelineRemoving) {
auto p = PipelineManager::GetInstance()->FindConfigByName(configName);
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
(*iter->second.first)->SetPipelineForItems(p);
}
Expand Down
20 changes: 16 additions & 4 deletions core/pipeline/queue/SenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ bool SenderQueue::Push(unique_ptr<SenderQueueItem>&& item) {
mInItemDataSizeBytes->Add(size);

if (Full()) {
mExtraBuffer.push(std::move(item));
mExtraBuffer.push_back(std::move(item));

mExtraBufferSize->Set(mExtraBuffer.size());
mExtraBufferDataSizeBytes->Add(size);
Expand Down Expand Up @@ -91,7 +91,7 @@ bool SenderQueue::Remove(SenderQueueItem* item) {
if (!mExtraBuffer.empty()) {
auto newSize = mExtraBuffer.front()->mData.size();
Push(std::move(mExtraBuffer.front()));
mExtraBuffer.pop();
mExtraBuffer.pop_front();

mExtraBufferSize->Set(mExtraBuffer.size());
mExtraBufferDataSizeBytes->Sub(newSize);
Expand Down Expand Up @@ -142,8 +142,20 @@ void SenderQueue::GetAllAvailableItems(vector<SenderQueueItem*>& items, bool wit
}
}

void SenderQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
for (auto& item : mQueue) {
void SenderQueue::SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const {
if (Empty()) {
return;
}
for (auto index = mRead; index < mWrite; ++index) {
SenderQueueItem* item = mQueue[index % mCapacity].get();
if (item == nullptr) {
continue;
}
if (!item->mPipeline) {
item->mPipeline = p;
}
}
for (auto& item : mExtraBuffer) {
if (!item->mPipeline) {
item->mPipeline = p;
}
Expand Down
Loading

0 comments on commit 5d46f61

Please sign in to comment.