Skip to content

Commit

Permalink
change go exporter (#1727)
Browse files Browse the repository at this point in the history
* change go exporter

* fix lint

* delete agent-level funcs

* fix lint

* change name

* fix bug

* change cgo and add exit flush metrics

* change prama name

* fix bug

* change params name

* change space to \t
  • Loading branch information
Takuka0311 committed Sep 12, 2024
1 parent eb657cb commit eaeeb4a
Show file tree
Hide file tree
Showing 18 changed files with 197 additions and 135 deletions.
8 changes: 4 additions & 4 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ void LogInput::Start() {

mInteruptFlag = false;

mGlobalOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mGlobalRegisterHandlerTotal
mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mAgentRegisterHandlerTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_REGISTER_HANDLER_TOTAL);

new Thread([this]() { ProcessLoop(); });
Expand Down Expand Up @@ -347,10 +347,10 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
int32_t openFdTotal = GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize();
LogtailMonitor::GetInstance()->UpdateMetric("open_fd", openFdTotal);
mGlobalOpenFdTotal->Set(openFdTotal);
mAgentOpenFdTotal->Set(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mGlobalRegisterHandlerTotal->Set(handlerCount);
mAgentRegisterHandlerTotal->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
Expand Down
4 changes: 2 additions & 2 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ class LogInput : public LogRunnable {
volatile bool mIdleFlag;
int32_t mEventProcessCount;
int32_t mLastUpdateMetricTime;
IntGaugePtr mGlobalOpenFdTotal;
IntGaugePtr mGlobalRegisterHandlerTotal;
IntGaugePtr mAgentOpenFdTotal;
IntGaugePtr mAgentRegisterHandlerTotal;

std::atomic_int mLastReadEventTime{0};
mutable std::mutex mThreadRunningMux;
Expand Down
12 changes: 6 additions & 6 deletions core/file_server/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ static const int64_t NANO_CONVERTING = 1000000000;

void PollingDirFile::Start() {
ClearCache();
mGlobalConfigTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL);
mGlobalPollingDirCacheSizeTotal
mAgentConfigTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL);
mAgentPollingDirCacheSizeTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL);
mGlobalPollingFileCacheSizeTotal
mAgentPollingFileCacheSizeTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL);
mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
Expand Down Expand Up @@ -152,15 +152,15 @@ void PollingDirFile::Polling() {

size_t configTotal = nameConfigMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal);
mGlobalConfigTotal->Set(configTotal);
mAgentConfigTotal->Set(configTotal);
{
ScopedSpinLock lock(mCacheLock);
size_t pollingDirCacheSize = mDirCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize);
mGlobalPollingDirCacheSizeTotal->Set(pollingDirCacheSize);
mAgentPollingDirCacheSizeTotal->Set(pollingDirCacheSize);
size_t pollingFileCacheSize = mFileCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize);
mGlobalPollingFileCacheSizeTotal->Set(pollingFileCacheSize);
mAgentPollingFileCacheSizeTotal->Set(pollingFileCacheSize);
}

// Iterate all normal configs, make sure stat count will not exceed limit.
Expand Down
6 changes: 3 additions & 3 deletions core/file_server/polling/PollingDirFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ class PollingDirFile : public LogRunnable {
// The sequence number of current round, uint64_t is used to avoid overflow.
uint64_t mCurrentRound;

IntGaugePtr mGlobalConfigTotal;
IntGaugePtr mGlobalPollingDirCacheSizeTotal;
IntGaugePtr mGlobalPollingFileCacheSizeTotal;
IntGaugePtr mAgentConfigTotal;
IntGaugePtr mAgentPollingDirCacheSizeTotal;
IntGaugePtr mAgentPollingFileCacheSizeTotal;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PollingUnittest;
Expand Down
4 changes: 2 additions & 2 deletions core/file_server/polling/PollingModify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ PollingModify::~PollingModify() {

void PollingModify::Start() {
ClearCache();
mGlobalPollingModifySizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL);
mAgentPollingModifySizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL);

mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
Expand Down Expand Up @@ -251,7 +251,7 @@ void PollingModify::Polling() {
int32_t statCount = 0;
size_t pollingModifySizeTotal = mModifyCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal);
mGlobalPollingModifySizeTotal->Set(pollingModifySizeTotal);
mAgentPollingModifySizeTotal->Set(pollingModifySizeTotal);
for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) {
if (!mRuningFlag || mHoldOnFlag)
break;
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/polling/PollingModify.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PollingModify : public LogRunnable {

ModifyCheckCacheMap mModifyCacheMap;

IntGaugePtr mGlobalPollingModifySizeTotal;
IntGaugePtr mAgentPollingModifySizeTotal;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PollingUnittest;
Expand Down
15 changes: 9 additions & 6 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,10 @@ bool LogtailPlugin::LoadPluginBase() {
LOG_ERROR(sLogger, ("load ProcessLogGroup error, Message", error));
return mPluginValid;
}
// 获取golang插件部分统计信息
mGetPipelineMetricsFun = (GetPipelineMetricsFun)loader.LoadMethod("GetPipelineMetrics", error);
// 获取golang部分指标信息
mGetGoMetricsFun = (GetGoMetricsFun)loader.LoadMethod("GetGoMetrics", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load GetPipelineMetrics error, Message", error));
LOG_ERROR(sLogger, ("load GetGoMetrics error, Message", error));
return mPluginValid;
}

Expand Down Expand Up @@ -470,9 +470,12 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
}
}

void LogtailPlugin::GetPipelineMetrics(std::vector<std::map<std::string, std::string>>& metircsList) {
if (mGetPipelineMetricsFun != nullptr) {
auto metrics = mGetPipelineMetricsFun();
void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList, const string& metricType) {
if (mGetGoMetricsFun != nullptr) {
GoString type;
type.n = metricType.size();
type.p = metricType.c_str();
auto metrics = mGetGoMetricsFun(type);
if (metrics != nullptr) {
for (int i = 0; i < metrics->count; ++i) {
std::map<std::string, std::string> item;
Expand Down
6 changes: 3 additions & 3 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg);
typedef GoInt (*ProcessLogsFun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags);
typedef GoInt (*ProcessLogGroupFun)(GoString c, GoSlice l, GoString p);
typedef struct innerContainerMeta* (*GetContainerMetaFun)(GoString containerID);
typedef InnerPluginMetrics* (*GetPipelineMetricsFun)();
typedef InnerPluginMetrics* (*GetGoMetricsFun)(GoString metricType);

// Methods export by adapter.
typedef int (*IsValidToSendFun)(long long logstoreKey);
Expand Down Expand Up @@ -264,7 +264,7 @@ class LogtailPlugin {

K8sContainerMeta GetContainerMeta(const std::string& containerID);

void GetPipelineMetrics(std::vector<std::map<std::string, std::string>>& metircsList);
void GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList, const std::string& metricType);

private:
void* mPluginBasePtr;
Expand All @@ -284,7 +284,7 @@ class LogtailPlugin {
ProcessLogsFun mProcessLogsFun;
ProcessLogGroupFun mProcessLogGroupFun;
GetContainerMetaFun mGetContainerMetaFun;
GetPipelineMetricsFun mGetPipelineMetricsFun;
GetGoMetricsFun mGetGoMetricsFun;

// Configuration for plugin system in JSON format.
Json::Value mPluginCfg;
Expand Down
2 changes: 1 addition & 1 deletion core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const std::string METRIC_LABEL_INSTANCE_ID = "instance_id";
const std::string METRIC_LABEL_IP = "ip";
const std::string METRIC_LABEL_OS = "os";
const std::string METRIC_LABEL_OS_DETAIL = "os_detail";
const std::string METRIC_LABEL_PROJECTS = "projects";
const std::string METRIC_LABEL_USER_DEFINED_ID = "user_defined_id";
const std::string METRIC_LABEL_UUID = "uuid";
const std::string METRIC_LABEL_VERSION = "version";
Expand All @@ -44,6 +43,7 @@ const std::string METRIC_AGENT_CPU = "agent_cpu_percent";
const std::string METRIC_AGENT_CPU_GO = "agent_go_cpu_percent";
const std::string METRIC_AGENT_MEMORY = "agent_memory_used_mb";
const std::string METRIC_AGENT_MEMORY_GO = "agent_go_memory_used_mb";
const std::string METRIC_AGENT_GO_ROUTINES_TOTAL = "agent_go_routines_total";
const std::string METRIC_AGENT_OPEN_FD_TOTAL = "agent_open_fd_total";
const std::string METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL = "agent_polling_dir_cache_size_total";
const std::string METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL = "agent_polling_file_cache_size_total";
Expand Down
2 changes: 1 addition & 1 deletion core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ extern const std::string METRIC_LABEL_INSTANCE_ID;
extern const std::string METRIC_LABEL_IP;
extern const std::string METRIC_LABEL_OS;
extern const std::string METRIC_LABEL_OS_DETAIL;
extern const std::string METRIC_LABEL_PROJECTS;
extern const std::string METRIC_LABEL_USER_DEFINED_ID;
extern const std::string METRIC_LABEL_UUID;
extern const std::string METRIC_LABEL_VERSION;
Expand All @@ -46,6 +45,7 @@ extern const std::string METRIC_AGENT_CPU;
extern const std::string METRIC_AGENT_CPU_GO;
extern const std::string METRIC_AGENT_MEMORY;
extern const std::string METRIC_AGENT_MEMORY_GO;
extern const std::string METRIC_AGENT_GO_ROUTINES_TOTAL;
extern const std::string METRIC_AGENT_OPEN_FD_TOTAL;
extern const std::string METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL;
extern const std::string METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL;
Expand Down
127 changes: 64 additions & 63 deletions core/monitor/MetricExportor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ DECLARE_FLAG_STRING(metrics_report_method);

namespace logtail {

const std::string agentLevelMetricKey = "metric-level";
const std::string agentLevelMetricValue = "agent";
const std::string METRIC_EXPORT_TYPE_GO = "direct";
const std::string METRIC_EXPORT_TYPE_CPP = "cpp_provided";

MetricExportor::MetricExportor() : mSendInterval(60), mLastSendTime(time(NULL) - (rand() % (mSendInterval / 10)) * 10) {
// mGlobalCpuGo = LoongCollectorMonitor::GetInstance()->GetDoubleGauge(METRIC_AGENT_CPU_GO);
mGlobalMemGo = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_MEMORY_GO);
// mAgentCpuGo = LoongCollectorMonitor::GetInstance()->GetDoubleGauge(METRIC_AGENT_CPU_GO);
mAgentMemGo = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_MEMORY_GO);
mAgentGoRoutines = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL);
}

void MetricExportor::PushMetrics(bool forceSend) {
Expand Down Expand Up @@ -72,37 +73,13 @@ void MetricExportor::PushCppMetrics() {
}

void MetricExportor::PushGoMetrics() {
std::vector<std::map<std::string, std::string>> goMetircsList;
LogtailPlugin::GetInstance()->GetPipelineMetrics(goMetircsList);
std::vector<std::map<std::string, std::string>> goDirectMetircsList;
LogtailPlugin::GetInstance()->GetGoMetrics(goDirectMetircsList, METRIC_EXPORT_TYPE_GO);
std::vector<std::map<std::string, std::string>> goCppProvidedMetircsList;
LogtailPlugin::GetInstance()->GetGoMetrics(goCppProvidedMetircsList, METRIC_EXPORT_TYPE_CPP);

// filter agent or plugin level metrics
std::vector<std::map<std::string, std::string>> goPluginMetircsList;
for (auto goMetrics : goMetircsList) {
if (goMetrics.find(agentLevelMetricKey) != goMetrics.end()) {
// Go agent-level metrics
if (goMetrics.at(agentLevelMetricKey) == agentLevelMetricValue) {
SendGoAgentLevelMetrics(goMetrics);
continue;
}
} else {
// Go plugin-level metrics
goPluginMetircsList.push_back(std::move(goMetrics));
}
}
if (goPluginMetircsList.size() == 0) {
return;
}

// send plugin-level metrics
if ("sls" == STRING_FLAG(metrics_report_method)) {
std::map<std::string, sls_logs::LogGroup*> goPluginMetircsLogGroupMap;
SerializeGoPluginMetricsListToLogGroupMap(goPluginMetircsList, goPluginMetircsLogGroupMap);
SendToSLS(goPluginMetircsLogGroupMap);
} else if ("file" == STRING_FLAG(metrics_report_method)) {
std::string goPluginMetircsContent;
SerializeGoPluginMetricsListToString(goPluginMetircsList, goPluginMetircsContent);
SendToLocalFile(goPluginMetircsContent, "self-metrics-go");
}
PushGoCppProvidedMetrics(goCppProvidedMetircsList);
PushGoDirectMetrics(goDirectMetircsList);
}

void MetricExportor::SendToSLS(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) {
Expand Down Expand Up @@ -171,32 +148,56 @@ void MetricExportor::SendToLocalFile(std::string& metricsContent, const std::str
}
}

void MetricExportor::SendGoAgentLevelMetrics(std::map<std::string, std::string>& metrics) {
for (auto metric : metrics) {
if (metric.first == agentLevelMetricKey) {
continue;
}
// if (metric.first == METRIC_AGENT_CPU_GO) {
// mGlobalCpuGo->Set(std::stod(metric.second));
// }
if (metric.first == METRIC_AGENT_MEMORY_GO) {
mGlobalMemGo->Set(std::stoi(metric.second));
// metrics from Go that are directly outputted
void MetricExportor::PushGoDirectMetrics(std::vector<std::map<std::string, std::string>>& metricsList) {
if (metricsList.size() == 0) {
return;
}

if ("sls" == STRING_FLAG(metrics_report_method)) {
std::map<std::string, sls_logs::LogGroup*> logGroupMap;
SerializeGoDirectMetricsListToLogGroupMap(metricsList, logGroupMap);
SendToSLS(logGroupMap);
} else if ("file" == STRING_FLAG(metrics_report_method)) {
std::string metricsContent;
SerializeGoDirectMetricsListToString(metricsList, metricsContent);
SendToLocalFile(metricsContent, "self-metrics-go");
}
}

// metrics from Go that are provided by cpp
void MetricExportor::PushGoCppProvidedMetrics(std::vector<std::map<std::string, std::string>>& metricsList) {
if (metricsList.size() == 0) {
return;
}

for (auto metrics : metricsList) {
for (auto metric : metrics) {
// if (metric.first == METRIC_AGENT_CPU_GO) {
// mAgentCpuGo->Set(std::stod(metric.second));
// }
if (metric.first == METRIC_AGENT_MEMORY_GO) {
mAgentMemGo->Set(std::stoi(metric.second));
}
if (metric.first == METRIC_AGENT_GO_ROUTINES_TOTAL) {
mAgentGoRoutines->Set(std::stoi(metric.second));
}
LogtailMonitor::GetInstance()->UpdateMetric(metric.first, metric.second);
}
LogtailMonitor::GetInstance()->UpdateMetric(metric.first, metric.second);
}
}

void MetricExportor::SerializeGoPluginMetricsListToLogGroupMap(
std::vector<std::map<std::string, std::string>>& goPluginMetircsList,
std::map<std::string, sls_logs::LogGroup*>& goLogGroupMap) {
for (auto& item : goPluginMetircsList) {
void MetricExportor::SerializeGoDirectMetricsListToLogGroupMap(
std::vector<std::map<std::string, std::string>>& metricsList,
std::map<std::string, sls_logs::LogGroup*>& logGroupMap) {
for (auto& metrics : metricsList) {
std::string configName = "";
std::string region = METRIC_REGION_DEFAULT;
{
// get the config_name label
for (const auto& pair : item) {
if (pair.first == "label.config_name") {
configName = pair.second;
for (const auto& metric : metrics) {
if (metric.first == "label.config_name") {
configName = metric.second;
break;
}
}
Expand All @@ -213,37 +214,37 @@ void MetricExportor::SerializeGoPluginMetricsListToLogGroupMap(
}
}
Log* logPtr = nullptr;
auto LogGroupIter = goLogGroupMap.find(region);
if (LogGroupIter != goLogGroupMap.end()) {
auto LogGroupIter = logGroupMap.find(region);
if (LogGroupIter != logGroupMap.end()) {
sls_logs::LogGroup* logGroup = LogGroupIter->second;
logPtr = logGroup->add_logs();
} else {
sls_logs::LogGroup* logGroup = new sls_logs::LogGroup();
logPtr = logGroup->add_logs();
goLogGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(region, logGroup));
logGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(region, logGroup));
}
auto now = GetCurrentLogtailTime();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec);
for (const auto& pair : item) {
for (const auto& metric : metrics) {
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(pair.first);
contentPtr->set_value(pair.second);
contentPtr->set_key(metric.first);
contentPtr->set_value(metric.second);
}
}
}

void MetricExportor::SerializeGoPluginMetricsListToString(
std::vector<std::map<std::string, std::string>>& goPluginMetircsList, std::string& metricsContent) {
void MetricExportor::SerializeGoDirectMetricsListToString(std::vector<std::map<std::string, std::string>>& metricsList,
std::string& metricsContent) {
std::ostringstream oss;

for (auto& item : goPluginMetircsList) {
for (auto& metrics : metricsList) {
Json::Value metricsRecordValue;
auto now = GetCurrentLogtailTime();
metricsRecordValue["time"]
= AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec;
for (const auto& pair : item) {
metricsRecordValue[pair.first] = pair.second;
for (const auto& metric : metrics) {
metricsRecordValue[metric.first] = metric.second;
}
Json::StreamWriterBuilder writer;
writer["indentation"] = "";
Expand Down
Loading

0 comments on commit eaeeb4a

Please sign in to comment.