diff --git a/core/go_pipeline/LogtailPlugin.cpp b/core/go_pipeline/LogtailPlugin.cpp index ca2d640b03..26c9e64e4f 100644 --- a/core/go_pipeline/LogtailPlugin.cpp +++ b/core/go_pipeline/LogtailPlugin.cpp @@ -22,15 +22,15 @@ #include "common/JsonUtil.h" #include "common/LogtailCommonFlags.h" #include "common/TimeUtil.h" -#include "pipeline/compression/CompressorFactory.h" #include "container_manager/ConfigContainerInfoUpdateCmd.h" #include "file_server/ConfigManager.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" #include "pipeline/PipelineManager.h" -#include "profile_sender/ProfileSender.h" +#include "pipeline/compression/CompressorFactory.h" #include "pipeline/queue/SenderQueueManager.h" +#include "profile_sender/ProfileSender.h" DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false); DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect, diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 0610563906..820e14a85f 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -19,41 +19,22 @@ #include #include -#include "pipeline/batch/TimeoutFlushManager.h" +#include "batch/TimeoutFlushManager.h" #include "common/Flags.h" #include "common/ParamExtractor.h" -#include "plugin/flusher/sls/FlusherSLS.h" +#include "flusher/sls/FlusherSLS.h" #include "go_pipeline/LogtailPlugin.h" -#include "plugin/input/InputFeedbackInterfaceRegistry.h" -#include "pipeline/plugin/PluginRegistry.h" -#include "plugin/processor/ProcessorParseApsaraNative.h" -#include "pipeline/queue/ProcessQueueManager.h" -#include "pipeline/queue/QueueKeyManager.h" -#include "pipeline/queue/SenderQueueManager.h" +#include "input/InputFeedbackInterfaceRegistry.h" +#include "plugin/PluginRegistry.h" +#include "processor/ProcessorParseApsaraNative.h" +#include "queue/ProcessQueueManager.h" +#include "queue/QueueKeyManager.h" +#include "queue/SenderQueueManager.h" DECLARE_FLAG_INT32(default_plugin_log_queue_size); using namespace std; -namespace { -class AggregatorDefaultConfig { -public: - static AggregatorDefaultConfig& Instance() { - static AggregatorDefaultConfig instance; - return instance; - } - - Json::Value* GetJsonConfig() { return &aggregatorDefault; } - -private: - Json::Value aggregatorDefault; - AggregatorDefaultConfig() { aggregatorDefault["Type"] = "aggregator_default"; } - - AggregatorDefaultConfig(AggregatorDefaultConfig const&) = delete; - void operator=(AggregatorDefaultConfig const&) = delete; -}; -} // namespace - namespace logtail { void AddExtendedGlobalParamToGoPipeline(const Json::Value& extendedParams, Json::Value& pipeline) { @@ -78,21 +59,20 @@ bool Pipeline::Init(PipelineConfig&& config) { const InputContainerStdio* inputContainerStdio = nullptr; bool hasFlusherSLS = false; - // to send alarm and init MetricsRecord before flusherSLS is built, a temporary object is made, which will be +#ifdef __ENTERPRISE__ + // to send alarm before flusherSLS is built, a temporary object is made, which will be overriden shortly after. unique_ptr SLSTmp = make_unique(); - if (!config.mProject.empty()) { - SLSTmp->mProject = config.mProject; - SLSTmp->mLogstore = config.mLogstore; - SLSTmp->mRegion = config.mRegion; - mContext.SetSLSInfo(SLSTmp.get()); - } + SLSTmp->mProject = config.mProject; + SLSTmp->mLogstore = config.mLogstore; + SLSTmp->mRegion = config.mRegion; + mContext.SetSLSInfo(SLSTmp.get()); +#endif mPluginID.store(0); for (size_t i = 0; i < config.mInputs.size(); ++i) { const Json::Value& detail = *config.mInputs[i]; - string pluginType = detail["Type"].asString(); - unique_ptr input - = PluginRegistry::GetInstance()->CreateInput(pluginType, GenNextPluginMeta(false)); + string name = detail["Type"].asString(); + unique_ptr input = PluginRegistry::GetInstance()->CreateInput(name, GenNextPluginMeta(false)); if (input) { Json::Value optionalGoPipeline; if (!input->Init(detail, mContext, i, optionalGoPipeline)) { @@ -103,66 +83,56 @@ bool Pipeline::Init(PipelineConfig&& config) { MergeGoPipeline(optionalGoPipeline, mGoPipelineWithInput); } // for special treatment below - if (pluginType == InputFile::sName) { + if (name == InputFile::sName) { inputFile = static_cast(mInputs[0]->GetPlugin()); - } else if (pluginType == InputContainerStdio::sName) { + } else if (name == InputContainerStdio::sName) { inputContainerStdio = static_cast(mInputs[0]->GetPlugin()); } } else { - AddPluginToGoPipeline(pluginType, detail, "inputs", mGoPipelineWithInput); + AddPluginToGoPipeline(detail, "inputs", mGoPipelineWithInput); } - ++mPluginCntMap["inputs"][pluginType]; + ++mPluginCntMap["inputs"][name]; } for (size_t i = 0; i < config.mProcessors.size(); ++i) { - const Json::Value& detail = *config.mProcessors[i]; - string pluginType = detail["Type"].asString(); + string name = (*config.mProcessors[i])["Type"].asString(); unique_ptr processor = PluginRegistry::GetInstance()->CreateProcessor(name, GenNextPluginMeta(false)); - if (processor) { + if (processor) { if (!processor->Init(*config.mProcessors[i], mContext)) { return false; } mProcessorLine.emplace_back(std::move(processor)); // for special treatment of topicformat in apsara mode - if (i == 0 && pluginType == ProcessorParseApsaraNative::sName) { + if (i == 0 && name == ProcessorParseApsaraNative::sName) { mContext.SetIsFirstProcessorApsaraFlag(true); } } else { - if (ShouldAddPluginToGoPipelineWithInput()) { + if (ShouldAddPluginToGoPipelineWithInput()) { AddPluginToGoPipeline(*config.mProcessors[i], "processors", mGoPipelineWithInput); } else { - AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*config.mProcessors[i], "processors", mGoPipelineWithoutInput); } } - ++mPluginCntMap["processors"][pluginType]; + ++mPluginCntMap["processors"][name]; } - if (config.mAggregators.empty() && config.IsFlushingThroughGoPipelineExisted()) { - // an aggregator_default plugin will be add to go pipeline when mAggregators is empty and need to send go data - // to cpp flusher. - config.mAggregators.push_back(AggregatorDefaultConfig::Instance().GetJsonConfig()); - } - for (size_t i = 0; i < config.mAggregators.size(); ++i) { - const Json::Value& detail = *config.mAggregators[i]; - string pluginType = detail["Type"].asString(); - GenNextPluginMeta(false); + for (auto detail : config.mAggregators) { if (ShouldAddPluginToGoPipelineWithInput()) { - AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithInput); + AddPluginToGoPipeline(*detail, "aggregators", mGoPipelineWithInput); } else { - AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*detail, "aggregators", mGoPipelineWithoutInput); } - ++mPluginCntMap["aggregators"][pluginType]; + ++mPluginCntMap["aggregators"][(*detail)["Type"].asString()]; } - for (size_t i = 0; i < config.mFlushers.size(); ++i) { - const Json::Value& detail = *config.mFlushers[i]; - string pluginType = detail["Type"].asString(); + for (auto detail : config.mFlushers) { + string name = (*detail)["Type"].asString(); unique_ptr flusher - = PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false)); + = PluginRegistry::GetInstance()->CreateFlusher(name, GenNextPluginMeta(false)); if (flusher) { Json::Value optionalGoPipeline; - if (!flusher->Init(detail, mContext, optionalGoPipeline)) { + if (!flusher->Init(*detail, mContext, optionalGoPipeline)) { return false; } mFlushers.emplace_back(std::move(flusher)); @@ -173,18 +143,18 @@ bool Pipeline::Init(PipelineConfig&& config) { MergeGoPipeline(optionalGoPipeline, mGoPipelineWithoutInput); } } - if (pluginType == FlusherSLS::sName) { + if (name == FlusherSLS::sName) { hasFlusherSLS = true; mContext.SetSLSInfo(static_cast(mFlushers.back()->GetPlugin())); } } else { if (ShouldAddPluginToGoPipelineWithInput()) { - AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithInput); + AddPluginToGoPipeline(*detail, "flushers", mGoPipelineWithInput); } else { - AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*detail, "flushers", mGoPipelineWithoutInput); } } - ++mPluginCntMap["flushers"][pluginType]; + ++mPluginCntMap["flushers"][name]; } // route is only enabled in native flushing mode, thus the index in config is the same as that in mFlushers @@ -192,17 +162,14 @@ bool Pipeline::Init(PipelineConfig&& config) { return false; } - for (size_t i = 0; i < config.mExtensions.size(); ++i) { - const Json::Value& detail = *config.mExtensions[i]; - string pluginType = detail["Type"].asString(); - GenNextPluginMeta(false); + for (auto detail : config.mExtensions) { if (!mGoPipelineWithInput.isNull()) { - AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithInput); + AddPluginToGoPipeline(*detail, "extensions", mGoPipelineWithInput); } if (!mGoPipelineWithoutInput.isNull()) { - AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithoutInput); + AddPluginToGoPipeline(*detail, "extensions", mGoPipelineWithoutInput); } - ++mPluginCntMap["extensions"][pluginType]; + ++mPluginCntMap["extensions"][(*detail)["Type"].asString()]; } // global module must be initialized at last, since native input or flusher plugin may generate global param in Go @@ -291,7 +258,7 @@ bool Pipeline::Init(PipelineConfig&& config) { ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(mContext.GetProcessQueueKey(), priority); } else { ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue( - mContext.GetProcessQueueKey(), priority, 1024); + mContext.GetProcessQueueKey(), priority, 100); } @@ -323,13 +290,13 @@ void Pipeline::Start() { } if (!mGoPipelineWithoutInput.isNull()) { - // TODO: 加载该Go流水线 + LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput()); } - // TODO: 启用Process中改流水线对应的输入队列 + ProcessQueueManager::GetInstance()->ValidatePop(mContext.GetProcessQueueKey()); if (!mGoPipelineWithInput.isNull()) { - // TODO: 加载该Go流水线 + LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput()); } for (const auto& input : mInputs) { @@ -367,6 +334,7 @@ bool Pipeline::Send(vector&& groupList) { } } } + pipeline->SubInProcessingCnt(); return allSucceeded; } @@ -386,23 +354,42 @@ void Pipeline::Stop(bool isRemoving) { } if (!mGoPipelineWithInput.isNull()) { - // TODO: 卸载该Go流水线 + // Go pipeline `Stop` will stop and delete + LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), isRemoving); + } + + ProcessQueueManager::GetInstance()->InvalidatePop(mContext->GetConfigName()); + + // Wait for all processing item finish + uint64_t startTime = GetCurrentTimeInMilliSeconds(); + bool alarmFlag = false; + while (mProcessingCnt.load() != 0) { + usleep(1000 * 10); // 10ms + uint64_t duration = GetCurrentTimeInMilliSeconds() - startTime; + if (!alarmFlag && duration > 1000) { // 1s + LOG_ERROR(sLogger, ("pipeline stop", "too slow")("config", mName)("cost", duration)); + LogtailAlarm::GetInstance()->SendAlarm(CONFIG_UPDATE_ALARM, + string("pipeline stop too slow, config: ") + mName + + "; cost:" + duration, + GetProject(), + GetLogstore(), + GetRegion()); + alarmFlag = true; + } } - // TODO: 禁用Process中改流水线对应的输入队列 - if (!isRemoving) { FlushBatch(); } if (!mGoPipelineWithoutInput.isNull()) { - // TODO: 卸载该Go流水线 + // Go pipeline `Stop` will stop and delete + LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), isRemoving); } for (const auto& flusher : mFlushers) { flusher->Stop(isRemoving); } - LOG_INFO(sLogger, ("pipeline stop", "succeeded")("config", mName)); } @@ -426,18 +413,10 @@ void Pipeline::MergeGoPipeline(const Json::Value& src, Json::Value& dst) { } } -std::string Pipeline::GenPluginTypeWithID(std::string pluginType, std::string pluginID) { - return pluginType + "/" + pluginID; -} - -// Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority. -void Pipeline::AddPluginToGoPipeline(const string& pluginType, - const Json::Value& plugin, - const string& module, - Json::Value& dst) { +void Pipeline::AddPluginToGoPipeline(const Json::Value& plugin, const string& module, Json::Value& dst) { Json::Value res(Json::objectValue), detail = plugin; detail.removeMember("Type"); - res["type"] = GenPluginTypeWithID(pluginType, GetNowPluginID()); + res["type"] = plugin["Type"]; res["detail"] = detail; dst[module].append(res); } @@ -501,6 +480,11 @@ std::string Pipeline::GetNowPluginID() { return std::to_string(mPluginID.load()); } +std::string Pipeline::GenNextPluginID() { + mPluginID.fetch_add(1); + return std::to_string(mPluginID.load()); +} + PluginInstance::PluginMeta Pipeline::GenNextPluginMeta(bool lastOne) { mPluginID.fetch_add(1); int32_t childNodeID = mPluginID.load(); diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index 23055a99cd..43a989fcd6 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -24,14 +24,14 @@ #include #include "config/PipelineConfig.h" -#include "plugin/input/InputContainerStdio.h" -#include "plugin/input/InputFile.h" #include "models/PipelineEventGroup.h" #include "pipeline/PipelineContext.h" #include "pipeline/plugin/instance/FlusherInstance.h" #include "pipeline/plugin/instance/InputInstance.h" #include "pipeline/plugin/instance/ProcessorInstance.h" #include "pipeline/route/Router.h" +#include "plugin/input/InputContainerStdio.h" +#include "plugin/input/InputFile.h" namespace logtail { @@ -45,6 +45,18 @@ class Pipeline { bool Send(std::vector&& groupList); bool FlushBatch(); void RemoveProcessQueue() const; + void AddInProcessingCnt() const { mProcessingCnt.fetch_add(1); } + void SubInProcessingCnt() const { + uint16_t currentVal; + do { + currentVal = atomic_val.load(std::memory_order_relaxed); + // cannot sub smaller than 0 + if (currentVal == 0) { + return; + } + } while (!atomic_val.compare_exchange_weak( + currentVal, currentVal - 1, std::memory_order_release, std::memory_order_relaxed)); + } const std::string& Name() const { return mName; } PipelineContext& GetContext() const { return mContext; } @@ -88,6 +100,7 @@ class Pipeline { std::unordered_map> mPluginCntMap; std::unique_ptr mConfig; std::atomic_uint16_t mPluginID; + std::atomic_uint16_t mProcessingCnt; #ifdef APSARA_UNIT_TEST_MAIN friend class PipelineMock; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 3bbe4a57ff..08d1e637b2 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -16,7 +16,7 @@ #include "pipeline/PipelineManager.h" -#include "file_server/ConfigManager.h" +#include "config_manager/ConfigManager.h" #include "file_server/FileServer.h" #include "go_pipeline/LogtailPlugin.h" #include "prometheus/PrometheusInputRunner.h" @@ -24,15 +24,15 @@ #include "ebpf/eBPFServer.h" #include "observer/ObserverManager.h" #endif -#include "runner/LogProcess.h" +#include "processor/daemon/LogProcess.h" #if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) #include "app_config/AppConfig.h" #include "shennong/ShennongManager.h" #include "streamlog/StreamLogManager.h" #endif #include "config/feedbacker/ConfigFeedbackReceiver.h" -#include "pipeline/queue/ProcessQueueManager.h" -#include "pipeline/queue/QueueKeyManager.h" +#include "queue/ProcessQueueManager.h" +#include "queue/QueueKeyManager.h" using namespace std; @@ -85,18 +85,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { if (isFileServerStarted && (isInputFileChanged || isInputContainerStdioChanged)) { FileServer::GetInstance()->Pause(); } - LogProcess::GetInstance()->HoldOn(); - LogtailPlugin::GetInstance()->HoldOn(false); #endif for (const auto& name : diff.mRemoved) { auto iter = mPipelineNameEntityMap.find(name); - if (iter->second->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), true); - } - if (iter->second->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), true); - } iter->second->Stop(true); DecreasePluginUsageCnt(iter->second->GetPluginStatistics()); iter->second->RemoveProcessQueue(); @@ -105,13 +97,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { } for (auto& config : diff.mModified) { auto iter = mPipelineNameEntityMap.find(config.mName); - if (iter->second->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), false); - } - if (iter->second->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), false); - } - auto p = BuildPipeline(std::move(config)); + auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue if (!p) { LOG_WARNING(sLogger, ("failed to build pipeline for existing config", @@ -127,21 +113,16 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { ConfigFeedbackStatus::FAILED); continue; } - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); LOG_INFO(sLogger, ("pipeline building for existing config succeeded", "stop the old pipeline and start the new one")("config", config.mName)); iter->second->Stop(false); DecreasePluginUsageCnt(iter->second->GetPluginStatistics()); + mPipelineNameEntityMap[config.mName] = p; IncreasePluginUsageCnt(p->GetPluginStatistics()); p->Start(); - if (p->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput()); - } - if (p->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput()); - } + ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); } for (auto& config : diff.mAdded) { auto p = BuildPipeline(std::move(config)); @@ -160,24 +141,16 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { } LOG_INFO(sLogger, ("pipeline building for new config succeeded", "begin to start pipeline")("config", config.mName)); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); mPipelineNameEntityMap[config.mName] = p; IncreasePluginUsageCnt(p->GetPluginStatistics()); p->Start(); - if (p->HasGoPipelineWithoutInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput()); - } - if (p->HasGoPipelineWithInput()) { - LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput()); - } + ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); } #ifndef APSARA_UNIT_TEST_MAIN // 在Flusher改造完成前,先不执行如下步骤,不会造成太大影响 // Sender::CleanUnusedAk(); - // 过渡使用 - LogProcess::GetInstance()->Resume(); if (isInputFileChanged || isInputContainerStdioChanged) { if (isFileServerStarted) { FileServer::GetInstance()->Resume(); @@ -258,6 +231,8 @@ void PipelineManager::StopAllPipelines() { FileServer::GetInstance()->Stop(); PrometheusInputRunner::GetInstance()->Stop(); + LogtailPlugin::GetInstance()->StopAll(true, true); + bool logProcessFlushFlag = false; for (int i = 0; !logProcessFlushFlag && i < 500; ++i) { logProcessFlushFlag = LogProcess::GetInstance()->FlushOut(10); @@ -271,7 +246,6 @@ void PipelineManager::StopAllPipelines() { FlushAllBatch(); - LogtailPlugin::GetInstance()->StopAll(true, true); LogtailPlugin::GetInstance()->StopAll(true, false); // TODO: make it common @@ -316,7 +290,9 @@ void PipelineManager::CheckIfInputUpdated(const Json::Value& config, bool& isInputObserverChanged, bool& isInputFileChanged, bool& isInputStreamChanged, - bool& isInputContainerStdioChanged) { + bool& isInputContainerStdioChanged, + bool& isInputPrometheusChanged, + bool& isInputEbpfChanged) { string inputType = config["Type"].asString(); if (inputType == "input_observer_network") { isInputObserverChanged = true; @@ -326,6 +302,13 @@ void PipelineManager::CheckIfInputUpdated(const Json::Value& config, isInputStreamChanged = true; } else if (inputType == "input_container_stdio") { isInputContainerStdioChanged = true; + } else if (inputType == "input_prometheus") { + isInputPrometheusChanged = true; + } else if (inputType == "input_ebpf_processprobe_security" || inputType == "input_ebpf_processprobe_observer" + || inputType == "input_ebpf_sockettraceprobe_security" + || inputType == "input_ebpf_sockettraceprobe_observer" || inputType == "input_ebpf_fileprobe_security" + || inputType == "input_ebpf_profilingprobe_observer") { + isInputEbpfChanged = true; } } diff --git a/core/pipeline/plugin/interface/Flusher.cpp b/core/pipeline/plugin/interface/Flusher.cpp index 432b184fd2..1eecba9d69 100644 --- a/core/pipeline/plugin/interface/Flusher.cpp +++ b/core/pipeline/plugin/interface/Flusher.cpp @@ -29,6 +29,14 @@ bool Flusher::Start() { } bool Flusher::Stop(bool isPipelineRemoving) { + if (HasContext()) { + unique_ptr tombStone = make_unique(); + tombStone->mPipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName()); + if (!tombStone->mPipeline) { + LOG_ERROR(sLogger, ("failed to find pipeline when stop flusher", mContext->GetConfigName())); + } + SenderQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(tombStone)); + } SenderQueueManager::GetInstance()->DeleteQueue(mQueueKey); return true; } diff --git a/core/pipeline/queue/BoundedProcessQueue.cpp b/core/pipeline/queue/BoundedProcessQueue.cpp index 470d134476..22ee0d2f1f 100644 --- a/core/pipeline/queue/BoundedProcessQueue.cpp +++ b/core/pipeline/queue/BoundedProcessQueue.cpp @@ -33,12 +33,26 @@ bool BoundedProcessQueue::Pop(unique_ptr& item) { } item = std::move(mQueue.front()); mQueue.pop(); + item->AddPipelineInProcessingCnt(); if (ChangeStateIfNeededAfterPop()) { GiveFeedback(); } return true; } +void BoundedProcessQueue::InvalidatePop() { + mValidToPop = false; + std::size_t size = mQueue.size(); + auto pipeline = PipelineManager::GetInstance()->FindConfigByName(mConfigName); + if (pipeline) { + for (std::size_t i = 0; i < size; ++i) { + mQueue.front()->mPipeline = pipeline; + mQueue.push(std::move(mQueue.front())); + mQueue.pop(); + } + } +} + void BoundedProcessQueue::SetUpStreamFeedbacks(std::vector&& feedbacks) { mUpStreamFeedbacks.clear(); for (auto& item : feedbacks) { diff --git a/core/pipeline/queue/BoundedProcessQueue.h b/core/pipeline/queue/BoundedProcessQueue.h index 8b65c17193..5c7ccffef3 100644 --- a/core/pipeline/queue/BoundedProcessQueue.h +++ b/core/pipeline/queue/BoundedProcessQueue.h @@ -38,6 +38,7 @@ class BoundedProcessQueue : public BoundedQueueInterface&& item) override; bool Pop(std::unique_ptr& item) override; + void InvalidatePop() override; void SetUpStreamFeedbacks(std::vector&& feedbacks); diff --git a/core/pipeline/queue/CircularProcessQueue.cpp b/core/pipeline/queue/CircularProcessQueue.cpp index 5962bc6c05..21ffaaac72 100644 --- a/core/pipeline/queue/CircularProcessQueue.cpp +++ b/core/pipeline/queue/CircularProcessQueue.cpp @@ -40,14 +40,24 @@ bool CircularProcessQueue::Pop(unique_ptr& item) { return false; } item = std::move(mQueue.front()); - mQueue.pop_front(); + item->AddPipelineInProcessingCnt(); mEventCnt -= item->mEventGroup.GetEvents().size(); return true; } +void CircularProcessQueue::InvalidatePop() { + mValidToPop = false; + auto pipeline = PipelineManager::GetInstance()->FindConfigByName(mConfigName); + if (pipeline) { + for (auto it = mQueue.begin(); it != mQueue.end(); ++it) { + (*it)->mPipeline = pipeline; + } + } +} + void CircularProcessQueue::Reset(size_t cap) { - // it seems more reasonable to retain extra items and process them immediately, however this contray to current framework design - // so we simply discard extra items, considering that it is a rare case to change capacity + // it seems more reasonable to retain extra items and process them immediately, however this contray to current + // framework design so we simply discard extra items, considering that it is a rare case to change capacity uint32_t cnt = 0; while (!mQueue.empty() && mEventCnt > cap) { mEventCnt -= mQueue.front()->mEventGroup.GetEvents().size(); diff --git a/core/pipeline/queue/CircularProcessQueue.h b/core/pipeline/queue/CircularProcessQueue.h index 45f50a1959..bf246e8efe 100644 --- a/core/pipeline/queue/CircularProcessQueue.h +++ b/core/pipeline/queue/CircularProcessQueue.h @@ -35,6 +35,7 @@ class CircularProcessQueue : virtual public QueueInterface&& item) override; bool Pop(std::unique_ptr& item) override; + void InvalidatePop() override; void Reset(size_t cap); diff --git a/core/pipeline/queue/ProcessQueueInterface.h b/core/pipeline/queue/ProcessQueueInterface.h index 363bc55f27..fd564ea0ce 100644 --- a/core/pipeline/queue/ProcessQueueInterface.h +++ b/core/pipeline/queue/ProcessQueueInterface.h @@ -43,12 +43,10 @@ class ProcessQueueInterface : virtual public QueueInterface&& ques); - void InvalidatePop() { mValidToPop = false; } + virtual void InvalidatePop() { mValidToPop = false; } void ValidatePop() { mValidToPop = true; } - void Reset() { - mDownStreamQueues.clear(); - } + void Reset() { mDownStreamQueues.clear(); } protected: bool IsValidToPop() const; diff --git a/core/pipeline/queue/ProcessQueueItem.h b/core/pipeline/queue/ProcessQueueItem.h index 3b90db0906..a4595dcfd1 100644 --- a/core/pipeline/queue/ProcessQueueItem.h +++ b/core/pipeline/queue/ProcessQueueItem.h @@ -19,6 +19,7 @@ #include #include "models/PipelineEventGroup.h" +#include "pipeline/PipelineManager.h" namespace logtail { @@ -30,6 +31,17 @@ struct ProcessQueueItem { size_t mInputIndex = 0; // index of the input in the pipeline ProcessQueueItem(PipelineEventGroup&& group, size_t index) : mEventGroup(std::move(group)), mInputIndex(index) {} + + AddPipelineInProcessingCnt(std::string& configName) const { + if (mPipeline) { + mPipeline->AddInProcessingCnt(); + } else { + auto p = PipelineManager::GetInstance()->FindConfigByName(configName); + if (p) { + p->AddInProcessingCnt(); + } + } + } }; } // namespace logtail diff --git a/core/pipeline/queue/SenderQueueItem.h b/core/pipeline/queue/SenderQueueItem.h index 9e3b83c979..88ffeade77 100644 --- a/core/pipeline/queue/SenderQueueItem.h +++ b/core/pipeline/queue/SenderQueueItem.h @@ -21,6 +21,7 @@ #include #include +#include "pipeline/PipelineManager.h" #include "pipeline/queue/QueueKey.h" namespace logtail { @@ -60,6 +61,17 @@ struct SenderQueueItem { virtual ~SenderQueueItem() = default; virtual SenderQueueItem* Clone() { return new SenderQueueItem(*this); } + + void SubInProcessingCnt(std::string& configName) const { + if (mPipeline) { + mPipeline->SubInProcessingCnt(); + } else { + auto p = PipelineManager::GetInstance()->FindConfigByName(configName); + if (p != nullptr) { + p->SubInProcessingCnt(); + } + } + } }; } // namespace logtail diff --git a/core/pipeline/queue/SenderQueueManager.h b/core/pipeline/queue/SenderQueueManager.h index 08b5508794..2e301cb06d 100644 --- a/core/pipeline/queue/SenderQueueManager.h +++ b/core/pipeline/queue/SenderQueueManager.h @@ -24,11 +24,11 @@ #include #include "common/FeedbackInterface.h" +#include "pipeline/limiter/ConcurrencyLimiter.h" +#include "pipeline/limiter/RateLimiter.h" #include "pipeline/queue/QueueParam.h" #include "pipeline/queue/SenderQueue.h" #include "pipeline/queue/SenderQueueItem.h" -#include "pipeline/limiter/ConcurrencyLimiter.h" -#include "pipeline/limiter/RateLimiter.h" namespace logtail { diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 797739ef2f..ad9d0cb402 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -18,28 +18,28 @@ #include "config/provider/EnterpriseConfigProvider.h" #endif #include "app_config/AppConfig.h" -#include "pipeline/batch/FlushStrategy.h" #include "common/EndpointUtil.h" #include "common/HashUtil.h" #include "common/LogtailCommonFlags.h" #include "common/ParamExtractor.h" #include "common/TimeUtil.h" +#include "pipeline/Pipeline.h" +#include "pipeline/batch/FlushStrategy.h" #include "pipeline/compression/CompressorFactory.h" +#include "pipeline/queue/QueueKeyManager.h" +#include "pipeline/queue/SLSSenderQueueItem.h" +#include "pipeline/queue/SenderQueueManager.h" #include "plugin/flusher/sls/PackIdManager.h" #include "plugin/flusher/sls/SLSClientManager.h" #include "plugin/flusher/sls/SLSResponse.h" #include "plugin/flusher/sls/SendResult.h" -#include "pipeline/Pipeline.h" #include "profile_sender/ProfileSender.h" -#include "pipeline/queue/QueueKeyManager.h" -#include "pipeline/queue/SLSSenderQueueItem.h" -#include "pipeline/queue/SenderQueueManager.h" -#include "sdk/Common.h" #include "runner/FlusherRunner.h" +#include "sdk/Common.h" #include "sls_control/SLSControl.h" // TODO: temporarily used here -#include "plugin/flusher/sls/DiskBufferWriter.h" #include "pipeline/PipelineManager.h" +#include "plugin/flusher/sls/DiskBufferWriter.h" using namespace std; @@ -1001,18 +1001,6 @@ bool FlusherSLS::SerializeAndPush(vector&& groupLists) { } bool FlusherSLS::PushToQueue(QueueKey key, unique_ptr&& item, uint32_t retryTimes) { -#ifndef APSARA_UNIT_TEST_MAIN - // TODO: temporarily set here, should be removed after independent config update refactor - if (item->mFlusher->HasContext()) { - item->mPipeline - = PipelineManager::GetInstance()->FindConfigByName(item->mFlusher->GetContext().GetConfigName()); - if (!item->mPipeline) { - // should not happen - return false; - } - } -#endif - const string& str = QueueKeyManager::GetInstance()->GetName(key); for (size_t i = 0; i < retryTimes; ++i) { int rst = SenderQueueManager::GetInstance()->PushQueue(key, std::move(item)); diff --git a/core/runner/LogProcess.cpp b/core/runner/LogProcess.cpp index 08b4dfb47c..dfd7b5e87e 100644 --- a/core/runner/LogProcess.cpp +++ b/core/runner/LogProcess.cpp @@ -15,12 +15,12 @@ #include "runner/LogProcess.h" #include "app_config/AppConfig.h" -#include "pipeline/batch/TimeoutFlushManager.h" #include "common/Flags.h" #include "go_pipeline/LogtailPlugin.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" #include "pipeline/PipelineManager.h" +#include "pipeline/batch/TimeoutFlushManager.h" #include "pipeline/queue/ExactlyOnceQueueManager.h" #include "pipeline/queue/ProcessQueueManager.h" #include "pipeline/queue/QueueKeyManager.h" @@ -56,7 +56,8 @@ LogProcess::~LogProcess() { void LogProcess::Start() { if (mInitialized) return; - mGlobalProcessQueueFullTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL); + mGlobalProcessQueueFullTotal + = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL); mGlobalProcessQueueTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_TOTAL); mInitialized = true; @@ -294,6 +295,8 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { } pipeline->Send(std::move(eventGroupList)); } + // When item is in sender queue, we could decrease the count + pipeline->SubInProcessingCnt(); } } LOG_WARNING(sLogger, ("runner/LogProcess.hread", "Exit")("threadNo", threadNo)); diff --git a/pkg/logtail/libPluginAdapter.so b/pkg/logtail/libPluginAdapter.so index 823180a8bd..c64c6832d6 100755 Binary files a/pkg/logtail/libPluginAdapter.so and b/pkg/logtail/libPluginAdapter.so differ diff --git a/plugin_main/plugin_export.go b/plugin_main/plugin_export.go index 5f02dabbfa..e306c7d305 100644 --- a/plugin_main/plugin_export.go +++ b/plugin_main/plugin_export.go @@ -211,9 +211,9 @@ func StopAll(exitFlag int, withInputFlag int) { } //export Stop -func Stop(configName string, removingFlag int) { - logger.Info(context.Background(), "Stop", "start", "config", configName, "removing", removingFlag) - err := pluginmanager.Stop(configName, removingFlag != 0) +func Stop(configName string, exitFlag int) { + logger.Info(context.Background(), "Stop", "start", "config", configName, "exit", exitFlag) + err := pluginmanager.Stop(configName, exitFlag != 0) if err != nil { logger.Error(context.Background(), "PLUGIN_ALARM", "stop error", err) } diff --git a/pluginmanager/logstore_config.go b/pluginmanager/logstore_config.go index 70c38eac51..4576ea8a97 100644 --- a/pluginmanager/logstore_config.go +++ b/pluginmanager/logstore_config.go @@ -92,7 +92,7 @@ type LogstoreConfig struct { ConfigName string ConfigNameWithSuffix string LogstoreKey int64 - FlushOutFlag bool + FlushOutFlag atomic.Bool // Each LogstoreConfig can have its independent GlobalConfig if the "global" field // is offered in configuration, see build-in StatisticsConfig and AlarmConfig. GlobalConfig *config.GlobalConfig @@ -133,7 +133,7 @@ func (p *LogstoreStatistics) Init(context pipeline.Context) { // 4. Start inputs (including metrics and services), just like aggregator, each input // has its own goroutine. func (lc *LogstoreConfig) Start() { - lc.FlushOutFlag = false + lc.FlushOutFlag.Store(false) logger.Info(lc.Context.GetRuntimeContext(), "config start", "begin") lc.PluginRunner.Run() @@ -151,9 +151,9 @@ func (lc *LogstoreConfig) Start() { // 5. Set stopping flag, stop flusher goroutine. // 6. If Logtail is exiting and there are remaining data, try to flush once. // 7. Stop flusher plugins. -func (lc *LogstoreConfig) Stop(exitFlag bool) error { - logger.Info(lc.Context.GetRuntimeContext(), "config stop", "begin", "exit", exitFlag) - if err := lc.PluginRunner.Stop(exitFlag); err != nil { +func (lc *LogstoreConfig) Stop(removingFlag bool) error { + logger.Info(lc.Context.GetRuntimeContext(), "config stop", "begin", "removing", removingFlag) + if err := lc.PluginRunner.Stop(removingFlag); err != nil { return err } logger.Info(lc.Context.GetRuntimeContext(), "Plugin Runner stop", "done") @@ -323,15 +323,12 @@ func hasDockerStdoutInput(plugins map[string]interface{}) bool { if !valid { continue } - pluginTypeWithID, valid := cfg["type"] + typeName, valid := cfg["type"] if !valid { continue } - if val, valid := pluginTypeWithID.(string); valid { - pluginType := getPluginType(val) - if pluginType == input.ServiceDockerStdoutPluginName { - return true - } + if val, valid := typeName.(string); valid && val == input.ServiceDockerStdoutPluginName { + return true } } return false @@ -362,11 +359,6 @@ func createLogstoreConfig(project string, logstore string, configName string, lo return nil, err } - if lastUnsendBuffer, hasLastConfig := LastUnsendBuffer[configName]; hasLastConfig { - // Move unsent LogGroups from last config to new config. - logstoreC.PluginRunner.Merge(lastUnsendBuffer) - } - logstoreC.ContainerLabelSet = make(map[string]struct{}) logstoreC.EnvSet = make(map[string]struct{}) logstoreC.K8sLabelSet = make(map[string]struct{}) @@ -380,16 +372,11 @@ func createLogstoreConfig(project string, logstore string, configName string, lo if !valid { continue } - pluginTypeWithID, valid := cfg["type"] - if !valid { - continue - } - val, valid := pluginTypeWithID.(string) + typeName, valid := cfg["type"] if !valid { continue } - pluginType := getPluginType(val) - if pluginType == input.ServiceDockerStdoutPluginName || pluginType == input.MetricDocierFilePluginName { + if val, valid := typeName.(string); valid && (val == input.ServiceDockerStdoutPluginName || val == input.MetricDocierFilePluginName) { configDetail, valid := cfg["detail"] if !valid { continue @@ -485,19 +472,16 @@ func createLogstoreConfig(project string, logstore string, configName string, lo if !ok { return nil, fmt.Errorf("invalid extension type") } - if pluginTypeWithID, ok := extension["type"]; ok { - pluginTypeWithIDStr, ok := pluginTypeWithID.(string) - if !ok { - return nil, fmt.Errorf("invalid extension type") - } - pluginType := getPluginType(pluginTypeWithIDStr) - logger.Debug(contextImp.GetRuntimeContext(), "add extension", pluginType) - err = loadExtension(logstoreC.genPluginMeta(pluginTypeWithIDStr, false, false), logstoreC, extension["detail"]) - if err != nil { - return nil, err - } - contextImp.AddPlugin(pluginType) + typeName, ok := extension["type"].(string) + if !ok { + return nil, fmt.Errorf("invalid extension type") + } + logger.Debug(contextImp.GetRuntimeContext(), "add extension", typeName) + err = loadExtension(logstoreC.genPluginMeta(typeName, false, false), logstoreC, extension["detail"]) + if err != nil { + return nil, err } + contextImp.AddPlugin(typeName) } } @@ -508,21 +492,20 @@ func createLogstoreConfig(project string, logstore string, configName string, lo for _, inputInterface := range inputs { input, ok := inputInterface.(map[string]interface{}) if ok { - if pluginTypeWithID, ok := input["type"]; ok { - if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { - pluginType := getPluginType(pluginTypeWithIDStr) - if _, isMetricInput := pipeline.MetricInputs[pluginType]; isMetricInput { + if typeName, ok := input["type"]; ok { + if typeNameStr, ok := typeName.(string); ok { + if _, isMetricInput := pipeline.MetricInputs[typeNameStr]; isMetricInput { // Load MetricInput plugin defined in pipeline.MetricInputs // pipeline.MetricInputs will be renamed in a future version - err = loadMetric(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), logstoreC, input["detail"]) - } else if _, isServiceInput := pipeline.ServiceInputs[pluginType]; isServiceInput { + err = loadMetric(logstoreC.genPluginMeta(typeNameStr, true, false), logstoreC, input["detail"]) + } else if _, isServiceInput := pipeline.ServiceInputs[typeNameStr]; isServiceInput { // Load ServiceInput plugin defined in pipeline.ServiceInputs - err = loadService(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), logstoreC, input["detail"]) + err = loadService(logstoreC.genPluginMeta(typeNameStr, true, false), logstoreC, input["detail"]) } if err != nil { return nil, err } - contextImp.AddPlugin(pluginType) + contextImp.AddPlugin(typeNameStr) continue } } @@ -540,15 +523,14 @@ func createLogstoreConfig(project string, logstore string, configName string, lo for i, processorInterface := range processors { processor, ok := processorInterface.(map[string]interface{}) if ok { - if pluginTypeWithID, ok := processor["type"]; ok { - if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { - pluginType := getPluginType(pluginTypeWithIDStr) - logger.Debug(contextImp.GetRuntimeContext(), "add processor", pluginType) - err = loadProcessor(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), i, logstoreC, processor["detail"]) + if typeName, ok := processor["type"]; ok { + if typeNameStr, ok := typeName.(string); ok { + logger.Debug(contextImp.GetRuntimeContext(), "add processor", typeNameStr) + err = loadProcessor(logstoreC.genPluginMeta(typeNameStr, true, false), i, logstoreC, processor["detail"]) if err != nil { return nil, err } - contextImp.AddPlugin(pluginType) + contextImp.AddPlugin(typeNameStr) continue } } @@ -567,15 +549,14 @@ func createLogstoreConfig(project string, logstore string, configName string, lo for _, aggregatorInterface := range aggregators { aggregator, ok := aggregatorInterface.(map[string]interface{}) if ok { - if pluginTypeWithID, ok := aggregator["type"]; ok { - if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { - pluginType := getPluginType(pluginTypeWithIDStr) - logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", pluginType) - err = loadAggregator(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), logstoreC, aggregator["detail"]) + if typeName, ok := aggregator["type"]; ok { + if typeNameStr, ok := typeName.(string); ok { + logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", typeNameStr) + err = loadAggregator(logstoreC.genPluginMeta(typeNameStr, true, false), logstoreC, aggregator["detail"]) if err != nil { return nil, err } - contextImp.AddPlugin(pluginType) + contextImp.AddPlugin(typeNameStr) continue } } @@ -598,19 +579,18 @@ func createLogstoreConfig(project string, logstore string, configName string, lo for num, flusherInterface := range flushers { flusher, ok := flusherInterface.(map[string]interface{}) if ok { - if pluginTypeWithID, ok := flusher["type"]; ok { - if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok { - pluginType := getPluginType(pluginTypeWithIDStr) - logger.Debug(contextImp.GetRuntimeContext(), "add flusher", pluginType) + if typeName, ok := flusher["type"]; ok { + if typeNameStr, ok := typeName.(string); ok { + logger.Debug(contextImp.GetRuntimeContext(), "add flusher", typeNameStr) lastOne := false if num == flushersLen-1 { lastOne = true } - err = loadFlusher(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, lastOne), logstoreC, flusher["detail"]) + err = loadFlusher(logstoreC.genPluginMeta(typeNameStr, true, lastOne), logstoreC, flusher["detail"]) if err != nil { return nil, err } - contextImp.AddPlugin(pluginType) + contextImp.AddPlugin(typeNameStr) continue } } @@ -668,14 +648,14 @@ func LoadLogstoreConfig(project string, logstore string, configName string, logs if err != nil { return err } - LogtailConfig.Store(configName, logstoreC) + ToStartLogtailConfig = logstoreC return nil } func loadBuiltinConfig(name string, project string, logstore string, configName string, cfgStr string) (*LogstoreConfig, error) { logger.Infof(context.Background(), "load built-in config %v, config name: %v, logstore: %v", name, configName, logstore) - return createLogstoreConfig(project, logstore, configName, -1, cfgStr) + return createLogstoreConfig(project, logstore, configName, 0, cfgStr) } // loadMetric creates a metric plugin object and append to logstoreConfig.MetricPlugins. @@ -783,21 +763,14 @@ func applyPluginConfig(plugin interface{}, pluginConfig interface{}) error { return err } -// Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority. -func getPluginType(pluginTypeWithID string) string { - if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 { - return pluginTypeWithID[:ids] - } - return pluginTypeWithID -} - -func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool, lastOne bool) *pipeline.PluginMeta { +// Rule: pluginName=pluginType/pluginID#pluginPriority. +func (lc *LogstoreConfig) genPluginMeta(pluginName string, genNodeID bool, lastOne bool) *pipeline.PluginMeta { nodeID := "" childNodeID := "" - if isPluginTypeWithID(pluginTypeWithID) { - pluginTypeWithID := pluginTypeWithID - if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 { - pluginTypeWithID = pluginTypeWithID[:idx] + if isPluginTypeWithID(pluginName) { + pluginTypeWithID := pluginName + if idx := strings.IndexByte(pluginName, '#'); idx != -1 { + pluginTypeWithID = pluginName[:idx] } if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 { if genNodeID { @@ -815,31 +788,30 @@ func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool, } } } - pluginType := pluginTypeWithID pluginID := lc.genPluginID() if genNodeID { nodeID, childNodeID = lc.genNodeID(lastOne) } - pluginTypeWithID = fmt.Sprintf("%s/%s", pluginType, pluginID) + pluginTypeWithID := fmt.Sprintf("%s/%s", pluginName, pluginID) return &pipeline.PluginMeta{ PluginTypeWithID: pluginTypeWithID, - PluginType: pluginType, + PluginType: pluginName, PluginID: pluginID, NodeID: nodeID, ChildNodeID: childNodeID, } } -func isPluginTypeWithID(pluginTypeWithID string) bool { - if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 { +func isPluginTypeWithID(pluginName string) bool { + if idx := strings.IndexByte(pluginName, '/'); idx != -1 { return true } return false } -func GetPluginPriority(pluginTypeWithID string) int { - if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 { - val, err := strconv.Atoi(pluginTypeWithID[idx+1:]) +func GetPluginPriority(pluginName string) int { + if idx := strings.IndexByte(pluginName, '#'); idx != -1 { + val, err := strconv.Atoi(pluginName[idx+1:]) if err != nil { return 0 } diff --git a/pluginmanager/plugin_manager.go b/pluginmanager/plugin_manager.go index f9dcb253da..8ef19ec609 100644 --- a/pluginmanager/plugin_manager.go +++ b/pluginmanager/plugin_manager.go @@ -31,6 +31,7 @@ import ( // Following variables are exported so that tests of main package can reference them. var LogtailConfig sync.Map +var ToStartLogtailConfig *LogstoreConfig var LastUnsendBuffer map[string]PluginRunner var ContainerConfig *LogstoreConfig @@ -133,11 +134,11 @@ func Init() (err error) { // timeoutStop wrappers LogstoreConfig.Stop with timeout (5s by default). // @return true if Stop returns before timeout, otherwise false. -func timeoutStop(config *LogstoreConfig, exitFlag bool) bool { +func timeoutStop(config *LogstoreConfig, removingFlag bool) bool { done := make(chan int) go func() { logger.Info(config.Context.GetRuntimeContext(), "Stop config in goroutine", "begin") - _ = config.Stop(exitFlag) + _ = config.Stop(removingFlag) close(done) logger.Info(config.Context.GetRuntimeContext(), "Stop config in goroutine", "end") }() @@ -219,15 +220,15 @@ func StopAll(exitFlag, withInput bool) error { } // Stop stop the given config. -func Stop(configName string, removingFlag bool) error { +func Stop(configName string, exitFlag bool) error { defer panicRecover("Run plugin") if object, exists := LogtailConfig.Load(configName); exists { if config, ok := object.(*LogstoreConfig); ok { - if hasStopped := timeoutStop(config, true); !hasStopped { + if hasStopped := timeoutStop(config, exitFlag); !hasStopped { logger.Error(config.Context.GetRuntimeContext(), "CONFIG_STOP_TIMEOUT_ALARM", "timeout when stop config, goroutine might leak") } - if !removingFlag { + if !exitFlag { LastUnsendBuffer[configName] = config.PluginRunner } LogtailConfig.Delete(configName) @@ -240,13 +241,17 @@ func Stop(configName string, removingFlag bool) error { // Start starts the given config. func Start(configName string) error { defer panicRecover("Run plugin") - if object, exists := LogtailConfig.Load(configName); exists { - if config, ok := object.(*LogstoreConfig); ok { - config.Start() - return nil - } + if ToStartLogtailConfig == nil { + return fmt.Errorf("no pipeline for the config is created: %s", configName) } - return fmt.Errorf("config not found: %s", configName) + if ToStartLogtailConfig.ConfigName != configName { + // should never happen + return fmt.Errorf("config unmatch with to start pipeline: given %s, expect %s", configName, ToStartLogtailConfig.ConfigName) + } + ToStartLogtailConfig.Start() + LogtailConfig.Store(configName, ToStartLogtailConfig) + ToStartLogtailConfig = nil + return nil } func GetLogtailConfigSize() int { diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index 343d146752..d93238e068 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -354,7 +354,7 @@ func (p *pluginv1Runner) runFlusherInternal(cc *pipeline.AsyncControl) { } break } - if !p.LogstoreConfig.FlushOutFlag { + if !p.LogstoreConfig.FlushOutFlag.Load() { time.Sleep(time.Duration(10) * time.Millisecond) continue } @@ -384,7 +384,7 @@ func (p *pluginv1Runner) Stop(exit bool) error { p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") - p.LogstoreConfig.FlushOutFlag = true + p.LogstoreConfig.FlushOutFlag.Store(true) p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 { diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index 23ed88a859..195d804c17 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -375,7 +375,7 @@ func (p *pluginv2Runner) runFlusherInternal(cc *pipeline.AsyncControl) { } break } - if !p.LogstoreConfig.FlushOutFlag { + if !p.LogstoreConfig.FlushOutFlag.Load() { time.Sleep(time.Duration(10) * time.Millisecond) continue } @@ -405,7 +405,7 @@ func (p *pluginv2Runner) Stop(exit bool) error { p.AggregateControl.WaitCancel() logger.Info(p.LogstoreConfig.Context.GetRuntimeContext(), "aggregator plugins stop", "done") - p.LogstoreConfig.FlushOutFlag = true + p.LogstoreConfig.FlushOutFlag.Store(true) p.FlushControl.WaitCancel() if exit && p.FlushOutStore.Len() > 0 {