Skip to content

Commit

Permalink
feat: reload Go pipeline config independently
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Aug 22, 2024
1 parent f67bca3 commit 075bdd7
Show file tree
Hide file tree
Showing 23 changed files with 297 additions and 428 deletions.
71 changes: 49 additions & 22 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ LogtailPlugin::LogtailPlugin() {
mPluginAdapterPtr = NULL;
mPluginBasePtr = NULL;
mLoadConfigFun = NULL;
mHoldOnFun = NULL;
mResumeFun = NULL;
mStopAllFun = NULL;
mStopFun = NULL;
mStartFun = NULL;
mLoadGlobalConfigFun = NULL;
mProcessRawLogFun = NULL;
mPluginValid = false;
Expand Down Expand Up @@ -101,25 +102,45 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
return false;
}

void LogtailPlugin::HoldOn(bool exitFlag) {
if (mPluginValid && mHoldOnFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines pause", "starts"));
auto holdOnStart = GetCurrentTimeInMilliSeconds();
mHoldOnFun(exitFlag ? 1 : 0);
auto holdOnCost = GetCurrentTimeInMilliSeconds() - holdOnStart;
LOG_INFO(sLogger, ("Go pipelines pause", "succeeded")("cost", ToString(holdOnCost) + "ms"));
if (holdOnCost >= 60 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(HOLD_ON_TOO_SLOW_ALARM,
"Pausing Go pipelines took " + ToString(holdOnCost) + "ms");
void LogtailPlugin::StopAll(bool exitFlag, bool withInputFlag) {
if (mPluginValid && mStopAllFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop all", "starts"));
auto stopAllStart = GetCurrentTimeInMilliSeconds();
mStopAllFun(exitFlag ? 1 : 0, withInputFlag ? 1 : 0);
auto stopAllCost = GetCurrentTimeInMilliSeconds() - stopAllStart;
LOG_INFO(sLogger, ("Go pipelines stop all", "succeeded")("cost", ToString(stopAllCost) + "ms"));
if (stopAllCost >= 60 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(STOP_ALL_TOO_SLOW_ALARM,
"Stopping all Go pipelines took " + ToString(stopAllCost) + "ms");
}
}
}

void LogtailPlugin::Resume() {
if (mPluginValid && mResumeFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines resume", "starts"));
mResumeFun();
LOG_INFO(sLogger, ("Go pipelines resume", "succeeded"));
void LogtailPlugin::Stop(const std::string& configName, bool removingFlag) {
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
GoString goConfigName;
goConfigName.n = configName.size();
goConfigName.p = configName.c_str();
mStopFun(goConfigName, removingFlag ? 1 : 0);
auto stopCost = GetCurrentTimeInMilliSeconds() - stopStart;
LOG_INFO(sLogger, ("Go pipelines stop", "succeeded")("config", configName)("cost", ToString(stopCost) + "ms"));
if (stopCost >= 10 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(
STOP_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
}

void LogtailPlugin::Start(const std::string& configName) {
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts"));
GoString goConfigName;
goConfigName.n = configName.size();
goConfigName.p = configName.c_str();
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded"));
}
}

Expand Down Expand Up @@ -328,16 +349,22 @@ bool LogtailPlugin::LoadPluginBase() {
LOG_ERROR(sLogger, ("load UnloadConfig error, Message", error));
return mPluginValid;
}
// 插件暂停
mHoldOnFun = (HoldOnFun)loader.LoadMethod("HoldOn", error);
// 停止所有插件
mStopAllFun = (StopAllFun)loader.LoadMethod("StopAll", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load StopAll error, Message", error));
return mPluginValid;
}
// 停止单个插件
mStopFun = (StopFun)loader.LoadMethod("Stop", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load HoldOn error, Message", error));
LOG_ERROR(sLogger, ("load Stop error, Message", error));
return mPluginValid;
}
// 插件恢复
mResumeFun = (ResumeFun)loader.LoadMethod("Resume", error);
mStartFun = (StartFun)loader.LoadMethod("Start", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load Resume error, Message", error));
LOG_ERROR(sLogger, ("load Start error, Message", error));
return mPluginValid;
}
// C++传递原始二进制数据到golang插件,v1和v2的区别:是否传递tag
Expand Down
15 changes: 9 additions & 6 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ typedef GoInt (*LoadConfigFun)(GoString p, GoString l, GoString c, GoInt64 k, Go
typedef GoInt (*UnloadConfigFun)(GoString p, GoString l, GoString c);
typedef GoInt (*ProcessRawLogFun)(GoString c, GoSlice l, GoString p, GoString t);
typedef GoInt (*ProcessRawLogV2Fun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
typedef void (*HoldOnFun)(GoInt);
typedef void (*ResumeFun)();
typedef void (*StopAllFun)(GoInt, GoInt);
typedef void (*StopFun)(GoString, GoInt);
typedef void (*StartFun)(GoString);
typedef GoInt (*InitPluginBaseFun)();
typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg);
typedef GoInt (*ProcessLogsFun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
Expand Down Expand Up @@ -214,8 +215,9 @@ class LogtailPlugin {
const std::string& logstore = "",
const std::string& region = "",
logtail::QueueKey logstoreKey = 0);
void HoldOn(bool exitFlag);
void Resume();
void StopAll(bool exitFlag, bool withInputFlag);
void Stop(const std::string& configName, bool removingFlag);
void Start(const std::string& configName);

bool IsPluginOpened() { return mPluginValid; }

Expand Down Expand Up @@ -273,8 +275,9 @@ class LogtailPlugin {
LoadGlobalConfigFun mLoadGlobalConfigFun;
LoadConfigFun mLoadConfigFun;
UnloadConfigFun mUnloadConfigFun;
HoldOnFun mHoldOnFun;
ResumeFun mResumeFun;
StopAllFun mStopAllFun;
StopFun mStopFun;
StartFun mStartFun;
ProcessRawLogFun mProcessRawLogFun;
ProcessRawLogV2Fun mProcessRawLogV2Fun;
volatile bool mPluginValid;
Expand Down
2 changes: 1 addition & 1 deletion core/monitor/LogtailAlarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ LogtailAlarm::LogtailAlarm() {
mMessageType[PROCESS_TOO_SLOW_ALARM] = "PROCESS_TOO_SLOW_ALARM";
mMessageType[LOAD_LOCAL_EVENT_ALARM] = "LOAD_LOCAL_EVENT_ALARM";
mMessageType[WINDOWS_WORKER_START_HINTS_ALARM] = "WINDOWS_WORKER_START_HINTS_ALARM";
mMessageType[HOLD_ON_TOO_SLOW_ALARM] = "HOLD_ON_TOO_SLOW_ALARM";
mMessageType[STOP_ALL_TOO_SLOW_ALARM] = "STOP_ALL_TOO_SLOW_ALARM";
mMessageType[INNER_PROFILE_ALARM] = "INNER_PROFILE_ALARM";
mMessageType[FUSE_FILE_TRUNCATE_ALARM] = "FUSE_FILE_TRUNCATE_ALARM";
mMessageType[SENDING_COSTS_TOO_MUCH_TIME_ALARM] = "SENDING_COSTS_TOO_MUCH_TIME_ALARM";
Expand Down
5 changes: 3 additions & 2 deletions core/monitor/LogtailAlarm.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ enum LogtailAlarmType {
PROCESS_TOO_SLOW_ALARM = 48,
LOAD_LOCAL_EVENT_ALARM = 49,
WINDOWS_WORKER_START_HINTS_ALARM = 50,
HOLD_ON_TOO_SLOW_ALARM = 51,
STOP_ALL_TOO_SLOW_ALARM = 51,
INNER_PROFILE_ALARM = 52,
FUSE_FILE_TRUNCATE_ALARM = 53,
SENDING_COSTS_TOO_MUCH_TIME_ALARM = 54,
Expand All @@ -99,7 +99,8 @@ enum LogtailAlarmType {
ALL_LOGTAIL_ALARM_NUM = 65,
COMPRESS_FAIL_ALARM = 66,
SERIALIZE_FAIL_ALARM = 67,
RELABEL_METRIC_FAIL_ALARM = 68
RELABEL_METRIC_FAIL_ALARM = 68,
STOP_TOO_SLOW_ALARM = 69,
};

struct LogtailAlarmMessage {
Expand Down
10 changes: 6 additions & 4 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bool Pipeline::Init(PipelineConfig&& config) {
string name = (*config.mProcessors[i])["Type"].asString();
unique_ptr<ProcessorInstance> processor
= PluginRegistry::GetInstance()->CreateProcessor(name, GenNextPluginMeta(false));
if (processor) {
if (processor) {
if (!processor->Init(*config.mProcessors[i], mContext)) {
return false;
}
Expand All @@ -108,7 +108,7 @@ bool Pipeline::Init(PipelineConfig&& config) {
mContext.SetIsFirstProcessorApsaraFlag(true);
}
} else {
if (ShouldAddPluginToGoPipelineWithInput()) {
if (ShouldAddPluginToGoPipelineWithInput()) {
AddPluginToGoPipeline(*config.mProcessors[i], "processors", mGoPipelineWithInput);
} else {
AddPluginToGoPipeline(*config.mProcessors[i], "processors", mGoPipelineWithoutInput);
Expand Down Expand Up @@ -415,7 +415,8 @@ bool Pipeline::LoadGoPipelines() const {
// 目前按照从后往前顺序加载,即便without成功with失败导致without残留在插件系统中,也不会有太大的问题,但最好改成原子的。
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
if (!LogtailPlugin::GetInstance()->LoadPipeline(mName + "/2",
string goConfigName = GetConfigNameOfGoPipelineWithoutInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
content,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
Expand All @@ -434,7 +435,8 @@ bool Pipeline::LoadGoPipelines() const {
}
if (!mGoPipelineWithInput.isNull()) {
string content = mGoPipelineWithInput.toStyledString();
if (!LogtailPlugin::GetInstance()->LoadPipeline(mName + "/1",
string goConfigName = GetConfigNameOfGoPipelineWithInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
content,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
Expand Down
5 changes: 5 additions & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ class Pipeline {
std::string GenNextPluginID();
PluginInstance::PluginMeta GenNextPluginMeta(bool lastOne);

bool HasGoPipelineWithInput() const { return !mGoPipelineWithInput.isNull(); }
bool HasGoPipelineWithoutInput() const { return !mGoPipelineWithoutInput.isNull(); }
std::string GetConfigNameOfGoPipelineWithInput() const { return mName + "/1"; }
std::string GetConfigNameOfGoPipelineWithoutInput() const { return mName + "/2"; }

private:
void MergeGoPipeline(const Json::Value& src, Json::Value& dst);
void AddPluginToGoPipeline(const Json::Value& plugin, const std::string& module, Json::Value& dst);
Expand Down
48 changes: 32 additions & 16 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include "go_pipeline/LogtailPlugin.h"
#include "prometheus/PrometheusInputRunner.h"
#if defined(__linux__) && !defined(__ANDROID__)
#include "observer/ObserverManager.h"
#include "ebpf/eBPFServer.h"
#include "observer/ObserverManager.h"
#endif
#include "processor/daemon/LogProcess.h"
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
Expand Down Expand Up @@ -92,7 +92,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
FileServer::GetInstance()->Pause();
}
LogProcess::GetInstance()->HoldOn();
LogtailPlugin::GetInstance()->HoldOn(false);
if (isInputPrometheusChanged) {
PrometheusInputRunner::GetInstance()->Start();
}
Expand All @@ -107,13 +106,26 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {

for (const auto& name : diff.mRemoved) {
auto iter = mPipelineNameEntityMap.find(name);
if (iter->second->HasGoPipelineWithInput()) {
LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), true);
}
if (iter->second->HasGoPipelineWithoutInput()) {
LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), true);
}
iter->second->Stop(true);
DecreasePluginUsageCnt(iter->second->GetPluginStatistics());
iter->second->RemoveProcessQueue();
mPipelineNameEntityMap.erase(iter);
ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(name, ConfigFeedbackStatus::DELETED);
}
for (auto& config : diff.mModified) {
auto iter = mPipelineNameEntityMap.find(config.mName);
if (iter->second->HasGoPipelineWithInput()) {
LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithInput(), false);
}
if (iter->second->HasGoPipelineWithoutInput()) {
LogtailPlugin::GetInstance()->Stop(iter->second->GetConfigNameOfGoPipelineWithoutInput(), false);
}
auto p = BuildPipeline(std::move(config));
if (!p) {
LOG_WARNING(sLogger,
Expand All @@ -134,13 +146,17 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
LOG_INFO(sLogger,
("pipeline building for existing config succeeded",
"stop the old pipeline and start the new one")("config", config.mName));

auto iter = mPipelineNameEntityMap.find(config.mName);
iter->second->Stop(false);
DecreasePluginUsageCnt(iter->second->GetPluginStatistics());
mPipelineNameEntityMap[config.mName] = p;
IncreasePluginUsageCnt(p->GetPluginStatistics());
p->Start();
if (p->HasGoPipelineWithoutInput()) {
LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput());
}
if (p->HasGoPipelineWithInput()) {
LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput());
}
}
for (auto& config : diff.mAdded) {
auto p = BuildPipeline(std::move(config));
Expand All @@ -163,18 +179,19 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
mPipelineNameEntityMap[config.mName] = p;
IncreasePluginUsageCnt(p->GetPluginStatistics());
p->Start();
if (p->HasGoPipelineWithoutInput()) {
LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithoutInput());
}
if (p->HasGoPipelineWithInput()) {
LogtailPlugin::GetInstance()->Start(p->GetConfigNameOfGoPipelineWithInput());
}
}

#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用,有变更的流水线的Go流水线加载在BuildPipeline中完成
for (auto& name : diff.mUnchanged) {
mPipelineNameEntityMap[name]->LoadGoPipelines();
}
// 在Flusher改造完成前,先不执行如下步骤,不会造成太大影响
// Sender::CleanUnusedAk();

// 过渡使用
LogtailPlugin::GetInstance()->Resume();
LogProcess::GetInstance()->Resume();
if (isInputFileChanged || isInputContainerStdioChanged) {
if (isFileServerStarted) {
Expand Down Expand Up @@ -269,7 +286,8 @@ void PipelineManager::StopAllPipelines() {

FlushAllBatch();

LogtailPlugin::GetInstance()->HoldOn(true);
LogtailPlugin::GetInstance()->StopAll(true, true);
LogtailPlugin::GetInstance()->StopAll(true, false);

// TODO: make it common
FlusherSLS::RecycleResourceIfNotUsed();
Expand Down Expand Up @@ -327,12 +345,10 @@ void PipelineManager::CheckIfInputUpdated(const Json::Value& config,
isInputContainerStdioChanged = true;
} else if (inputType == "input_prometheus") {
isInputPrometheusChanged = true;
} else if (inputType == "input_ebpf_processprobe_security" ||
inputType == "input_ebpf_processprobe_observer" ||
inputType == "input_ebpf_sockettraceprobe_security" ||
inputType == "input_ebpf_sockettraceprobe_observer" ||
inputType == "input_ebpf_fileprobe_security" ||
inputType == "input_ebpf_profilingprobe_observer") {
} else if (inputType == "input_ebpf_processprobe_security" || inputType == "input_ebpf_processprobe_observer"
|| inputType == "input_ebpf_sockettraceprobe_security"
|| inputType == "input_ebpf_sockettraceprobe_observer" || inputType == "input_ebpf_fileprobe_security"
|| inputType == "input_ebpf_profilingprobe_observer") {
isInputEbpfChanged = true;
}
}
Expand Down
Loading

0 comments on commit 075bdd7

Please sign in to comment.