Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reload pipeline config independently #1713

Merged
merged 43 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
9f501d7
feat: reload Go pipeline config independently
Abingcbc Aug 22, 2024
f8d7a0a
fix unittest
Abingcbc Aug 23, 2024
750f732
fix unittest
Abingcbc Aug 23, 2024
8f73fa7
fix
Abingcbc Aug 23, 2024
3496d95
fix
Abingcbc Aug 25, 2024
c5206af
fix
Abingcbc Sep 12, 2024
a6556f3
fix
Abingcbc Sep 12, 2024
5a3c10f
fix
Abingcbc Sep 13, 2024
d8d4c5b
unittest
Abingcbc Sep 15, 2024
96955d9
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 16, 2024
62f3a35
fix
Abingcbc Sep 16, 2024
d602845
fix
Abingcbc Sep 16, 2024
8c1ad10
fix
Abingcbc Sep 16, 2024
fd478b7
fix
Abingcbc Sep 16, 2024
9ddb08d
fix
Abingcbc Sep 17, 2024
409f575
self telemetry
Abingcbc Sep 18, 2024
9e76b8a
fix
Abingcbc Sep 18, 2024
d307d95
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 19, 2024
82baa18
fix
Abingcbc Sep 20, 2024
d8c28cd
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 20, 2024
6442438
fix
Abingcbc Sep 20, 2024
fad1bfe
fix
Abingcbc Sep 20, 2024
0d9a2a7
fix
Abingcbc Sep 20, 2024
9f0424e
fix
Abingcbc Sep 23, 2024
4e022c5
fix
Abingcbc Sep 23, 2024
5155278
fix
Abingcbc Sep 23, 2024
0a05f02
fix
Abingcbc Sep 23, 2024
b29b57f
fix
Abingcbc Sep 24, 2024
e770c59
fix
Abingcbc Sep 24, 2024
5d46f61
fix
Abingcbc Sep 24, 2024
7714655
fix
Abingcbc Sep 24, 2024
f5c4bf4
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 24, 2024
bea03ba
test: calculate incremental unittest coverage
Abingcbc Sep 25, 2024
2bf161e
fix
Abingcbc Sep 25, 2024
1cda480
Merge remote-tracking branch 'origin/coverage' into go-config
Abingcbc Sep 25, 2024
98e14ec
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 25, 2024
7634196
fix
Abingcbc Sep 26, 2024
25b10d2
fix
Abingcbc Sep 27, 2024
1d97013
fix
Abingcbc Sep 30, 2024
c932d94
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Sep 30, 2024
385e565
fix conflict
Abingcbc Sep 30, 2024
8d9c1da
Merge remote-tracking branch 'alibaba/main' into go-config
Abingcbc Oct 12, 2024
eb27f26
fix log
Abingcbc Oct 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 51 additions & 24 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
#include "common/JsonUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/TimeUtil.h"
#include "pipeline/compression/CompressorFactory.h"
#include "container_manager/ConfigContainerInfoUpdateCmd.h"
#include "file_server/ConfigManager.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "pipeline/PipelineManager.h"
#include "profile_sender/ProfileSender.h"
#include "pipeline/compression/CompressorFactory.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "profile_sender/ProfileSender.h"

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand All @@ -46,8 +46,9 @@ LogtailPlugin::LogtailPlugin() {
mPluginAdapterPtr = NULL;
mPluginBasePtr = NULL;
mLoadConfigFun = NULL;
mHoldOnFun = NULL;
mResumeFun = NULL;
mStopAllFun = NULL;
mStopFun = NULL;
mStartFun = NULL;
mLoadGlobalConfigFun = NULL;
mProcessRawLogFun = NULL;
mPluginValid = false;
Expand Down Expand Up @@ -108,25 +109,45 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
return false;
}

void LogtailPlugin::HoldOn(bool exitFlag) {
if (mPluginValid && mHoldOnFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines pause", "starts"));
auto holdOnStart = GetCurrentTimeInMilliSeconds();
mHoldOnFun(exitFlag ? 1 : 0);
auto holdOnCost = GetCurrentTimeInMilliSeconds() - holdOnStart;
LOG_INFO(sLogger, ("Go pipelines pause", "succeeded")("cost", ToString(holdOnCost) + "ms"));
if (holdOnCost >= 60 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(HOLD_ON_TOO_SLOW_ALARM,
"Pausing Go pipelines took " + ToString(holdOnCost) + "ms");
void LogtailPlugin::StopAll(bool exitFlag, bool withInputFlag) {
if (mPluginValid && mStopAllFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop all", "starts"));
auto stopAllStart = GetCurrentTimeInMilliSeconds();
mStopAllFun(exitFlag ? 1 : 0, withInputFlag ? 1 : 0);
auto stopAllCost = GetCurrentTimeInMilliSeconds() - stopAllStart;
LOG_INFO(sLogger, ("Go pipelines stop all", "succeeded")("cost", ToString(stopAllCost) + "ms"));
if (stopAllCost >= 60 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(STOP_ALL_TOO_SLOW_ALARM,
"Stopping all Go pipelines took " + ToString(stopAllCost) + "ms");
}
}
}

void LogtailPlugin::Resume() {
if (mPluginValid && mResumeFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines resume", "starts"));
mResumeFun();
LOG_INFO(sLogger, ("Go pipelines resume", "succeeded"));
void LogtailPlugin::Stop(const std::string& configName, bool removingFlag) {
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
GoString goConfigName;
goConfigName.n = configName.size();
goConfigName.p = configName.c_str();
mStopFun(goConfigName, removingFlag ? 1 : 0);
auto stopCost = GetCurrentTimeInMilliSeconds() - stopStart;
LOG_INFO(sLogger, ("Go pipelines stop", "succeeded")("config", configName)("cost", ToString(stopCost) + "ms"));
if (stopCost >= 10 * 1000) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
LogtailAlarm::GetInstance()->SendAlarm(
STOP_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
}

void LogtailPlugin::Start(const std::string& configName) {
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
goConfigName.n = configName.size();
goConfigName.p = configName.c_str();
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
}

Expand Down Expand Up @@ -341,16 +362,22 @@ bool LogtailPlugin::LoadPluginBase() {
LOG_ERROR(sLogger, ("load UnloadConfig error, Message", error));
return mPluginValid;
}
// 插件暂停
mHoldOnFun = (HoldOnFun)loader.LoadMethod("HoldOn", error);
// 停止所有插件
mStopAllFun = (StopAllFun)loader.LoadMethod("StopAll", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load StopAll error, Message", error));
return mPluginValid;
}
// 停止单个插件
mStopFun = (StopFun)loader.LoadMethod("Stop", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load HoldOn error, Message", error));
LOG_ERROR(sLogger, ("load Stop error, Message", error));
return mPluginValid;
}
// 插件恢复
mResumeFun = (ResumeFun)loader.LoadMethod("Resume", error);
mStartFun = (StartFun)loader.LoadMethod("Start", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load Resume error, Message", error));
LOG_ERROR(sLogger, ("load Start error, Message", error));
return mPluginValid;
}
// C++传递原始二进制数据到golang插件,v1和v2的区别:是否传递tag
Expand Down
15 changes: 9 additions & 6 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ typedef GoInt (*LoadConfigFun)(GoString p, GoString l, GoString c, GoInt64 k, Go
typedef GoInt (*UnloadConfigFun)(GoString p, GoString l, GoString c);
typedef GoInt (*ProcessRawLogFun)(GoString c, GoSlice l, GoString p, GoString t);
typedef GoInt (*ProcessRawLogV2Fun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
typedef void (*HoldOnFun)(GoInt);
typedef void (*ResumeFun)();
typedef void (*StopAllFun)(GoInt, GoInt);
typedef void (*StopFun)(GoString, GoInt);
typedef void (*StartFun)(GoString);
typedef GoInt (*InitPluginBaseFun)();
typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg);
typedef GoInt (*ProcessLogsFun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
Expand Down Expand Up @@ -214,8 +215,9 @@ class LogtailPlugin {
const std::string& logstore = "",
const std::string& region = "",
logtail::QueueKey logstoreKey = 0);
void HoldOn(bool exitFlag);
void Resume();
void StopAll(bool exitFlag, bool withInputFlag);
void Stop(const std::string& configName, bool removingFlag);
void Start(const std::string& configName);

bool IsPluginOpened() { return mPluginValid; }

Expand Down Expand Up @@ -273,8 +275,9 @@ class LogtailPlugin {
LoadGlobalConfigFun mLoadGlobalConfigFun;
LoadConfigFun mLoadConfigFun;
UnloadConfigFun mUnloadConfigFun;
HoldOnFun mHoldOnFun;
ResumeFun mResumeFun;
StopAllFun mStopAllFun;
StopFun mStopFun;
StartFun mStartFun;
ProcessRawLogFun mProcessRawLogFun;
ProcessRawLogV2Fun mProcessRawLogV2Fun;
volatile bool mPluginValid;
Expand Down
5 changes: 5 additions & 0 deletions core/monitor/LogtailAlarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ LogtailAlarm::LogtailAlarm() {
mMessageType[OBSERVER_RUNTIME_ALARM] = "OBSERVER_RUNTIME_ALARM";
mMessageType[OBSERVER_STOP_ALARM] = "OBSERVER_STOP_ALARM";
mMessageType[INVALID_CONTAINER_PATH_ALARM] = "INVALID_CONTAINER_PATH_ALARM";
mMessageType[COMPRESS_FAIL_ALARM] = "COMPRESS_FAIL_ALARM";
mMessageType[SERIALIZE_FAIL_ALARM] = "SERIALIZE_FAIL_ALARM";
mMessageType[RELABEL_METRIC_FAIL_ALARM] = "RELABEL_METRIC_FAIL_ALARM";
mMessageType[STOP_TOO_SLOW_ALARM] = "STOP_TOO_SLOW_ALARM";
mMessageType[STOP_ALL_TOO_SLOW_ALARM] = "STOP_ALL_TOO_SLOW_ALARM";
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
}

void LogtailAlarm::Init() {
Expand Down
10 changes: 6 additions & 4 deletions core/monitor/LogtailAlarm.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ enum LogtailAlarmType {
OBSERVER_RUNTIME_ALARM = 62,
OBSERVER_STOP_ALARM = 63,
INVALID_CONTAINER_PATH_ALARM = 64,
ALL_LOGTAIL_ALARM_NUM = 65,
COMPRESS_FAIL_ALARM = 66,
SERIALIZE_FAIL_ALARM = 67,
RELABEL_METRIC_FAIL_ALARM = 68
COMPRESS_FAIL_ALARM = 65,
SERIALIZE_FAIL_ALARM = 66,
RELABEL_METRIC_FAIL_ALARM = 67,
STOP_TOO_SLOW_ALARM = 68,
STOP_ALL_TOO_SLOW_ALARM = 69,
ALL_LOGTAIL_ALARM_NUM = 70
};

struct LogtailAlarmMessage {
Expand Down
47 changes: 37 additions & 10 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,16 @@ void AddExtendedGlobalParamToGoPipeline(const Json::Value& extendedParams, Json:
}

bool Pipeline::Init(PipelineConfig&& config) {
mInitTime = std::chrono::system_clock::now();
mName = config.mName;
mConfig = std::move(config.mDetail);
mContext.SetConfigName(mName);
mContext.SetCreateTime(config.mCreateTime);
mContext.SetPipeline(*this);
mContext.SetIsFirstProcessorJsonFlag(config.mIsFirstProcessorJson);
mMetricsRecordRef.AddLabels({{METRIC_LABEL_PROJECT, config.mProject}});
mMetricsRecordRef.AddLabels({{METRIC_LABEL_CONFIG_NAME, mName}});
WriteMetrics::GetInstance()->CommitMetricsRecordRef(mMetricsRecordRef);

// for special treatment below
const InputFile* inputFile = nullptr;
Expand All @@ -88,6 +92,7 @@ bool Pipeline::Init(PipelineConfig&& config) {
}

mPluginID.store(0);
mProcessingCnt.store(0);
for (size_t i = 0; i < config.mInputs.size(); ++i) {
const Json::Value& detail = *config.mInputs[i];
string pluginType = detail["Type"].asString();
Expand Down Expand Up @@ -324,20 +329,21 @@ void Pipeline::Start() {
}

if (!mGoPipelineWithoutInput.isNull()) {
// TODO: 加载该Go流水线
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput());
}

// TODO: 启用Process中改流水线对应的输入队列
ProcessQueueManager::GetInstance()->ValidatePop(mContext.GetConfigName());

if (!mGoPipelineWithInput.isNull()) {
// TODO: 加载该Go流水线
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput());
}

for (const auto& input : mInputs) {
input->Start();
}

LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName));
mLoadDelayMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - mInitTime).count());
}

void Pipeline::Process(vector<PipelineEventGroup>& logGroupList, size_t inputIndex) {
Expand Down Expand Up @@ -387,23 +393,42 @@ void Pipeline::Stop(bool isRemoving) {
}

if (!mGoPipelineWithInput.isNull()) {
// TODO: 卸载该Go流水线
// Go pipeline `Stop` will stop and delete
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving);
}

ProcessQueueManager::GetInstance()->InvalidatePop(mContext.GetConfigName());

// Wait for all processing item finish
uint64_t startTime = GetCurrentTimeInMilliSeconds();
bool alarmFlag = false;
while (mProcessingCnt.load() != 0) {
usleep(1000 * 10); // 10ms
uint64_t duration = GetCurrentTimeInMilliSeconds() - startTime;
if (!alarmFlag && duration > 1000) { // 1s
LOG_ERROR(sLogger, ("pipeline stop", "too slow")("config", mName)("cost", duration));
LogtailAlarm::GetInstance()->SendAlarm(CONFIG_UPDATE_ALARM,
string("pipeline stop too slow, config: ") + mName
+ "; cost:" + std::to_string(duration),
mContext.GetProjectName(),
mContext.GetLogstoreName(),
mContext.GetRegion());
alarmFlag = true;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

抽成一个函数,评估一下需不需要再加一个condition variable来通知而不是死等

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condition variable要配合unique lock一起。这里只需要保证这一个变量的原子性,用atomic比unique lock更轻量级

}

// TODO: 禁用Process中改流水线对应的输入队列

if (!isRemoving) {
FlushBatch();
}

if (!mGoPipelineWithoutInput.isNull()) {
// TODO: 卸载该Go流水线
// Go pipeline `Stop` will stop and delete
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
}

for (const auto& flusher : mFlushers) {
flusher->Stop(isRemoving);
}

LOG_INFO(sLogger, ("pipeline stop", "succeeded")("config", mName));
}

Expand Down Expand Up @@ -457,7 +482,8 @@ bool Pipeline::LoadGoPipelines() const {
// 目前按照从后往前顺序加载,即便without成功with失败导致without残留在插件系统中,也不会有太大的问题,但最好改成原子的。
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
if (!LogtailPlugin::GetInstance()->LoadPipeline(mName + "/2",
string goConfigName = GetConfigNameOfGoPipelineWithoutInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
content,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
Expand All @@ -476,7 +502,8 @@ bool Pipeline::LoadGoPipelines() const {
}
if (!mGoPipelineWithInput.isNull()) {
string content = mGoPipelineWithInput.toStyledString();
if (!LogtailPlugin::GetInstance()->LoadPipeline(mName + "/1",
string goConfigName = GetConfigNameOfGoPipelineWithInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
content,
mContext.GetProjectName(),
mContext.GetLogstoreName(),
Expand Down
36 changes: 33 additions & 3 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@
#include <vector>

#include "config/PipelineConfig.h"
#include "plugin/input/InputContainerStdio.h"
#include "plugin/input/InputFile.h"
#include "models/PipelineEventGroup.h"
#include "monitor/MetricConstants.h"
#include "pipeline/PipelineContext.h"
#include "pipeline/plugin/instance/FlusherInstance.h"
#include "pipeline/plugin/instance/InputInstance.h"
#include "pipeline/plugin/instance/ProcessorInstance.h"
#include "pipeline/route/Router.h"
#include "plugin/input/InputContainerStdio.h"
#include "plugin/input/InputFile.h"

namespace logtail {

class Pipeline {
public:
Pipeline() {
WriteMetrics::GetInstance()->CreateMetricsRecordRef(mMetricsRecordRef, {});
mLoadDelayMs = mMetricsRecordRef.CreateCounter("config_load_delay_ms");
}
// copy/move control functions are deleted because of mContext
bool Init(PipelineConfig&& config);
void Start();
Expand All @@ -45,6 +50,20 @@ class Pipeline {
bool Send(std::vector<PipelineEventGroup>&& groupList);
bool FlushBatch();
void RemoveProcessQueue() const;
// Should add before or when item pop from ProcessorQueue, must be called in the lock of ProcessorQueue
void AddInProcessingCnt() { mProcessingCnt.fetch_add(1); }
// Should sub when or after item push to SenderQueue
void SubInProcessingCnt() {
uint16_t currentVal;
do {
currentVal = mProcessingCnt.load(std::memory_order_relaxed);
// cannot sub smaller than 0
if (currentVal == 0) {
return;
}
} while (!mProcessingCnt.compare_exchange_weak(
currentVal, currentVal - 1, std::memory_order_release, std::memory_order_relaxed));
}
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

const std::string& Name() const { return mName; }
PipelineContext& GetContext() const { return mContext; }
Expand All @@ -54,7 +73,6 @@ class Pipeline {
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& GetPluginStatistics() const {
return mPluginCntMap;
}
bool LoadGoPipelines() const; // 应当放在private,过渡期间放在public

// only for input_observer_network for compatability
const std::vector<std::unique_ptr<InputInstance>>& GetInputs() const { return mInputs; }
Expand All @@ -63,7 +81,13 @@ class Pipeline {
static std::string GenPluginTypeWithID(std::string pluginType, std::string pluginID);
PluginInstance::PluginMeta GenNextPluginMeta(bool lastOne);

bool HasGoPipelineWithInput() const { return !mGoPipelineWithInput.isNull(); }
bool HasGoPipelineWithoutInput() const { return !mGoPipelineWithoutInput.isNull(); }
std::string GetConfigNameOfGoPipelineWithInput() const { return mName + "/1"; }
std::string GetConfigNameOfGoPipelineWithoutInput() const { return mName + "/2"; }

private:
bool LoadGoPipelines() const;
void MergeGoPipeline(const Json::Value& src, Json::Value& dst);
void AddPluginToGoPipeline(const std::string& type,
const Json::Value& plugin,
Expand All @@ -83,6 +107,12 @@ class Pipeline {
std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>> mPluginCntMap;
std::unique_ptr<Json::Value> mConfig;
std::atomic_uint16_t mPluginID;
std::atomic_uint16_t mProcessingCnt;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

// self-observation
mutable MetricsRecordRef mMetricsRecordRef;
std::chrono::system_clock::time_point mInitTime;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
CounterPtr mLoadDelayMs;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineMock;
Expand Down
Loading
Loading