Skip to content

Commit

Permalink
Merge branch 'main' into feat/raw_event
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 committed Nov 13, 2024
2 parents 65e345b + 6e7991d commit 3ac462b
Show file tree
Hide file tree
Showing 167 changed files with 2,410 additions and 2,315 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
strategy:
matrix:
go-version: [ 1.19.10 ]
python-version: [ 3.8 ]
runner: [ ubuntu-latest ]
fail-fast: true
permissions:
Expand All @@ -62,6 +63,11 @@ jobs:
with:
go-version: ${{ matrix.go-version }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Check out code
uses: actions/checkout@v2
with:
Expand All @@ -83,6 +89,7 @@ jobs:
BUILD_LOGTAIL_UT: OFF
WITHOUTGDB: ON
run: |
pip3 install -r test/requirements.txt
make benchmark
git stash
Expand Down
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants profile_sender models
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants monitor/profile_sender models
config config/watcher constants
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
Expand Down
6 changes: 3 additions & 3 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include "json/value.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/AlarmManager.h"
#include "monitor/Monitor.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
Expand Down Expand Up @@ -660,10 +660,10 @@ void AppConfig::loadAppConfigLogtailMode(const std::string& ilogtailConfigFile)
confJson.clear();
if (res == CONFIG_NOT_EXIST) {
LOG_ERROR(sLogger, ("can not find start config", ilogtailConfigFile));
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CONFIG_ALARM, "can not find start config");
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CONFIG_ALARM, "can not find start config");
} else if (res == CONFIG_INVALID_FORMAT) {
LOG_ERROR(sLogger, ("start config is not valid json", ilogtailConfigFile));
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CONFIG_ALARM, "start config is not valid json");
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CONFIG_ALARM, "start config is not valid json");
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,16 @@ void Application::Init() {
// Initialize basic information: IP, hostname, etc.
LogFileProfiler::GetInstance();
#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->Init("enterprise");
EnterpriseConfigProvider::GetInstance()->LoadRegionConfig();
if (GlobalConf::Instance()->mStartWorkerStatus == "Crash") {
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM, "Logtail Restart");
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM, "Logtail Restart");
}
// get last crash info
string backTraceStr = GetCrashBackTrace();
if (!backTraceStr.empty()) {
LOG_ERROR(sLogger, ("last logtail crash stack", backTraceStr));
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_STACK_ALARM, backTraceStr);
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_STACK_ALARM, backTraceStr);
}
if (BOOL_FLAG(ilogtail_disable_core)) {
InitCrashBackTrace();
Expand Down Expand Up @@ -222,13 +223,13 @@ void Application::Start() { // GCOVR_EXCL_START
}

#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->Init("enterprise");
EnterpriseConfigProvider::GetInstance()->Start();
LegacyConfigProvider::GetInstance()->Init("legacy");
#else
InitRemoteConfigProviders();
#endif

LogtailAlarm::GetInstance()->Init();
AlarmManager::GetInstance()->Init();
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();

Expand Down Expand Up @@ -367,7 +368,7 @@ void Application::Exit() {

LogtailMonitor::GetInstance()->Stop();
LoongCollectorMonitor::GetInstance()->Stop();
LogtailAlarm::GetInstance()->Stop();
AlarmManager::GetInstance()->Stop();
LogtailPlugin::GetInstance()->StopBuiltInModules();
// from now on, alarm should not be used.

Expand All @@ -390,9 +391,9 @@ void Application::CheckCriticalCondition(int32_t curTime) {
// force to exit if config update thread is block more than 1 hour
if (lastGetConfigTime > 0 && curTime - lastGetConfigTime > 3600) {
LOG_ERROR(sLogger, ("last config get time is too old", lastGetConfigTime)("prepare force exit", ""));
LogtailAlarm::GetInstance()->SendAlarm(
AlarmManager::GetInstance()->SendAlarm(
LOGTAIL_CRASH_ALARM, "last config get time is too old: " + ToString(lastGetConfigTime) + " force exit");
LogtailAlarm::GetInstance()->ForceToSend();
AlarmManager::GetInstance()->ForceToSend();
sleep(10);
_exit(1);
}
Expand All @@ -401,9 +402,9 @@ void Application::CheckCriticalCondition(int32_t curTime) {
// work around for no network when docker start
if (BOOL_FLAG(send_prefer_real_ip) && !BOOL_FLAG(global_network_success) && curTime - mStartTime > 7200) {
LOG_ERROR(sLogger, ("network is fail", "prepare force exit"));
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM,
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM,
"network is fail since " + ToString(mStartTime) + " force exit");
LogtailAlarm::GetInstance()->ForceToSend();
AlarmManager::GetInstance()->ForceToSend();
sleep(10);
_exit(1);
}
Expand Down
6 changes: 3 additions & 3 deletions core/checkpoint/AdhocCheckpointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/AlarmManager.h"
#include "common/Thread.h"
#include "common/HashUtil.h"

Expand Down Expand Up @@ -149,7 +149,7 @@ void AdhocCheckpointManager::LoadAdhocCheckpoint() {
std::vector<std::string> jobList;
if (!GetAllFiles(adhocCheckpointDir, "*", jobList)) {
LOG_WARNING(sLogger, ("get all adhoc checkpoint files", "failed"));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "Load adhoc check point files failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "Load adhoc check point files failed");
return;
}

Expand All @@ -161,7 +161,7 @@ void AdhocCheckpointManager::LoadAdhocCheckpoint() {
}
} else if (!Mkdir(adhocCheckpointDir)) {
LOG_WARNING(sLogger, ("Create adhoc checkpoint dir", "failed"));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "Create adhoc check point dir failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "Create adhoc check point dir failed");
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/checkpoint/AdhocJobCheckpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "AdhocJobCheckpoint.h"
#include "common/FileSystemUtil.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/AlarmManager.h"

namespace logtail {

Expand Down Expand Up @@ -86,7 +86,7 @@ bool AdhocJobCheckpoint::Load(const std::string& path) {
std::ifstream ifs(path);
if (!ifs.is_open()) {
LOG_ERROR(sLogger, ("open adhoc check point file error when load, file path", path));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open check point file failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open check point file failed");
return false;
}

Expand Down Expand Up @@ -157,7 +157,7 @@ void AdhocJobCheckpoint::Dump(const std::string& path, bool isAutoDump) {

if (!Mkdirs(ParentPath(path))) {
LOG_ERROR(sLogger, ("open adhoc check point file dir error when dump, file path", path));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open adhoc check point file dir failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open adhoc check point file dir failed");
return;
}

Expand Down Expand Up @@ -209,7 +209,7 @@ void AdhocJobCheckpoint::Dump(const std::string& path, bool isAutoDump) {
std::ofstream ofs(path);
if (!ofs.is_open()) {
LOG_ERROR(sLogger, ("open adhoc check point file error, file path", path));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open adhoc check point file failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open adhoc check point file failed");
return;
}
ofs << jsonString;
Expand Down
24 changes: 12 additions & 12 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "file_server/ConfigManager.h"
#include "file_server/FileDiscoveryOptions.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/AlarmManager.h"

using namespace std;
DECLARE_FLAG_STRING(check_point_filename);
Expand Down Expand Up @@ -134,7 +134,7 @@ void CheckPointManager::LoadCheckPoint() {
LOG_ERROR(sLogger,
("load check point file fail, file content is not valid json",
AppConfig::GetInstance()->GetCheckPointFilePath()));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "content of check point file is not valid json");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "content of check point file is not valid json");
}
return;
}
Expand Down Expand Up @@ -171,11 +171,11 @@ void CheckPointManager::LoadDirCheckPoint(const Json::Value& root) {
}
} catch (const exception& e) {
LOG_ERROR(sLogger, ("failed to parse dir checkpoint", e.what()));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
"failed to parse dir checkpoint:" + string(e.what()));
} catch (...) {
LOG_ERROR(sLogger, ("failed to parse dir checkpoint", "unknown exception"));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
"failed to parse dir checkpoint, unkonw exception");
}
}
Expand Down Expand Up @@ -301,11 +301,11 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
}
} catch (const exception& e) {
LOG_ERROR(sLogger, ("failed to parse file checkpoint", e.what()));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
"failed to parse file checkpoint:" + string(e.what()));
} catch (...) {
LOG_ERROR(sLogger, ("failed to parse file checkpoint", "unknown exception"));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
"failed to parse file checkpoint, unkonw exception");
}
}
Expand All @@ -317,7 +317,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {

if (!Mkdirs(ParentPath(checkPointFile))) {
LOG_ERROR(sLogger, ("open check point file dir error", checkPointFile));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open check point file dir failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open check point file dir failed");
return false;
}

Expand Down Expand Up @@ -379,7 +379,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
= leaf;
}
LOG_WARNING(sLogger, ("Too many check point", mDevInodeCheckPointPtrMap.size()));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
"Too many check point:" + ToString(mDevInodeCheckPointPtrMap.size()));
}

Expand All @@ -400,7 +400,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
std::ofstream fout(checkPointTempFile.c_str());
if (!fout) {
LOG_ERROR(sLogger, ("open check point file error", checkPointFile));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open check point file failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "open check point file failed");
return false;
}
Json::Value result;
Expand All @@ -410,7 +410,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
fout << result.toStyledString();
if (!fout.good()) {
LOG_ERROR(sLogger, ("dump check point to file failed", checkPointFile));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "dump check point to file failed");
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "dump check point to file failed");
fout.close();
return false;
}
Expand All @@ -422,7 +422,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
#endif
if (rename(checkPointTempFile.c_str(), checkPointFile.c_str()) == -1) {
LOG_ERROR(sLogger, ("rename check point file fail, errno", errno));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM,
std::string("rename check point file fail, errno ") + ToString(errno));
return false;
}
Expand Down Expand Up @@ -494,7 +494,7 @@ boost::optional<std::string> SearchFilePathByDevInodeInDirectory(const std::stri

fsutil::Dir dir(dirPath);
if (!dir.Open()) {
LogtailAlarm::GetInstance()->SendAlarm(
AlarmManager::GetInstance()->SendAlarm(
CHECKPOINT_ALARM, string("Failed to open dir : ") + dirPath + ";\terrno : " + ToString(GetErrno()));
LOG_ERROR(sLogger, METHOD_LOG_PATTERN("message", "open dir error")("dir", dirPath));
continue;
Expand Down
4 changes: 2 additions & 2 deletions core/checkpoint/CheckpointManagerV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "common/ScopeInvoker.h"
#include "common/TimeUtil.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/AlarmManager.h"
#include "app_config/AppConfig.h"
#include "checkpoint/CheckPointManager.h"

Expand All @@ -45,7 +45,7 @@ namespace detail {
std::string msg;
msg.append("op:").append(op).append(", key:").append(key).append(", status:").append(s.ToString());
LOG_ERROR(sLogger, (title, msg));
LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_V2_ALARM, title + ", " + msg);
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_V2_ALARM, title + ", " + msg);
}

// Range key is represented by data pointer and size to avoid copy.
Expand Down
4 changes: 2 additions & 2 deletions core/common/EncodingConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

#include "EncodingConverter.h"
#include "LogtailAlarm.h"
#include "AlarmManager.h"
#include "logger/Logger.h"
#if defined(__linux__)
#include <iconv.h>
Expand Down Expand Up @@ -78,7 +78,7 @@ size_t EncodingConverter::ConvertGbk2Utf8(
if (ret == (size_t)(-1)) {
LOG_ERROR(sLogger, ("convert GBK to UTF8 fail, errno", strerror(errno)));
iconv(mGbk2Utf8Cd, NULL, NULL, NULL, NULL); // Clear status.
LogtailAlarm::GetInstance()->SendAlarm(ENCODING_CONVERT_ALARM, "convert GBK to UTF8 fail");
AlarmManager::GetInstance()->SendAlarm(ENCODING_CONVERT_ALARM, "convert GBK to UTF8 fail");
// use memcpy
memcpy(originDes + destIndex, originSrc + beginIndex, endIndex - beginIndex + 1);
destIndex += endIndex - beginIndex + 1;
Expand Down
2 changes: 1 addition & 1 deletion core/common/ParamExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

#include "common/StringTools.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/AlarmManager.h"

#define PARAM_ERROR_RETURN(logger, alarm, msg, module, config, project, logstore, region) \
if (module.empty()) { \
Expand Down
4 changes: 2 additions & 2 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ namespace logtail {

void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_COMPONENT, std::move(labels), std::move(dynamicLabels));
mInItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_TOTAL);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES);
mOutItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_TOTAL);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_SIZE_BYTES);
mTotalProcessMs = mMetricsRecordRef.CreateTimeCounter(METRIC_COMPONENT_TOTAL_PROCESS_TIME_MS);
mDiscardedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_TOTAL);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_SIZE_BYTES);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
Expand Down
2 changes: 1 addition & 1 deletion core/common/compression/Compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include <string>

#include "monitor/LogtailMetric.h"
#include "monitor/MetricManager.h"
#include "common/compression/CompressType.h"

namespace logtail {
Expand Down
3 changes: 1 addition & 2 deletions core/common/compression/CompressorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include "common/compression/CompressorFactory.h"

#include "common/ParamExtractor.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "common/compression/LZ4Compressor.h"
#include "common/compression/ZstdCompressor.h"
#include "monitor/metric_constants/MetricConstants.h"

using namespace std;

Expand Down Expand Up @@ -64,7 +64,6 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
compressor->SetMetricRecordRef({{METRIC_LABEL_KEY_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_COMPRESSOR},
{METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_COMPONENT},
{METRIC_LABEL_KEY_FLUSHER_PLUGIN_ID, flusherId}});
return compressor;
}
Expand Down
2 changes: 1 addition & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ bool PipelineConfig::Parse() {

string key, errorMsg;
const Json::Value* itr = nullptr;
LogtailAlarm& alarm = *LogtailAlarm::GetInstance();
AlarmManager& alarm = *AlarmManager::GetInstance();
// to send alarm and init MetricsRecord, project, logstore and region should be extracted first.
key = "flushers";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
Expand Down
Loading

0 comments on commit 3ac462b

Please sign in to comment.