From 02e1a409e535eda92c9da47d039383140180052f Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Tue, 24 Sep 2024 14:26:34 +0800 Subject: [PATCH 1/2] add metric for runner (#1766) --- core/common/compression/Compressor.cpp | 14 +-- core/file_server/FileServer.cpp | 9 +- core/file_server/FileServer.h | 9 +- core/file_server/event_handler/LogInput.cpp | 22 +++-- core/file_server/event_handler/LogInput.h | 6 +- core/file_server/polling/PollingDirFile.cpp | 19 ++-- core/file_server/polling/PollingDirFile.h | 4 +- core/file_server/polling/PollingModify.cpp | 7 +- core/file_server/polling/PollingModify.h | 2 +- core/monitor/LogtailMetric.cpp | 27 +----- core/monitor/LogtailMetric.h | 10 +- core/monitor/MetricConstants.cpp | 96 +++++++++++++++---- core/monitor/MetricConstants.h | 94 ++++++++++++++---- core/monitor/Monitor.cpp | 19 ---- core/monitor/Monitor.h | 2 - core/pipeline/Pipeline.cpp | 13 +-- core/pipeline/batch/Batcher.h | 16 ++-- .../plugin/instance/FlusherInstance.cpp | 4 +- core/pipeline/plugin/interface/Plugin.h | 20 ++-- core/pipeline/queue/BoundedQueueInterface.h | 2 +- .../queue/BoundedSenderQueueInterface.cpp | 4 +- core/pipeline/queue/CircularProcessQueue.cpp | 4 +- core/pipeline/queue/CircularProcessQueue.h | 2 +- core/pipeline/queue/QueueInterface.h | 12 +-- core/pipeline/route/Router.cpp | 4 +- core/pipeline/serializer/Serializer.h | 14 +-- core/runner/FlusherRunner.cpp | 22 +++++ core/runner/FlusherRunner.h | 10 ++ core/runner/LogProcess.cpp | 65 +++++-------- core/runner/LogProcess.h | 8 +- core/runner/sink/http/HttpSink.cpp | 44 +++++++-- core/runner/sink/http/HttpSink.h | 10 ++ core/unittest/pipeline/PipelineUnittest.cpp | 6 +- .../queue/CircularProcessQueueUnittest.cpp | 2 +- 34 files changed, 372 insertions(+), 230 deletions(-) diff --git a/core/common/compression/Compressor.cpp b/core/common/compression/Compressor.cpp index 39a7af4b57..7a6d95951e 100644 --- a/core/common/compression/Compressor.cpp +++ b/core/common/compression/Compressor.cpp @@ -25,13 +25,13 @@ namespace logtail { void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) { WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, std::move(labels), std::move(dynamicLabels)); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); - mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); - mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES); - mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt"); - mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_size_bytes"); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT); + mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT); + mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT); + mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES); } bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) { diff --git a/core/file_server/FileServer.cpp b/core/file_server/FileServer.cpp index 49b1953b72..0e03c3f80d 100644 --- a/core/file_server/FileServer.cpp +++ b/core/file_server/FileServer.cpp @@ -18,12 +18,12 @@ #include "common/Flags.h" #include "common/StringTools.h" #include "common/TimeUtil.h" +#include "file_server/ConfigManager.h" #include "file_server/EventDispatcher.h" #include "file_server/event_handler/LogInput.h" -#include "file_server/ConfigManager.h" -#include "plugin/input/InputFile.h" #include "file_server/polling/PollingDirFile.h" #include "file_server/polling/PollingModify.h" +#include "plugin/input/InputFile.h" DEFINE_FLAG_BOOL(enable_polling_discovery, "", true); @@ -31,6 +31,11 @@ using namespace std; namespace logtail { +FileServer::FileServer() { + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, + {{METRIC_LABEL_KEY_RUNNER_NAME, "file_server"}}); +} + // 启动文件服务,包括加载配置、处理检查点、注册事件等 void FileServer::Start() { ConfigManager::GetInstance()->LoadDockerConfig(); diff --git a/core/file_server/FileServer.h b/core/file_server/FileServer.h index 7daee73fc5..d77b6a13a1 100644 --- a/core/file_server/FileServer.h +++ b/core/file_server/FileServer.h @@ -23,9 +23,11 @@ #include "common/Lock.h" #include "file_server/FileDiscoveryOptions.h" #include "file_server/MultilineOptions.h" +#include "file_server/reader/FileReaderOptions.h" +#include "monitor/LogtailMetric.h" #include "monitor/PluginMetricManager.h" #include "pipeline/PipelineContext.h" -#include "file_server/reader/FileReaderOptions.h" + namespace logtail { @@ -78,6 +80,7 @@ class FileServer { // for reader, event_handler ... ReentrantMetricsRecordRef GetOrCreateReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels); void ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels); + MetricsRecordRef& GetMetricsRecordRef() { return mMetricsRecordRef; } // 过渡使用 void Resume(bool isConfigUpdate = true); @@ -88,7 +91,7 @@ class FileServer { void RemoveExactlyOnceConcurrency(const std::string& name); private: - FileServer() = default; + FileServer(); ~FileServer() = default; void PauseInner(); @@ -102,6 +105,8 @@ class FileServer { std::unordered_map mPipelineNamePluginMetricManagersMap; // 过渡使用 std::unordered_map mPipelineNameEOConcurrencyMap; + + mutable MetricsRecordRef mMetricsRecordRef; }; } // namespace logtail diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index a5ea529bec..96a3ac31af 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -16,8 +16,6 @@ #include -#include "file_server/event_handler/EventHandler.h" -#include "file_server/event_handler/HistoryFileImporter.h" #include "app_config/AppConfig.h" #include "application/Application.h" #include "checkpoint/CheckPointManager.h" @@ -27,18 +25,20 @@ #include "common/RuntimeUtil.h" #include "common/StringTools.h" #include "common/TimeUtil.h" +#include "file_server/ConfigManager.h" #include "file_server/EventDispatcher.h" #include "file_server/event/BlockEventManager.h" -#include "file_server/ConfigManager.h" -#include "logger/Logger.h" -#include "monitor/LogtailAlarm.h" -#include "monitor/Monitor.h" +#include "file_server/event_handler/EventHandler.h" +#include "file_server/event_handler/HistoryFileImporter.h" #include "file_server/polling/PollingCache.h" #include "file_server/polling/PollingDirFile.h" #include "file_server/polling/PollingEventQueue.h" #include "file_server/polling/PollingModify.h" #include "file_server/reader/GloablFileDescriptorManager.h" #include "file_server/reader/LogFileReader.h" +#include "logger/Logger.h" +#include "monitor/LogtailAlarm.h" +#include "monitor/Monitor.h" #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" #endif @@ -88,9 +88,11 @@ void LogInput::Start() { mInteruptFlag = false; + mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); - mAgentRegisterHandlerTotal - = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_REGISTER_HANDLER_TOTAL); + mRegisterdHandlersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_CNT); + mActiveReadersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_CNT); + mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); new Thread([this]() { ProcessLoop(); }); } @@ -342,6 +344,7 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) { void LogInput::UpdateCriticalMetric(int32_t curTime) { LogtailMonitor::GetInstance()->UpdateMetric("last_read_event_time", GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S")); + mLastRunTime->Set(mLastReadEventTime.load()); LogtailMonitor::GetInstance()->UpdateMetric("event_tps", 1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime)); @@ -350,8 +353,9 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) { mAgentOpenFdTotal->Set(openFdTotal); size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount(); LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount); - mAgentRegisterHandlerTotal->Set(handlerCount); + mRegisterdHandlersCnt->Set(handlerCount); LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount()); + mActiveReadersCnt->Set(CheckPointManager::Instance()->GetReaderCount()); LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig()); mEventProcessCount = 0; } diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index 21217086a3..69f2ffb077 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -79,8 +79,12 @@ class LogInput : public LogRunnable { volatile bool mIdleFlag; int32_t mEventProcessCount; int32_t mLastUpdateMetricTime; + + IntGaugePtr mLastRunTime; IntGaugePtr mAgentOpenFdTotal; - IntGaugePtr mAgentRegisterHandlerTotal; + IntGaugePtr mRegisterdHandlersCnt; + IntGaugePtr mActiveReadersCnt; + IntGaugePtr mEnableFileIncludedByMultiConfigs; std::atomic_int mLastReadEventTime{0}; mutable std::mutex mThreadRunningMux; diff --git a/core/file_server/polling/PollingDirFile.cpp b/core/file_server/polling/PollingDirFile.cpp index 2bc809de7c..0a578f5ce8 100644 --- a/core/file_server/polling/PollingDirFile.cpp +++ b/core/file_server/polling/PollingDirFile.cpp @@ -21,8 +21,6 @@ #endif #include -#include "file_server/polling/PollingEventQueue.h" -#include "file_server/polling/PollingModify.h" #include "app_config/AppConfig.h" #include "common/ErrorUtil.h" #include "common/FileSystemUtil.h" @@ -30,10 +28,13 @@ #include "common/StringTools.h" #include "common/TimeUtil.h" #include "file_server/ConfigManager.h" -#include "file_server/event/Event.h" #include "file_server/FileServer.h" +#include "file_server/event/Event.h" +#include "file_server/polling/PollingEventQueue.h" +#include "file_server/polling/PollingModify.h" #include "logger/Logger.h" #include "monitor/LogtailAlarm.h" +#include "monitor/MetricConstants.h" // Control the check frequency to call ClearUnavailableFileAndDir. DEFINE_FLAG_INT32(check_not_exist_file_dir_round, "clear not exist file dir cache, round", 20); @@ -69,10 +70,10 @@ static const int64_t NANO_CONVERTING = 1000000000; void PollingDirFile::Start() { ClearCache(); mAgentConfigTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL); - mAgentPollingDirCacheSizeTotal - = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL); - mAgentPollingFileCacheSizeTotal - = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL); + mPollingDirCacheSize + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_DIR_CACHE_SIZE); + mPollingFileCacheSize + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_FILE_CACHE_SIZE); mRuningFlag = true; mThreadPtr = CreateThread([this]() { Polling(); }); } @@ -157,10 +158,10 @@ void PollingDirFile::Polling() { ScopedSpinLock lock(mCacheLock); size_t pollingDirCacheSize = mDirCacheMap.size(); LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize); - mAgentPollingDirCacheSizeTotal->Set(pollingDirCacheSize); + mPollingDirCacheSize->Set(pollingDirCacheSize); size_t pollingFileCacheSize = mFileCacheMap.size(); LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize); - mAgentPollingFileCacheSizeTotal->Set(pollingFileCacheSize); + mPollingFileCacheSize->Set(pollingFileCacheSize); } // Iterate all normal configs, make sure stat count will not exceed limit. diff --git a/core/file_server/polling/PollingDirFile.h b/core/file_server/polling/PollingDirFile.h index 40a4ebfb8b..3e381f80fa 100644 --- a/core/file_server/polling/PollingDirFile.h +++ b/core/file_server/polling/PollingDirFile.h @@ -137,8 +137,8 @@ class PollingDirFile : public LogRunnable { uint64_t mCurrentRound; IntGaugePtr mAgentConfigTotal; - IntGaugePtr mAgentPollingDirCacheSizeTotal; - IntGaugePtr mAgentPollingFileCacheSizeTotal; + IntGaugePtr mPollingDirCacheSize; + IntGaugePtr mPollingFileCacheSize; #ifdef APSARA_UNIT_TEST_MAIN friend class PollingUnittest; diff --git a/core/file_server/polling/PollingModify.cpp b/core/file_server/polling/PollingModify.cpp index 6dac3afad4..73ff76b3cd 100644 --- a/core/file_server/polling/PollingModify.cpp +++ b/core/file_server/polling/PollingModify.cpp @@ -24,9 +24,11 @@ #include "common/Flags.h" #include "common/StringTools.h" #include "common/TimeUtil.h" +#include "file_server/FileServer.h" #include "file_server/event/Event.h" #include "logger/Logger.h" #include "monitor/LogtailAlarm.h" +#include "monitor/MetricConstants.h" using namespace std; @@ -47,7 +49,8 @@ PollingModify::~PollingModify() { void PollingModify::Start() { ClearCache(); - mAgentPollingModifySizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL); + mPollingModifySize + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_MODIFY_CACHE_SIZE); mRuningFlag = true; mThreadPtr = CreateThread([this]() { Polling(); }); @@ -251,7 +254,7 @@ void PollingModify::Polling() { int32_t statCount = 0; size_t pollingModifySizeTotal = mModifyCacheMap.size(); LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal); - mAgentPollingModifySizeTotal->Set(pollingModifySizeTotal); + mPollingModifySize->Set(pollingModifySizeTotal); for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) { if (!mRuningFlag || mHoldOnFlag) break; diff --git a/core/file_server/polling/PollingModify.h b/core/file_server/polling/PollingModify.h index c82337a898..5e3738a92a 100644 --- a/core/file_server/polling/PollingModify.h +++ b/core/file_server/polling/PollingModify.h @@ -101,7 +101,7 @@ class PollingModify : public LogRunnable { ModifyCheckCacheMap mModifyCacheMap; - IntGaugePtr mAgentPollingModifySizeTotal; + IntGaugePtr mPollingModifySize; #ifdef APSARA_UNIT_TEST_MAIN friend class PollingUnittest; diff --git a/core/monitor/LogtailMetric.cpp b/core/monitor/LogtailMetric.cpp index d6ddc5e768..6b26300d8c 100644 --- a/core/monitor/LogtailMetric.cpp +++ b/core/monitor/LogtailMetric.cpp @@ -205,34 +205,11 @@ WriteMetrics::~WriteMetrics() { Clear(); } -void WriteMetrics::PreparePluginCommonLabels(const std::string& projectName, - const std::string& logstoreName, - const std::string& region, - const std::string& configName, - const std::string& pluginType, - const std::string& pluginID, - const std::string& nodeID, - const std::string& childNodeID, - MetricLabels& labels) { - labels.emplace_back(std::make_pair(METRIC_LABEL_PROJECT, projectName)); - labels.emplace_back(std::make_pair(METRIC_LABEL_LOGSTORE, logstoreName)); - labels.emplace_back(std::make_pair(METRIC_LABEL_REGION, region)); - labels.emplace_back(std::make_pair(METRIC_LABEL_CONFIG_NAME, configName)); - labels.emplace_back(std::make_pair(METRIC_LABEL_PLUGIN_NAME, pluginType)); - labels.emplace_back(std::make_pair(METRIC_LABEL_PLUGIN_ID, pluginID)); - labels.emplace_back(std::make_pair(METRIC_LABEL_NODE_ID, nodeID)); - labels.emplace_back(std::make_pair(METRIC_LABEL_CHILD_NODE_ID, childNodeID)); -} - void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) { - MetricsRecord* cur = new MetricsRecord(std::make_shared(labels), - std::make_shared(dynamicLabels)); - ref.SetMetricsRecord(cur); - std::lock_guard lock(mMutex); - cur->SetNext(mHead); - mHead = cur; + CreateMetricsRecordRef(ref, std::move(labels), std::move(dynamicLabels)); + CommitMetricsRecordRef(ref); } void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref, diff --git a/core/monitor/LogtailMetric.h b/core/monitor/LogtailMetric.h index 6c98081b2a..6f4ef9e8ef 100644 --- a/core/monitor/LogtailMetric.h +++ b/core/monitor/LogtailMetric.h @@ -133,15 +133,7 @@ class WriteMetrics { static WriteMetrics* ptr = new WriteMetrics(); return ptr; } - void PreparePluginCommonLabels(const std::string& projectName, - const std::string& logstoreName, - const std::string& region, - const std::string& configName, - const std::string& pluginType, - const std::string& pluginID, - const std::string& nodeID, - const std::string& childNodeID, - MetricLabels& labels); + void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {}); void CreateMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {}); diff --git a/core/monitor/MetricConstants.cpp b/core/monitor/MetricConstants.cpp index 24212c9d3c..e0b43e87e3 100644 --- a/core/monitor/MetricConstants.cpp +++ b/core/monitor/MetricConstants.cpp @@ -45,21 +45,12 @@ const std::string METRIC_AGENT_MEMORY = "agent_memory_used_mb"; const std::string METRIC_AGENT_MEMORY_GO = "agent_go_memory_used_mb"; const std::string METRIC_AGENT_GO_ROUTINES_TOTAL = "agent_go_routines_total"; const std::string METRIC_AGENT_OPEN_FD_TOTAL = "agent_open_fd_total"; -const std::string METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL = "agent_polling_dir_cache_size_total"; -const std::string METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL = "agent_polling_file_cache_size_total"; -const std::string METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL = "agent_polling_modify_size_total"; -const std::string METRIC_AGENT_REGISTER_HANDLER_TOTAL = "agent_register_handler_total"; const std::string METRIC_AGENT_INSTANCE_CONFIG_TOTAL = "agent_instance_config_total"; const std::string METRIC_AGENT_PIPELINE_CONFIG_TOTAL = "agent_pipeline_config_total"; const std::string METRIC_AGENT_ENV_PIPELINE_CONFIG_TOTAL = "agent_env_pipeline_config_total"; const std::string METRIC_AGENT_CRD_PIPELINE_CONFIG_TOTAL = "agent_crd_pipeline_config_total"; const std::string METRIC_AGENT_CONSOLE_PIPELINE_CONFIG_TOTAL = "agent_console_pipeline_config_total"; const std::string METRIC_AGENT_PLUGIN_TOTAL = "agent_plugin_total"; -const std::string METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL = "agent_process_queue_full_total"; -const std::string METRIC_AGENT_PROCESS_QUEUE_TOTAL = "agent_process_queue_total"; -const std::string METRIC_AGENT_SEND_QUEUE_FULL_TOTAL = "agent_send_queue_full_total"; -const std::string METRIC_AGENT_SEND_QUEUE_TOTAL = "agent_send_queue_total"; -const std::string METRIC_AGENT_USED_SENDING_CONCURRENCY = "agent_used_sending_concurrency"; // common plugin labels const std::string METRIC_LABEL_PROJECT = "project"; @@ -72,6 +63,7 @@ const std::string METRIC_LABEL_NODE_ID = "node_id"; const std::string METRIC_LABEL_CHILD_NODE_ID = "child_node_id"; const std::string METRIC_LABEL_KEY_COMPONENT_NAME = "component_name"; +const std::string METRIC_LABEL_KEY_RUNNER_NAME = "runner_name"; const std::string METRIC_LABEL_KEY_QUEUE_TYPE = "queue_type"; const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG = "is_exactly_once"; const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID = "flusher_node_id"; @@ -149,14 +141,82 @@ const std::string METRIC_FLUSHER_QUOTA_ERROR_TOTAL = "flusher_quota_error_total" const std::string METRIC_FLUSHER_RETRIES_TOTAL = "flusher_retries_total"; const std::string METRIC_FLUSHER_RETRIES_ERROR_TOTAL = "flusher_retries_error_total"; -const std::string METRIC_IN_EVENTS_CNT = "in_events_cnt"; -const std::string METRIC_IN_ITEMS_CNT = "in_items_cnt"; -const std::string METRIC_IN_EVENT_GROUP_SIZE_BYTES = "in_event_group_data_size_bytes"; -const std::string METRIC_IN_ITEM_SIZE_BYTES = "in_item_data_size_bytes"; -const std::string METRIC_OUT_EVENTS_CNT = "out_events_cnt"; -const std::string METRIC_OUT_ITEMS_CNT = "out_items_cnt"; -const std::string METRIC_OUT_EVENT_GROUP_SIZE_BYTES = "out_event_group_data_size_bytes"; -const std::string METRIC_OUT_ITEM_SIZE_BYTES = "out_item_data_size_bytes"; -const std::string METRIC_TOTAL_DELAY_MS = "total_delay_ms"; +////////////////////////////////////////////////////////////////////////// +// plugin +////////////////////////////////////////////////////////////////////////// +// common metrics +const std::string METRIC_PLUGIN_IN_EVENTS_CNT = "plugin_in_events_total"; +const std::string METRIC_PLUGIN_IN_EVENT_GROUP_SIZE_BYTES = "plugin_in_event_group_size_bytes"; + +////////////////////////////////////////////////////////////////////////// +// component +////////////////////////////////////////////////////////////////////////// +// common metrics +const std::string METRIC_COMPONENT_IN_EVENTS_CNT = "component_in_events_total"; +const std::string METRIC_COMPONENT_IN_ITEMS_CNT = "component_in_items_total"; +const std::string METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES = "component_in_event_group_size_bytes"; +const std::string METRIC_COMPONENT_IN_ITEM_SIZE_BYTES = "component_in_item_size_bytes"; +const std::string METRIC_COMPONENT_OUT_EVENTS_CNT = "component_out_events_total"; +const std::string METRIC_COMPONENT_OUT_ITEMS_CNT = "component_out_items_total"; +const std::string METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES = "component_out_item_size_bytes"; +const std::string METRIC_COMPONENT_TOTAL_DELAY_MS = "component_total_delay_ms"; +const std::string METRIC_COMPONENT_DISCARDED_ITEMS_CNT = "component_discarded_items_total"; +const std::string METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES = "component_discarded_item_size_bytes"; + +// batcher metrics +const std::string METRIC_COMPONENT_BATCHER_EVENT_BATCHES_CNT = "component_event_batches_total"; +const std::string METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_CNT = "component_buffered_groups_total"; +const std::string METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_CNT = "component_buffered_events_total"; +const std::string METRIC_COMPONENT_BATCHER_BUFFERED_SIZE_BYTES = "component_buffered_size_bytes"; + +// queue metrics +const std::string METRIC_COMPONENT_QUEUE_SIZE_CNT = "component_queue_size"; +const std::string METRIC_COMPONENT_QUEUE_SIZE_BYTES = "component_queue_size_bytes"; +const std::string METRIC_COMPONENT_QUEUE_VALID_TO_PUSH_FLAG = "component_valid_to_push"; +const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE = "component_extra_buffer_size"; +const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES = "component_extra_buffer_size"; +const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_CNT = "component_discarded_events_total"; + +////////////////////////////////////////////////////////////////////////// +// pipeline +////////////////////////////////////////////////////////////////////////// +const std::string METRIC_PIPELINE_START_TIME = "pipeline_start_time"; +const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_CNT = "pipeline_processors_in_events_total"; +const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_CNT = "pipeline_processors_in_event_groups_total"; +const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUP_SIZE_BYTES + = "pipeline_processors_in_event_group_size_bytes"; +const std::string METRIC_PIPELINE_PROCESSORS_TOTAL_DELAY_MS = "pipeline_processors_total_delay_ms"; + +////////////////////////////////////////////////////////////////////////// +// runner +////////////////////////////////////////////////////////////////////////// +// common metrics +const std::string METRIC_RUNNER_IN_EVENTS_CNT = "runner_in_events_total"; +const std::string METRIC_RUNNER_IN_EVENT_GROUPS_CNT = "runner_in_event_groups_total"; +const std::string METRIC_RUNNER_IN_ITEMS_CNT = "runner_in_items_total"; +const std::string METRIC_RUNNER_IN_EVENT_GROUP_SIZE_BYTES = "runner_in_event_group_size_bytes"; +const std::string METRIC_RUNNER_IN_ITEM_SIZE_BYTES = "runner_in_item_size_bytes"; +const std::string METRIC_RUNNER_OUT_ITEMS_CNT = "runner_out_items_total"; +const std::string METRIC_RUNNER_TOTAL_DELAY_MS = "runner_total_delay_ms"; +const std::string METRIC_RUNNER_LAST_RUN_TIME = "runner_last_run_time"; + +// http sink metrics +const std::string METRIC_RUNNER_HTTP_SINK_OUT_SUCCESSFUL_ITEMS_CNT = "runner_out_successful_items_total"; +const std::string METRIC_RUNNER_HTTP_SINK_OUT_FAILED_ITEMS_CNT = "runner_out_failed_items_total"; +const std::string METRIC_RUNNER_HTTP_SINK_SENDING_ITEMS_CNT = "runner_sending_items_total"; +const std::string METRIC_RUNNER_HTTP_SINK_SEND_CONCURRENCY = "runner_send_concurrency"; + +// flusher runner metrics +const std::string METRIC_RUNNER_FLUSHER_IN_ITEM_RAW_SIZE_BYTES = "runner_in_item_raw_size_bytes"; +const std::string METRIC_RUNNER_FLUSHER_WAITING_ITEMS_CNT = "runner_waiting_items_total"; + +// file server metrics +const std::string METRIC_RUNNER_FILE_WATCHED_DIRS_CNT = "runner_watched_dirs_total"; +const std::string METRIC_RUNNER_FILE_ACTIVE_READERS_CNT = "runner_active_readers_total"; +const std::string METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG + = "runner_enable_file_included_by_multi_configs"; +const std::string METRIC_RUNNER_FILE_POLLING_MODIFY_CACHE_SIZE = "runner_polling_modify_cache_size"; +const std::string METRIC_RUNNER_FILE_POLLING_DIR_CACHE_SIZE = "runner_polling_dir_cache_size"; +const std::string METRIC_RUNNER_FILE_POLLING_FILE_CACHE_SIZE = "runner_polling_file_cache_size"; } // namespace logtail diff --git a/core/monitor/MetricConstants.h b/core/monitor/MetricConstants.h index 3b78a5eae0..cdf82bc734 100644 --- a/core/monitor/MetricConstants.h +++ b/core/monitor/MetricConstants.h @@ -47,21 +47,12 @@ extern const std::string METRIC_AGENT_MEMORY; extern const std::string METRIC_AGENT_MEMORY_GO; extern const std::string METRIC_AGENT_GO_ROUTINES_TOTAL; extern const std::string METRIC_AGENT_OPEN_FD_TOTAL; -extern const std::string METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL; -extern const std::string METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL; -extern const std::string METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL; -extern const std::string METRIC_AGENT_REGISTER_HANDLER_TOTAL; extern const std::string METRIC_AGENT_INSTANCE_CONFIG_TOTAL; extern const std::string METRIC_AGENT_PIPELINE_CONFIG_TOTAL; extern const std::string METRIC_AGENT_ENV_PIPELINE_CONFIG_TOTAL; extern const std::string METRIC_AGENT_CRD_PIPELINE_CONFIG_TOTAL; extern const std::string METRIC_AGENT_CONSOLE_PIPELINE_CONFIG_TOTAL; extern const std::string METRIC_AGENT_PLUGIN_TOTAL; -extern const std::string METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL; -extern const std::string METRIC_AGENT_PROCESS_QUEUE_TOTAL; -extern const std::string METRIC_AGENT_SEND_QUEUE_FULL_TOTAL; -extern const std::string METRIC_AGENT_SEND_QUEUE_TOTAL; -extern const std::string METRIC_AGENT_USED_SENDING_CONCURRENCY; // common plugin labels extern const std::string METRIC_LABEL_PROJECT; @@ -79,6 +70,7 @@ extern const std::string METRIC_LABEL_FILE_INODE; extern const std::string METRIC_LABEL_FILE_NAME; extern const std::string METRIC_LABEL_KEY_COMPONENT_NAME; +extern const std::string METRIC_LABEL_KEY_RUNNER_NAME; extern const std::string METRIC_LABEL_KEY_QUEUE_TYPE; extern const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG; extern const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID; @@ -142,14 +134,80 @@ extern const std::string METRIC_FLUSHER_QUOTA_ERROR_TOTAL; extern const std::string METRIC_FLUSHER_RETRIES_TOTAL; extern const std::string METRIC_FLUSHER_RETRIES_ERROR_TOTAL; -extern const std::string METRIC_IN_EVENTS_CNT; -extern const std::string METRIC_IN_ITEMS_CNT; -extern const std::string METRIC_IN_EVENT_GROUP_SIZE_BYTES; -extern const std::string METRIC_IN_ITEM_SIZE_BYTES; -extern const std::string METRIC_OUT_EVENTS_CNT; -extern const std::string METRIC_OUT_ITEMS_CNT; -extern const std::string METRIC_OUT_EVENT_GROUP_SIZE_BYTES; -extern const std::string METRIC_OUT_ITEM_SIZE_BYTES; -extern const std::string METRIC_TOTAL_DELAY_MS; +////////////////////////////////////////////////////////////////////////// +// plugin +////////////////////////////////////////////////////////////////////////// +// common metrics +extern const std::string METRIC_PLUGIN_IN_EVENTS_CNT; +extern const std::string METRIC_PLUGIN_IN_EVENT_GROUP_SIZE_BYTES; + +////////////////////////////////////////////////////////////////////////// +// component +////////////////////////////////////////////////////////////////////////// +// common metrics +extern const std::string METRIC_COMPONENT_IN_EVENTS_CNT; +extern const std::string METRIC_COMPONENT_IN_ITEMS_CNT; +extern const std::string METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_IN_ITEM_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_OUT_EVENTS_CNT; +extern const std::string METRIC_COMPONENT_OUT_ITEMS_CNT; +extern const std::string METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_TOTAL_DELAY_MS; +extern const std::string METRIC_COMPONENT_DISCARDED_ITEMS_CNT; +extern const std::string METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES; + +// batcher metrics +extern const std::string METRIC_COMPONENT_BATCHER_EVENT_BATCHES_CNT; +extern const std::string METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_CNT; +extern const std::string METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_CNT; +extern const std::string METRIC_COMPONENT_BATCHER_BUFFERED_SIZE_BYTES; + +// queue metrics +extern const std::string METRIC_COMPONENT_QUEUE_SIZE_CNT; +extern const std::string METRIC_COMPONENT_QUEUE_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_QUEUE_VALID_TO_PUSH_FLAG; +extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE; +extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_CNT; + +////////////////////////////////////////////////////////////////////////// +// pipeline +////////////////////////////////////////////////////////////////////////// +extern const std::string METRIC_PIPELINE_START_TIME; +extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_CNT; +extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_CNT; +extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_PIPELINE_PROCESSORS_TOTAL_DELAY_MS; + +////////////////////////////////////////////////////////////////////////// +// runner +////////////////////////////////////////////////////////////////////////// +// common metrics +extern const std::string METRIC_RUNNER_IN_EVENTS_CNT; +extern const std::string METRIC_RUNNER_IN_EVENT_GROUPS_CNT; +extern const std::string METRIC_RUNNER_IN_ITEMS_CNT; +extern const std::string METRIC_RUNNER_IN_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_RUNNER_IN_ITEM_SIZE_BYTES; +extern const std::string METRIC_RUNNER_OUT_ITEMS_CNT; +extern const std::string METRIC_RUNNER_TOTAL_DELAY_MS; +extern const std::string METRIC_RUNNER_LAST_RUN_TIME; + +// http sink metrics +extern const std::string METRIC_RUNNER_HTTP_SINK_OUT_SUCCESSFUL_ITEMS_CNT; +extern const std::string METRIC_RUNNER_HTTP_SINK_OUT_FAILED_ITEMS_CNT; +extern const std::string METRIC_RUNNER_HTTP_SINK_SENDING_ITEMS_CNT; +extern const std::string METRIC_RUNNER_HTTP_SINK_SEND_CONCURRENCY; + +// flusher runner metrics +extern const std::string METRIC_RUNNER_FLUSHER_IN_ITEM_RAW_SIZE_BYTES; +extern const std::string METRIC_RUNNER_FLUSHER_WAITING_ITEMS_CNT; + +// file server metrics +extern const std::string METRIC_RUNNER_FILE_WATCHED_DIRS_CNT; +extern const std::string METRIC_RUNNER_FILE_ACTIVE_READERS_CNT; +extern const std::string METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG; +extern const std::string METRIC_RUNNER_FILE_POLLING_MODIFY_CACHE_SIZE; +extern const std::string METRIC_RUNNER_FILE_POLLING_DIR_CACHE_SIZE; +extern const std::string METRIC_RUNNER_FILE_POLLING_FILE_CACHE_SIZE; } // namespace logtail diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index f3bfe2b391..2888af75e5 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -116,8 +116,6 @@ bool LogtailMonitor::Init() { // init metrics mAgentCpuGauge = LoongCollectorMonitor::GetInstance()->GetDoubleGauge(METRIC_AGENT_CPU); mAgentMemoryGauge = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_MEMORY); - mAgentUsedSendingConcurrency - = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_USED_SENDING_CONCURRENCY); // Initialize monitor thread. mThreadRes = async(launch::async, &LogtailMonitor::Monitor, this); @@ -320,7 +318,6 @@ bool LogtailMonitor::SendStatusProfile(bool suicide) { } int32_t usedSendingConcurrency = FlusherRunner::GetInstance()->GetSendingBufferCount(); UpdateMetric("used_sending_concurrency", usedSendingConcurrency); - mAgentUsedSendingConcurrency->Set(usedSendingConcurrency); AddLogContent(logPtr, "metric_json", MetricToString()); AddLogContent(logPtr, "status", CheckLogtailStatus()); @@ -731,14 +728,6 @@ void LoongCollectorMonitor::Init() { mIntGauges[METRIC_AGENT_MEMORY_GO] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_MEMORY_GO); mIntGauges[METRIC_AGENT_GO_ROUTINES_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL); mIntGauges[METRIC_AGENT_OPEN_FD_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); - mIntGauges[METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL] - = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL); - mIntGauges[METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL] - = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL); - mIntGauges[METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL] - = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL); - mIntGauges[METRIC_AGENT_REGISTER_HANDLER_TOTAL] - = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_REGISTER_HANDLER_TOTAL); // mIntGauges[METRIC_AGENT_INSTANCE_CONFIG_TOTAL] = // mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_INSTANCE_CONFIG_TOTAL); mIntGauges[METRIC_AGENT_PIPELINE_CONFIG_TOTAL] @@ -750,14 +739,6 @@ void LoongCollectorMonitor::Init() { // mIntGauges[METRIC_AGENT_CONSOLE_PIPELINE_CONFIG_TOTAL] // = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_CONSOLE_PIPELINE_CONFIG_TOTAL); // mIntGauges[METRIC_AGENT_PLUGIN_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_PLUGIN_TOTAL); - mIntGauges[METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL] - = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL); - mIntGauges[METRIC_AGENT_PROCESS_QUEUE_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_PROCESS_QUEUE_TOTAL); - mIntGauges[METRIC_AGENT_SEND_QUEUE_FULL_TOTAL] - = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_SEND_QUEUE_FULL_TOTAL); - mIntGauges[METRIC_AGENT_SEND_QUEUE_TOTAL] = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_SEND_QUEUE_TOTAL); - mIntGauges[METRIC_AGENT_USED_SENDING_CONCURRENCY] - = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_USED_SENDING_CONCURRENCY); LOG_INFO(sLogger, ("LoongCollectorMonitor", "started")); } diff --git a/core/monitor/Monitor.h b/core/monitor/Monitor.h index 5fcc551dd5..0a4d557361 100644 --- a/core/monitor/Monitor.h +++ b/core/monitor/Monitor.h @@ -167,8 +167,6 @@ class LogtailMonitor : public MetricStore { MemStat mMemStat; IntGaugePtr mAgentMemoryGauge; - IntGaugePtr mAgentUsedSendingConcurrency; - // Current scale up level, updated by CheckScaledCpuUsageUpLimit. float mScaledCpuUsageUpLimit; #if defined(__linux__) diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 0f841798f1..1b58a5a044 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -317,11 +317,12 @@ bool Pipeline::Init(PipelineConfig&& config) { WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, {{METRIC_LABEL_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_CONFIG_NAME, mName}}); - mStartTime = mMetricsRecordRef.CreateIntGauge("start_time"); - mProcessorsInEventsCnt = mMetricsRecordRef.CreateCounter("processors_in_events_cnt"); - mProcessorsInGroupsCnt = mMetricsRecordRef.CreateCounter("processors_in_event_groups_cnt"); - mProcessorsInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter("processors_in_event_group_data_size_bytes"); - mProcessorsTotalDelayMs = mMetricsRecordRef.CreateCounter("processors_total_delay_ms"); + mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME); + mProcessorsInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_CNT); + mProcessorsInGroupsCnt = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_CNT); + mProcessorsInGroupDataSizeBytes + = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUP_SIZE_BYTES); + mProcessorsTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_TOTAL_DELAY_MS); return true; } @@ -349,7 +350,7 @@ void Pipeline::Start() { mStartTime->Set(chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count()); #endif - LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName)("ptr", mStartTime.get())); + LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName)); } void Pipeline::Process(vector& logGroupList, size_t inputIndex) { diff --git a/core/pipeline/batch/Batcher.h b/core/pipeline/batch/Batcher.h index ca01986ca5..0c09990649 100644 --- a/core/pipeline/batch/Batcher.h +++ b/core/pipeline/batch/Batcher.h @@ -110,14 +110,14 @@ class Batcher { labels.emplace_back("enable_group_batch", "false"); } WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels)); - mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENTS_CNT); - mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); - mOutEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_EVENTS_CNT); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); - mEventBatchItemsCnt = mMetricsRecordRef.CreateIntGauge("event_batches_cnt"); - mBufferedGroupsCnt = mMetricsRecordRef.CreateIntGauge("buffered_groups_cnt"); - mBufferedEventsCnt = mMetricsRecordRef.CreateIntGauge("buffered_events_cnt"); - mBufferedDataSizeByte = mMetricsRecordRef.CreateIntGauge("buffered_data_size_bytes"); + mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES); + mOutEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_EVENTS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mEventBatchItemsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_EVENT_BATCHES_CNT); + mBufferedGroupsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_CNT); + mBufferedEventsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_CNT); + mBufferedDataSizeByte = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_SIZE_BYTES); return true; } diff --git a/core/pipeline/plugin/instance/FlusherInstance.cpp b/core/pipeline/plugin/instance/FlusherInstance.cpp index 997f75fe4b..003b279022 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.cpp +++ b/core/pipeline/plugin/instance/FlusherInstance.cpp @@ -25,8 +25,8 @@ bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, return false; } - mInEventsCnt = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_IN_EVENTS_CNT); - mInGroupDataSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); + mInEventsCnt = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENT_GROUP_SIZE_BYTES); return true; } diff --git a/core/pipeline/plugin/interface/Plugin.h b/core/pipeline/plugin/interface/Plugin.h index e63d098ae3..1ee9464022 100644 --- a/core/pipeline/plugin/interface/Plugin.h +++ b/core/pipeline/plugin/interface/Plugin.h @@ -21,6 +21,7 @@ #include #include "monitor/LogtailMetric.h" +#include "monitor/MetricConstants.h" #include "pipeline/PipelineContext.h" namespace logtail { @@ -39,18 +40,13 @@ class Plugin { const std::string& id, const std::string& nodeID, const std::string& childNodeID) { - std::vector> labels; - WriteMetrics::GetInstance()->PreparePluginCommonLabels(mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion(), - mContext->GetConfigName(), - name, - id, - nodeID, - childNodeID, - labels); - - WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels)); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, + {{METRIC_LABEL_PROJECT, mContext->GetProjectName()}, + {METRIC_LABEL_CONFIG_NAME, mContext->GetConfigName()}, + {METRIC_LABEL_PLUGIN_NAME, name}, + {METRIC_LABEL_PLUGIN_ID, id}, + {METRIC_LABEL_NODE_ID, nodeID}, + {METRIC_LABEL_CHILD_NODE_ID, childNodeID}}); } protected: diff --git a/core/pipeline/queue/BoundedQueueInterface.h b/core/pipeline/queue/BoundedQueueInterface.h index 350e22be37..c8102a5fbd 100644 --- a/core/pipeline/queue/BoundedQueueInterface.h +++ b/core/pipeline/queue/BoundedQueueInterface.h @@ -26,7 +26,7 @@ class BoundedQueueInterface : virtual public QueueInterface { BoundedQueueInterface(QueueKey key, size_t cap, size_t low, size_t high, const PipelineContext& ctx) : QueueInterface(key, cap, ctx), mLowWatermark(low), mHighWatermark(high) { this->mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_QUEUE_TYPE, "bounded"}}); - mValidToPushFlag = this->mMetricsRecordRef.CreateIntGauge("valid_to_push"); + mValidToPushFlag = this->mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_VALID_TO_PUSH_FLAG); } virtual ~BoundedQueueInterface() = default; diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.cpp b/core/pipeline/queue/BoundedSenderQueueInterface.cpp index 03e26ebac3..3e58d48e49 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.cpp +++ b/core/pipeline/queue/BoundedSenderQueueInterface.cpp @@ -26,8 +26,8 @@ BoundedSenderQueueInterface::BoundedSenderQueueInterface( : QueueInterface(key, cap, ctx), BoundedQueueInterface>(key, cap, low, high, ctx) { mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_COMPONENT_NAME, "sender_queue"}}); mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}}); - mExtraBufferSize = mMetricsRecordRef.CreateIntGauge("extra_buffer_size"); - mExtraBufferDataSizeBytes = mMetricsRecordRef.CreateIntGauge("extra_buffer_data_size_bytes"); + mExtraBufferSize = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE); + mExtraBufferDataSizeBytes = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES); } void BoundedSenderQueueInterface::SetFeedback(FeedbackInterface* feedback) { diff --git a/core/pipeline/queue/CircularProcessQueue.cpp b/core/pipeline/queue/CircularProcessQueue.cpp index e7f8b279f7..d9b7b551c8 100644 --- a/core/pipeline/queue/CircularProcessQueue.cpp +++ b/core/pipeline/queue/CircularProcessQueue.cpp @@ -25,7 +25,7 @@ namespace logtail { CircularProcessQueue::CircularProcessQueue(size_t cap, int64_t key, uint32_t priority, const PipelineContext& ctx) : QueueInterface>(key, cap, ctx), ProcessQueueInterface(key, cap, priority, ctx) { mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_QUEUE_TYPE, "circular"}}); - mDroppedEventsCnt = mMetricsRecordRef.CreateCounter("dropped_events_cnt"); + mDiscardedEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_CNT); WriteMetrics::GetInstance()->CommitMetricsRecordRef(mMetricsRecordRef); } @@ -38,7 +38,7 @@ bool CircularProcessQueue::Push(unique_ptr&& item) { mQueue.pop_front(); mQueueSize->Set(Size()); mQueueDataSizeByte->Sub(size); - mDroppedEventsCnt->Add(cnt); + mDiscardedEventsCnt->Add(cnt); } if (mEventCnt + newCnt > mCapacity) { return false; diff --git a/core/pipeline/queue/CircularProcessQueue.h b/core/pipeline/queue/CircularProcessQueue.h index a58a34e035..8a7c8c1ee9 100644 --- a/core/pipeline/queue/CircularProcessQueue.h +++ b/core/pipeline/queue/CircularProcessQueue.h @@ -43,7 +43,7 @@ class CircularProcessQueue : virtual public QueueInterface> mQueue; size_t mEventCnt = 0; - CounterPtr mDroppedEventsCnt; + CounterPtr mDiscardedEventsCnt; #ifdef APSARA_UNIT_TEST_MAIN friend class CircularProcessQueueUnittest; diff --git a/core/pipeline/queue/QueueInterface.h b/core/pipeline/queue/QueueInterface.h index 1c450013ad..7754190000 100644 --- a/core/pipeline/queue/QueueInterface.h +++ b/core/pipeline/queue/QueueInterface.h @@ -33,12 +33,12 @@ class QueueInterface { {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, }); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); - mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); - mQueueSize = mMetricsRecordRef.CreateIntGauge("queue_size"); - mQueueDataSizeByte = mMetricsRecordRef.CreateIntGauge("queue_data_size_bytes"); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT); + mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mQueueSize = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_SIZE_CNT); + mQueueDataSizeByte = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_SIZE_BYTES); } virtual ~QueueInterface() = default; diff --git a/core/pipeline/route/Router.cpp b/core/pipeline/route/Router.cpp index 73de16bcda..a83e1b4225 100644 --- a/core/pipeline/route/Router.cpp +++ b/core/pipeline/route/Router.cpp @@ -39,8 +39,8 @@ bool Router::Init(std::vector> configs, const P {{METRIC_LABEL_PROJECT, ctx.GetProjectName()}, {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, {METRIC_LABEL_KEY_COMPONENT_NAME, "router"}}); - mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENTS_CNT); - mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); + mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES); return true; } diff --git a/core/pipeline/serializer/Serializer.h b/core/pipeline/serializer/Serializer.h index b632124b7a..c4e7c674ae 100644 --- a/core/pipeline/serializer/Serializer.h +++ b/core/pipeline/serializer/Serializer.h @@ -53,13 +53,13 @@ class Serializer { {METRIC_LABEL_CONFIG_NAME, f->GetContext().GetConfigName()}, {METRIC_LABEL_KEY_COMPONENT_NAME, "serializer"}, {METRIC_LABEL_KEY_FLUSHER_NODE_ID, f->GetNodeID()}}); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); - mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); - mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES); - mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt"); - mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_data_size_bytes"); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT); + mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT); + mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT); + mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES); } virtual ~Serializer() = default; diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 16af0a51df..3d176b75ef 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -41,6 +41,16 @@ namespace logtail { bool FlusherRunner::Init() { srand(time(nullptr)); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, + {{METRIC_LABEL_KEY_RUNNER_NAME, "flusher_runner"}}); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEMS_CNT); + mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_OUT_ITEMS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_TOTAL_DELAY_MS); + mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); + mInItemRawDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_FLUSHER_IN_ITEM_RAW_SIZE_BYTES); + mWaitingItemsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_FLUSHER_WAITING_ITEMS_CNT); + mThreadRes = async(launch::async, &FlusherRunner::Run, this); mLastCheckSendClientTime = time(nullptr); return true; @@ -99,12 +109,20 @@ void FlusherRunner::Run() { LOG_INFO(sLogger, ("flusher runner", "started")); while (true) { auto curTime = chrono::system_clock::now(); + mLastRunTime->Set(chrono::duration_cast(curTime.time_since_epoch()).count()); vector items; SenderQueueManager::GetInstance()->GetAllAvailableItems(items, !Application::GetInstance()->IsExiting()); if (items.empty()) { SenderQueueManager::GetInstance()->Wait(1000); } else { + for (auto itr = items.begin(); itr != items.end(); ++itr) { + mInItemDataSizeBytes->Add((*itr)->mData.size()); + mInItemRawDataSizeBytes->Add((*itr)->mRawSize); + } + mInItemsCnt->Add(items.size()); + mWaitingItemsCnt->Add(items.size()); + // smoothing send tps, walk around webserver load burst uint32_t bufferPackageCount = items.size(); if (!Application::GetInstance()->IsExiting() && AppConfig::GetInstance()->IsSendRandomSleep()) { @@ -132,6 +150,10 @@ void FlusherRunner::Run() { } Dispatch(*itr); + mWaitingItemsCnt->Sub(1); + mOutItemsCnt->Add(1); + mTotalDelayMs->Add( + chrono::duration_cast(chrono::system_clock::now() - curTime).count()); } // TODO: move the following logic to scheduler diff --git a/core/runner/FlusherRunner.h b/core/runner/FlusherRunner.h index fb7347cddb..5681c4b263 100644 --- a/core/runner/FlusherRunner.h +++ b/core/runner/FlusherRunner.h @@ -20,6 +20,7 @@ #include #include +#include "monitor/LogtailMetric.h" #include "pipeline/plugin/interface/Flusher.h" #include "pipeline/queue/SenderQueueItem.h" #include "runner/sink/SinkType.h" @@ -63,6 +64,15 @@ class FlusherRunner { int64_t mSendLastTime = 0; int32_t mSendLastByte = 0; + mutable MetricsRecordRef mMetricsRecordRef; + CounterPtr mInItemsCnt; + CounterPtr mInItemDataSizeBytes; + CounterPtr mInItemRawDataSizeBytes; + CounterPtr mOutItemsCnt; + CounterPtr mTotalDelayMs; + IntGaugePtr mWaitingItemsCnt; + IntGaugePtr mLastRunTime; + #ifdef APSARA_UNIT_TEST_MAIN friend class PluginRegistryUnittest; friend class FlusherRunnerUnittest; diff --git a/core/runner/LogProcess.cpp b/core/runner/LogProcess.cpp index cd65998408..dc3cb84ad1 100644 --- a/core/runner/LogProcess.cpp +++ b/core/runner/LogProcess.cpp @@ -15,12 +15,13 @@ #include "runner/LogProcess.h" #include "app_config/AppConfig.h" -#include "pipeline/batch/TimeoutFlushManager.h" #include "common/Flags.h" #include "go_pipeline/LogtailPlugin.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" +#include "monitor/MetricConstants.h" #include "pipeline/PipelineManager.h" +#include "pipeline/batch/TimeoutFlushManager.h" #include "pipeline/queue/ExactlyOnceQueueManager.h" #include "pipeline/queue/ProcessQueueManager.h" #include "pipeline/queue/QueueKeyManager.h" @@ -39,6 +40,12 @@ DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged bu namespace logtail { +thread_local MetricsRecordRef LogProcess::sMetricsRecordRef; +thread_local CounterPtr LogProcess::sInGroupsCnt; +thread_local CounterPtr LogProcess::sInEventsCnt; +thread_local CounterPtr LogProcess::sInGroupDataSizeBytes; +thread_local IntGaugePtr LogProcess::sLastRunTime; + LogProcess::LogProcess() : mAccessProcessThreadRWL(ReadWriteLock::PREFER_WRITER) { } @@ -56,8 +63,6 @@ LogProcess::~LogProcess() { void LogProcess::Start() { if (mInitialized) return; - mAgentProcessQueueFullTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_FULL_TOTAL); - mAgentProcessQueueTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PROCESS_QUEUE_TOTAL); mInitialized = true; mThreadCount = AppConfig::GetInstance()->GetProcessThreadCount(); @@ -137,13 +142,16 @@ bool LogProcess::FlushOut(int32_t waitMs) { } void* LogProcess::ProcessLoop(int32_t threadNo) { - LOG_DEBUG(sLogger, ("runner/LogProcess.hread", "Start")("threadNo", threadNo)); + LOG_DEBUG(sLogger, ("LogProcess", "Start")("threadNo", threadNo)); + // thread local metrics should be initialized in each thread + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( + sMetricsRecordRef, {{METRIC_LABEL_KEY_RUNNER_NAME, "processor_runner"}, {"thread_no", ToString(threadNo)}}); + sInGroupsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENT_GROUPS_CNT); + sInEventsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENTS_CNT); + sInGroupDataSizeBytes = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENT_GROUP_SIZE_BYTES); + sLastRunTime = sMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); + static int32_t lastMergeTime = 0; - static atomic_int s_processCount{0}; - static atomic_long s_processBytes{0}; - static atomic_int s_processLines{0}; - // only thread 0 update metric - int32_t lastUpdateMetricTime = time(NULL); while (true) { mThreadFlags[threadNo] = false; @@ -153,35 +161,9 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { lastMergeTime = curTime; } - if (threadNo == 0 && curTime - lastUpdateMetricTime >= 40) { - static auto sMonitor = LogtailMonitor::GetInstance(); - - // atomic counter will be negative if process speed is too fast. - sMonitor->UpdateMetric("process_tps", 1.0 * s_processCount / (curTime - lastUpdateMetricTime)); - sMonitor->UpdateMetric("process_bytes_ps", 1.0 * s_processBytes / (curTime - lastUpdateMetricTime)); - sMonitor->UpdateMetric("process_lines_ps", 1.0 * s_processLines / (curTime - lastUpdateMetricTime)); - lastUpdateMetricTime = curTime; - s_processCount = 0; - s_processBytes = 0; - s_processLines = 0; - - // update process queue status - uint32_t InvalidProcessQueueTotal = ProcessQueueManager::GetInstance()->GetInvalidCnt(); - sMonitor->UpdateMetric("process_queue_full", InvalidProcessQueueTotal); - mAgentProcessQueueFullTotal->Set(InvalidProcessQueueTotal); - uint32_t ProcessQueueTotal = ProcessQueueManager::GetInstance()->GetCnt(); - sMonitor->UpdateMetric("process_queue_total", ProcessQueueTotal); - mAgentProcessQueueTotal->Set(ProcessQueueTotal); - if (ExactlyOnceQueueManager::GetInstance()->GetProcessQueueCnt() > 0) { - sMonitor->UpdateMetric("eo_process_queue_full", - ExactlyOnceQueueManager::GetInstance()->GetInvalidProcessQueueCnt()); - sMonitor->UpdateMetric("eo_process_queue_total", - ExactlyOnceQueueManager::GetInstance()->GetProcessQueueCnt()); - } - } - { ReadLock lock(mAccessProcessThreadRWL); + sLastRunTime->Set(curTime); unique_ptr item; string configName; @@ -189,6 +171,9 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { ProcessQueueManager::GetInstance()->Wait(100); continue; } + sInEventsCnt->Add(item->mEventGroup.GetEvents().size()); + sInGroupsCnt->Add(1); + sInGroupDataSizeBytes->Add(item->mEventGroup.DataSize()); mThreadFlags[threadNo] = true; auto pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); @@ -227,12 +212,6 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { pipeline->GetContext().GetRegion()); } - s_processCount++; - if (isLog) { - s_processBytes += profile.readBytes; - s_processLines += profile.splitLines; - } - if (eventGroupList.empty()) { continue; } @@ -296,7 +275,7 @@ void* LogProcess::ProcessLoop(int32_t threadNo) { } } } - LOG_WARNING(sLogger, ("runner/LogProcess.hread", "Exit")("threadNo", threadNo)); + LOG_WARNING(sLogger, ("LogProcess", "Exit")("threadNo", threadNo)); return NULL; } diff --git a/core/runner/LogProcess.h b/core/runner/LogProcess.h index c0bd77e9ad..624bb09be8 100644 --- a/core/runner/LogProcess.h +++ b/core/runner/LogProcess.h @@ -23,6 +23,7 @@ #include "common/LogRunnable.h" #include "common/Thread.h" #include "models/PipelineEventGroup.h" +#include "monitor/LogtailMetric.h" #include "monitor/Monitor.h" #include "pipeline/queue/QueueKey.h" @@ -60,8 +61,11 @@ class LogProcess : public LogRunnable { std::atomic_bool* mThreadFlags; ReadWriteLock mAccessProcessThreadRWL; - IntGaugePtr mAgentProcessQueueFullTotal; - IntGaugePtr mAgentProcessQueueTotal; + thread_local static MetricsRecordRef sMetricsRecordRef; + thread_local static CounterPtr sInGroupsCnt; + thread_local static CounterPtr sInEventsCnt; + thread_local static CounterPtr sInGroupDataSizeBytes; + thread_local static IntGaugePtr sLastRunTime; }; } // namespace logtail \ No newline at end of file diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 0a5e7de3d6..68e276397f 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -18,6 +18,7 @@ #include "common/StringTools.h" #include "common/http/Curl.h" #include "logger/Logger.h" +#include "monitor/MetricConstants.h" #include "pipeline/plugin/interface/HttpFlusher.h" #include "pipeline/queue/QueueKeyManager.h" #include "pipeline/queue/SenderQueueItem.h" @@ -33,6 +34,19 @@ bool HttpSink::Init() { LOG_ERROR(sLogger, ("failed to init http sink", "failed to init curl multi client")); return false; } + + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, + {{METRIC_LABEL_KEY_RUNNER_NAME, "http sink"}}); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEMS_CNT); + mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); + mOutSuccessfulItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_HTTP_SINK_OUT_SUCCESSFUL_ITEMS_CNT); + mOutFailedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_HTTP_SINK_OUT_FAILED_ITEMS_CNT); + mSendingItemsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_HTTP_SINK_SENDING_ITEMS_CNT); + mSendConcurrency = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_HTTP_SINK_SEND_CONCURRENCY); + + // TODO: should be dynamic + mSendConcurrency->Set(AppConfig::GetInstance()->GetSendRequestConcurrency()); + mThreadRes = async(launch::async, &HttpSink::Run, this); return true; } @@ -48,9 +62,13 @@ void HttpSink::Stop() { } void HttpSink::Run() { + LOG_INFO(sLogger, ("http sink", "started")); while (true) { + mLastRunTime->Set( + chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count()); unique_ptr request; if (mQueue.WaitAndPop(request, 500)) { + mInItemsCnt->Add(1); LOG_DEBUG( sLogger, ("got item from flusher runner, item address", request->mItem)( @@ -59,6 +77,7 @@ void HttpSink::Run() { if (!AddRequestToClient(std::move(request))) { continue; } + mSendingItemsCnt->Add(1); } else if (mIsFlush && mQueue.Empty()) { break; } else { @@ -72,7 +91,7 @@ void HttpSink::Run() { } } -bool HttpSink::AddRequestToClient(std::unique_ptr&& request) { +bool HttpSink::AddRequestToClient(unique_ptr&& request) { curl_slist* headers = nullptr; CURL* curl = CreateCurlHandler(request->mMethod, request->mHTTPSFlag, @@ -90,6 +109,7 @@ bool HttpSink::AddRequestToClient(std::unique_ptr&& request) { if (curl == nullptr) { request->mItem->mStatus = SendingStatus::IDLE; FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); + mOutFailedItemsCnt->Add(1); LOG_ERROR(sLogger, ("failed to send request", "failed to init curl handler")( "action", "put sender queue item back to sender queue")("item address", request->mItem)( @@ -99,13 +119,14 @@ bool HttpSink::AddRequestToClient(std::unique_ptr&& request) { request->mPrivateData = headers; curl_easy_setopt(curl, CURLOPT_PRIVATE, request.get()); - request->mLastSendTime = std::chrono::system_clock::now(); + request->mLastSendTime = chrono::system_clock::now(); auto res = curl_multi_add_handle(mClient, curl); if (res != CURLM_OK) { request->mItem->mStatus = SendingStatus::IDLE; FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); curl_easy_cleanup(curl); + mOutFailedItemsCnt->Add(1); LOG_ERROR(sLogger, ("failed to send request", "failed to add the easy curl handle to multi_handle")("errMsg", curl_multi_strerror(res))( @@ -122,6 +143,8 @@ void HttpSink::DoRun() { CURLMcode mc; int runningHandlers = 1; while (runningHandlers) { + auto curTime = chrono::system_clock::now(); + mLastRunTime->Set(chrono::duration_cast(curTime.time_since_epoch()).count()); if ((mc = curl_multi_perform(mClient, &runningHandlers)) != CURLM_OK) { LOG_ERROR( sLogger, @@ -133,6 +156,7 @@ void HttpSink::DoRun() { unique_ptr request; if (mQueue.TryPop(request)) { + mInItemsCnt->Add(1); LOG_DEBUG( sLogger, ("got item from flusher runner, item address", request->mItem)( @@ -140,6 +164,7 @@ void HttpSink::DoRun() { "wait time", ToString(time(nullptr) - request->mEnqueTime))("try cnt", ToString(request->mTryCnt))); if (AddRequestToClient(std::move(request))) { ++runningHandlers; + mSendingItemsCnt->Add(1); } } @@ -191,10 +216,13 @@ void HttpSink::HandleCompletedRequests() { HttpSinkRequest* request = nullptr; curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request); LOG_DEBUG(sLogger, - ("send http request completed, item address", request->mItem) - ("config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey)) - ("response time",ToString(chrono::duration_cast(chrono::system_clock::now()- request->mLastSendTime).count()) + "ms") - ("try cnt", ToString(request->mTryCnt))); + ("send http request completed, item address", request->mItem)( + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "response time", + ToString(chrono::duration_cast(chrono::system_clock::now() + - request->mLastSendTime) + .count()) + + "ms")("try cnt", ToString(request->mTryCnt))); switch (msg->data.result) { case CURLE_OK: { long statusCode = 0; @@ -202,6 +230,8 @@ void HttpSink::HandleCompletedRequests() { request->mResponse.mStatusCode = (int32_t)statusCode; static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); + mOutSuccessfulItemsCnt->Add(1); + mSendingItemsCnt->Sub(1); break; } default: @@ -225,6 +255,8 @@ void HttpSink::HandleCompletedRequests() { ->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); } + mOutFailedItemsCnt->Add(1); + mSendingItemsCnt->Sub(1); break; } curl_multi_remove_handle(mClient, handler); diff --git a/core/runner/sink/http/HttpSink.h b/core/runner/sink/http/HttpSink.h index 6e11b84855..56d9e01302 100644 --- a/core/runner/sink/http/HttpSink.h +++ b/core/runner/sink/http/HttpSink.h @@ -25,6 +25,7 @@ #include "runner/sink/Sink.h" #include "runner/sink/http/HttpSinkRequest.h" +#include "monitor/LogtailMetric.h" namespace logtail { @@ -55,6 +56,15 @@ class HttpSink : public Sink { std::future mThreadRes; std::atomic_bool mIsFlush = false; + mutable MetricsRecordRef mMetricsRecordRef; + CounterPtr mInItemsCnt; + CounterPtr mOutSuccessfulItemsCnt; + CounterPtr mOutFailedItemsCnt; + // CounterPtr mTotalDelayMs; // TODO: should record distribution instead of average + IntGaugePtr mSendingItemsCnt; + IntGaugePtr mSendConcurrency; + IntGaugePtr mLastRunTime; + #ifdef APSARA_UNIT_TEST_MAIN friend class FlusherRunnerUnittest; #endif diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index d4d76b5138..4c061b255d 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -2688,10 +2688,10 @@ void PipelineUnittest::TestProcess() const { pipeline.mProcessorLine.emplace_back(std::move(processor)); WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, {}); - pipeline.mProcessorsInEventsCnt = pipeline.mMetricsRecordRef.CreateCounter("processors_in_events_cnt"); - pipeline.mProcessorsInGroupsCnt = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_groups_cnt"); + pipeline.mProcessorsInEventsCnt = pipeline.mMetricsRecordRef.CreateCounter("pipeline_processors_in_events_total"); + pipeline.mProcessorsInGroupsCnt = pipeline.mMetricsRecordRef.CreateCounter("pipeline_processors_in_event_groups_total"); pipeline.mProcessorsInGroupDataSizeBytes - = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_group_data_size_bytes"); + = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_group_size_bytes"); pipeline.mProcessorsTotalDelayMs = pipeline.mMetricsRecordRef.CreateCounter("processors_total_delay_ms"); vector groups; diff --git a/core/unittest/queue/CircularProcessQueueUnittest.cpp b/core/unittest/queue/CircularProcessQueueUnittest.cpp index 82fce43912..2e6e1411f5 100644 --- a/core/unittest/queue/CircularProcessQueueUnittest.cpp +++ b/core/unittest/queue/CircularProcessQueueUnittest.cpp @@ -172,7 +172,7 @@ void CircularProcessQueueUnittest::TestMetric() { APSARA_TEST_EQUAL(dataSize1 + dataSize2, mQueue->mInItemDataSizeBytes->GetValue()); APSARA_TEST_EQUAL(1U, mQueue->mQueueSize->GetValue()); APSARA_TEST_EQUAL(dataSize2, mQueue->mQueueDataSizeByte->GetValue()); - APSARA_TEST_EQUAL(2U, mQueue->mDroppedEventsCnt->GetValue()); + APSARA_TEST_EQUAL(2U, mQueue->mDiscardedEventsCnt->GetValue()); mQueue->Pop(item); APSARA_TEST_EQUAL(1U, mQueue->mOutItemsCnt->GetValue()); From 7cd66f4c61f34e9b4245ae1a336e6ea5faa03c0d Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Tue, 24 Sep 2024 15:20:07 +0800 Subject: [PATCH 2/2] fix deadlock in full drain mode on config update (#1776) --- core/file_server/event_handler/LogInput.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index 96a3ac31af..0ee7970ee7 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -106,7 +106,7 @@ void LogInput::Resume() { void LogInput::HoldOn() { LOG_INFO(sLogger, ("event handle daemon pause", "starts")); - if (BOOL_FLAG(enable_full_drain_mode)) { + if (BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()) { unique_lock lock(mThreadRunningMux); mStopCV.wait(lock, [this]() { return mInteruptFlag; }); } else {