Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 12, 2024
1 parent 13f67a8 commit 8d23744
Show file tree
Hide file tree
Showing 21 changed files with 272 additions and 268 deletions.
4 changes: 2 additions & 2 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
#include "common/JsonUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/TimeUtil.h"
#include "pipeline/compression/CompressorFactory.h"
#include "container_manager/ConfigContainerInfoUpdateCmd.h"
#include "file_server/ConfigManager.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "pipeline/PipelineManager.h"
#include "profile_sender/ProfileSender.h"
#include "pipeline/compression/CompressorFactory.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "profile_sender/ProfileSender.h"

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down
174 changes: 79 additions & 95 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,22 @@
#include <cstdint>
#include <utility>

#include "pipeline/batch/TimeoutFlushManager.h"
#include "batch/TimeoutFlushManager.h"
#include "common/Flags.h"
#include "common/ParamExtractor.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "flusher/sls/FlusherSLS.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "input/InputFeedbackInterfaceRegistry.h"
#include "plugin/PluginRegistry.h"
#include "processor/ProcessorParseApsaraNative.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"
#include "queue/SenderQueueManager.h"

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

using namespace std;

namespace {
class AggregatorDefaultConfig {
public:
static AggregatorDefaultConfig& Instance() {
static AggregatorDefaultConfig instance;
return instance;
}

Json::Value* GetJsonConfig() { return &aggregatorDefault; }

private:
Json::Value aggregatorDefault;
AggregatorDefaultConfig() { aggregatorDefault["Type"] = "aggregator_default"; }

AggregatorDefaultConfig(AggregatorDefaultConfig const&) = delete;
void operator=(AggregatorDefaultConfig const&) = delete;
};
} // namespace

namespace logtail {

void AddExtendedGlobalParamToGoPipeline(const Json::Value& extendedParams, Json::Value& pipeline) {
Expand All @@ -78,21 +59,20 @@ bool Pipeline::Init(PipelineConfig&& config) {
const InputContainerStdio* inputContainerStdio = nullptr;
bool hasFlusherSLS = false;

// to send alarm and init MetricsRecord before flusherSLS is built, a temporary object is made, which will be
#ifdef __ENTERPRISE__
// to send alarm before flusherSLS is built, a temporary object is made, which will be overriden shortly after.
unique_ptr<FlusherSLS> SLSTmp = make_unique<FlusherSLS>();
if (!config.mProject.empty()) {
SLSTmp->mProject = config.mProject;
SLSTmp->mLogstore = config.mLogstore;
SLSTmp->mRegion = config.mRegion;
mContext.SetSLSInfo(SLSTmp.get());
}
SLSTmp->mProject = config.mProject;
SLSTmp->mLogstore = config.mLogstore;
SLSTmp->mRegion = config.mRegion;
mContext.SetSLSInfo(SLSTmp.get());
#endif

mPluginID.store(0);
for (size_t i = 0; i < config.mInputs.size(); ++i) {
const Json::Value& detail = *config.mInputs[i];
string pluginType = detail["Type"].asString();
unique_ptr<InputInstance> input
= PluginRegistry::GetInstance()->CreateInput(pluginType, GenNextPluginMeta(false));
string name = detail["Type"].asString();
unique_ptr<InputInstance> input = PluginRegistry::GetInstance()->CreateInput(name, GenNextPluginMeta(false));
if (input) {
Json::Value optionalGoPipeline;
if (!input->Init(detail, mContext, i, optionalGoPipeline)) {
Expand All @@ -103,66 +83,56 @@ bool Pipeline::Init(PipelineConfig&& config) {
MergeGoPipeline(optionalGoPipeline, mGoPipelineWithInput);
}
// for special treatment below
if (pluginType == InputFile::sName) {
if (name == InputFile::sName) {
inputFile = static_cast<const InputFile*>(mInputs[0]->GetPlugin());
} else if (pluginType == InputContainerStdio::sName) {
} else if (name == InputContainerStdio::sName) {
inputContainerStdio = static_cast<const InputContainerStdio*>(mInputs[0]->GetPlugin());
}
} else {
AddPluginToGoPipeline(pluginType, detail, "inputs", mGoPipelineWithInput);
AddPluginToGoPipeline(detail, "inputs", mGoPipelineWithInput);
}
++mPluginCntMap["inputs"][pluginType];
++mPluginCntMap["inputs"][name];
}

for (size_t i = 0; i < config.mProcessors.size(); ++i) {
const Json::Value& detail = *config.mProcessors[i];
string pluginType = detail["Type"].asString();
string name = (*config.mProcessors[i])["Type"].asString();
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(name, GenNextPluginMeta(false));
if (processor) {
if (processor) {
if (!processor->Init(*config.mProcessors[i], mContext)) {
return false;
}
mProcessorLine.emplace_back(std::move(processor));
// for special treatment of topicformat in apsara mode
if (i == 0 && pluginType == ProcessorParseApsaraNative::sName) {
if (i == 0 && name == ProcessorParseApsaraNative::sName) {
mContext.SetIsFirstProcessorApsaraFlag(true);
}
} else {
if (ShouldAddPluginToGoPipelineWithInput()) {
if (ShouldAddPluginToGoPipelineWithInput()) {
AddPluginToGoPipeline(*config.mProcessors[i], "processors", mGoPipelineWithInput);
} else {
AddPluginToGoPipeline(pluginType, detail, "processors", mGoPipelineWithoutInput);
AddPluginToGoPipeline(*config.mProcessors[i], "processors", mGoPipelineWithoutInput);
}
}
++mPluginCntMap["processors"][pluginType];
++mPluginCntMap["processors"][name];
}

if (config.mAggregators.empty() && config.IsFlushingThroughGoPipelineExisted()) {
// an aggregator_default plugin will be add to go pipeline when mAggregators is empty and need to send go data
// to cpp flusher.
config.mAggregators.push_back(AggregatorDefaultConfig::Instance().GetJsonConfig());
}
for (size_t i = 0; i < config.mAggregators.size(); ++i) {
const Json::Value& detail = *config.mAggregators[i];
string pluginType = detail["Type"].asString();
GenNextPluginMeta(false);
for (auto detail : config.mAggregators) {
if (ShouldAddPluginToGoPipelineWithInput()) {
AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithInput);
AddPluginToGoPipeline(*detail, "aggregators", mGoPipelineWithInput);
} else {
AddPluginToGoPipeline(pluginType, detail, "aggregators", mGoPipelineWithoutInput);
AddPluginToGoPipeline(*detail, "aggregators", mGoPipelineWithoutInput);
}
++mPluginCntMap["aggregators"][pluginType];
++mPluginCntMap["aggregators"][(*detail)["Type"].asString()];
}

for (size_t i = 0; i < config.mFlushers.size(); ++i) {
const Json::Value& detail = *config.mFlushers[i];
string pluginType = detail["Type"].asString();
for (auto detail : config.mFlushers) {
string name = (*detail)["Type"].asString();
unique_ptr<FlusherInstance> flusher
= PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false));
= PluginRegistry::GetInstance()->CreateFlusher(name, GenNextPluginMeta(false));
if (flusher) {
Json::Value optionalGoPipeline;
if (!flusher->Init(detail, mContext, optionalGoPipeline)) {
if (!flusher->Init(*detail, mContext, optionalGoPipeline)) {
return false;
}
mFlushers.emplace_back(std::move(flusher));
Expand All @@ -173,36 +143,33 @@ bool Pipeline::Init(PipelineConfig&& config) {
MergeGoPipeline(optionalGoPipeline, mGoPipelineWithoutInput);
}
}
if (pluginType == FlusherSLS::sName) {
if (name == FlusherSLS::sName) {
hasFlusherSLS = true;
mContext.SetSLSInfo(static_cast<const FlusherSLS*>(mFlushers.back()->GetPlugin()));
}
} else {
if (ShouldAddPluginToGoPipelineWithInput()) {
AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithInput);
AddPluginToGoPipeline(*detail, "flushers", mGoPipelineWithInput);
} else {
AddPluginToGoPipeline(pluginType, detail, "flushers", mGoPipelineWithoutInput);
AddPluginToGoPipeline(*detail, "flushers", mGoPipelineWithoutInput);
}
}
++mPluginCntMap["flushers"][pluginType];
++mPluginCntMap["flushers"][name];
}

// route is only enabled in native flushing mode, thus the index in config is the same as that in mFlushers
if (!mRouter.Init(config.mRouter, mContext)) {
return false;
}

for (size_t i = 0; i < config.mExtensions.size(); ++i) {
const Json::Value& detail = *config.mExtensions[i];
string pluginType = detail["Type"].asString();
GenNextPluginMeta(false);
for (auto detail : config.mExtensions) {
if (!mGoPipelineWithInput.isNull()) {
AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithInput);
AddPluginToGoPipeline(*detail, "extensions", mGoPipelineWithInput);
}
if (!mGoPipelineWithoutInput.isNull()) {
AddPluginToGoPipeline(pluginType, detail, "extensions", mGoPipelineWithoutInput);
AddPluginToGoPipeline(*detail, "extensions", mGoPipelineWithoutInput);
}
++mPluginCntMap["extensions"][pluginType];
++mPluginCntMap["extensions"][(*detail)["Type"].asString()];
}

// global module must be initialized at last, since native input or flusher plugin may generate global param in Go
Expand Down Expand Up @@ -291,7 +258,7 @@ bool Pipeline::Init(PipelineConfig&& config) {
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(mContext.GetProcessQueueKey(), priority);
} else {
ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue(
mContext.GetProcessQueueKey(), priority, 1024);
mContext.GetProcessQueueKey(), priority, 100);
}


Expand Down Expand Up @@ -323,13 +290,13 @@ void Pipeline::Start() {
}

if (!mGoPipelineWithoutInput.isNull()) {
// TODO: 加载该Go流水线
LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput());
}

// TODO: 启用Process中改流水线对应的输入队列
ProcessQueueManager::GetInstance()->ValidatePop(mContext.GetProcessQueueKey());

if (!mGoPipelineWithInput.isNull()) {
// TODO: 加载该Go流水线
LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput());
}

for (const auto& input : mInputs) {
Expand Down Expand Up @@ -367,6 +334,7 @@ bool Pipeline::Send(vector<PipelineEventGroup>&& groupList) {
}
}
}
pipeline->SubInProcessingCnt();
return allSucceeded;
}

Expand All @@ -386,23 +354,42 @@ void Pipeline::Stop(bool isRemoving) {
}

if (!mGoPipelineWithInput.isNull()) {
// TODO: 卸载该Go流水线
// Go pipeline `Stop` will stop and delete
LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), isRemoving);
}

ProcessQueueManager::GetInstance()->InvalidatePop(mContext->GetConfigName());

// Wait for all processing item finish
uint64_t startTime = GetCurrentTimeInMilliSeconds();
bool alarmFlag = false;
while (mProcessingCnt.load() != 0) {
usleep(1000 * 10); // 10ms
uint64_t duration = GetCurrentTimeInMilliSeconds() - startTime;
if (!alarmFlag && duration > 1000) { // 1s
LOG_ERROR(sLogger, ("pipeline stop", "too slow")("config", mName)("cost", duration));
LogtailAlarm::GetInstance()->SendAlarm(CONFIG_UPDATE_ALARM,
string("pipeline stop too slow, config: ") + mName
+ "; cost:" + duration,
GetProject(),
GetLogstore(),
GetRegion());
alarmFlag = true;
}
}

// TODO: 禁用Process中改流水线对应的输入队列

if (!isRemoving) {
FlushBatch();
}

if (!mGoPipelineWithoutInput.isNull()) {
// TODO: 卸载该Go流水线
// Go pipeline `Stop` will stop and delete
LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
}

for (const auto& flusher : mFlushers) {
flusher->Stop(isRemoving);
}

LOG_INFO(sLogger, ("pipeline stop", "succeeded")("config", mName));
}

Expand All @@ -426,18 +413,10 @@ void Pipeline::MergeGoPipeline(const Json::Value& src, Json::Value& dst) {
}
}

std::string Pipeline::GenPluginTypeWithID(std::string pluginType, std::string pluginID) {
return pluginType + "/" + pluginID;
}

// Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority.
void Pipeline::AddPluginToGoPipeline(const string& pluginType,
const Json::Value& plugin,
const string& module,
Json::Value& dst) {
void Pipeline::AddPluginToGoPipeline(const Json::Value& plugin, const string& module, Json::Value& dst) {
Json::Value res(Json::objectValue), detail = plugin;
detail.removeMember("Type");
res["type"] = GenPluginTypeWithID(pluginType, GetNowPluginID());
res["type"] = plugin["Type"];
res["detail"] = detail;
dst[module].append(res);
}
Expand Down Expand Up @@ -501,6 +480,11 @@ std::string Pipeline::GetNowPluginID() {
return std::to_string(mPluginID.load());
}

std::string Pipeline::GenNextPluginID() {
mPluginID.fetch_add(1);
return std::to_string(mPluginID.load());
}

PluginInstance::PluginMeta Pipeline::GenNextPluginMeta(bool lastOne) {
mPluginID.fetch_add(1);
int32_t childNodeID = mPluginID.load();
Expand Down
17 changes: 15 additions & 2 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
#include <vector>

#include "config/PipelineConfig.h"
#include "plugin/input/InputContainerStdio.h"
#include "plugin/input/InputFile.h"
#include "models/PipelineEventGroup.h"
#include "pipeline/PipelineContext.h"
#include "pipeline/plugin/instance/FlusherInstance.h"
#include "pipeline/plugin/instance/InputInstance.h"
#include "pipeline/plugin/instance/ProcessorInstance.h"
#include "pipeline/route/Router.h"
#include "plugin/input/InputContainerStdio.h"
#include "plugin/input/InputFile.h"

namespace logtail {

Expand All @@ -45,6 +45,18 @@ class Pipeline {
bool Send(std::vector<PipelineEventGroup>&& groupList);
bool FlushBatch();
void RemoveProcessQueue() const;
void AddInProcessingCnt() const { mProcessingCnt.fetch_add(1); }
void SubInProcessingCnt() const {
uint16_t currentVal;
do {
currentVal = atomic_val.load(std::memory_order_relaxed);
// cannot sub smaller than 0
if (currentVal == 0) {
return;
}
} while (!atomic_val.compare_exchange_weak(
currentVal, currentVal - 1, std::memory_order_release, std::memory_order_relaxed));
}

const std::string& Name() const { return mName; }
PipelineContext& GetContext() const { return mContext; }
Expand Down Expand Up @@ -88,6 +100,7 @@ class Pipeline {
std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>> mPluginCntMap;
std::unique_ptr<Json::Value> mConfig;
std::atomic_uint16_t mPluginID;
std::atomic_uint16_t mProcessingCnt;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineMock;
Expand Down
Loading

0 comments on commit 8d23744

Please sign in to comment.