diff --git a/core/common/compression/Compressor.cpp b/core/common/compression/Compressor.cpp index 39a7af4b57..7a6d95951e 100644 --- a/core/common/compression/Compressor.cpp +++ b/core/common/compression/Compressor.cpp @@ -25,13 +25,13 @@ namespace logtail { void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) { WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, std::move(labels), std::move(dynamicLabels)); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); - mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); - mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES); - mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt"); - mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_size_bytes"); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT); + mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT); + mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT); + mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES); } bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) { diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index bddc656b08..96a3ac31af 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -88,11 +88,11 @@ void LogInput::Start() { mInteruptFlag = false; - mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_LAST_RUN_TIME); + mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); - mRegisterdHandlersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge("registered_handlers_cnt"); - mReadersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge("readers_cnt"); - mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge("enable_file_included_by_multi_configs"); + mRegisterdHandlersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_CNT); + mActiveReadersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_CNT); + mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); new Thread([this]() { ProcessLoop(); }); } @@ -355,7 +355,7 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) { LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount); mRegisterdHandlersCnt->Set(handlerCount); LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount()); - mReadersCnt->Set(CheckPointManager::Instance()->GetReaderCount()); + mActiveReadersCnt->Set(CheckPointManager::Instance()->GetReaderCount()); LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig()); mEventProcessCount = 0; } diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index 1c4fb027d9..69f2ffb077 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -83,7 +83,7 @@ class LogInput : public LogRunnable { IntGaugePtr mLastRunTime; IntGaugePtr mAgentOpenFdTotal; IntGaugePtr mRegisterdHandlersCnt; - IntGaugePtr mReadersCnt; + IntGaugePtr mActiveReadersCnt; IntGaugePtr mEnableFileIncludedByMultiConfigs; std::atomic_int mLastReadEventTime{0}; diff --git a/core/monitor/LogtailMetric.cpp b/core/monitor/LogtailMetric.cpp index d6ddc5e768..6b26300d8c 100644 --- a/core/monitor/LogtailMetric.cpp +++ b/core/monitor/LogtailMetric.cpp @@ -205,34 +205,11 @@ WriteMetrics::~WriteMetrics() { Clear(); } -void WriteMetrics::PreparePluginCommonLabels(const std::string& projectName, - const std::string& logstoreName, - const std::string& region, - const std::string& configName, - const std::string& pluginType, - const std::string& pluginID, - const std::string& nodeID, - const std::string& childNodeID, - MetricLabels& labels) { - labels.emplace_back(std::make_pair(METRIC_LABEL_PROJECT, projectName)); - labels.emplace_back(std::make_pair(METRIC_LABEL_LOGSTORE, logstoreName)); - labels.emplace_back(std::make_pair(METRIC_LABEL_REGION, region)); - labels.emplace_back(std::make_pair(METRIC_LABEL_CONFIG_NAME, configName)); - labels.emplace_back(std::make_pair(METRIC_LABEL_PLUGIN_NAME, pluginType)); - labels.emplace_back(std::make_pair(METRIC_LABEL_PLUGIN_ID, pluginID)); - labels.emplace_back(std::make_pair(METRIC_LABEL_NODE_ID, nodeID)); - labels.emplace_back(std::make_pair(METRIC_LABEL_CHILD_NODE_ID, childNodeID)); -} - void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) { - MetricsRecord* cur = new MetricsRecord(std::make_shared(labels), - std::make_shared(dynamicLabels)); - ref.SetMetricsRecord(cur); - std::lock_guard lock(mMutex); - cur->SetNext(mHead); - mHead = cur; + CreateMetricsRecordRef(ref, std::move(labels), std::move(dynamicLabels)); + CommitMetricsRecordRef(ref); } void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref, diff --git a/core/monitor/LogtailMetric.h b/core/monitor/LogtailMetric.h index 6c98081b2a..6f4ef9e8ef 100644 --- a/core/monitor/LogtailMetric.h +++ b/core/monitor/LogtailMetric.h @@ -133,15 +133,7 @@ class WriteMetrics { static WriteMetrics* ptr = new WriteMetrics(); return ptr; } - void PreparePluginCommonLabels(const std::string& projectName, - const std::string& logstoreName, - const std::string& region, - const std::string& configName, - const std::string& pluginType, - const std::string& pluginID, - const std::string& nodeID, - const std::string& childNodeID, - MetricLabels& labels); + void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {}); void CreateMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {}); diff --git a/core/monitor/MetricConstants.cpp b/core/monitor/MetricConstants.cpp index bc3a8280dc..cb6b9111df 100644 --- a/core/monitor/MetricConstants.cpp +++ b/core/monitor/MetricConstants.cpp @@ -141,16 +141,78 @@ const std::string METRIC_FLUSHER_QUOTA_ERROR_TOTAL = "flusher_quota_error_total" const std::string METRIC_FLUSHER_RETRIES_TOTAL = "flusher_retries_total"; const std::string METRIC_FLUSHER_RETRIES_ERROR_TOTAL = "flusher_retries_error_total"; -const std::string METRIC_IN_EVENTS_CNT = "in_events_cnt"; -const std::string METRIC_IN_EVENT_GROUPS_CNT = "in_event_groups_cnt"; -const std::string METRIC_IN_ITEMS_CNT = "in_items_cnt"; -const std::string METRIC_IN_EVENT_GROUP_SIZE_BYTES = "in_event_group_data_size_bytes"; -const std::string METRIC_IN_ITEM_SIZE_BYTES = "in_item_data_size_bytes"; -const std::string METRIC_OUT_EVENTS_CNT = "out_events_cnt"; -const std::string METRIC_OUT_ITEMS_CNT = "out_items_cnt"; -const std::string METRIC_OUT_EVENT_GROUP_SIZE_BYTES = "out_event_group_data_size_bytes"; -const std::string METRIC_OUT_ITEM_SIZE_BYTES = "out_item_data_size_bytes"; -const std::string METRIC_TOTAL_DELAY_MS = "total_delay_ms"; -const std::string METRIC_LAST_RUN_TIME = "last_run_time"; +////////////////////////////////////////////////////////////////////////// +// plugin +////////////////////////////////////////////////////////////////////////// +// common metrics +const std::string METRIC_PLUGIN_IN_EVENTS_CNT = "plugin_in_events_total"; +const std::string METRIC_PLUGIN_IN_EVENT_GROUP_SIZE_BYTES = "plugin_in_event_group_size_bytes"; + +////////////////////////////////////////////////////////////////////////// +// component +////////////////////////////////////////////////////////////////////////// +// common metrics +const std::string METRIC_COMPONENT_IN_EVENTS_CNT = "component_in_events_total"; +const std::string METRIC_COMPONENT_IN_ITEMS_CNT = "component_in_items_total"; +const std::string METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES = "component_in_event_group_size_bytes"; +const std::string METRIC_COMPONENT_IN_ITEM_SIZE_BYTES = "component_in_item_size_bytes"; +const std::string METRIC_COMPONENT_OUT_EVENTS_CNT = "component_out_events_total"; +const std::string METRIC_COMPONENT_OUT_ITEMS_CNT = "component_out_items_total"; +const std::string METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES = "component_out_item_size_bytes"; +const std::string METRIC_COMPONENT_TOTAL_DELAY_MS = "component_total_delay_ms"; +const std::string METRIC_COMPONENT_DISCARDED_ITEMS_CNT = "component_discarded_items_total"; +const std::string METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES = "component_discarded_item_size_bytes"; + +// batcher metrics +const std::string METRIC_COMPONENT_BATCHER_EVENT_BATCHES_CNT = "component_event_batches_total"; +const std::string METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_CNT = "component_buffered_groups_total"; +const std::string METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_CNT = "component_buffered_events_total"; +const std::string METRIC_COMPONENT_BATCHER_BUFFERED_SIZE_BYTES = "component_buffered_size_bytes"; + +// queue metrics +const std::string METRIC_COMPONENT_QUEUE_SIZE_CNT = "component_queue_size"; +const std::string METRIC_COMPONENT_QUEUE_SIZE_BYTES = "component_queue_size_bytes"; +const std::string METRIC_COMPONENT_QUEUE_VALID_TO_PUSH_FLAG = "component_valid_to_push"; +const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE = "component_extra_buffer_size"; +const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES = "component_extra_buffer_size"; +const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_CNT = "component_discarded_events_total"; + +////////////////////////////////////////////////////////////////////////// +// pipeline +////////////////////////////////////////////////////////////////////////// +const std::string METRIC_PIPELINE_START_TIME = "pipeline_start_time"; +const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_CNT = "pipeline_processors_in_events_total"; +const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_CNT = "pipeline_processors_in_event_groups_total"; +const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUP_SIZE_BYTES = "pipeline_processors_in_event_group_size_bytes"; +const std::string METRIC_PIPELINE_PROCESSORS_TOTAL_DELAY_MS = "pipeline_processors_total_delay_ms"; + +////////////////////////////////////////////////////////////////////////// +// runner +////////////////////////////////////////////////////////////////////////// +// common metrics +const std::string METRIC_RUNNER_IN_EVENTS_CNT = "runner_in_events_total"; +const std::string METRIC_RUNNER_IN_EVENT_GROUPS_CNT = "runner_in_event_groups_total"; +const std::string METRIC_RUNNER_IN_ITEMS_CNT = "runner_in_items_total"; +const std::string METRIC_RUNNER_IN_EVENT_GROUP_SIZE_BYTES = "runner_in_event_group_size_bytes"; +const std::string METRIC_RUNNER_IN_ITEM_SIZE_BYTES = "runner_in_item_size_bytes"; +const std::string METRIC_RUNNER_OUT_ITEMS_CNT = "runner_out_items_total"; +const std::string METRIC_RUNNER_TOTAL_DELAY_MS = "runner_total_delay_ms"; +const std::string METRIC_RUNNER_LAST_RUN_TIME = "runner_last_run_time"; + +// http sink metrics +const std::string METRIC_RUNNER_HTTP_SINK_OUT_SUCCESSFUL_ITEMS_CNT = "runner_out_successful_items_total"; +const std::string METRIC_RUNNER_HTTP_SINK_OUT_FAILED_ITEMS_CNT = "runner_out_failed_items_total"; +const std::string METRIC_RUNNER_HTTP_SINK_SENDING_ITEMS_CNT = "runner_sending_items_total"; +const std::string METRIC_RUNNER_HTTP_SINK_SEND_CONCURRENCY = "runner_send_concurrency"; + +// flusher runner metrics +const std::string METRIC_RUNNER_FLUSHER_IN_ITEM_RAW_SIZE_BYTES = "runner_in_item_raw_size_bytes"; +const std::string METRIC_RUNNER_FLUSHER_WAITING_ITEMS_CNT = "runner_waiting_items_total"; + +// file server metrics +const std::string METRIC_RUNNER_FILE_WATCHED_DIRS_CNT = "runner_watched_dirs_total"; +const std::string METRIC_RUNNER_FILE_ACTIVE_READERS_CNT = "runner_active_readers_total"; +const std::string METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG + = "runner_enable_file_included_by_multi_configs"; } // namespace logtail diff --git a/core/monitor/MetricConstants.h b/core/monitor/MetricConstants.h index b014de81b0..e8986e7c80 100644 --- a/core/monitor/MetricConstants.h +++ b/core/monitor/MetricConstants.h @@ -134,16 +134,77 @@ extern const std::string METRIC_FLUSHER_QUOTA_ERROR_TOTAL; extern const std::string METRIC_FLUSHER_RETRIES_TOTAL; extern const std::string METRIC_FLUSHER_RETRIES_ERROR_TOTAL; -extern const std::string METRIC_IN_EVENTS_CNT; -extern const std::string METRIC_IN_EVENT_GROUPS_CNT; -extern const std::string METRIC_IN_ITEMS_CNT; -extern const std::string METRIC_IN_EVENT_GROUP_SIZE_BYTES; -extern const std::string METRIC_IN_ITEM_SIZE_BYTES; -extern const std::string METRIC_OUT_EVENTS_CNT; -extern const std::string METRIC_OUT_ITEMS_CNT; -extern const std::string METRIC_OUT_EVENT_GROUP_SIZE_BYTES; -extern const std::string METRIC_OUT_ITEM_SIZE_BYTES; -extern const std::string METRIC_TOTAL_DELAY_MS; -extern const std::string METRIC_LAST_RUN_TIME; +////////////////////////////////////////////////////////////////////////// +// plugin +////////////////////////////////////////////////////////////////////////// +// common metrics +extern const std::string METRIC_PLUGIN_IN_EVENTS_CNT; +extern const std::string METRIC_PLUGIN_IN_EVENT_GROUP_SIZE_BYTES; + +////////////////////////////////////////////////////////////////////////// +// component +////////////////////////////////////////////////////////////////////////// +// common metrics +extern const std::string METRIC_COMPONENT_IN_EVENTS_CNT; +extern const std::string METRIC_COMPONENT_IN_ITEMS_CNT; +extern const std::string METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_IN_ITEM_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_OUT_EVENTS_CNT; +extern const std::string METRIC_COMPONENT_OUT_ITEMS_CNT; +extern const std::string METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_TOTAL_DELAY_MS; +extern const std::string METRIC_COMPONENT_DISCARDED_ITEMS_CNT; +extern const std::string METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES; + +// batcher metrics +extern const std::string METRIC_COMPONENT_BATCHER_EVENT_BATCHES_CNT; +extern const std::string METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_CNT; +extern const std::string METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_CNT; +extern const std::string METRIC_COMPONENT_BATCHER_BUFFERED_SIZE_BYTES; + +// queue metrics +extern const std::string METRIC_COMPONENT_QUEUE_SIZE_CNT; +extern const std::string METRIC_COMPONENT_QUEUE_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_QUEUE_VALID_TO_PUSH_FLAG; +extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE; +extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES; +extern const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_CNT; + +////////////////////////////////////////////////////////////////////////// +// pipeline +////////////////////////////////////////////////////////////////////////// +extern const std::string METRIC_PIPELINE_START_TIME; +extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_CNT; +extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_CNT; +extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_PIPELINE_PROCESSORS_TOTAL_DELAY_MS; + +////////////////////////////////////////////////////////////////////////// +// runner +////////////////////////////////////////////////////////////////////////// +// common metrics +extern const std::string METRIC_RUNNER_IN_EVENTS_CNT; +extern const std::string METRIC_RUNNER_IN_EVENT_GROUPS_CNT; +extern const std::string METRIC_RUNNER_IN_ITEMS_CNT; +extern const std::string METRIC_RUNNER_IN_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_RUNNER_IN_ITEM_SIZE_BYTES; +extern const std::string METRIC_RUNNER_OUT_ITEMS_CNT; +extern const std::string METRIC_RUNNER_TOTAL_DELAY_MS; +extern const std::string METRIC_RUNNER_LAST_RUN_TIME; + +// http sink metrics +extern const std::string METRIC_RUNNER_HTTP_SINK_OUT_SUCCESSFUL_ITEMS_CNT; +extern const std::string METRIC_RUNNER_HTTP_SINK_OUT_FAILED_ITEMS_CNT; +extern const std::string METRIC_RUNNER_HTTP_SINK_SENDING_ITEMS_CNT; +extern const std::string METRIC_RUNNER_HTTP_SINK_SEND_CONCURRENCY; + +// flusher runner metrics +extern const std::string METRIC_RUNNER_FLUSHER_IN_ITEM_RAW_SIZE_BYTES; +extern const std::string METRIC_RUNNER_FLUSHER_WAITING_ITEMS_CNT; + +// file server metrics +extern const std::string METRIC_RUNNER_FILE_WATCHED_DIRS_CNT; +extern const std::string METRIC_RUNNER_FILE_ACTIVE_READERS_CNT; +extern const std::string METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG; } // namespace logtail diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 6642cdcc6e..ec5913704c 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -317,11 +317,12 @@ bool Pipeline::Init(PipelineConfig&& config) { WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, {{METRIC_LABEL_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_CONFIG_NAME, mName}}); - mStartTime = mMetricsRecordRef.CreateIntGauge("start_time"); - mProcessorsInEventsCnt = mMetricsRecordRef.CreateCounter("processors_in_events_cnt"); - mProcessorsInGroupsCnt = mMetricsRecordRef.CreateCounter("processors_in_event_groups_cnt"); - mProcessorsInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter("processors_in_event_group_data_size_bytes"); - mProcessorsTotalDelayMs = mMetricsRecordRef.CreateCounter("processors_total_delay_ms"); + mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME); + mProcessorsInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_CNT); + mProcessorsInGroupsCnt = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_CNT); + mProcessorsInGroupDataSizeBytes + = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUP_SIZE_BYTES); + mProcessorsTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_TOTAL_DELAY_MS); return true; } diff --git a/core/pipeline/batch/Batcher.h b/core/pipeline/batch/Batcher.h index ca01986ca5..0c09990649 100644 --- a/core/pipeline/batch/Batcher.h +++ b/core/pipeline/batch/Batcher.h @@ -110,14 +110,14 @@ class Batcher { labels.emplace_back("enable_group_batch", "false"); } WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels)); - mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENTS_CNT); - mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); - mOutEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_EVENTS_CNT); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); - mEventBatchItemsCnt = mMetricsRecordRef.CreateIntGauge("event_batches_cnt"); - mBufferedGroupsCnt = mMetricsRecordRef.CreateIntGauge("buffered_groups_cnt"); - mBufferedEventsCnt = mMetricsRecordRef.CreateIntGauge("buffered_events_cnt"); - mBufferedDataSizeByte = mMetricsRecordRef.CreateIntGauge("buffered_data_size_bytes"); + mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES); + mOutEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_EVENTS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mEventBatchItemsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_EVENT_BATCHES_CNT); + mBufferedGroupsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_CNT); + mBufferedEventsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_CNT); + mBufferedDataSizeByte = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_SIZE_BYTES); return true; } diff --git a/core/pipeline/plugin/instance/FlusherInstance.cpp b/core/pipeline/plugin/instance/FlusherInstance.cpp index 997f75fe4b..003b279022 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.cpp +++ b/core/pipeline/plugin/instance/FlusherInstance.cpp @@ -25,8 +25,8 @@ bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, return false; } - mInEventsCnt = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_IN_EVENTS_CNT); - mInGroupDataSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); + mInEventsCnt = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENT_GROUP_SIZE_BYTES); return true; } diff --git a/core/pipeline/plugin/interface/Plugin.h b/core/pipeline/plugin/interface/Plugin.h index e63d098ae3..1ee9464022 100644 --- a/core/pipeline/plugin/interface/Plugin.h +++ b/core/pipeline/plugin/interface/Plugin.h @@ -21,6 +21,7 @@ #include #include "monitor/LogtailMetric.h" +#include "monitor/MetricConstants.h" #include "pipeline/PipelineContext.h" namespace logtail { @@ -39,18 +40,13 @@ class Plugin { const std::string& id, const std::string& nodeID, const std::string& childNodeID) { - std::vector> labels; - WriteMetrics::GetInstance()->PreparePluginCommonLabels(mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion(), - mContext->GetConfigName(), - name, - id, - nodeID, - childNodeID, - labels); - - WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels)); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, + {{METRIC_LABEL_PROJECT, mContext->GetProjectName()}, + {METRIC_LABEL_CONFIG_NAME, mContext->GetConfigName()}, + {METRIC_LABEL_PLUGIN_NAME, name}, + {METRIC_LABEL_PLUGIN_ID, id}, + {METRIC_LABEL_NODE_ID, nodeID}, + {METRIC_LABEL_CHILD_NODE_ID, childNodeID}}); } protected: diff --git a/core/pipeline/queue/BoundedQueueInterface.h b/core/pipeline/queue/BoundedQueueInterface.h index 350e22be37..c8102a5fbd 100644 --- a/core/pipeline/queue/BoundedQueueInterface.h +++ b/core/pipeline/queue/BoundedQueueInterface.h @@ -26,7 +26,7 @@ class BoundedQueueInterface : virtual public QueueInterface { BoundedQueueInterface(QueueKey key, size_t cap, size_t low, size_t high, const PipelineContext& ctx) : QueueInterface(key, cap, ctx), mLowWatermark(low), mHighWatermark(high) { this->mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_QUEUE_TYPE, "bounded"}}); - mValidToPushFlag = this->mMetricsRecordRef.CreateIntGauge("valid_to_push"); + mValidToPushFlag = this->mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_VALID_TO_PUSH_FLAG); } virtual ~BoundedQueueInterface() = default; diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.cpp b/core/pipeline/queue/BoundedSenderQueueInterface.cpp index 03e26ebac3..3e58d48e49 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.cpp +++ b/core/pipeline/queue/BoundedSenderQueueInterface.cpp @@ -26,8 +26,8 @@ BoundedSenderQueueInterface::BoundedSenderQueueInterface( : QueueInterface(key, cap, ctx), BoundedQueueInterface>(key, cap, low, high, ctx) { mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_COMPONENT_NAME, "sender_queue"}}); mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}}); - mExtraBufferSize = mMetricsRecordRef.CreateIntGauge("extra_buffer_size"); - mExtraBufferDataSizeBytes = mMetricsRecordRef.CreateIntGauge("extra_buffer_data_size_bytes"); + mExtraBufferSize = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE); + mExtraBufferDataSizeBytes = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES); } void BoundedSenderQueueInterface::SetFeedback(FeedbackInterface* feedback) { diff --git a/core/pipeline/queue/CircularProcessQueue.cpp b/core/pipeline/queue/CircularProcessQueue.cpp index c6df833e7f..9723894315 100644 --- a/core/pipeline/queue/CircularProcessQueue.cpp +++ b/core/pipeline/queue/CircularProcessQueue.cpp @@ -24,7 +24,7 @@ namespace logtail { CircularProcessQueue::CircularProcessQueue(size_t cap, int64_t key, uint32_t priority, const PipelineContext& ctx) : QueueInterface>(key, cap, ctx), ProcessQueueInterface(key, cap, priority, ctx) { mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_QUEUE_TYPE, "circular"}}); - mDroppedEventsCnt = mMetricsRecordRef.CreateCounter("dropped_events_cnt"); + mDiscardedEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_CNT); WriteMetrics::GetInstance()->CommitMetricsRecordRef(mMetricsRecordRef); } @@ -37,7 +37,7 @@ bool CircularProcessQueue::Push(unique_ptr&& item) { mQueue.pop_front(); mQueueSize->Set(Size()); mQueueDataSizeByte->Sub(size); - mDroppedEventsCnt->Add(cnt); + mDiscardedEventsCnt->Add(cnt); } if (mEventCnt + newCnt > mCapacity) { return false; diff --git a/core/pipeline/queue/CircularProcessQueue.h b/core/pipeline/queue/CircularProcessQueue.h index 813929c799..7f272c3f87 100644 --- a/core/pipeline/queue/CircularProcessQueue.h +++ b/core/pipeline/queue/CircularProcessQueue.h @@ -42,7 +42,7 @@ class CircularProcessQueue : virtual public QueueInterface> mQueue; size_t mEventCnt = 0; - CounterPtr mDroppedEventsCnt; + CounterPtr mDiscardedEventsCnt; #ifdef APSARA_UNIT_TEST_MAIN friend class CircularProcessQueueUnittest; diff --git a/core/pipeline/queue/QueueInterface.h b/core/pipeline/queue/QueueInterface.h index 1c450013ad..7754190000 100644 --- a/core/pipeline/queue/QueueInterface.h +++ b/core/pipeline/queue/QueueInterface.h @@ -33,12 +33,12 @@ class QueueInterface { {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, }); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); - mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); - mQueueSize = mMetricsRecordRef.CreateIntGauge("queue_size"); - mQueueDataSizeByte = mMetricsRecordRef.CreateIntGauge("queue_data_size_bytes"); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT); + mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mQueueSize = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_SIZE_CNT); + mQueueDataSizeByte = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_QUEUE_SIZE_BYTES); } virtual ~QueueInterface() = default; diff --git a/core/pipeline/route/Router.cpp b/core/pipeline/route/Router.cpp index 73de16bcda..a83e1b4225 100644 --- a/core/pipeline/route/Router.cpp +++ b/core/pipeline/route/Router.cpp @@ -39,8 +39,8 @@ bool Router::Init(std::vector> configs, const P {{METRIC_LABEL_PROJECT, ctx.GetProjectName()}, {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, {METRIC_LABEL_KEY_COMPONENT_NAME, "router"}}); - mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENTS_CNT); - mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); + mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENT_GROUP_SIZE_BYTES); return true; } diff --git a/core/pipeline/serializer/Serializer.h b/core/pipeline/serializer/Serializer.h index b632124b7a..c4e7c674ae 100644 --- a/core/pipeline/serializer/Serializer.h +++ b/core/pipeline/serializer/Serializer.h @@ -53,13 +53,13 @@ class Serializer { {METRIC_LABEL_CONFIG_NAME, f->GetContext().GetConfigName()}, {METRIC_LABEL_KEY_COMPONENT_NAME, "serializer"}, {METRIC_LABEL_KEY_FLUSHER_NODE_ID, f->GetNodeID()}}); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); - mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); - mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES); - mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt"); - mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_data_size_bytes"); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT); + mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT); + mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT); + mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES); } virtual ~Serializer() = default; diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index a40d16fb43..3d176b75ef 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -43,13 +43,13 @@ bool FlusherRunner::Init() { srand(time(nullptr)); WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, {{METRIC_LABEL_KEY_RUNNER_NAME, "flusher_runner"}}); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); - mInItemRawDataSizeBytes = mMetricsRecordRef.CreateCounter("in_item_raw_data_size_bytes"); - mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); - mWaitingItemsCnt = mMetricsRecordRef.CreateIntGauge("waiting_items_cnt"); - mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_LAST_RUN_TIME); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEMS_CNT); + mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_OUT_ITEMS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_TOTAL_DELAY_MS); + mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); + mInItemRawDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_FLUSHER_IN_ITEM_RAW_SIZE_BYTES); + mWaitingItemsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_FLUSHER_WAITING_ITEMS_CNT); mThreadRes = async(launch::async, &FlusherRunner::Run, this); mLastCheckSendClientTime = time(nullptr); diff --git a/core/runner/LogProcess.cpp b/core/runner/LogProcess.cpp index 0e2ae8d442..dc3cb84ad1 100644 --- a/core/runner/LogProcess.cpp +++ b/core/runner/LogProcess.cpp @@ -143,12 +143,13 @@ bool LogProcess::FlushOut(int32_t waitMs) { void* LogProcess::ProcessLoop(int32_t threadNo) { LOG_DEBUG(sLogger, ("LogProcess", "Start")("threadNo", threadNo)); + // thread local metrics should be initialized in each thread WriteMetrics::GetInstance()->PrepareMetricsRecordRef( sMetricsRecordRef, {{METRIC_LABEL_KEY_RUNNER_NAME, "processor_runner"}, {"thread_no", ToString(threadNo)}}); - sInGroupsCnt = sMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUPS_CNT); - sInEventsCnt = sMetricsRecordRef.CreateCounter(METRIC_IN_EVENTS_CNT); - sInGroupDataSizeBytes = sMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); - sLastRunTime = sMetricsRecordRef.CreateIntGauge(METRIC_LAST_RUN_TIME); + sInGroupsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENT_GROUPS_CNT); + sInEventsCnt = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENTS_CNT); + sInGroupDataSizeBytes = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_EVENT_GROUP_SIZE_BYTES); + sLastRunTime = sMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); static int32_t lastMergeTime = 0; while (true) { diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 6bcf7e92c1..68e276397f 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -37,12 +37,12 @@ bool HttpSink::Init() { WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, {{METRIC_LABEL_KEY_RUNNER_NAME, "http sink"}}); - mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); - mOutSuccessfulItemsCnt = mMetricsRecordRef.CreateCounter("out_successful_items_cnt"); - mOutFailedItemsCnt = mMetricsRecordRef.CreateCounter("out_failed_items_cnt"); - mSendingItemsCnt = mMetricsRecordRef.CreateIntGauge("sending_items_cnt"); - mSendConcurrency = mMetricsRecordRef.CreateIntGauge("send_concurrency"); - mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_LAST_RUN_TIME); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_ITEMS_CNT); + mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); + mOutSuccessfulItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_HTTP_SINK_OUT_SUCCESSFUL_ITEMS_CNT); + mOutFailedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_HTTP_SINK_OUT_FAILED_ITEMS_CNT); + mSendingItemsCnt = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_HTTP_SINK_SENDING_ITEMS_CNT); + mSendConcurrency = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_HTTP_SINK_SEND_CONCURRENCY); // TODO: should be dynamic mSendConcurrency->Set(AppConfig::GetInstance()->GetSendRequestConcurrency()); diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index d4d76b5138..4c061b255d 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -2688,10 +2688,10 @@ void PipelineUnittest::TestProcess() const { pipeline.mProcessorLine.emplace_back(std::move(processor)); WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, {}); - pipeline.mProcessorsInEventsCnt = pipeline.mMetricsRecordRef.CreateCounter("processors_in_events_cnt"); - pipeline.mProcessorsInGroupsCnt = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_groups_cnt"); + pipeline.mProcessorsInEventsCnt = pipeline.mMetricsRecordRef.CreateCounter("pipeline_processors_in_events_total"); + pipeline.mProcessorsInGroupsCnt = pipeline.mMetricsRecordRef.CreateCounter("pipeline_processors_in_event_groups_total"); pipeline.mProcessorsInGroupDataSizeBytes - = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_group_data_size_bytes"); + = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_group_size_bytes"); pipeline.mProcessorsTotalDelayMs = pipeline.mMetricsRecordRef.CreateCounter("processors_total_delay_ms"); vector groups; diff --git a/core/unittest/queue/CircularProcessQueueUnittest.cpp b/core/unittest/queue/CircularProcessQueueUnittest.cpp index 7b5ef9d0d3..16de133b4a 100644 --- a/core/unittest/queue/CircularProcessQueueUnittest.cpp +++ b/core/unittest/queue/CircularProcessQueueUnittest.cpp @@ -169,7 +169,7 @@ void CircularProcessQueueUnittest::TestMetric() { APSARA_TEST_EQUAL(dataSize1 + dataSize2, mQueue->mInItemDataSizeBytes->GetValue()); APSARA_TEST_EQUAL(1U, mQueue->mQueueSize->GetValue()); APSARA_TEST_EQUAL(dataSize2, mQueue->mQueueDataSizeByte->GetValue()); - APSARA_TEST_EQUAL(2U, mQueue->mDroppedEventsCnt->GetValue()); + APSARA_TEST_EQUAL(2U, mQueue->mDiscardedEventsCnt->GetValue()); mQueue->Pop(item); APSARA_TEST_EQUAL(1U, mQueue->mOutItemsCnt->GetValue());