From eaeeb4a6aed537b5241218f896c691f4190516da Mon Sep 17 00:00:00 2001 From: Takuka0311 <1914426213@qq.com> Date: Thu, 12 Sep 2024 14:26:39 +0800 Subject: [PATCH] change go exporter (#1727) * change go exporter * fix lint * delete agent-level funcs * fix lint * change name * fix bug * change cgo and add exit flush metrics * change prama name * fix bug * change params name * change space to \t --- core/file_server/event_handler/LogInput.cpp | 8 +- core/file_server/event_handler/LogInput.h | 4 +- core/file_server/polling/PollingDirFile.cpp | 12 +- core/file_server/polling/PollingDirFile.h | 6 +- core/file_server/polling/PollingModify.cpp | 4 +- core/file_server/polling/PollingModify.h | 2 +- core/go_pipeline/LogtailPlugin.cpp | 15 ++- core/go_pipeline/LogtailPlugin.h | 6 +- core/monitor/MetricConstants.cpp | 2 +- core/monitor/MetricConstants.h | 2 +- core/monitor/MetricExportor.cpp | 127 ++++++++++---------- core/monitor/MetricExportor.h | 16 +-- core/monitor/Monitor.cpp | 22 ++-- core/monitor/Monitor.h | 6 +- core/runner/LogProcess.cpp | 8 +- core/runner/LogProcess.h | 4 +- plugin_main/plugin_export.go | 6 +- pluginmanager/metric_export.go | 82 ++++++++++--- 18 files changed, 197 insertions(+), 135 deletions(-) diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index c48d77a3cf..a5ea529bec 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -88,8 +88,8 @@ void LogInput::Start() { mInteruptFlag = false; - mGlobalOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); - mGlobalRegisterHandlerTotal + mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); + mAgentRegisterHandlerTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_REGISTER_HANDLER_TOTAL); new Thread([this]() { ProcessLoop(); }); @@ -347,10 +347,10 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) { 1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime)); int32_t openFdTotal = GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize(); LogtailMonitor::GetInstance()->UpdateMetric("open_fd", openFdTotal); - mGlobalOpenFdTotal->Set(openFdTotal); + mAgentOpenFdTotal->Set(openFdTotal); size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount(); LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount); - mGlobalRegisterHandlerTotal->Set(handlerCount); + mAgentRegisterHandlerTotal->Set(handlerCount); LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount()); LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig()); mEventProcessCount = 0; diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index 3940030190..21217086a3 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -79,8 +79,8 @@ class LogInput : public LogRunnable { volatile bool mIdleFlag; int32_t mEventProcessCount; int32_t mLastUpdateMetricTime; - IntGaugePtr mGlobalOpenFdTotal; - IntGaugePtr mGlobalRegisterHandlerTotal; + IntGaugePtr mAgentOpenFdTotal; + IntGaugePtr mAgentRegisterHandlerTotal; std::atomic_int mLastReadEventTime{0}; mutable std::mutex mThreadRunningMux; diff --git a/core/file_server/polling/PollingDirFile.cpp b/core/file_server/polling/PollingDirFile.cpp index 4905bcd8d9..2bc809de7c 100644 --- a/core/file_server/polling/PollingDirFile.cpp +++ b/core/file_server/polling/PollingDirFile.cpp @@ -68,10 +68,10 @@ static const int64_t NANO_CONVERTING = 1000000000; void PollingDirFile::Start() { ClearCache(); - mGlobalConfigTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL); - mGlobalPollingDirCacheSizeTotal + mAgentConfigTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL); + mAgentPollingDirCacheSizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL); - mGlobalPollingFileCacheSizeTotal + mAgentPollingFileCacheSizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL); mRuningFlag = true; mThreadPtr = CreateThread([this]() { Polling(); }); @@ -152,15 +152,15 @@ void PollingDirFile::Polling() { size_t configTotal = nameConfigMap.size(); LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal); - mGlobalConfigTotal->Set(configTotal); + mAgentConfigTotal->Set(configTotal); { ScopedSpinLock lock(mCacheLock); size_t pollingDirCacheSize = mDirCacheMap.size(); LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize); - mGlobalPollingDirCacheSizeTotal->Set(pollingDirCacheSize); + mAgentPollingDirCacheSizeTotal->Set(pollingDirCacheSize); size_t pollingFileCacheSize = mFileCacheMap.size(); LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize); - mGlobalPollingFileCacheSizeTotal->Set(pollingFileCacheSize); + mAgentPollingFileCacheSizeTotal->Set(pollingFileCacheSize); } // Iterate all normal configs, make sure stat count will not exceed limit. diff --git a/core/file_server/polling/PollingDirFile.h b/core/file_server/polling/PollingDirFile.h index b9af8c8988..40a4ebfb8b 100644 --- a/core/file_server/polling/PollingDirFile.h +++ b/core/file_server/polling/PollingDirFile.h @@ -136,9 +136,9 @@ class PollingDirFile : public LogRunnable { // The sequence number of current round, uint64_t is used to avoid overflow. uint64_t mCurrentRound; - IntGaugePtr mGlobalConfigTotal; - IntGaugePtr mGlobalPollingDirCacheSizeTotal; - IntGaugePtr mGlobalPollingFileCacheSizeTotal; + IntGaugePtr mAgentConfigTotal; + IntGaugePtr mAgentPollingDirCacheSizeTotal; + IntGaugePtr mAgentPollingFileCacheSizeTotal; #ifdef APSARA_UNIT_TEST_MAIN friend class PollingUnittest; diff --git a/core/file_server/polling/PollingModify.cpp b/core/file_server/polling/PollingModify.cpp index 4b6e4c0995..6dac3afad4 100644 --- a/core/file_server/polling/PollingModify.cpp +++ b/core/file_server/polling/PollingModify.cpp @@ -47,7 +47,7 @@ PollingModify::~PollingModify() { void PollingModify::Start() { ClearCache(); - mGlobalPollingModifySizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL); + mAgentPollingModifySizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL); mRuningFlag = true; mThreadPtr = CreateThread([this]() { Polling(); }); @@ -251,7 +251,7 @@ void PollingModify::Polling() { int32_t statCount = 0; size_t pollingModifySizeTotal = mModifyCacheMap.size(); LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal); - mGlobalPollingModifySizeTotal->Set(pollingModifySizeTotal); + mAgentPollingModifySizeTotal->Set(pollingModifySizeTotal); for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) { if (!mRuningFlag || mHoldOnFlag) break; diff --git a/core/file_server/polling/PollingModify.h b/core/file_server/polling/PollingModify.h index 9be9827ffc..c82337a898 100644 --- a/core/file_server/polling/PollingModify.h +++ b/core/file_server/polling/PollingModify.h @@ -101,7 +101,7 @@ class PollingModify : public LogRunnable { ModifyCheckCacheMap mModifyCacheMap; - IntGaugePtr mGlobalPollingModifySizeTotal; + IntGaugePtr mAgentPollingModifySizeTotal; #ifdef APSARA_UNIT_TEST_MAIN friend class PollingUnittest; diff --git a/core/go_pipeline/LogtailPlugin.cpp b/core/go_pipeline/LogtailPlugin.cpp index 4549cde308..2be93a3d97 100644 --- a/core/go_pipeline/LogtailPlugin.cpp +++ b/core/go_pipeline/LogtailPlugin.cpp @@ -382,10 +382,10 @@ bool LogtailPlugin::LoadPluginBase() { LOG_ERROR(sLogger, ("load ProcessLogGroup error, Message", error)); return mPluginValid; } - // 获取golang插件部分统计信息 - mGetPipelineMetricsFun = (GetPipelineMetricsFun)loader.LoadMethod("GetPipelineMetrics", error); + // 获取golang部分指标信息 + mGetGoMetricsFun = (GetGoMetricsFun)loader.LoadMethod("GetGoMetrics", error); if (!error.empty()) { - LOG_ERROR(sLogger, ("load GetPipelineMetrics error, Message", error)); + LOG_ERROR(sLogger, ("load GetGoMetrics error, Message", error)); return mPluginValid; } @@ -470,9 +470,12 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName, } } -void LogtailPlugin::GetPipelineMetrics(std::vector>& metircsList) { - if (mGetPipelineMetricsFun != nullptr) { - auto metrics = mGetPipelineMetricsFun(); +void LogtailPlugin::GetGoMetrics(std::vector>& metircsList, const string& metricType) { + if (mGetGoMetricsFun != nullptr) { + GoString type; + type.n = metricType.size(); + type.p = metricType.c_str(); + auto metrics = mGetGoMetricsFun(type); if (metrics != nullptr) { for (int i = 0; i < metrics->count; ++i) { std::map item; diff --git a/core/go_pipeline/LogtailPlugin.h b/core/go_pipeline/LogtailPlugin.h index d59750e57e..5ed3d22242 100644 --- a/core/go_pipeline/LogtailPlugin.h +++ b/core/go_pipeline/LogtailPlugin.h @@ -143,7 +143,7 @@ typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg); typedef GoInt (*ProcessLogsFun)(GoString c, GoSlice l, GoString p, GoString t, GoSlice tags); typedef GoInt (*ProcessLogGroupFun)(GoString c, GoSlice l, GoString p); typedef struct innerContainerMeta* (*GetContainerMetaFun)(GoString containerID); -typedef InnerPluginMetrics* (*GetPipelineMetricsFun)(); +typedef InnerPluginMetrics* (*GetGoMetricsFun)(GoString metricType); // Methods export by adapter. typedef int (*IsValidToSendFun)(long long logstoreKey); @@ -264,7 +264,7 @@ class LogtailPlugin { K8sContainerMeta GetContainerMeta(const std::string& containerID); - void GetPipelineMetrics(std::vector>& metircsList); + void GetGoMetrics(std::vector>& metircsList, const std::string& metricType); private: void* mPluginBasePtr; @@ -284,7 +284,7 @@ class LogtailPlugin { ProcessLogsFun mProcessLogsFun; ProcessLogGroupFun mProcessLogGroupFun; GetContainerMetaFun mGetContainerMetaFun; - GetPipelineMetricsFun mGetPipelineMetricsFun; + GetGoMetricsFun mGetGoMetricsFun; // Configuration for plugin system in JSON format. Json::Value mPluginCfg; diff --git a/core/monitor/MetricConstants.cpp b/core/monitor/MetricConstants.cpp index 547cd26ed0..8284b8406a 100644 --- a/core/monitor/MetricConstants.cpp +++ b/core/monitor/MetricConstants.cpp @@ -33,7 +33,6 @@ const std::string METRIC_LABEL_INSTANCE_ID = "instance_id"; const std::string METRIC_LABEL_IP = "ip"; const std::string METRIC_LABEL_OS = "os"; const std::string METRIC_LABEL_OS_DETAIL = "os_detail"; -const std::string METRIC_LABEL_PROJECTS = "projects"; const std::string METRIC_LABEL_USER_DEFINED_ID = "user_defined_id"; const std::string METRIC_LABEL_UUID = "uuid"; const std::string METRIC_LABEL_VERSION = "version"; @@ -44,6 +43,7 @@ const std::string METRIC_AGENT_CPU = "agent_cpu_percent"; const std::string METRIC_AGENT_CPU_GO = "agent_go_cpu_percent"; const std::string METRIC_AGENT_MEMORY = "agent_memory_used_mb"; const std::string METRIC_AGENT_MEMORY_GO = "agent_go_memory_used_mb"; +const std::string METRIC_AGENT_GO_ROUTINES_TOTAL = "agent_go_routines_total"; const std::string METRIC_AGENT_OPEN_FD_TOTAL = "agent_open_fd_total"; const std::string METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL = "agent_polling_dir_cache_size_total"; const std::string METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL = "agent_polling_file_cache_size_total"; diff --git a/core/monitor/MetricConstants.h b/core/monitor/MetricConstants.h index 1cd5dc561d..32c6adb601 100644 --- a/core/monitor/MetricConstants.h +++ b/core/monitor/MetricConstants.h @@ -35,7 +35,6 @@ extern const std::string METRIC_LABEL_INSTANCE_ID; extern const std::string METRIC_LABEL_IP; extern const std::string METRIC_LABEL_OS; extern const std::string METRIC_LABEL_OS_DETAIL; -extern const std::string METRIC_LABEL_PROJECTS; extern const std::string METRIC_LABEL_USER_DEFINED_ID; extern const std::string METRIC_LABEL_UUID; extern const std::string METRIC_LABEL_VERSION; @@ -46,6 +45,7 @@ extern const std::string METRIC_AGENT_CPU; extern const std::string METRIC_AGENT_CPU_GO; extern const std::string METRIC_AGENT_MEMORY; extern const std::string METRIC_AGENT_MEMORY_GO; +extern const std::string METRIC_AGENT_GO_ROUTINES_TOTAL; extern const std::string METRIC_AGENT_OPEN_FD_TOTAL; extern const std::string METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL; extern const std::string METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL; diff --git a/core/monitor/MetricExportor.cpp b/core/monitor/MetricExportor.cpp index 153fe621ca..f8a1b10298 100644 --- a/core/monitor/MetricExportor.cpp +++ b/core/monitor/MetricExportor.cpp @@ -34,12 +34,13 @@ DECLARE_FLAG_STRING(metrics_report_method); namespace logtail { -const std::string agentLevelMetricKey = "metric-level"; -const std::string agentLevelMetricValue = "agent"; +const std::string METRIC_EXPORT_TYPE_GO = "direct"; +const std::string METRIC_EXPORT_TYPE_CPP = "cpp_provided"; MetricExportor::MetricExportor() : mSendInterval(60), mLastSendTime(time(NULL) - (rand() % (mSendInterval / 10)) * 10) { - // mGlobalCpuGo = LoongCollectorMonitor::GetInstance()->GetDoubleGauge(METRIC_AGENT_CPU_GO); - mGlobalMemGo = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_MEMORY_GO); + // mAgentCpuGo = LoongCollectorMonitor::GetInstance()->GetDoubleGauge(METRIC_AGENT_CPU_GO); + mAgentMemGo = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_MEMORY_GO); + mAgentGoRoutines = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL); } void MetricExportor::PushMetrics(bool forceSend) { @@ -72,37 +73,13 @@ void MetricExportor::PushCppMetrics() { } void MetricExportor::PushGoMetrics() { - std::vector> goMetircsList; - LogtailPlugin::GetInstance()->GetPipelineMetrics(goMetircsList); + std::vector> goDirectMetircsList; + LogtailPlugin::GetInstance()->GetGoMetrics(goDirectMetircsList, METRIC_EXPORT_TYPE_GO); + std::vector> goCppProvidedMetircsList; + LogtailPlugin::GetInstance()->GetGoMetrics(goCppProvidedMetircsList, METRIC_EXPORT_TYPE_CPP); - // filter agent or plugin level metrics - std::vector> goPluginMetircsList; - for (auto goMetrics : goMetircsList) { - if (goMetrics.find(agentLevelMetricKey) != goMetrics.end()) { - // Go agent-level metrics - if (goMetrics.at(agentLevelMetricKey) == agentLevelMetricValue) { - SendGoAgentLevelMetrics(goMetrics); - continue; - } - } else { - // Go plugin-level metrics - goPluginMetircsList.push_back(std::move(goMetrics)); - } - } - if (goPluginMetircsList.size() == 0) { - return; - } - - // send plugin-level metrics - if ("sls" == STRING_FLAG(metrics_report_method)) { - std::map goPluginMetircsLogGroupMap; - SerializeGoPluginMetricsListToLogGroupMap(goPluginMetircsList, goPluginMetircsLogGroupMap); - SendToSLS(goPluginMetircsLogGroupMap); - } else if ("file" == STRING_FLAG(metrics_report_method)) { - std::string goPluginMetircsContent; - SerializeGoPluginMetricsListToString(goPluginMetircsList, goPluginMetircsContent); - SendToLocalFile(goPluginMetircsContent, "self-metrics-go"); - } + PushGoCppProvidedMetrics(goCppProvidedMetircsList); + PushGoDirectMetrics(goDirectMetircsList); } void MetricExportor::SendToSLS(std::map& logGroupMap) { @@ -171,32 +148,56 @@ void MetricExportor::SendToLocalFile(std::string& metricsContent, const std::str } } -void MetricExportor::SendGoAgentLevelMetrics(std::map& metrics) { - for (auto metric : metrics) { - if (metric.first == agentLevelMetricKey) { - continue; - } - // if (metric.first == METRIC_AGENT_CPU_GO) { - // mGlobalCpuGo->Set(std::stod(metric.second)); - // } - if (metric.first == METRIC_AGENT_MEMORY_GO) { - mGlobalMemGo->Set(std::stoi(metric.second)); +// metrics from Go that are directly outputted +void MetricExportor::PushGoDirectMetrics(std::vector>& metricsList) { + if (metricsList.size() == 0) { + return; + } + + if ("sls" == STRING_FLAG(metrics_report_method)) { + std::map logGroupMap; + SerializeGoDirectMetricsListToLogGroupMap(metricsList, logGroupMap); + SendToSLS(logGroupMap); + } else if ("file" == STRING_FLAG(metrics_report_method)) { + std::string metricsContent; + SerializeGoDirectMetricsListToString(metricsList, metricsContent); + SendToLocalFile(metricsContent, "self-metrics-go"); + } +} + +// metrics from Go that are provided by cpp +void MetricExportor::PushGoCppProvidedMetrics(std::vector>& metricsList) { + if (metricsList.size() == 0) { + return; + } + + for (auto metrics : metricsList) { + for (auto metric : metrics) { + // if (metric.first == METRIC_AGENT_CPU_GO) { + // mAgentCpuGo->Set(std::stod(metric.second)); + // } + if (metric.first == METRIC_AGENT_MEMORY_GO) { + mAgentMemGo->Set(std::stoi(metric.second)); + } + if (metric.first == METRIC_AGENT_GO_ROUTINES_TOTAL) { + mAgentGoRoutines->Set(std::stoi(metric.second)); + } + LogtailMonitor::GetInstance()->UpdateMetric(metric.first, metric.second); } - LogtailMonitor::GetInstance()->UpdateMetric(metric.first, metric.second); } } -void MetricExportor::SerializeGoPluginMetricsListToLogGroupMap( - std::vector>& goPluginMetircsList, - std::map& goLogGroupMap) { - for (auto& item : goPluginMetircsList) { +void MetricExportor::SerializeGoDirectMetricsListToLogGroupMap( + std::vector>& metricsList, + std::map& logGroupMap) { + for (auto& metrics : metricsList) { std::string configName = ""; std::string region = METRIC_REGION_DEFAULT; { // get the config_name label - for (const auto& pair : item) { - if (pair.first == "label.config_name") { - configName = pair.second; + for (const auto& metric : metrics) { + if (metric.first == "label.config_name") { + configName = metric.second; break; } } @@ -213,37 +214,37 @@ void MetricExportor::SerializeGoPluginMetricsListToLogGroupMap( } } Log* logPtr = nullptr; - auto LogGroupIter = goLogGroupMap.find(region); - if (LogGroupIter != goLogGroupMap.end()) { + auto LogGroupIter = logGroupMap.find(region); + if (LogGroupIter != logGroupMap.end()) { sls_logs::LogGroup* logGroup = LogGroupIter->second; logPtr = logGroup->add_logs(); } else { sls_logs::LogGroup* logGroup = new sls_logs::LogGroup(); logPtr = logGroup->add_logs(); - goLogGroupMap.insert(std::pair(region, logGroup)); + logGroupMap.insert(std::pair(region, logGroup)); } auto now = GetCurrentLogtailTime(); SetLogTime(logPtr, AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec); - for (const auto& pair : item) { + for (const auto& metric : metrics) { Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key(pair.first); - contentPtr->set_value(pair.second); + contentPtr->set_key(metric.first); + contentPtr->set_value(metric.second); } } } -void MetricExportor::SerializeGoPluginMetricsListToString( - std::vector>& goPluginMetircsList, std::string& metricsContent) { +void MetricExportor::SerializeGoDirectMetricsListToString(std::vector>& metricsList, + std::string& metricsContent) { std::ostringstream oss; - for (auto& item : goPluginMetircsList) { + for (auto& metrics : metricsList) { Json::Value metricsRecordValue; auto now = GetCurrentLogtailTime(); metricsRecordValue["time"] = AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec; - for (const auto& pair : item) { - metricsRecordValue[pair.first] = pair.second; + for (const auto& metric : metrics) { + metricsRecordValue[metric.first] = metric.second; } Json::StreamWriterBuilder writer; writer["indentation"] = ""; diff --git a/core/monitor/MetricExportor.h b/core/monitor/MetricExportor.h index 254d539017..dfb524814a 100644 --- a/core/monitor/MetricExportor.h +++ b/core/monitor/MetricExportor.h @@ -39,19 +39,21 @@ class MetricExportor { // Send Methods void SendToSLS(std::map& logGroupMap); void SendToLocalFile(std::string& metricsContent, const std::string metricsFileNamePrefix); - void SendGoAgentLevelMetrics(std::map& metrics); - // inner methods - void SerializeGoPluginMetricsListToLogGroupMap(std::vector>& goPluginMetircsList, - std::map& goLogGroupMap); - void SerializeGoPluginMetricsListToString(std::vector>& goPluginMetircsList, + // go metrics + void PushGoDirectMetrics(std::vector>& metricsList); + void PushGoCppProvidedMetrics(std::vector>& metricsList); + void SerializeGoDirectMetricsListToLogGroupMap(std::vector>& metricsList, + std::map& logGroupMap); + void SerializeGoDirectMetricsListToString(std::vector>& metricsList, std::string& metricsContent); int32_t mSendInterval; int32_t mLastSendTime; // go process-level metrics - DoubleGaugePtr mGlobalCpuGo; - IntGaugePtr mGlobalMemGo; + DoubleGaugePtr mAgentCpuGo; + IntGaugePtr mAgentMemGo; + IntGaugePtr mAgentGoRoutines; }; } // namespace logtail \ No newline at end of file diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 2af97b0cf3..54cf59b0fb 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -33,12 +33,13 @@ #include "common/TimeUtil.h" #include "common/version.h" #include "file_server/event_handler/LogInput.h" -#include "plugin/flusher/sls/FlusherSLS.h" #include "go_pipeline/LogtailPlugin.h" -#include "protobuf/sls/sls_logs.pb.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" +#include "monitor/MetricExportor.h" +#include "plugin/flusher/sls/FlusherSLS.h" +#include "protobuf/sls/sls_logs.pb.h" #include "runner/FlusherRunner.h" #if defined(__linux__) && !defined(__ANDROID__) #include "ObserverManager.h" @@ -113,9 +114,9 @@ bool LogtailMonitor::Init() { #endif // init metrics - mGlobalCpuGauge = LoongCollectorMonitor::GetInstance()->GetDoubleGauge(METRIC_AGENT_CPU); - mGlobalMemoryGauge = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_MEMORY); - mGlobalUsedSendingConcurrency + mAgentCpuGauge = LoongCollectorMonitor::GetInstance()->GetDoubleGauge(METRIC_AGENT_CPU); + mAgentMemoryGauge = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_MEMORY); + mAgentUsedSendingConcurrency = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_USED_SENDING_CONCURRENCY); // Initialize monitor thread. @@ -177,6 +178,7 @@ void LogtailMonitor::Monitor() { lastCheckHardLimitTime = monitorTime; GetMemStat(); + CalCpuStat(curCpuStat, mCpuStat); if (CheckHardMemLimit()) { LOG_ERROR(sLogger, ("Resource used by program exceeds hard limit", @@ -266,14 +268,14 @@ bool LogtailMonitor::SendStatusProfile(bool suicide) { SetLogTime(logPtr, AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec); // CPU usage of Logtail process. AddLogContent(logPtr, "cpu", mCpuStat.mCpuUsage); - mGlobalCpuGauge->Set(mCpuStat.mCpuUsage); + mAgentCpuGauge->Set(mCpuStat.mCpuUsage); #if defined(__linux__) // TODO: Remove this if auto scale is available on Windows. // CPU usage of system. AddLogContent(logPtr, "os_cpu", mOsCpuStatForScale.mOsCpuUsage); #endif // Memory usage of Logtail process. AddLogContent(logPtr, "mem", mMemStat.mRss); - mGlobalMemoryGauge->Set(mMemStat.mRss); + mAgentMemoryGauge->Set(mMemStat.mRss); // The version, uuid of Logtail. AddLogContent(logPtr, "version", ILOGTAIL_VERSION); AddLogContent(logPtr, "uuid", Application::GetInstance()->GetUUID()); @@ -318,7 +320,7 @@ bool LogtailMonitor::SendStatusProfile(bool suicide) { } int32_t usedSendingConcurrency = FlusherRunner::GetInstance()->GetSendingBufferCount(); UpdateMetric("used_sending_concurrency", usedSendingConcurrency); - mGlobalUsedSendingConcurrency->Set(usedSendingConcurrency); + mAgentUsedSendingConcurrency->Set(usedSendingConcurrency); AddLogContent(logPtr, "metric_json", MetricToString()); AddLogContent(logPtr, "status", CheckLogtailStatus()); @@ -712,7 +714,7 @@ void LoongCollectorMonitor::Init() { labels.emplace_back(METRIC_LABEL_UUID, Application::GetInstance()->GetUUID()); labels.emplace_back(METRIC_LABEL_VERSION, ILOGTAIL_VERSION); DynamicMetricLabels dynamicLabels; - dynamicLabels.emplace_back(METRIC_LABEL_PROJECTS, []() -> std::string { return FlusherSLS::GetAllProjects(); }); + dynamicLabels.emplace_back(METRIC_LABEL_PROJECT, []() -> std::string { return FlusherSLS::GetAllProjects(); }); #ifdef __ENTERPRISE__ dynamicLabels.emplace_back(METRIC_LABEL_ALIUIDS, []() -> std::string { return EnterpriseConfigProvider::GetInstance()->GetAliuidSet(); }); @@ -727,6 +729,7 @@ void LoongCollectorMonitor::Init() { // mDoubleGauges[METRIC_AGENT_CPU_GO] = mMetricsRecordRef.CreateDoubleGauge(METRIC_AGENT_CPU_GO); mIntGauges[METRIC_AGENT_MEMORY] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_MEMORY); mIntGauges[METRIC_AGENT_MEMORY_GO] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_MEMORY_GO); + mIntGauges[METRIC_AGENT_GO_ROUTINES_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL); mIntGauges[METRIC_AGENT_OPEN_FD_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); mIntGauges[METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL); @@ -759,6 +762,7 @@ void LoongCollectorMonitor::Init() { } void LoongCollectorMonitor::Stop() { + MetricExportor::GetInstance()->PushMetrics(true); } CounterPtr LoongCollectorMonitor::GetCounter(std::string key) { diff --git a/core/monitor/Monitor.h b/core/monitor/Monitor.h index cb8744442c..d0cac3314e 100644 --- a/core/monitor/Monitor.h +++ b/core/monitor/Monitor.h @@ -161,12 +161,12 @@ class LogtailMonitor : public MetricStore { CpuStat mRealtimeCpuStat; // Use to calculate CPU limit, updated regularly (30s by default). CpuStat mCpuStat; - DoubleGaugePtr mGlobalCpuGauge; + DoubleGaugePtr mAgentCpuGauge; // Memory usage statistics. MemStat mMemStat; - IntGaugePtr mGlobalMemoryGauge; + IntGaugePtr mAgentMemoryGauge; - IntGaugePtr mGlobalUsedSendingConcurrency; + IntGaugePtr mAgentUsedSendingConcurrency; // Current scale up level, updated by CheckScaledCpuUsageUpLimit. float mScaledCpuUsageUpLimit; diff --git a/core/runner/LogProcess.cpp b/core/runner/LogProcess.cpp index 08b4dfb47c..cd65998408 100644 --- a/core/runner/LogProcess.cpp +++ b/core/runner/LogProcess.cpp @@ -56,8 +56,8 @@ LogProcess::~LogProcess() { void LogProcess::Start() { if (mInitialized) return; - mGlobalProcessQueueFullTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL); - mGlobalProcessQueueTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_TOTAL); + mAgentProcessQueueFullTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL); + mAgentProcessQueueTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_TOTAL); mInitialized = true; mThreadCount = AppConfig::GetInstance()->GetProcessThreadCount(); @@ -168,10 +168,10 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { // update process queue status uint32_t InvalidProcessQueueTotal = ProcessQueueManager::GetInstance()->GetInvalidCnt(); sMonitor->UpdateMetric("process_queue_full", InvalidProcessQueueTotal); - mGlobalProcessQueueFullTotal->Set(InvalidProcessQueueTotal); + mAgentProcessQueueFullTotal->Set(InvalidProcessQueueTotal); uint32_t ProcessQueueTotal = ProcessQueueManager::GetInstance()->GetCnt(); sMonitor->UpdateMetric("process_queue_total", ProcessQueueTotal); - mGlobalProcessQueueTotal->Set(ProcessQueueTotal); + mAgentProcessQueueTotal->Set(ProcessQueueTotal); if (ExactlyOnceQueueManager::GetInstance()->GetProcessQueueCnt() > 0) { sMonitor->UpdateMetric("eo_process_queue_full", ExactlyOnceQueueManager::GetInstance()->GetInvalidProcessQueueCnt()); diff --git a/core/runner/LogProcess.h b/core/runner/LogProcess.h index c5ddf02c67..c0bd77e9ad 100644 --- a/core/runner/LogProcess.h +++ b/core/runner/LogProcess.h @@ -60,8 +60,8 @@ class LogProcess : public LogRunnable { std::atomic_bool* mThreadFlags; ReadWriteLock mAccessProcessThreadRWL; - IntGaugePtr mGlobalProcessQueueFullTotal; - IntGaugePtr mGlobalProcessQueueTotal; + IntGaugePtr mAgentProcessQueueFullTotal; + IntGaugePtr mAgentProcessQueueTotal; }; } // namespace logtail \ No newline at end of file diff --git a/plugin_main/plugin_export.go b/plugin_main/plugin_export.go index 49a76f4b5e..66350d8973 100644 --- a/plugin_main/plugin_export.go +++ b/plugin_main/plugin_export.go @@ -296,9 +296,9 @@ func GetContainerMeta(containerID string) *C.struct_containerMeta { return returnStruct } -//export GetPipelineMetrics -func GetPipelineMetrics() *C.PluginMetrics { - results := pluginmanager.GetMetrics() +//export GetGoMetrics +func GetGoMetrics(metricType string) *C.PluginMetrics { + results := pluginmanager.GetMetrics(metricType) // 统计所有键值对的总数,用于分配内存 numMetrics := len(results) diff --git a/pluginmanager/metric_export.go b/pluginmanager/metric_export.go index c5a9b2fad7..43bfbd61fa 100644 --- a/pluginmanager/metric_export.go +++ b/pluginmanager/metric_export.go @@ -19,18 +19,68 @@ import ( "strings" ) -func GetMetrics() []map[string]string { +const ( + MetricExportTypeGo = "direct" + MetricExportTypeCpp = "cpp_provided" +) + +func GetMetrics(metricType string) []map[string]string { + if metricType == MetricExportTypeGo { + return GetGoDirectMetrics() + } + if metricType == MetricExportTypeCpp { + return GetGoCppProvidedMetrics() + } + return []map[string]string{} +} + +// 直接输出的go指标,例如go插件指标 +// +// []map[string]string{ +// { +// "label.plugin_name": "processor_test", +// "value.proc_in_records_total": "100", +// }, +// { +// "label.plugin_name": "flusher_stdout", +// "value.flusher_in_records_total": "100", +// }, +// } +func GetGoDirectMetrics() []map[string]string { + metrics := make([]map[string]string, 0) + // go plugin metrics + metrics = append(metrics, GetGoPluginMetrics()...) + return metrics +} + +// 由C++定义的指标,go把值传过去,例如go的进程级指标 +// +// []map[string]string{ +// { +// "agent_go_memory_used_mb": "100", +// "agent_go_routines_total": "20" +// } +// } +func GetGoCppProvidedMetrics() []map[string]string { + metrics := make([]map[string]string, 0) + // agent-level metrics + metrics = append(metrics, GetAgentStat()...) + return metrics +} + +// go 插件指标,直接输出 +func GetGoPluginMetrics() []map[string]string { metrics := make([]map[string]string, 0) for _, config := range LogtailConfig { metrics = append(metrics, config.Context.ExportMetricRecords()...) } - metrics = append(metrics, GetAgentStat()) return metrics } -func GetAgentStat() map[string]string { - recrods := map[string]string{} - recrods["metric-level"] = "agent" +// go 进程级指标,由C++部分注册 +func GetAgentStat() []map[string]string { + metrics := []map[string]string{} + metric := map[string]string{} // key is the metric key in runtime/metrics, value is agent's metric key metricNames := map[string]string{ // cpu @@ -50,20 +100,22 @@ func GetAgentStat() map[string]string { // push results to recrods for _, sample := range samples { - recordName := metricNames[sample.Name] - recordValue := sample.Value - recordValueString := "" - switch recordValue.Kind() { + key := metricNames[sample.Name] + value := sample.Value + valueStr := "" + switch value.Kind() { case goruntimemetrics.KindUint64: - if strings.HasSuffix(recordName, "_mb") { - recordValueString = strconv.FormatUint(recordValue.Uint64()/1024/1024, 10) + if strings.HasSuffix(key, "_mb") { + valueStr = strconv.FormatUint(value.Uint64()/1024/1024, 10) } else { - recordValueString = strconv.FormatUint(recordValue.Uint64(), 10) + valueStr = strconv.FormatUint(value.Uint64(), 10) } case goruntimemetrics.KindFloat64: - recordValueString = strconv.FormatFloat(recordValue.Float64(), 'g', -1, 64) + valueStr = strconv.FormatFloat(value.Float64(), 'g', -1, 64) } - recrods[recordName] = recordValueString + metric[key] = valueStr } - return recrods + + metrics = append(metrics, metric) + return metrics }