Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reload pipeline config independently #1713

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9f501d7
feat: reload Go pipeline config independently
Abingcbc Aug 22, 2024
f8d7a0a
fix unittest
Abingcbc Aug 23, 2024
750f732
fix unittest
Abingcbc Aug 23, 2024
8f73fa7
fix
Abingcbc Aug 23, 2024
3496d95
fix
Abingcbc Aug 25, 2024
c5206af
fix
Abingcbc Sep 12, 2024
a6556f3
fix
Abingcbc Sep 12, 2024
5a3c10f
fix
Abingcbc Sep 13, 2024
d8d4c5b
unittest
Abingcbc Sep 15, 2024
96955d9
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 16, 2024
62f3a35
fix
Abingcbc Sep 16, 2024
d602845
fix
Abingcbc Sep 16, 2024
8c1ad10
fix
Abingcbc Sep 16, 2024
fd478b7
fix
Abingcbc Sep 16, 2024
9ddb08d
fix
Abingcbc Sep 17, 2024
409f575
self telemetry
Abingcbc Sep 18, 2024
9e76b8a
fix
Abingcbc Sep 18, 2024
d307d95
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 19, 2024
82baa18
fix
Abingcbc Sep 20, 2024
d8c28cd
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 20, 2024
6442438
fix
Abingcbc Sep 20, 2024
fad1bfe
fix
Abingcbc Sep 20, 2024
0d9a2a7
fix
Abingcbc Sep 20, 2024
9f0424e
fix
Abingcbc Sep 23, 2024
4e022c5
fix
Abingcbc Sep 23, 2024
5155278
fix
Abingcbc Sep 23, 2024
0a05f02
fix
Abingcbc Sep 23, 2024
b29b57f
fix
Abingcbc Sep 24, 2024
e770c59
fix
Abingcbc Sep 24, 2024
5d46f61
fix
Abingcbc Sep 24, 2024
7714655
fix
Abingcbc Sep 24, 2024
f5c4bf4
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 24, 2024
bea03ba
test: calculate incremental unittest coverage
Abingcbc Sep 25, 2024
2bf161e
fix
Abingcbc Sep 25, 2024
1cda480
Merge remote-tracking branch 'origin/coverage' into go-config
Abingcbc Sep 25, 2024
98e14ec
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 25, 2024
7634196
fix
Abingcbc Sep 26, 2024
25b10d2
fix
Abingcbc Sep 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/watcher/ConfigWatcher.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
#include "monitor/Monitor.h"
#include "pipeline/InstanceConfigManager.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "runner/FlusherRunner.h"
#include "runner/ProcessorRunner.h"
#include "runner/sink/http/HttpSink.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
Expand Down Expand Up @@ -269,7 +269,7 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

LogProcess::GetInstance()->Start();
ProcessorRunner::GetInstance()->Init();

time_t curTime = 0, lastProfilingCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
lastCheckTagsTime = 0, lastQueueGCTime = 0;
Expand Down
4 changes: 1 addition & 3 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <string>
#include <vector>

#include "config/PipelineConfig.h"
#include "config/InstanceConfig.h"
#include "config/PipelineConfig.h"

namespace logtail {

Expand All @@ -29,7 +29,6 @@ class PipelineConfigDiff {
std::vector<PipelineConfig> mAdded;
std::vector<PipelineConfig> mModified;
std::vector<std::string> mRemoved;
std::vector<std::string> mUnchanged; // 过渡使用,仅供插件系统用
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

Expand All @@ -38,7 +37,6 @@ class InstanceConfigDiff {
std::vector<InstanceConfig> mAdded;
std::vector<InstanceConfig> mModified;
std::vector<std::string> mRemoved;
std::vector<std::string> mUnchanged; // 过渡使用,仅供插件系统用
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

Expand Down
59 changes: 31 additions & 28 deletions core/config/watcher/ConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <unordered_set>

#include "logger/Logger.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/InstanceConfigManager.h"
#include "pipeline/PipelineManager.h"

using namespace std;

Expand Down Expand Up @@ -51,12 +51,15 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
continue;
}
if (!filesystem::exists(s)) {
LOG_WARNING(sLogger, ("config dir path not existed", "skip current object")("dir path", dir.string())("configType", configType));
LOG_WARNING(sLogger,
("config dir path not existed", "skip current object")("dir path", dir.string())("configType",
configType));
continue;
}
if (!filesystem::is_directory(s)) {
LOG_WARNING(sLogger,
("config dir path is not a directory", "skip current object")("dir path", dir.string())("configType", configType));
("config dir path is not a directory",
"skip current object")("dir path", dir.string())("configType", configType));
continue;
}
for (auto const& entry : filesystem::directory_iterator(dir, ec)) {
Expand All @@ -72,13 +75,15 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
const string& configName = path.stem().string();
const string& filepath = path.string();
if (!filesystem::is_regular_file(entry.status(ec))) {
LOG_DEBUG(sLogger, ("config file is not a regular file", "skip current object")("filepath", filepath)("configType", configType));
LOG_DEBUG(sLogger,
("config file is not a regular file",
"skip current object")("filepath", filepath)("configType", configType));
continue;
}
if (configSet.find(configName) != configSet.end()) {
LOG_WARNING(
sLogger,
("more than 1 config with the same name is found", "skip current config")("filepath", filepath)("configType", configType));
LOG_WARNING(sLogger,
("more than 1 config with the same name is found",
"skip current config")("filepath", filepath)("configType", configType));
continue;
}
configSet.insert(configName);
Expand All @@ -93,12 +98,16 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName)("configType", configType));
LOG_INFO(sLogger,
("new config found and disabled",
"skip current object")("config", configName)("configType", configType));
continue;
}
ConfigType config(configName, std::move(detail));
if (!config.Parse()) {
LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName)("configType", configType));
LOG_ERROR(sLogger,
("new config found but invalid",
"skip current object")("config", configName)("configType", configType));
LogtailAlarm::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM,
"new config found but invalid: skip current object, config: "
+ configName + ", configType: " + configType,
Expand All @@ -108,29 +117,27 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
continue;
}
diff.mAdded.push_back(std::move(config));
LOG_INFO(
sLogger,
("new config found and passed topology check", "prepare to build config")("config", configName)("configType", configType));
LOG_INFO(sLogger,
("new config found and passed topology check",
"prepare to build config")("config", configName)("configType", configType));
} else if (iter->second.first != size || iter->second.second != mTime) {
// for config currently running, we leave it untouched if new config is invalid
fileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
if (configManager->FindConfigByName(configName)) {
diff.mUnchanged.push_back(configName);
}
continue;
}
if (!IsConfigEnabled(configName, *detail)) {
if (configManager->FindConfigByName(configName)) {
diff.mRemoved.push_back(configName);
LOG_INFO(sLogger,
("existing valid config modified and disabled",
"prepare to stop current running config")("config", configName)("configType", configType));
LOG_INFO(
sLogger,
("existing valid config modified and disabled",
"prepare to stop current running config")("config", configName)("configType", configType));
} else {
LOG_INFO(sLogger,
("existing invalid config modified and disabled", "skip current object")("config",
configName)("configType", configType));
("existing invalid config modified and disabled",
"skip current object")("config", configName)("configType", configType));
}
continue;
}
Expand All @@ -157,7 +164,6 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
} else if (*detail != p->GetConfig()) {
ConfigType config(configName, std::move(detail));
if (!config.Parse()) {
diff.mUnchanged.push_back(configName);
LOG_ERROR(sLogger,
("existing valid config modified and becomes invalid",
"keep current config running")("config", configName)("configType", configType));
Expand All @@ -175,15 +181,11 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
("existing valid config modified and passed topology check",
"prepare to rebuild config")("config", configName)("configType", configType));
} else {
diff.mUnchanged.push_back(configName);
LOG_DEBUG(sLogger,
("existing valid config file modified, but no change found", "skip current object")("configType", configType));
("existing valid config file modified, but no change found",
"skip current object")("configType", configType));
}
} else {
// 为了插件系统过渡使用
if (configManager->FindConfigByName(configName)) {
diff.mUnchanged.push_back(configName);
}
LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")("configType", configType));
}
}
Expand All @@ -192,7 +194,8 @@ ConfigDiffType ConfigWatcher::CheckConfigDiff(
if (configSet.find(name) == configSet.end()) {
diff.mRemoved.push_back(name);
LOG_INFO(sLogger,
("existing valid config is removed", "prepare to stop current running config")("config", name)("configType", configType));
("existing valid config is removed",
"prepare to stop current running config")("config", name)("configType", configType));
}
}
for (const auto& item : fileInfoMap) {
Expand Down
10 changes: 5 additions & 5 deletions core/file_server/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
#include <string>
#include <vector>

#include "file_server/event_handler/LogInput.h"
#include "app_config/AppConfig.h"
#include "common/FileSystemUtil.h"
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/event_handler/LogInput.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "runner/ProcessorRunner.h"

using namespace std;
using namespace sls_logs;
Expand Down Expand Up @@ -1090,7 +1090,7 @@ int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* lo
time(NULL));
PipelineEventGroup group = LogFileReader::GenerateEventGroup(reader, logBuffer);

while (!LogProcess::GetInstance()->PushBuffer(reader->GetQueueKey(), 0, std::move(group))) // 10ms
while (!ProcessorRunner::GetInstance()->PushQueue(reader->GetQueueKey(), 0, std::move(group))) // 10ms
{
++pushRetry;
if (pushRetry % 10 == 0)
Expand Down
8 changes: 4 additions & 4 deletions core/file_server/event_handler/HistoryFileImporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
#include "common/Thread.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/reader/LogFileReader.h"
#include "logger/Logger.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "file_server/reader/LogFileReader.h"
#include "runner/ProcessorRunner.h"

namespace logtail {

Expand Down Expand Up @@ -59,7 +59,7 @@ void HistoryFileImporter::LoadCheckPoint() {
}

void HistoryFileImporter::ProcessEvent(const HistoryFileEvent& event, const std::vector<std::string>& fileNames) {
static LogProcess* logProcess = LogProcess::GetInstance();
static ProcessorRunner* logProcess = ProcessorRunner::GetInstance();

LOG_INFO(sLogger, ("begin load history files, count", fileNames.size())("file list", ToString(fileNames)));
for (size_t i = 0; i < fileNames.size(); ++i) {
Expand Down Expand Up @@ -116,7 +116,7 @@ void HistoryFileImporter::ProcessEvent(const HistoryFileEvent& event, const std:

// TODO: currently only 1 input is allowed, so we assume 0 here. It should be the actual input seq after
// refactorization.
logProcess->PushBuffer(readerSharePtr->GetQueueKey(), 0, std::move(group), 100000000);
logProcess->PushQueue(readerSharePtr->GetQueueKey(), 0, std::move(group), 100000000);
} else {
// when ReadLog return false, retry once
if (doneFlag) {
Expand Down
Loading
Loading