Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick self monitor: merge 1.8.x #1773

Merged
merged 19 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 37 additions & 39 deletions core/config/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ bool Config::Parse() {
string key, errorMsg;
const Json::Value* itr = nullptr;
LogtailAlarm& alarm = *LogtailAlarm::GetInstance();
#ifdef __ENTERPRISE__
// to send alarm, project, logstore and region should be extracted first.
// to send alarm and init MetricsRecord, project, logstore and region should be extracted first.
key = "flushers";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (itr && itr->isArray()) {
Expand All @@ -122,7 +121,6 @@ bool Config::Parse() {
}
}
}
#endif

if (!GetOptionalUIntParam(*mDetail, "createTime", mCreateTime, errorMsg)) {
PARAM_WARNING_DEFAULT(sLogger, alarm, errorMsg, mCreateTime, noModule, mName, mProject, mLogstore, mRegion);
Expand Down Expand Up @@ -198,19 +196,19 @@ bool Config::Parse() {
mLogstore,
mRegion);
}
const string pluginName = it->asString();
const string pluginType = it->asString();
if (i == 0) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoInput = true;
} else if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginName)) {
} else if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
mHasNativeInput = true;
} else {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported input plugin", pluginName, mName, mProject, mLogstore, mRegion);
sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
} else {
if (mHasGoInput) {
if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginName)) {
if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native and extended input plugins coexist",
Expand All @@ -219,12 +217,12 @@ bool Config::Parse() {
mProject,
mLogstore,
mRegion);
} else if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
} else if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported input plugin", pluginName, mName, mProject, mLogstore, mRegion);
sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
} else {
if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginName)) {
if (PluginRegistry::GetInstance()->IsValidNativeInputPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"more than 1 native input plugin is given",
Expand All @@ -233,7 +231,7 @@ bool Config::Parse() {
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native and extended input plugins coexist",
Expand All @@ -244,15 +242,15 @@ bool Config::Parse() {
mRegion);
} else {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported input plugin", pluginName, mName, mProject, mLogstore, mRegion);
sLogger, alarm, "unsupported input plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
}
}
mInputs.push_back(&plugin);
if (pluginName == "input_observer_network") {
if (pluginType == "input_observer_network") {
hasObserverInput = true;
#ifdef __ENTERPRISE__
} else if (pluginName == "input_stream") {
} else if (pluginType == "input_stream") {
if (!AppConfig::GetInstance()->GetOpenStreamLog()) {
PARAM_ERROR_RETURN(
sLogger, alarm, "stream log is not enabled", noModule, mName, mProject, mLogstore, mRegion);
Expand Down Expand Up @@ -322,9 +320,9 @@ bool Config::Parse() {
mLogstore,
mRegion);
}
const string pluginName = it->asString();
const string pluginType = it->asString();
if (mHasGoInput) {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginName)) {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugins coexist with extended input plugins",
Expand All @@ -333,33 +331,33 @@ bool Config::Parse() {
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoProcessor = true;
} else {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginName,
pluginType,
mName,
mProject,
mLogstore,
mRegion);
}
} else {
if (isCurrentPluginNative) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
isCurrentPluginNative = false;
mHasGoProcessor = true;
} else if (!PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginName)) {
} else if (!PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginName,
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (pluginName == "processor_spl") {
} else if (pluginType == "processor_spl") {
isSPL = true;
if (i != 0 || itr->size() != 1) {
PARAM_ERROR_RETURN(sLogger,
Expand All @@ -385,22 +383,22 @@ bool Config::Parse() {
mHasNativeProcessor = true;
}
} else {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginName)) {
if (PluginRegistry::GetInstance()->IsValidNativeProcessorPlugin(pluginType)) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"native processor plugin comes after extended processor plugin",
pluginName,
pluginType,
mName,
mProject,
mLogstore,
mRegion);
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
} else if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoProcessor = true;
} else {
PARAM_ERROR_RETURN(sLogger,
alarm,
"unsupported processor plugin",
pluginName,
pluginType,
mName,
mProject,
mLogstore,
Expand All @@ -410,7 +408,7 @@ bool Config::Parse() {
}
mProcessors.push_back(&plugin);
if (i == 0) {
if (pluginName == "processor_parse_json_native" || pluginName == "processor_json") {
if (pluginType == "processor_parse_json_native" || pluginType == "processor_json") {
mIsFirstProcessorJson = true;
}
}
Expand Down Expand Up @@ -471,10 +469,10 @@ bool Config::Parse() {
mLogstore,
mRegion);
}
const string pluginName = it->asString();
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
const string pluginType = it->asString();
if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
mHasGoFlusher = true;
} else if (PluginRegistry::GetInstance()->IsValidNativeFlusherPlugin(pluginName)) {
} else if (PluginRegistry::GetInstance()->IsValidNativeFlusherPlugin(pluginType)) {
mHasNativeFlusher = true;
// processor spl could change pipelineEventGroup tags and affect the merge logic of aggregator
// so force specify the MergeType as logstore, loggroup will be merged into a List.
Expand All @@ -484,10 +482,10 @@ bool Config::Parse() {
}
} else {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported flusher plugin", pluginName, mName, mProject, mLogstore, mRegion);
sLogger, alarm, "unsupported flusher plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
#ifdef __ENTERPRISE__
if (hasStreamInput && pluginName != "flusher_sls") {
if (hasStreamInput && pluginType != "flusher_sls") {
PARAM_ERROR_RETURN(sLogger,
alarm,
"flusher plugins other than flusher_sls coexist with input_stream",
Expand Down Expand Up @@ -562,10 +560,10 @@ bool Config::Parse() {
mLogstore,
mRegion);
}
const string pluginName = it->asString();
if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
const string pluginType = it->asString();
if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported aggregator plugin", pluginName, mName, mProject, mLogstore, mRegion);
sLogger, alarm, "unsupported aggregator plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
mAggregators.push_back(&plugin);
}
Expand Down Expand Up @@ -628,10 +626,10 @@ bool Config::Parse() {
mLogstore,
mRegion);
}
const string pluginName = it->asString();
if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginName)) {
const string pluginType = it->asString();
if (!PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported extension plugin", pluginName, mName, mProject, mLogstore, mRegion);
sLogger, alarm, "unsupported extension plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
mExtensions.push_back(&plugin);
}
Expand Down
2 changes: 1 addition & 1 deletion core/config/provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void CommonConfigProvider::Stop() {
void CommonConfigProvider::CheckUpdateThread() {
LOG_INFO(sLogger, ("common config provider", "started"));
usleep((rand() % 10) * 100 * 1000);
int32_t lastCheckTime = 0;
int32_t lastCheckTime = time(NULL);
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
int32_t curTime = time(NULL);
Expand Down
1 change: 1 addition & 0 deletions core/container_manager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ file(GLOB LIB_SOURCE_FILES *.cpp *.h)
append_source_files(LIB_SOURCE_FILES)
add_library(${PROJECT_NAME} STATIC ${LIB_SOURCE_FILES})
target_link_libraries(${PROJECT_NAME} common)
target_link_libraries(${PROJECT_NAME} pipeline)
Loading
Loading