From c5206af04c1bf697357b3f64e38e14fdef710ee5 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Thu, 12 Sep 2024 15:18:28 +0800 Subject: [PATCH] fix --- core/go_pipeline/LogtailPlugin.cpp | 4 +- core/pipeline/Pipeline.cpp | 170 +++++++++---------- core/pipeline/Pipeline.h | 17 +- core/pipeline/PipelineManager.cpp | 69 ++------ core/pipeline/plugin/interface/Flusher.cpp | 8 + core/pipeline/queue/BoundedProcessQueue.cpp | 14 ++ core/pipeline/queue/BoundedProcessQueue.h | 1 + core/pipeline/queue/CircularProcessQueue.cpp | 16 +- core/pipeline/queue/CircularProcessQueue.h | 1 + core/pipeline/queue/ProcessQueueInterface.h | 6 +- core/pipeline/queue/ProcessQueueItem.h | 12 ++ core/pipeline/queue/SenderQueueItem.h | 12 ++ core/pipeline/queue/SenderQueueManager.h | 4 +- core/plugin/flusher/sls/FlusherSLS.cpp | 26 +-- core/runner/LogProcess.cpp | 4 +- pkg/logtail/libPluginAdapter.so | Bin 20768 -> 29968 bytes plugin_main/plugin_export.go | 6 +- pluginmanager/logstore_config.go | 140 ++++++--------- pluginmanager/plugin_manager.go | 27 +-- pluginmanager/plugin_runner_v1.go | 4 +- pluginmanager/plugin_runner_v2.go | 4 +- 21 files changed, 265 insertions(+), 280 deletions(-) diff --git a/core/go_pipeline/LogtailPlugin.cpp b/core/go_pipeline/LogtailPlugin.cpp index 7e69ce3b82..9b78fb5a1a 100644 --- a/core/go_pipeline/LogtailPlugin.cpp +++ b/core/go_pipeline/LogtailPlugin.cpp @@ -22,15 +22,15 @@ #include "common/JsonUtil.h" #include "common/LogtailCommonFlags.h" #include "common/TimeUtil.h" -#include "pipeline/compression/CompressorFactory.h" #include "container_manager/ConfigContainerInfoUpdateCmd.h" #include "file_server/ConfigManager.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" #include "pipeline/PipelineManager.h" -#include "profile_sender/ProfileSender.h" +#include "pipeline/compression/CompressorFactory.h" #include "pipeline/queue/SenderQueueManager.h" +#include "profile_sender/ProfileSender.h" DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false); DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect, diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index b0450a2bb0..c4614004b0 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -19,41 +19,22 @@ #include #include -#include "pipeline/batch/TimeoutFlushManager.h" +#include "batch/TimeoutFlushManager.h" #include "common/Flags.h" #include "common/ParamExtractor.h" -#include "plugin/flusher/sls/FlusherSLS.h" +#include "flusher/sls/FlusherSLS.h" #include "go_pipeline/LogtailPlugin.h" -#include "plugin/input/InputFeedbackInterfaceRegistry.h" -#include "pipeline/plugin/PluginRegistry.h" -#include "plugin/processor/ProcessorParseApsaraNative.h" -#include "pipeline/queue/ProcessQueueManager.h" -#include "pipeline/queue/QueueKeyManager.h" -#include "pipeline/queue/SenderQueueManager.h" +#include "input/InputFeedbackInterfaceRegistry.h" +#include "plugin/PluginRegistry.h" +#include "processor/ProcessorParseApsaraNative.h" +#include "queue/ProcessQueueManager.h" +#include "queue/QueueKeyManager.h" +#include "queue/SenderQueueManager.h" DECLARE_FLAG_INT32(default_plugin_log_queue_size); using namespace std; -namespace { -class AggregatorDefaultConfig { -public: - static AggregatorDefaultConfig& Instance() { - static AggregatorDefaultConfig instance; - return instance; - } - - Json::Value* GetJsonConfig() { return &aggregatorDefault; } - -private: - Json::Value aggregatorDefault; - AggregatorDefaultConfig() { aggregatorDefault["Type"] = "aggregator_default"; } - - AggregatorDefaultConfig(AggregatorDefaultConfig const&) = delete; - void operator=(AggregatorDefaultConfig const&) = delete; -}; -} // namespace - namespace logtail { void AddExtendedGlobalParamToGoPipeline(const Json::Value& extendedParams, Json::Value& pipeline) { @@ -78,21 +59,20 @@ bool Pipeline::Init(PipelineConfig&& config) { const InputContainerStdio* inputContainerStdio = nullptr; bool hasFlusherSLS = false; - // to send alarm and init MetricsRecord before flusherSLS is built, a temporary object is made, which will be +#ifdef __ENTERPRISE__ + // to send alarm before flusherSLS is built, a temporary object is made, which will be overriden shortly after. unique_ptr SLSTmp = make_unique(); - if (!config.mProject.empty()) { - SLSTmp->mProject = config.mProject; - SLSTmp->mLogstore = config.mLogstore; - SLSTmp->mRegion = config.mRegion; - mContext.SetSLSInfo(SLSTmp.get()); - } + SLSTmp->mProject = config.mProject; + SLSTmp->mLogstore = config.mLogstore; + SLSTmp->mRegion = config.mRegion; + mContext.SetSLSInfo(SLSTmp.get()); +#endif mPluginID.store(0); for (size_t i = 0; i < config.mInputs.size(); ++i) { const Json::Value& detail = *config.mInputs[i]; - string pluginType = detail["Type"].asString(); - unique_ptr input - = PluginRegistry::GetInstance()->CreateInput(pluginType, GenNextPluginMeta(false)); + string name = detail["Type"].asString(); + unique_ptr input = PluginRegistry::GetInstance()->CreateInput(name, GenNextPluginMeta(false)); if (input) { Json::Value optionalGoPipeline; if (!input->Init(detail, mContext, i, optionalGoPipeline)) { @@ -103,20 +83,19 @@ bool Pipeline::Init(PipelineConfig&& config) { MergeGoPipeline(optionalGoPipeline, mGoPipelineWithInput); } // for special treatment below - if (pluginType == InputFile::sName) { + if (name == InputFile::sName) { inputFile = static_cast(mInputs[0]->GetPlugin()); - } else if (pluginType == InputContainerStdio::sName) { + } else if (name == InputContainerStdio::sName) { inputContainerStdio = static_cast(mInputs[0]->GetPlugin()); } } else { - AddPluginToGoPipeline(pluginType, detail, "inputs", mGoPipelineWithInput); + AddPluginToGoPipeline(detail, "inputs", mGoPipelineWithInput); } - ++mPluginCntMap["inputs"][pluginType]; + ++mPluginCntMap["inputs"][name]; } for (size_t i = 0; i < config.mProcessors.size(); ++i) { - const Json::Value& detail = *config.mProcessors[i]; - string pluginType = detail["Type"].asString(); + string name = (*config.mProcessors[i])["Type"].asString(); unique_ptr processor = PluginRegistry::GetInstance()->CreateProcessor(pluginType, GenNextPluginMeta(false)); if (processor) { @@ -125,44 +104,35 @@ bool Pipeline::Init(PipelineConfig&& config) { } mProcessorLine.emplace_back(std::move(processor)); // for special treatment of topicformat in apsara mode - if (i == 0 && pluginType == ProcessorParseApsaraNative::sName) { + if (i == 0 && name == ProcessorParseApsaraNative::sName) { mContext.SetIsFirstProcessorApsaraFlag(true); } } else { if (ShouldAddPluginToGoPipelineWithInput()) { AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithInput); } else { - AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*config.mProcessors[i], "processors", mGoPipelineWithoutInput); } } - ++mPluginCntMap["processors"][pluginType]; + ++mPluginCntMap["processors"][name]; } - if (config.mAggregators.empty() && config.IsFlushingThroughGoPipelineExisted()) { - // an aggregator_default plugin will be add to go pipeline when mAggregators is empty and need to send go data - // to cpp flusher. - config.mAggregators.push_back(AggregatorDefaultConfig::Instance().GetJsonConfig()); - } - for (size_t i = 0; i < config.mAggregators.size(); ++i) { - const Json::Value& detail = *config.mAggregators[i]; - string pluginType = detail["Type"].asString(); - GenNextPluginMeta(false); + for (auto detail : config.mAggregators) { if (ShouldAddPluginToGoPipelineWithInput()) { - AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithInput); + AddPluginToGoPipeline(*detail, "aggregators", mGoPipelineWithInput); } else { - AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*detail, "aggregators", mGoPipelineWithoutInput); } - ++mPluginCntMap["aggregators"][pluginType]; + ++mPluginCntMap["aggregators"][(*detail)["Type"].asString()]; } - for (size_t i = 0; i < config.mFlushers.size(); ++i) { - const Json::Value& detail = *config.mFlushers[i]; - string pluginType = detail["Type"].asString(); + for (auto detail : config.mFlushers) { + string name = (*detail)["Type"].asString(); unique_ptr flusher - = PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false)); + = PluginRegistry::GetInstance()->CreateFlusher(name, GenNextPluginMeta(false)); if (flusher) { Json::Value optionalGoPipeline; - if (!flusher->Init(detail, mContext, optionalGoPipeline)) { + if (!flusher->Init(*detail, mContext, optionalGoPipeline)) { return false; } mFlushers.emplace_back(std::move(flusher)); @@ -173,18 +143,18 @@ bool Pipeline::Init(PipelineConfig&& config) { MergeGoPipeline(optionalGoPipeline, mGoPipelineWithoutInput); } } - if (pluginType == FlusherSLS::sName) { + if (name == FlusherSLS::sName) { hasFlusherSLS = true; mContext.SetSLSInfo(static_cast(mFlushers.back()->GetPlugin())); } } else { if (ShouldAddPluginToGoPipelineWithInput()) { - AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithInput); + AddPluginToGoPipeline(*detail, "flushers", mGoPipelineWithInput); } else { - AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*detail, "flushers", mGoPipelineWithoutInput); } } - ++mPluginCntMap["flushers"][pluginType]; + ++mPluginCntMap["flushers"][name]; } // route is only enabled in native flushing mode, thus the index in config is the same as that in mFlushers @@ -192,17 +162,14 @@ bool Pipeline::Init(PipelineConfig&& config) { return false; } - for (size_t i = 0; i < config.mExtensions.size(); ++i) { - const Json::Value& detail = *config.mExtensions[i]; - string pluginType = detail["Type"].asString(); - GenNextPluginMeta(false); + for (auto detail : config.mExtensions) { if (!mGoPipelineWithInput.isNull()) { - AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithInput); + AddPluginToGoPipeline(*detail, "extensions", mGoPipelineWithInput); } if (!mGoPipelineWithoutInput.isNull()) { - AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*detail, "extensions", mGoPipelineWithoutInput); } - ++mPluginCntMap["extensions"][pluginType]; + ++mPluginCntMap["extensions"][(*detail)["Type"].asString()]; } // global module must be initialized at last, since native input or flusher plugin may generate global param in Go @@ -291,7 +258,7 @@ bool Pipeline::Init(PipelineConfig&& config) { ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(mContext.GetProcessQueueKey(), priority); } else { ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue( - mContext.GetProcessQueueKey(), priority, 1024); + mContext.GetProcessQueueKey(), priority, 100); } @@ -323,13 +290,13 @@ void Pipeline::Start() { } if (!mGoPipelineWithoutInput.isNull()) { - // TODO: 加载该Go流水线 + LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput()); } - // TODO: 启用Process中改流水线对应的输入队列 + ProcessQueueManager::GetInstance()->ValidatePop(mContext.GetProcessQueueKey()); if (!mGoPipelineWithInput.isNull()) { - // TODO: 加载该Go流水线 + LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput()); } for (const auto& input : mInputs) { @@ -367,6 +334,7 @@ bool Pipeline::Send(vector&& groupList) { } } } + pipeline->SubInProcessingCnt(); return allSucceeded; } @@ -386,23 +354,42 @@ void Pipeline::Stop(bool isRemoving) { } if (!mGoPipelineWithInput.isNull()) { - // TODO: 卸载该Go流水线 + // Go pipeline `Stop` will stop and delete + LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), isRemoving); + } + + ProcessQueueManager::GetInstance()->InvalidatePop(mContext->GetConfigName()); + + // Wait for all processing item finish + uint64_t startTime = GetCurrentTimeInMilliSeconds(); + bool alarmFlag = false; + while (mProcessingCnt.load() != 0) { + usleep(1000 * 10); // 10ms + uint64_t duration = GetCurrentTimeInMilliSeconds() - startTime; + if (!alarmFlag && duration > 1000) { // 1s + LOG_ERROR(sLogger, ("pipeline stop", "too slow")("config", mName)("cost", duration)); + LogtailAlarm::GetInstance()->SendAlarm(CONFIG_UPDATE_ALARM, + string("pipeline stop too slow, config: ") + mName + + "; cost:" + duration, + GetProject(), + GetLogstore(), + GetRegion()); + alarmFlag = true; + } } - // TODO: 禁用Process中改流水线对应的输入队列 - if (!isRemoving) { FlushBatch(); } if (!mGoPipelineWithoutInput.isNull()) { - // TODO: 卸载该Go流水线 + // Go pipeline `Stop` will stop and delete + LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), isRemoving); } for (const auto& flusher : mFlushers) { flusher->Stop(isRemoving); } - LOG_INFO(sLogger, ("pipeline stop", "succeeded")("config", mName)); } @@ -426,18 +413,10 @@ void Pipeline::MergeGoPipeline(const Json::Value& src, Json::Value& dst) { } } -std::string Pipeline::GenPluginTypeWithID(std::string pluginType, std::string pluginID) { - return pluginType + "/" + pluginID; -} - -// Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority. -void Pipeline::AddPluginToGoPipeline(const string& pluginType, - const Json::Value& plugin, - const string& module, - Json::Value& dst) { +void Pipeline::AddPluginToGoPipeline(const Json::Value& plugin, const string& module, Json::Value& dst) { Json::Value res(Json::objectValue), detail = plugin; detail.removeMember("Type"); - res["type"] = GenPluginTypeWithID(pluginType, GetNowPluginID()); + res["type"] = plugin["Type"]; res["detail"] = detail; dst[module].append(res); } @@ -501,6 +480,11 @@ std::string Pipeline::GetNowPluginID() { return std::to_string(mPluginID.load()); } +std::string Pipeline::GenNextPluginID() { + mPluginID.fetch_add(1); + return std::to_string(mPluginID.load()); +} + PluginInstance::PluginMeta Pipeline::GenNextPluginMeta(bool lastOne) { mPluginID.fetch_add(1); int32_t childNodeID = mPluginID.load(); diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index 55669559db..5ec31d4ab0 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -24,14 +24,14 @@ #include #include "config/PipelineConfig.h" -#include "plugin/input/InputContainerStdio.h" -#include "plugin/input/InputFile.h" #include "models/PipelineEventGroup.h" #include "pipeline/PipelineContext.h" #include "pipeline/plugin/instance/FlusherInstance.h" #include "pipeline/plugin/instance/InputInstance.h" #include "pipeline/plugin/instance/ProcessorInstance.h" #include "pipeline/route/Router.h" +#include "plugin/input/InputContainerStdio.h" +#include "plugin/input/InputFile.h" namespace logtail { @@ -45,6 +45,18 @@ class Pipeline { bool Send(std::vector&& groupList); bool FlushBatch(); void RemoveProcessQueue() const; + void AddInProcessingCnt() const { mProcessingCnt.fetch_add(1); } + void SubInProcessingCnt() const { + uint16_t currentVal; + do { + currentVal = atomic_val.load(std::memory_order_relaxed); + // cannot sub smaller than 0 + if (currentVal == 0) { + return; + } + } while (!atomic_val.compare_exchange_weak( + currentVal, currentVal - 1, std::memory_order_release, std::memory_order_relaxed)); + } const std::string& Name() const { return mName; } PipelineContext& GetContext() const { return mContext; } @@ -88,6 +100,7 @@ class Pipeline { std::unordered_map> mPluginCntMap; std::unique_ptr mConfig; std::atomic_uint16_t mPluginID; + std::atomic_uint16_t mProcessingCnt; #ifdef APSARA_UNIT_TEST_MAIN friend class PipelineMock; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index c99b458e4e..08d1e637b2 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -16,7 +16,7 @@ #include "pipeline/PipelineManager.h" -#include "file_server/ConfigManager.h" +#include "config_manager/ConfigManager.h" #include "file_server/FileServer.h" #include "go_pipeline/LogtailPlugin.h" #include "prometheus/PrometheusInputRunner.h" @@ -24,15 +24,15 @@ #include "ebpf/eBPFServer.h" #include "observer/ObserverManager.h" #endif -#include "runner/LogProcess.h" +#include "processor/daemon/LogProcess.h" #if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) #include "app_config/AppConfig.h" #include "shennong/ShennongManager.h" #include "streamlog/StreamLogManager.h" #endif #include "config/feedbacker/ConfigFeedbackReceiver.h" -#include "pipeline/queue/ProcessQueueManager.h" -#include "pipeline/queue/QueueKeyManager.h" +#include "queue/ProcessQueueManager.h" +#include "queue/QueueKeyManager.h" using namespace std; @@ -85,27 +85,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { if (isFileServerStarted && (isInputFileChanged || isInputContainerStdioChanged)) { FileServer::GetInstance()->Pause(); } - LogProcess::GetInstance()->HoldOn(); - LogtailPlugin::GetInstance()->HoldOn(false); - if (isInputPrometheusChanged) { - PrometheusInputRunner::GetInstance()->Start(); - } -#if defined(__linux__) && !defined(__ANDROID__) - // 和其它插件不同,ebpf需要init之后才能配置加载,最终状态这个init函数是在插件自己的start函数里面,目前暂时在此过渡。 - if (inputEbpfChanged) { - logtail::ebpf::eBPFServer::GetInstance()->Init(); - } - #endif for (const auto& name : diff.mRemoved) { auto iter = mPipelineNameEntityMap.find(name); - if (iter->second->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), true); - } - if (iter->second->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), true); - } iter->second->Stop(true); DecreasePluginUsageCnt(iter->second->GetPluginStatistics()); iter->second->RemoveProcessQueue(); @@ -114,13 +97,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { } for (auto& config : diff.mModified) { auto iter = mPipelineNameEntityMap.find(config.mName); - if (iter->second->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), false); - } - if (iter->second->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), false); - } - auto p = BuildPipeline(std::move(config)); + auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue if (!p) { LOG_WARNING(sLogger, ("failed to build pipeline for existing config", @@ -136,21 +113,16 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { ConfigFeedbackStatus::FAILED); continue; } - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); LOG_INFO(sLogger, ("pipeline building for existing config succeeded", "stop the old pipeline and start the new one")("config", config.mName)); iter->second->Stop(false); DecreasePluginUsageCnt(iter->second->GetPluginStatistics()); + mPipelineNameEntityMap[config.mName] = p; IncreasePluginUsageCnt(p->GetPluginStatistics()); p->Start(); - if (p->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput()); - } - if (p->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput()); - } + ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); } for (auto& config : diff.mAdded) { auto p = BuildPipeline(std::move(config)); @@ -169,24 +141,16 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { } LOG_INFO(sLogger, ("pipeline building for new config succeeded", "begin to start pipeline")("config", config.mName)); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); mPipelineNameEntityMap[config.mName] = p; IncreasePluginUsageCnt(p->GetPluginStatistics()); p->Start(); - if (p->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput()); - } - if (p->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput()); - } + ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); } #ifndef APSARA_UNIT_TEST_MAIN // 在Flusher改造完成前,先不执行如下步骤,不会造成太大影响 // Sender::CleanUnusedAk(); - // 过渡使用 - LogProcess::GetInstance()->Resume(); if (isInputFileChanged || isInputContainerStdioChanged) { if (isFileServerStarted) { FileServer::GetInstance()->Resume(); @@ -267,6 +231,8 @@ void PipelineManager::StopAllPipelines() { FileServer::GetInstance()->Stop(); PrometheusInputRunner::GetInstance()->Stop(); + LogtailPlugin::GetInstance()->StopAll(true, true); + bool logProcessFlushFlag = false; for (int i = 0; !logProcessFlushFlag && i < 500; ++i) { logProcessFlushFlag = LogProcess::GetInstance()->FlushOut(10); @@ -280,7 +246,6 @@ void PipelineManager::StopAllPipelines() { FlushAllBatch(); - LogtailPlugin::GetInstance()->StopAll(true, true); LogtailPlugin::GetInstance()->StopAll(true, false); // TODO: make it common @@ -325,7 +290,9 @@ void PipelineManager::CheckIfInputUpdated(const Json::Value& config, bool& isInputObserverChanged, bool& isInputFileChanged, bool& isInputStreamChanged, - bool& isInputContainerStdioChanged) { + bool& isInputContainerStdioChanged, + bool& isInputPrometheusChanged, + bool& isInputEbpfChanged) { string inputType = config["Type"].asString(); if (inputType == "input_observer_network") { isInputObserverChanged = true; @@ -337,12 +304,10 @@ void PipelineManager::CheckIfInputUpdated(const Json::Value& config, isInputContainerStdioChanged = true; } else if (inputType == "input_prometheus") { isInputPrometheusChanged = true; - } else if (inputType == "input_ebpf_processprobe_security" || - inputType == "input_ebpf_processprobe_observer" || - inputType == "input_ebpf_sockettraceprobe_security" || - inputType == "input_ebpf_sockettraceprobe_observer" || - inputType == "input_ebpf_fileprobe_security" || - inputType == "input_ebpf_profilingprobe_observer") { + } else if (inputType == "input_ebpf_processprobe_security" || inputType == "input_ebpf_processprobe_observer" + || inputType == "input_ebpf_sockettraceprobe_security" + || inputType == "input_ebpf_sockettraceprobe_observer" || inputType == "input_ebpf_fileprobe_security" + || inputType == "input_ebpf_profilingprobe_observer") { isInputEbpfChanged = true; } } diff --git a/core/pipeline/plugin/interface/Flusher.cpp b/core/pipeline/plugin/interface/Flusher.cpp index 432b184fd2..1eecba9d69 100644 --- a/core/pipeline/plugin/interface/Flusher.cpp +++ b/core/pipeline/plugin/interface/Flusher.cpp @@ -29,6 +29,14 @@ bool Flusher::Start() { } bool Flusher::Stop(bool isPipelineRemoving) { + if (HasContext()) { + unique_ptr tombStone = make_unique(); + tombStone->mPipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName()); + if (!tombStone->mPipeline) { + LOG_ERROR(sLogger, ("failed to find pipeline when stop flusher", mContext->GetConfigName())); + } + SenderQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(tombStone)); + } SenderQueueManager::GetInstance()->DeleteQueue(mQueueKey); return true; } diff --git a/core/pipeline/queue/BoundedProcessQueue.cpp b/core/pipeline/queue/BoundedProcessQueue.cpp index 470d134476..22ee0d2f1f 100644 --- a/core/pipeline/queue/BoundedProcessQueue.cpp +++ b/core/pipeline/queue/BoundedProcessQueue.cpp @@ -33,12 +33,26 @@ bool BoundedProcessQueue::Pop(unique_ptr& item) { } item = std::move(mQueue.front()); mQueue.pop(); + item->AddPipelineInProcessingCnt(); if (ChangeStateIfNeededAfterPop()) { GiveFeedback(); } return true; } +void BoundedProcessQueue::InvalidatePop() { + mValidToPop = false; + std::size_t size = mQueue.size(); + auto pipeline = PipelineManager::GetInstance()->FindConfigByName(mConfigName); + if (pipeline) { + for (std::size_t i = 0; i < size; ++i) { + mQueue.front()->mPipeline = pipeline; + mQueue.push(std::move(mQueue.front())); + mQueue.pop(); + } + } +} + void BoundedProcessQueue::SetUpStreamFeedbacks(std::vector&& feedbacks) { mUpStreamFeedbacks.clear(); for (auto& item : feedbacks) { diff --git a/core/pipeline/queue/BoundedProcessQueue.h b/core/pipeline/queue/BoundedProcessQueue.h index 8b65c17193..5c7ccffef3 100644 --- a/core/pipeline/queue/BoundedProcessQueue.h +++ b/core/pipeline/queue/BoundedProcessQueue.h @@ -38,6 +38,7 @@ class BoundedProcessQueue : public BoundedQueueInterface&& item) override; bool Pop(std::unique_ptr& item) override; + void InvalidatePop() override; void SetUpStreamFeedbacks(std::vector&& feedbacks); diff --git a/core/pipeline/queue/CircularProcessQueue.cpp b/core/pipeline/queue/CircularProcessQueue.cpp index 5962bc6c05..21ffaaac72 100644 --- a/core/pipeline/queue/CircularProcessQueue.cpp +++ b/core/pipeline/queue/CircularProcessQueue.cpp @@ -40,14 +40,24 @@ bool CircularProcessQueue::Pop(unique_ptr& item) { return false; } item = std::move(mQueue.front()); - mQueue.pop_front(); + item->AddPipelineInProcessingCnt(); mEventCnt -= item->mEventGroup.GetEvents().size(); return true; } +void CircularProcessQueue::InvalidatePop() { + mValidToPop = false; + auto pipeline = PipelineManager::GetInstance()->FindConfigByName(mConfigName); + if (pipeline) { + for (auto it = mQueue.begin(); it != mQueue.end(); ++it) { + (*it)->mPipeline = pipeline; + } + } +} + void CircularProcessQueue::Reset(size_t cap) { - // it seems more reasonable to retain extra items and process them immediately, however this contray to current framework design - // so we simply discard extra items, considering that it is a rare case to change capacity + // it seems more reasonable to retain extra items and process them immediately, however this contray to current + // framework design so we simply discard extra items, considering that it is a rare case to change capacity uint32_t cnt = 0; while (!mQueue.empty() && mEventCnt > cap) { mEventCnt -= mQueue.front()->mEventGroup.GetEvents().size(); diff --git a/core/pipeline/queue/CircularProcessQueue.h b/core/pipeline/queue/CircularProcessQueue.h index 45f50a1959..bf246e8efe 100644 --- a/core/pipeline/queue/CircularProcessQueue.h +++ b/core/pipeline/queue/CircularProcessQueue.h @@ -35,6 +35,7 @@ class CircularProcessQueue : virtual public QueueInterface&& item) override; bool Pop(std::unique_ptr& item) override; + void InvalidatePop() override; void Reset(size_t cap); diff --git a/core/pipeline/queue/ProcessQueueInterface.h b/core/pipeline/queue/ProcessQueueInterface.h index 363bc55f27..fd564ea0ce 100644 --- a/core/pipeline/queue/ProcessQueueInterface.h +++ b/core/pipeline/queue/ProcessQueueInterface.h @@ -43,12 +43,10 @@ class ProcessQueueInterface : virtual public QueueInterface&& ques); - void InvalidatePop() { mValidToPop = false; } + virtual void InvalidatePop() { mValidToPop = false; } void ValidatePop() { mValidToPop = true; } - void Reset() { - mDownStreamQueues.clear(); - } + void Reset() { mDownStreamQueues.clear(); } protected: bool IsValidToPop() const; diff --git a/core/pipeline/queue/ProcessQueueItem.h b/core/pipeline/queue/ProcessQueueItem.h index 3b90db0906..a4595dcfd1 100644 --- a/core/pipeline/queue/ProcessQueueItem.h +++ b/core/pipeline/queue/ProcessQueueItem.h @@ -19,6 +19,7 @@ #include #include "models/PipelineEventGroup.h" +#include "pipeline/PipelineManager.h" namespace logtail { @@ -30,6 +31,17 @@ struct ProcessQueueItem { size_t mInputIndex = 0; // index of the input in the pipeline ProcessQueueItem(PipelineEventGroup&& group, size_t index) : mEventGroup(std::move(group)), mInputIndex(index) {} + + AddPipelineInProcessingCnt(std::string& configName) const { + if (mPipeline) { + mPipeline->AddInProcessingCnt(); + } else { + auto p = PipelineManager::GetInstance()->FindConfigByName(configName); + if (p) { + p->AddInProcessingCnt(); + } + } + } }; } // namespace logtail diff --git a/core/pipeline/queue/SenderQueueItem.h b/core/pipeline/queue/SenderQueueItem.h index 9e3b83c979..88ffeade77 100644 --- a/core/pipeline/queue/SenderQueueItem.h +++ b/core/pipeline/queue/SenderQueueItem.h @@ -21,6 +21,7 @@ #include #include +#include "pipeline/PipelineManager.h" #include "pipeline/queue/QueueKey.h" namespace logtail { @@ -60,6 +61,17 @@ struct SenderQueueItem { virtual ~SenderQueueItem() = default; virtual SenderQueueItem* Clone() { return new SenderQueueItem(*this); } + + void SubInProcessingCnt(std::string& configName) const { + if (mPipeline) { + mPipeline->SubInProcessingCnt(); + } else { + auto p = PipelineManager::GetInstance()->FindConfigByName(configName); + if (p != nullptr) { + p->SubInProcessingCnt(); + } + } + } }; } // namespace logtail diff --git a/core/pipeline/queue/SenderQueueManager.h b/core/pipeline/queue/SenderQueueManager.h index 08b5508794..2e301cb06d 100644 --- a/core/pipeline/queue/SenderQueueManager.h +++ b/core/pipeline/queue/SenderQueueManager.h @@ -24,11 +24,11 @@ #include #include "common/FeedbackInterface.h" +#include "pipeline/limiter/ConcurrencyLimiter.h" +#include "pipeline/limiter/RateLimiter.h" #include "pipeline/queue/QueueParam.h" #include "pipeline/queue/SenderQueue.h" #include "pipeline/queue/SenderQueueItem.h" -#include "pipeline/limiter/ConcurrencyLimiter.h" -#include "pipeline/limiter/RateLimiter.h" namespace logtail { diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 797739ef2f..ad9d0cb402 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -18,28 +18,28 @@ #include "config/provider/EnterpriseConfigProvider.h" #endif #include "app_config/AppConfig.h" -#include "pipeline/batch/FlushStrategy.h" #include "common/EndpointUtil.h" #include "common/HashUtil.h" #include "common/LogtailCommonFlags.h" #include "common/ParamExtractor.h" #include "common/TimeUtil.h" +#include "pipeline/Pipeline.h" +#include "pipeline/batch/FlushStrategy.h" #include "pipeline/compression/CompressorFactory.h" +#include "pipeline/queue/QueueKeyManager.h" +#include "pipeline/queue/SLSSenderQueueItem.h" +#include "pipeline/queue/SenderQueueManager.h" #include "plugin/flusher/sls/PackIdManager.h" #include "plugin/flusher/sls/SLSClientManager.h" #include "plugin/flusher/sls/SLSResponse.h" #include "plugin/flusher/sls/SendResult.h" -#include "pipeline/Pipeline.h" #include "profile_sender/ProfileSender.h" -#include "pipeline/queue/QueueKeyManager.h" -#include "pipeline/queue/SLSSenderQueueItem.h" -#include "pipeline/queue/SenderQueueManager.h" -#include "sdk/Common.h" #include "runner/FlusherRunner.h" +#include "sdk/Common.h" #include "sls_control/SLSControl.h" // TODO: temporarily used here -#include "plugin/flusher/sls/DiskBufferWriter.h" #include "pipeline/PipelineManager.h" +#include "plugin/flusher/sls/DiskBufferWriter.h" using namespace std; @@ -1001,18 +1001,6 @@ bool FlusherSLS::SerializeAndPush(vector&& groupLists) { } bool FlusherSLS::PushToQueue(QueueKey key, unique_ptr&& item, uint32_t retryTimes) { -#ifndef APSARA_UNIT_TEST_MAIN - // TODO: temporarily set here, should be removed after independent config update refactor - if (item->mFlusher->HasContext()) { - item->mPipeline - = PipelineManager::GetInstance()->FindConfigByName(item->mFlusher->GetContext().GetConfigName()); - if (!item->mPipeline) { - // should not happen - return false; - } - } -#endif - const string& str = QueueKeyManager::GetInstance()->GetName(key); for (size_t i = 0; i < retryTimes; ++i) { int rst = SenderQueueManager::GetInstance()->PushQueue(key, std::move(item)); diff --git a/core/runner/LogProcess.cpp b/core/runner/LogProcess.cpp index cd65998408..295ac9369e 100644 --- a/core/runner/LogProcess.cpp +++ b/core/runner/LogProcess.cpp @@ -15,12 +15,12 @@ #include "runner/LogProcess.h" #include "app_config/AppConfig.h" -#include "pipeline/batch/TimeoutFlushManager.h" #include "common/Flags.h" #include "go_pipeline/LogtailPlugin.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" #include "pipeline/PipelineManager.h" +#include "pipeline/batch/TimeoutFlushManager.h" #include "pipeline/queue/ExactlyOnceQueueManager.h" #include "pipeline/queue/ProcessQueueManager.h" #include "pipeline/queue/QueueKeyManager.h" @@ -294,6 +294,8 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { } pipeline->Send(std::move(eventGroupList)); } + // When item is in sender queue, we could decrease the count + pipeline->SubInProcessingCnt(); } } LOG_WARNING(sLogger, ("runner/LogProcess.hread", "Exit")("threadNo", threadNo)); diff --git a/pkg/logtail/libPluginAdapter.so b/pkg/logtail/libPluginAdapter.so index 823180a8bdfae00a1ebbdcabc75a7a659a0fe6fb..c64c6832d6ec3e9c7bf5f8cb0eecca957d7a6a83 100755 GIT binary patch literal 29968 zcmeHwdwf$>zVA*_F!D&^1E-@nZZT*p(gzB*3YyRqb|A$Hg<*7hM`Z{gG!`GBa z%W)f`Jph_RORQ8#sQ@_XKk}4)SmY+xPXVDkNaa>Zf+*l+n zuj1*;S7cF2#P#rZIF)i%L@B9T$aM=jNmXLNC8?c96)GeBm5KCh>K1&G5-(lVA4cja z++V?>C(ujCaJVFua@RsmRDm;m%cn4>vRLGo%PtR#@=GeRXLUOhdD!qbf6-Fq?)JUI zFF)N_HQdm*;foI+d%X0KXJ$|lPQ%|6{E?bd@TY9rRNz@PWum*srQEhL?|YO+Hn<3X zm*8(Y{^%;gUoi`fe3Xs^S%_tT(2d(RKYi2wPnPX}{Zq$h(Pv8TpLXwu`~PwOgMZk2 z)>99>b>m#$@xO2Xed3T~()WKh^QkR=dgsnJ9tFyHBiYyG zIm*+XL(c;_@KbZ>*`9-cZw~yb9D06*f}f^*M;WZ+HIV-AO@Z&u!p{PJx-vnTd==-H zpZ5v=p=^GZ?|8){9K&5g;Azm)mGMgV1su3V;QIy6oC?C!|LD3FdQMZ$QZ@N}o0AMi;hKP2z7>Y<(+)yL8ztq#;~$jNC(;* zBcbM&icm|-+)&*rr7Hr>ol2v(KGXr_{;p-Qp zk(P>9#*#zzvKd*z%Vv_vH(g*&^N+uM|u=9Tp=MP2Ph zGbo5k)lHj5(ODF2k+o-AAy^H5(lF6^}_$mQssA&&>y=ffu4 zyx!hy!f9LxfL~Am|gdJVH3FN#)gF%R_|AtpGw^ zsHIPS51Y?}>~pY|M~MHsJjgzG*YXJA=kOr=9IE9J!s*^iUPdhs5q_=&K@ZmQ2;pK& zgaJoP_<5X#e~R%<{Hh7JoAC2Z_{rc&3!JpTNei5`z)1_7w7^LVoV3763!JpTNei5` z!2cf>&|)9kwfLC3uT~VTw?8t*ScrOSvFGjEdHqu}-$PPr>Ie9Ap65Y`;!Tv897^Gu z`WD446dz>qT@T@RWpikGnXPbf}X97&bMZ=^VF zZ6sYR-br!Vno24xegnm6iz9jPIDn}uDNb7$$srcMj^ea+ksM_4#T2J4i)25GFQ7PW zRU|jF_*{z97DaLsi(f%;+L}oAvG@#%)0RZCo5e4tIBi8FJ6L=Q#c2y7Sl-0E9qhJaTKR5h-3+iTPaRk4@s59zrr_hO|<2Zbg}qn6sN6*q=L9H4Id3f z&hvVJV7n%HKUACIT4K&$Jm5VOIZ*#V{0Cn2pv0KPM9PmuCZT&Rnu9R9*Qu!?=Xu?v zXuF``FJ$~Prw}PTuIUH0ZJ%7NZ98JqEIYN=jz`Xff^&reduph`d7f`XzI)E!k-ZWv zUaH0B9G{J#rhgcj0JT5ALQ#x~RD~uq+s-ksfzPr+<|}FPCM25c6Cd*$w0LX$`k-I` zda0%#()7P+iJ7m%WJ@&tbxr?DiM|gavdJNQe-<0HHN3 zlz;sS?+>>(jATak@-Af=9rNoiQl4KR!#9uxa(aofPRu-re8nIw<5_5w1uKK$;CX*^ zCCmGAo;T(AArM}QGt#9vL|l444Q?ulmu#Vc_YuP5tbNX5ImN~tLm#oE22r1*6k`4W zk#{Ka1dR%a@(uy{btCdqAnyX>+r;RL*SPb&iC(hy&nUR` zV^2aMdJyxs=T1lG)Z*i{#7(ME{xv#K?1&|D%0NXvFDK7gI%@sNbO%miAwt9*>1P1}fOTaB06~ z)tSR@%J%Rs^Dgz)EM20ZA(jNKyg%ArS$bgjQ?pkMIjl08H z_<*|`GS0$>+fXfYt zpc6_2nqSp>ATleUf2t*F+!zy|7^{z<;S+OyifPHKKW|)wYQWt`muu^vAA}GZfjgXr zK5$(RE+dav^Z}X_Esg{w#Gz<9~daN{RW!yUh6t*^vA4H<@Y zAkpHUK$Y~CU%J>he2nVzG&5U3|3uTb8}AdOKc}JUmsaVA{Q7R>j=G~LYf%(5-nHb6 z*FR6{sb`aiMF$h*@Kd=5Q2+Rt(pg~GReD$(@UU-e;K~O)lgX(!W)8gmuo~bCz%V_$i;LP7Tm$;rF7SgU^Ok&;|s9@3! z`kC%0>8Ud~9pW%PgVFRIrTx7J)-LkvAC&e7^!K*Xx=a7m*oic>)Hq`W0o(Dt=an8FsF=)J;Sds@pb*Bo%H}?dQz(1!Zx8^)jg&bd!a>H*b)gK(7hM_vS6pPrB66_b1W*X{D@1IGHVAP~QD zc!$ZNeQ=+=kNgIydj9p9*?VY;Kl&`KUX8m*Jd2~D5B`!RFVR0IS8lYBngV@!9yv6) zvvj&97#4zo<){}7c=AYde3~(fbjPo^iv;6B(jsPNbBWMjkO?pFVv<24OvF@;ytecC zjus+3BIaY_XXT$F`Dtz^9_F92`Gt3leVni9H@oe|%U>Xu#9E8dMS(a*2u7o(ZK#6WUQzASaI}#U*GF5{K6mm+8WK9T@;z4>93+I z8J*PZ`nG`OT}^+7*dKRpsj%#{T%aYopkkY)r$74eP79Sa9}Ze9QMzxn6h)ueY55Ux z4DaY;?Zdj)T<&Bd=f)M?#!XZ#pwm%fy~2*h?JMmcewX|P>dg?-{Q1ZXw8bqGdQ;Jh zwfHSc;_{l+}^R{R_7hJLC(9> zmuNc{NCaKp8gJ~w<8U~oDdhH*KR&H0zR<28_3FD~1J;1P+Z+4CaVf9GJC2Z55+1w1 z{Lrn_0&&m5*nX>~cVJe5Pxc=4mG6rFMXTGTCGy^&ZtqV_$g4{D^5A$2{ju$?D&6ia ze>-}QFqpafV7z(1@7Gh_Wq$p*XzBGI-wP#heF2OwXU{F9Q9n{yn&SP*8?SD{1Y7Hm zdpctKEn1=jE^_vp>H2DMSA1?h^V;#nYC=P2nG22yFASf``j-~#$Nl8#{_;NuoPn>% zVU}-;?x#h4`InJ#T5P9%_-XFPJ@zV$>pS5;x8QEc`1~{K9yB31dwz@CwbH{+(oLLk zKkz3JfGy_y3mB2^AH)Qx8QYk|4kj@G2}m0gO*zWmiIL{KkfI&Zmos49a9g#L>JAo9g(Wa z__C%-*veOWV8mtnZ7(AuYDU z;w^7$ioW2B|In@Z<8Aqsy?Y{;IkzlDb&T$>ilcUf(F~QAIA;0bvj`d+OIkFHN~m#e zarM3!$?Mq_-3O;{I8d)Zyxv_<)%$_7Cm%~x=avtN#F{gn|ER3_#Gi<&2#^ce~QF z%h^+fmZjwea)g;_7S`XUR>&WA1oZcP`hH@Xi&5?)_Yz3dS-QWP8vUTM=YX?k8eppG z1<>i&2fW?KI-EV914QT_oV^nesw{o6`zGs6&Ypk7wgT2>0sYlV{TMBiTw0-_6%Km~ zcNp(~O7c4sKAT0~GdD52pi)0<+zK$=FQDCc2Dt=ad6z#i`@DeuS-|pnKz}7*Ichve zDf<3^zK`7Bg2e0+qYn^@7SNB)-I-UQ7#+}tYA1^@`KHHv-P!PuTJxUrJKitDx@g;x z^R#V8&oA7b$lK%7-!I(m?A2MGH`x8L9rjAJ;~j5e=^^E@>e@>4cWnEqNY{+n$irK{ z_s1W}uH@`p2lnKnOvVFKwP1Zieq2LKyNpNKOz!Nt0X>{M&9BJG(Wtu^ri;_~>QNDz z6RNLkboM-lyl5K0N^lPCdSIQ?OpZK0^E$XsT9u#A#{HAD149e!g*0#IV`fvTz7vY0 z9~c+GXToKC4{P2Bu#*B+JBTo~PVESEPGUN#fLDH!N>#;I!#Nyw_TEajM$|*Gt|GT3 zS5@9%@(l19pB$j(iMaEz7`X#`Hs~r={W)5?d}+)eMf#!SQCNc4)Fo?r0hTrKlJ&U_I?L3&hROD4>G`XVo<#0^IXoJ+hHkbfnT85C~YH~B;f?8)0SaL_anTzF#3|t z7>^>N0|k0tj5@Wr&%HbbIg4iTx!3Y}`eJ0tve&kf$*D}5UgC}a$YnUGJg8pTv#9$o z^?c(OXgx37j$8uv?uu3=SE04i{uw#g{qbcTOt%)lge_Rd1buzlNmj`iq?zqIJCio}^$>(zPQ#QaoMqUCvv5Nr^P_}{AdY&X*Yhi5llVS}`+ zs{CX4N4ir-B{bv~y$hAYEavPv!gf$*CW^6imRejGtU+C;r?gPV7i2sAS* zVp|ZM)TpO8bdq5+ER%ef*2XQ*u=r$#IonC;c_{?X-z~9 z1&gey;6Aw1hMAE3o$X^3>PRYb-A-Ha2vc+qAJ%W6Yoj{7g>1l#aa@d2V9x}jcIHN! z_h`q;7#fCSxCb0`znoZWHSXLm9QyVotuD77pdNs{z62xkivG-KrBK;vk=tpWARjOr zcltEGjvFgsC3O7SA_DqnB~JT;=h;xZ z9(x9uR0uxE;B9oz*O9r-?o{Jvb_F-P)mNMKB8oP`UBkrV9 z@$N{SmF3HJCXC54^Ruz|fJ>uUmKVz^Qyl4($vnk?%VO4`{=j9_(mk?w2lRTkIy_14 z-=eQ#M4jsFc>}e^7=A}ZCT~YR;;Q~xs+tK^`di!HJB^Rya+kAr3Lwt4Msh8q4DGKa z6SMyaZBXTq=xa;7XxfyYHt8^DM$a6c#n-dA z`SI)XkNC?!jYz*RU^m9{45ROs_KVc$dx9VJ!Pw8|mFk^_shbv5D!FbCRUIEHD>e6? zKg2!L6BwGTa1Wx)G;REdyeVtSh`WUMpG#)Y+LyL(N_bD|`77onEivgnnrZ?xJ@ZNS zC3qo>)jYIL@*d2aJom)JYT~OAe3d#aFcWjyz-;^lm4Tbo_tC8PGL;35?4bhnEa}`^XqcdsL!fa{Y8>acR zcaO8T4gp#SV-0*3^Wrp6z%;!YT9cJXO$#DWKpK*K!2$o8JR7qk?@$+tvh_4jhf*+$ zIeYukzS}NSy&`pjv$r#yx*uMMwI=OikSw}t4N_@pt4g$~-lu6I^2b+T;O>cNJj35g zp#+M=AYZ)9_!o{w>flbwKXUfcw-NEEk~ni#`E$4vSFcbhJFCitOFB2H~^_&VI^-Vd9Q??J+(g-A#~ z1YTagNwR#m1H#Q-@&+`zH@>=w?sl$(-AtmVkZ3c{W&-IKJ+m1-5Iyq-STFfCTqoN6 zuc#2nlbqZQ!n&(ZAMQP%B_=)qx8&6k8uK7RKBsS3b5_2PoC)1BKk8FYVP_|Kh|fz; zpSPKEDZL z;yk|eqF>!(asU70d}BI{k0tfM@LuVE;Fj$T%Yk;)(Ta=;-@x5+tjHY{6B8ji`TSwuIlOt;We=0 zX7P5`^!o71Xk&3*duOC4`a0suqh1Yh3JDXd>ZIMumGBsK>HL4a&?J88} zS6s6!Sh;xdHH(Au{DDd(B#`C)nxMCarzoA%R^k=&y7p)r-da;zo4Z;=k-8?OF4Wf6 z9#PpFav^$eFWOeeUasqkwzh^k*D00sx}I{)>TqX6OZytGxq5NsJpc0GlA6W-YBQot ztyela+wpE+b6evTQ@gHMvZ@*7&wwg;hVUFB2 zHkG|HNPA;+u@~;zSF34$o=n+^R~FN?Q>UwBv${yQUY*(nV-;Q2Rj4+izUkPwZ_+1g zt7c&yV zweMh_!-A;;l%B-t2CV_D{%b1Lf%JY*dSBc#luG>xv>Wte!a?oebG@5N6%aqD2ecNn z8*~%sX3&G6yFoqgrBZpAM20|TgI2$vO5IFx(0f7E4^yd^K{tcCumBhWb%PFKHCPW? zf>mNSXf-G;6*@ql0PO=E0v!UiV*%pAolXI$2UG(+2-*W$l1!x@0__GJ1XVGO*-`Hy ztVDkX>cZ-FGw3GJK~TEnyFr#47b}*vF3UL+PO%iY+*q?5FmAMXA!n)9rztQG%s2lR;I|^cBu62}Hs~iPhhw63Yg8`yc#)t7&4>`KG zRhBqh%b=0w>VEuf1b#Ir@ce5W>LSXF%DxwWn~}C)B+ch2nC~b72kFGBQW?t9N#7`( ziTc~oC*=EhgcXoY(4W{B`^1C4*}xQFFm?LE#+On9d^hy6_XR5*t_@b7BR^&fIMn6L z8eWIpyL~4YAv^bg-vGatAXsaI^%~-bO>U6<%+8O3&xLXY(tI#c%r?)V2FFRp`PgwF zJMRYXW#GLx&08UKT5m`*`^Nhau5h?eB5k}f8@zO`#{-ZHlN`){hjp$aKVh5eP&ejz z9R(Z4cpW9NvDT__jslbqX}pQN$d4?LL5xd!b_r>A*$xJOb=-VDAxt3^!U~pABU5Sf0<}u|9>Oz`DHe`o=GG zs7Mz*nfhHB#&JEyc@4<}k6Blvp8oNaR)e&Mk;dNNV`;C>w7uXPguaW2&*yM0MG5DRpU3i} z^skXlzDlMqV=cwg%aA@f555KIkC8lVl%T#K+6#Rl2HShA8+lJi4_5R9(!CtA_3(Qh zl4Uk%f%b~=WC`*aYmwfA^!XxvgH`tBNVqz@QWTJ>7{asc*aSM^~nwgjDR54P!TsN#xyOyAxfCo&sX$_U!)ih+m*~rzbjn z*uJK^E7wwgArHp=6#4GuNTYjYMWZxp8;rGp@}+S@KK%ux|DMw4lYd~gLVMpJ+q-i6 zP7Phxwt>oU@;YgO|I;iW&w-HVKFD((vd?v(sevw!2*~)ttoU_V@u2AVlAq43pi9o- zYeXDhSh4G0MO@D3ipuecoTSg=!EB7^$?2_kLknf{QrvL4Ds-5Ds zB0mgeb{)j#5nW;#rnumX>Ef$uypJvT$p_FyK7=m828ArD2sXtK0a};PCDf}WDmwT4V zk$HkH6SP6l)q-vi^d3R~P0%fZ?iBP*LH7xISkSTOE9^Q;&~;V6S0zqh}R+n$ED=Mx~3vl9Mp;}&aX;G;&FYQbXc>u8|} z$EVLIWEyAVllL{@`Ba|KBu>(PA4|tkFD^d?X7wbAB%|%OiC%_4LDmZc3uC@fCbBe8FAg!Egl}FUs zUR5mCJYXw`=f9Fi+ejYLZnldo3#~YYiZYP4vrQA&2_-ES1ubsGeo)bab+(L?c({xOBu_a+Uz=rIw7#_BoX6U!9QKM- zDk0&U1jBiL=##qOd&#V0Yrrf_Ax=Tr(~)izxRYOyyUnFjyh4OxI6fh|%ouE2NPgx!#fY z3oi*=u45#AK;SNsuUwyxgMQLel7*`Rug=2D1m2N_FA}(%|7E@CVLGapynmDTOZN+0 z-p9%MzQpvLrO0)nFsnl6y;|tJ(UqtOE{Pu(xLij{{46Wym+MN2Un20KP^KQgz&B;# zs{~$~h12!ZFhM%U$^9?2c zmqL%cKbE)+3sVd3Cgu5w63gucHZe6zqk z0= z^ZN&nsJ+&S>F9QfZvIps!unJ82M{G*+7eO(ScZ8`9tOH96YlGVoJ*`s#UpvVYDK`0lK7 zUY8@^)*SfhIqcA#gTD~tdbIP$6?Rf{QM(4p2i3>;@tH*$Dk>@Jz=`2SB|NFVJy;)Y z?NFc&r%mHgi43@?uDzvRoD5x3l;ha$Z%fB{-27PTk|Oc6Q07tIS&8CE?@`ecRq;c` z(-~wuha{exQG)XW*Ua?>g2Arn%Ah}3Qc_w{5-bVkdUPq1-zWG>zk#10x?MDS<|0w# zmQmiIGzhPAx^`K(s9hQz&3wUJtz%M}tNpMy(G@}C7u6{vhsFmZtw9_;-WKjc_UI*z zE$u5qEx~#`*V7dYMc3k)tJaPd9B^J=bor$-W{u9oj-wBTIy*z_f_Mf8&s{ZihFZha zgYy=9t15$)i+psxy*SyN&QBL%HYDT&T>|6dCfe53+}IYTu8SR( zV5KHhX}-l$n4Y`|2SfF;dj~tiOjf3IMdHD-beFhh-n=E1HNhJ1+(2czOzgOJbD2!p zn+9_v)ELCM?Tt9WT`=>b-Gf0K(4VeleOG(1Db!X^NA>e6^Mn0)n3pQ6&YkV;>Eqj( zP12?2LhR&up^OH55Ur{NeLq;_t@2m!gX29Od5HTAoZQY1gwKQ^ZvM3ac8+}}2j!B9 zh%D*&eEz!xIe#;NA7!5@%}&kFg!sAnnb_W%vApK1esVPcZ4Iq_rQNLB9n>#5J4u!e`FyOGym{`s3)7*_)jfl zwlM#_2>yczBU=zI+A*HBmU27z?5){9gz-W^U;c3!vQDa-Qh&>-W z9lzedVojm0CZ(u;T^l4hjdb!9F(qe2f=I(tuOSi;p^lb_Qp84hQ6#(;VfF(aMV;+z zaxV%u2_M;14_O)G65LsG2^l6S$^r^laYC)lbtnS-1AP>++_>hI_!S6D@2&LQEm-LR zU?{<_Ou*rbXmeWwY2isLS9XS1%NRWuEW@yn3<~j47JdJ9h<5m}q{2NTzvTY1q~dl( zq-LIdPhb)L@H#@fl&=*5NgoxGHbLb&QAFAA&0y7;&fg^%B^|=LjxHj&#Yyj{$^9)4 zFcOjZ%YAK0pO6XQp^J2wFAWjw*{91d_rE1=63nvvQk|qrkxt)%%edSpm$XF4XO~~V zY#Wc#q`ZfWflE^Py}Qg`+E3QM4ZI{jmxOT1eRoOa_x>c0-H{XQw-y-L1j}uHshKo; zXPELbUtNfp8QeM(6kfpC`q`droqp1oYu5Xin4m?zTDKEbVSu5maKBSAT z?D(s|sP9R6`F)A}{XkV1R@zO<%X008tT})Ay~?I*kbtX}eU5xcc}d?!l36}2P|aoN zbRzR5~QB2Lnuviy=>kt6>hA>W;rHo>U$-aYN`jY$6Kc>bjfl{h`QHk@)fH%2fZI*zOtgq{{%G4k=+0Q literal 20768 zcmeHP3vgW3c|LbnS6XRTZ(A!_*j^JmwS%=Q$q!^ec=a-?Ml!}PObWSP?OsVcUhVGe z!`LtsHzoyDxV6&`F@vWykV!~LW=NY!UFgFEhF8-mkaqA;n(?Hh6@*F5qXGKx_WRFc zb??<`Af4$w&^Kr@# z2ZVx?jtQ74s_brp9o6&GCKdMGvb?3@a!l4=Q5jy|zGWzky6J|U``G02!N<)z541k{ zzI!w$J>tL|xH_uXIBKv*q)_4qBj`H!~z%ZER_ z?*0$%VUP8N-+1Ybwx|B$#Jdeoe*YhS{PvH2(eV5K_)X7yg}>^4``))7z2oU8{_@VZ zf4(1PWfRpiP)2ga5_sPd_-^1=mhtldSD zCGi@mKPdGR5??3r&Q<~3Eb;e1$$y>Hr}j{yer}-%s}AQVW1FO(tn}lRcpKOEv11aK zFpDD7%-UFASRiVg-y!j^1HS;A>T_JS>t12T&VrJkU10$zKle*K;=q|Dhu_IA(5KSiTFxk{V zJeq;^Xl_4%_}=s$GZo)aNRfRaX(amDrh5ZYmZta>Vrz9xLHCpFr6SW21vZMsKh;xP^t%nS3ZI+8c_S znOq3AqIL#kz{vZOsZj8SdqY>b+OaHD5>MM8ZS+9T-_&+#|nR0IymXT z6@PBK@JGi=>6!!QEqJ3);oBE1I32@Odf9@jI+N^z1(&BVZgSCrt7A3^<}J9~b-Df} z3(n7z9A)ey3vS&-+rvVEj;Gdk1uVF=-1QdxS2=HpQYza@1S%1zM4%FZN(3qq_;rpz z;_ooG$u__Lm8M5~HhegaW(-J94pt_~wd{4@#Qx>!V7_d4-3 z<#_95&c8}LO_km{4?c3D|3v_i6Z#K`Z2Ks`9L4RVJc_F=UTj=_47%(YNumEB*WdCL zA{#D8W-mpa{`vOE(;vAb+H;XtF6WoQ!Aj}CQ@l9YxO#ZO{#gGl5V1mNU*vfI0QW1SSxfz3ZdMY~fO5cHcY-Jr8%0 zIs6yEjbDn)?fYmHwL7up&l{+i=g(0?=Ktd|T0N)7xbi;OH-R$+2T#PM1vd}w3$9)$7~bcF?4qq2O}x=_ z`lDP(y;P_2&$w?1S%1zM4%FZN(3qqs6?O=fl34_5vWAq|0Dvx zWqY!7TR=TB3rrSLfpr=2U&n0l;og!2nL4I*y049(cJvJ9jb9XtanM&T6^rytZ~ME& z;(k!R-LnkMdIl3^X+Tu4|~i+LeGOA*cGy zLC&8^mgHY5$?t*uJ;?bpR3*=GG57y9$m`G!{!CWMUGb9rvyktBJOjE={-KimO~8*r z&Y#_u?01*sRcP<)kn?AcS z74vLZ0Md*_q+DG`())0f{DQC-B%tueM9*?+Oist{)gaPx4^S&P2)83ZV%lV;OY+{qw@_n zT*&d&A~p09J5wyxvgUF#d4<;Y9ea^JM7;#2ruP+_jkFmd%X z)`Pe`trP+(eXHG(y{61XyPEqI{~)w#JT)^!G|viDsm4`9%OGm1YO)-xzJi1LAQ6v# z8g{&Us_yTg!-nhYz^hJlQohzx_XuPc^j~^50`>j~y84GTJQiN+P5r-WDMTLcQ^fo? zXPUfEKv%!anRags5!Z{H3HVOHsm?Uct@q8sx2_4U+c!ZjT!QQKxk%G3xUg>nY3hQD z_%@QuYQgRD{WXkmk!NS0D9avMqXBGjb8?LpYJNN`>fV@ zH|R;<$7C1MYUjY6_R*l$y9IaFS3_Kn;2!fe5Z5cXCw$9^+oaV|kDv2>flAmS>UrLG z1<9_~Jah2=#syg`Yw3h(P3MXa$#jfVR&kY<6@;(m__Eib zuC>r>9jw;f7Y0FEb`u;N5X^wrNs~0@@CpWYu&rT z9AwL{BM&$8o!+@BLOav;0UDx>bE$Lfg?6S6r!)Tx@S}~drelQb)U^M7&W5Oku2XS8 z4H|8HNU#U}f56%OLivFIZti_TNcXa~K{D>=r8!rOXlK4xDy@Hwt+eYqRB3dY)wWCN z0sl9s!C@&q=zo~J4|LKF>^il}e=V_tQaa{;h?IvU%lr>eO^2l`kAELYN2IigwS9(+ zyLnBWYgM!}VVn&`T)ORX>G9iY+AF29rkkWx)^u}+t)^R~RMvE>WEpF_np(Yq*TuOG zMmv+Gy3jdV+pv~S5a=V;zMdMls@l0qMLY9%q)o+Yt9odpQIE!`>8n3SL(FyRpudT1 z0~={83%2pt6oTfouM1b?!=mrKFt^Zv7$U_HnAUe@MCIe*CuzKL;Rc z*48z&dfg434b56hO%rA^t~TawxW;>}cbj(%*!)X%eB!~4T!EY5>&5+;Cinc z2kaV|ZNM@6vs_q9R=APfs<|7yUg3>wz4aF7ebDepK&R3Os9z`^d#@(TY+M7kA&8q= z;ov$<7@Ic>Puy^O2SF_o;x!jg>l#F5nH_hvTT^m8|ty8&Y)!#TIlw4_E0VpjhVSnLN0kR zrV{z-!bB*R&YGbNUk)Ue#AK(j8s>05olfS=eAl*6Hl5Cg3b`yTj|pL=P3TD9R%7et zuB;iKj^?|PiB#c8Xj>?eiX{tilW|qageDUCoDIQ?W@5#S?Ey|OmdQ{>%gUxnf~7xk zb22zhr~drWjG03g0~u9Hq>;rShDIrSG-qJdQWEQ{gc{T1tk$K*w7X5yeV%@Om8+{& z_rIpMy{2og>3stL3jjBButVWJ*Xvh4rR#sJH{YVKJ*;qHm523& ztG9*l_3pZ$-cXXPKCGu)+geF7)2*-YoRSt-9M*4gt>qS1Y|?$7*R@+*F7%jTq$ej0 ztP~r)(a4(7xRJ?cnGu_gCQ>Yibzcz47|AtNMs7Nt%}*32S!QAnEex}XvB-)%^%^}` ziAD>#uo4YC)QyQ~&Sc5-R4$MH|Gas0sij@eNlgX#hUTT2iNV6;q=`yRCX!|<&0^DL z><}#}D^(c_&sZ#-noLaXjLu+5+hj62l`E}Fvn>!Ky9HHVe^o#7V#EqrBbH7UX5c=z z(5cKwC*u?;3{W;DM$*DkHu5kN%^pG}X-pt+)6r}^63tEX0&?)YzlTcZjVh#aiK!H> z0jPCkx?^-~#K8WLqK8e3)eFU%Z}}!o`4-+yv|tosb?%;)f9O*?qUs>#T(hl zvUpQz-Kf=@8!M32eJVTN!qhrZJKoC5-6wJ5Z3VTS)UJ=K&#)B{c{kQQcbCQ0-6^|` zn_>P^Cb=0VL1mJ=bQjJpbhA!7AUQXGo53!^3;}b1cZ+rulRyA2tmi)H=wHkyIeyZC zuaUUAuc`c9jf`fA1C>W<2YyaXLq2TWYPX5DTA*6dnlpd6RoY&5DWm@h# z0Z#)Da7+8PoQ_1v1G-<>!rOX-8!V%2vW->`L zZ^naHZ|dDzUWtDc!H8zF(W8c$%4d(VNlZdaBVL%9ISLaC$G@UrQ(baWVHi7h4~&l( zBRhxb^9u%MWX4EDf|23f%(!9f#=(Ix8GjOl! zbvU$Po~J=cX~yXTkW<6f6bBB@~w&mLHX{G6rN>GZ`fVnad>e zEXcpt5zNzUFUWf{m`%r{`6vsT({e~o$6>2DVIjt*uuyrjq9WjcS0_4?h@n1dIL0Q( zOPk2$SP(nQ40Zr0nG=O6m`77n*s~;?NKKNx5KT;E&BKbL3lo(`Y!>hA{|ki9y!5OI zhXbC}yid`!5?91IoeM(nOFTSMz^G$CRR^&hpO!x7w@g1Vz`9(2|_x(~~2b z>?3KU2M~23p zvRCs;^*ytI46KTqDp!qz3$V48uda8_g+vJvK}uU?ujoHJ>{(cRE?@gJm0}y)%9nt`YV})xeRJQ1(`; z_FZc|56JHrwaWwJDhTOZr}|Hwf9Uw4`rc}v84=tOhjUe~(rsSCe*OlbLDxgp 0 { diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index 23ed88a859..195d804c17 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -375,7 +375,7 @@ func (p *pluginv2Runner) runFlusherInternal(cc *pipeline.AsyncControl) { } break } - if !p.LogstoreConfig.FlushOutFlag { + if !p.LogstoreConfig.FlushOutFlag.Load() { time.Sleep(time.Duration(10) * time.Millisecond) continue } @@ -405,7 +405,7 @@ func (p *pluginv2Runner) Stop(exit bool) error { p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") - p.LogstoreConfig.FlushOutFlag = true + p.LogstoreConfig.FlushOutFlag.Store(true) p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 {