Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reload pipeline config independently #1713

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9f501d7
feat: reload Go pipeline config independently
Abingcbc Aug 22, 2024
f8d7a0a
fix unittest
Abingcbc Aug 23, 2024
750f732
fix unittest
Abingcbc Aug 23, 2024
8f73fa7
fix
Abingcbc Aug 23, 2024
3496d95
fix
Abingcbc Aug 25, 2024
c5206af
fix
Abingcbc Sep 12, 2024
a6556f3
fix
Abingcbc Sep 12, 2024
5a3c10f
fix
Abingcbc Sep 13, 2024
d8d4c5b
unittest
Abingcbc Sep 15, 2024
96955d9
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 16, 2024
62f3a35
fix
Abingcbc Sep 16, 2024
d602845
fix
Abingcbc Sep 16, 2024
8c1ad10
fix
Abingcbc Sep 16, 2024
fd478b7
fix
Abingcbc Sep 16, 2024
9ddb08d
fix
Abingcbc Sep 17, 2024
409f575
self telemetry
Abingcbc Sep 18, 2024
9e76b8a
fix
Abingcbc Sep 18, 2024
d307d95
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 19, 2024
82baa18
fix
Abingcbc Sep 20, 2024
d8c28cd
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 20, 2024
6442438
fix
Abingcbc Sep 20, 2024
fad1bfe
fix
Abingcbc Sep 20, 2024
0d9a2a7
fix
Abingcbc Sep 20, 2024
9f0424e
fix
Abingcbc Sep 23, 2024
4e022c5
fix
Abingcbc Sep 23, 2024
5155278
fix
Abingcbc Sep 23, 2024
0a05f02
fix
Abingcbc Sep 23, 2024
b29b57f
fix
Abingcbc Sep 24, 2024
e770c59
fix
Abingcbc Sep 24, 2024
5d46f61
fix
Abingcbc Sep 24, 2024
7714655
fix
Abingcbc Sep 24, 2024
f5c4bf4
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 24, 2024
bea03ba
test: calculate incremental unittest coverage
Abingcbc Sep 25, 2024
2bf161e
fix
Abingcbc Sep 25, 2024
1cda480
Merge remote-tracking branch 'origin/coverage' into go-config
Abingcbc Sep 25, 2024
98e14ec
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 25, 2024
7634196
fix
Abingcbc Sep 26, 2024
25b10d2
fix
Abingcbc Sep 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <string>
#include <vector>

#include "config/PipelineConfig.h"
#include "config/InstanceConfig.h"
#include "config/PipelineConfig.h"

namespace logtail {

Expand All @@ -29,7 +29,6 @@ class PipelineConfigDiff {
std::vector<PipelineConfig> mAdded;
std::vector<PipelineConfig> mModified;
std::vector<std::string> mRemoved;
std::vector<std::string> mUnchanged; // 过渡使用,仅供插件系统用
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

Expand All @@ -38,7 +37,6 @@ class InstanceConfigDiff {
std::vector<InstanceConfig> mAdded;
std::vector<InstanceConfig> mModified;
std::vector<std::string> mRemoved;
std::vector<std::string> mUnchanged; // 过渡使用,仅供插件系统用
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

Expand Down
59 changes: 31 additions & 28 deletions core/config/watcher/ConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <unordered_set>

#include "logger/Logger.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/InstanceConfigManager.h"
#include "pipeline/PipelineManager.h"

using namespace std;

Expand Down Expand Up @@ -51,12 +51,15 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
continue;
}
if (!filesystem::exists(s)) {
LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string())("configType", configType));
LOG_WARNING(sLogger,
("config dir path not existed", "skip current object")("dir path", dir.string())("configType",
configType));
continue;
}
if (!filesystem::is_directory(s)) {
LOG_WARNING(sLogger,
("config dir path is not a directory", "skip current object")("dir path", dir.string())("configType", configType));
("config dir path is not a directory",
"skip current object")("dir path", dir.string())("configType", configType));
continue;
}
for (auto const& entry : filesystem::directory_iterator(dir, ec)) {
Expand All @@ -72,13 +75,15 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
const string& configName = path.stem().string();
const string& filepath = path.string();
if (!filesystem::is_regular_file(entry.status(ec))) {
LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath)("configType", configType));
LOG_DEBUG(sLogger,
("config file is not a regular file",
"skip current object")("filepath", filepath)("configType", configType));
continue;
}
if (configSet.find(configName) != configSet.end()) {
LOG_WARNING(
sLogger,
("more than 1 config with the same name is found", "skip current config")("filepath", filepath)("configType", configType));
LOG_WARNING(sLogger,
("more than 1 config with the same name is found",
"skip current config")("filepath", filepath)("configType", configType));
continue;
}
configSet.insert(configName);
Expand All @@ -93,12 +98,16 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName)("configType", configType));
LOG_INFO(sLogger,
("new config found and disabled",
"skip current object")("config", configName)("configType", configType));
continue;
}
ConfigType config(configName, std::move(detail));
if (!config.Parse()) {
LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName)("configType", configType));
LOG_ERROR(sLogger,
("new config found but invalid",
"skip current object")("config", configName)("configType", configType));
LogtailAlarm::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM,
"new config found but invalid: skip current object, config: "
+ configName + ", configType: " + configType,
Expand All @@ -108,29 +117,27 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
continue;
}
diff.mAdded.push_back(std::move(config));
LOG_INFO(
sLogger,
("new config found and passed topology check", "prepare to build config")("config", configName)("configType", configType));
LOG_INFO(sLogger,
("new config found and passed topology check",
"prepare to build config")("config", configName)("configType", configType));
} else if (iter->second.first != size || iter->second.second != mTime) {
// for config currently running, we leave it untouched if new config is invalid
fileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
if (configManager->FindConfigByName(configName)) {
diff.mUnchanged.push_back(configName);
}
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
if (configManager->FindConfigByName(configName)) {
diff.mRemoved.push_back(configName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running config")("config", configName)("configType", configType));
LOG_INFO(
sLogger,
("existing valid config modified and disabled",
"prepare to stop current running config")("config", configName)("configType", configType));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled", "skip current object")("config",
configName)("configType", configType));
("existing invalid config modified and disabled",
"skip current object")("config", configName)("configType", configType));
}
continue;
}
Expand All @@ -157,7 +164,6 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
} else if (*detail != p->GetConfig()) {
ConfigType config(configName, std::move(detail));
if (!config.Parse()) {
diff.mUnchanged.push_back(configName);
LOG_ERROR(sLogger,
("existing valid config modified and becomes invalid",
"keep current config running")("config", configName)("configType", configType));
Expand All @@ -175,15 +181,11 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
("existing valid config modified and passed topology check",
"prepare to rebuild config")("config", configName)("configType", configType));
} else {
diff.mUnchanged.push_back(configName);
LOG_DEBUG(sLogger,
("existing valid config file modified, but no change found", "skip current object")("configType", configType));
("existing valid config file modified, but no change found",
"skip current object")("configType", configType));
}
} else {
// 为了插件系统过渡使用
if (configManager->FindConfigByName(configName)) {
diff.mUnchanged.push_back(configName);
}
LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")("configType", configType));
}
}
Expand All @@ -192,7 +194,8 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
if (configSet.find(name) == configSet.end()) {
diff.mRemoved.push_back(name);
LOG_INFO(sLogger,
("existing valid config is removed", "prepare to stop current running config")("config", name)("configType", configType));
("existing valid config is removed",
"prepare to stop current running config")("config", name)("configType", configType));
}
}
for (const auto& item : fileInfoMap) {
Expand Down
122 changes: 82 additions & 40 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ LogtailPlugin* LogtailPlugin::s_instance = NULL;
LogtailPlugin::LogtailPlugin() {
mPluginAdapterPtr = NULL;
mPluginBasePtr = NULL;
mLoadConfigFun = NULL;
mHoldOnFun = NULL;
mResumeFun = NULL;
mLoadPipelineFun = NULL;
mUnloadPipelineFun = NULL;
mStopAllFun = NULL;
mStopFun = NULL;
mStartFun = NULL;
mLoadGlobalConfigFun = NULL;
mProcessRawLogFun = NULL;
mPluginValid = false;
mPluginAlarmConfig.mLogstore = "logtail_alarm";
mPluginAlarmConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid);
Expand Down Expand Up @@ -83,7 +84,7 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
LoadPluginBase();
}

if (mPluginValid && mLoadConfigFun != NULL) {
if (mPluginValid && mLoadPipelineFun != NULL) {
GoString goProject;
GoString goLogstore;
GoString goConfigName;
Expand All @@ -99,31 +100,77 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
goLogstore.p = logstore.c_str();
long long goLogStoreKey = static_cast<long long>(logstoreKey);

return mLoadConfigFun(goProject, goLogstore, goConfigName, goLogStoreKey, goPluginConfig) == 0;
return mLoadPipelineFun(goProject, goLogstore, goConfigName, goLogStoreKey, goPluginConfig) == 0;
}

return false;
}

void LogtailPlugin::HoldOn(bool exitFlag) {
if (mPluginValid && mHoldOnFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines pause", "starts"));
auto holdOnStart = GetCurrentTimeInMilliSeconds();
mHoldOnFun(exitFlag ? 1 : 0);
auto holdOnCost = GetCurrentTimeInMilliSeconds() - holdOnStart;
LOG_INFO(sLogger, ("Go pipelines pause", "succeeded")("cost", ToString(holdOnCost) + "ms"));
if (holdOnCost >= 60 * 1000) {
bool LogtailPlugin::UnloadPipeline(const std::string& project,
const std::string& logstore,
const std::string& pipelineName) {
if (!mPluginValid) {
LOG_ERROR(sLogger, ("UnloadPipeline", "plugin not valid"));
return false;
}

if (mPluginValid && mUnloadPipelineFun != NULL) {
GoString goProject;
GoString goLogstore;
GoString goConfigName;

goProject.n = project.size();
goProject.p = project.c_str();
goLogstore.n = logstore.size();
goLogstore.p = logstore.c_str();
goConfigName.n = pipelineName.size();
goConfigName.p = pipelineName.c_str();

return mUnloadPipelineFun(goProject, goLogstore, goConfigName) == 0;
}

return false;
}

void LogtailPlugin::StopAll(bool withInputFlag) {
if (mPluginValid && mStopAllFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop all", "starts"));
auto stopAllStart = GetCurrentTimeInMilliSeconds();
mStopAllFun(withInputFlag ? 1 : 0);
auto stopAllCost = GetCurrentTimeInMilliSeconds() - stopAllStart;
LOG_INFO(sLogger, ("Go pipelines stop all", "succeeded")("cost", ToString(stopAllCost) + "ms"));
if (stopAllCost >= 60 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(HOLD_ON_TOO_SLOW_ALARM,
"Pausing Go pipelines took " + ToString(holdOnCost) + "ms");
"Stopping all Go pipelines took " + ToString(stopAllCost) + "ms");
}
}
}

void LogtailPlugin::Resume() {
if (mPluginValid && mResumeFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines resume", "starts"));
mResumeFun();
LOG_INFO(sLogger, ("Go pipelines resume", "succeeded"));
void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
GoString goConfigName;
goConfigName.n = configName.size();
goConfigName.p = configName.c_str();
mStopFun(goConfigName, removedFlag ? 1 : 0);
auto stopCost = GetCurrentTimeInMilliSeconds() - stopStart;
LOG_INFO(sLogger, ("Go pipelines stop", "succeeded")("config", configName)("cost", ToString(stopCost) + "ms"));
if (stopCost >= 30 * 1000) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
LogtailAlarm::GetInstance()->SendAlarm(
HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
}

void LogtailPlugin::Start(const std::string& configName) {
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
goConfigName.n = configName.size();
goConfigName.p = configName.c_str();
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
}

Expand Down Expand Up @@ -326,39 +373,34 @@ bool LogtailPlugin::LoadPluginBase() {
LOG_ERROR(sLogger, ("load LoadGlobalConfig error, Message", error));
return mPluginValid;
}
// 加载单个配置,目前应该是Resume的时候,全量加载一次
mLoadConfigFun = (LoadConfigFun)loader.LoadMethod("LoadConfig", error);
// 加载单个配置
mLoadPipelineFun = (LoadPipelineFun)loader.LoadMethod("LoadPipeline", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load LoadConfig error, Message", error));
LOG_ERROR(sLogger, ("load LoadPipelineFun error, Message", error));
return mPluginValid;
}
// 更新配置,目前应该没有调用点
mUnloadConfigFun = (UnloadConfigFun)loader.LoadMethod("UnloadConfig", error);
// 卸载单个配置
mUnloadPipelineFun = (UnloadPipelineFun)loader.LoadMethod("UnloadPipeline", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load UnloadConfig error, Message", error));
LOG_ERROR(sLogger, ("load UnloadPipelineFun error, Message", error));
return mPluginValid;
}
// 插件暂停
mHoldOnFun = (HoldOnFun)loader.LoadMethod("HoldOn", error);
// 停止所有插件
mStopAllFun = (StopAllFun)loader.LoadMethod("StopAll", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load HoldOn error, Message", error));
LOG_ERROR(sLogger, ("load StopAll error, Message", error));
return mPluginValid;
}
// 插件恢复
mResumeFun = (ResumeFun)loader.LoadMethod("Resume", error);
// 停止单个插件
mStopFun = (StopFun)loader.LoadMethod("Stop", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load Resume error, Message", error));
LOG_ERROR(sLogger, ("load Stop error, Message", error));
return mPluginValid;
}
// C++传递原始二进制数据到golang插件,v1和v2的区别:是否传递tag
mProcessRawLogFun = (ProcessRawLogFun)loader.LoadMethod("ProcessRawLog", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load ProcessRawLog error, Message", error));
return mPluginValid;
}
mProcessRawLogV2Fun = (ProcessRawLogV2Fun)loader.LoadMethod("ProcessRawLogV2", error);
// 插件恢复
mStartFun = (StartFun)loader.LoadMethod("Start", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load ProcessRawLogV2 error, Message", error));
LOG_ERROR(sLogger, ("load Start error, Message", error));
return mPluginValid;
}
// C++获取容器信息的
Expand Down
Loading
Loading