Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename self monitor metrics #1779

Merged
merged 14 commits into from
Sep 29, 2024
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager logger go_pipeline monitor profile_sender models
application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models
config config/watcher
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
Expand Down
22 changes: 11 additions & 11 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include <chrono>

#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"

using namespace std;

Expand All @@ -25,31 +25,31 @@ namespace logtail {
void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES);
mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES);
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS);
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT);
mInItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_TOTAL);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES);
mOutItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_TOTAL);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_SIZE_BYTES);
mTotalProcessMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_PROCESS_TIME_MS);
mDiscardedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_TOTAL);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
if (mMetricsRecordRef != nullptr) {
mInItemsCnt->Add(1);
mInItemsTotal->Add(1);
mInItemSizeBytes->Add(input.size());
}

auto before = chrono::system_clock::now();
auto res = Compress(input, output, errorMsg);

if (mMetricsRecordRef != nullptr) {
mTotalDelayMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - before).count());
mTotalProcessMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - before).count());
if (res) {
mOutItemsCnt->Add(1);
mOutItemsTotal->Add(1);
mOutItemSizeBytes->Add(output.size());
} else {
mDiscardedItemsCnt->Add(1);
mDiscardedItemsTotal->Add(1);
mDiscardedItemSizeBytes->Add(input.size());
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/common/compression/Compressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ class Compressor {

protected:
mutable MetricsRecordRef mMetricsRecordRef;
CounterPtr mInItemsCnt;
CounterPtr mInItemsTotal;
CounterPtr mInItemSizeBytes;
CounterPtr mOutItemsCnt;
CounterPtr mOutItemsTotal;
CounterPtr mOutItemSizeBytes;
CounterPtr mDiscardedItemsCnt;
CounterPtr mDiscardedItemsTotal;
CounterPtr mDiscardedItemSizeBytes;
CounterPtr mTotalDelayMs;
CounterPtr mTotalProcessMs;

private:
virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
Expand Down
8 changes: 4 additions & 4 deletions core/common/compression/CompressorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include "common/compression/CompressorFactory.h"

#include "common/ParamExtractor.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "common/compression/LZ4Compressor.h"
#include "common/compression/ZstdCompressor.h"

Expand Down Expand Up @@ -61,9 +61,9 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
} else {
compressor = Create(defaultType);
}
compressor->SetMetricRecordRef({{METRIC_LABEL_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, "compressor"},
compressor->SetMetricRecordRef({{METRIC_LABEL_KEY_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_COMPRESSOR},
{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}});
return compressor;
}
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace logtail {

FileServer::FileServer() {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, "file_server"}});
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER}});
}

// 启动文件服务,包括加载配置、处理检查点、注册事件等
Expand Down
8 changes: 4 additions & 4 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ void LogInput::Start() {

mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mRegisterdHandlersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_CNT);
mActiveReadersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_CNT);
mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);

new Thread([this]() { ProcessLoop(); });
Expand Down Expand Up @@ -353,9 +353,9 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {
mAgentOpenFdTotal->Set(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mRegisterdHandlersCnt->Set(handlerCount);
mRegisterdHandlersTotal->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
mActiveReadersCnt->Set(CheckPointManager::Instance()->GetReaderCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
}
Expand Down
4 changes: 2 additions & 2 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class LogInput : public LogRunnable {

IntGaugePtr mLastRunTime;
IntGaugePtr mAgentOpenFdTotal;
IntGaugePtr mRegisterdHandlersCnt;
IntGaugePtr mActiveReadersCnt;
IntGaugePtr mRegisterdHandlersTotal;
IntGaugePtr mActiveReadersTotal;
IntGaugePtr mEnableFileIncludedByMultiConfigs;

std::atomic_int mLastReadEventTime{0};
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
#include "file_server/polling/PollingModify.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"

// Control the check frequency to call ClearUnavailableFileAndDir.
DEFINE_FLAG_INT32(check_not_exist_file_dir_round, "clear not exist file dir cache, round", 20);
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/polling/PollingModify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "file_server/event/Event.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"

using namespace std;

Expand Down
26 changes: 14 additions & 12 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
Expand Down Expand Up @@ -203,20 +203,21 @@ LogFileReader::LogFileReader(const std::string& hostLogPathDir,

void LogFileReader::SetMetrics() {
mMetricInited = false;
mMetricLabels = {{METRIC_LABEL_FILE_NAME, GetConvertedPath()},
{METRIC_LABEL_FILE_DEV, std::to_string(GetDevInode().dev)},
{METRIC_LABEL_FILE_INODE, std::to_string(GetDevInode().inode)}};
mMetricLabels = {{METRIC_LABEL_KEY_FILE_NAME, GetConvertedPath()},
{METRIC_LABEL_KEY_FILE_DEV, std::to_string(GetDevInode().dev)},
{METRIC_LABEL_KEY_FILE_INODE, std::to_string(GetDevInode().inode)}};
mMetricsRecordRef = FileServer::GetInstance()->GetOrCreateReentrantMetricsRecordRef(GetConfigName(), mMetricLabels);
if (mMetricsRecordRef == nullptr) {
LOG_ERROR(sLogger,
("failed to init metrics", "cannot get config's metricRecordRef")("config name", GetConfigName()));
return;
}

mInputRecordsSizeBytesCounter = mMetricsRecordRef->GetCounter(METRIC_INPUT_RECORDS_SIZE_BYTES);
mInputReadTotalCounter = mMetricsRecordRef->GetCounter(METRIC_INPUT_READ_TOTAL);
mInputFileSizeBytesGauge = mMetricsRecordRef->GetIntGauge(METRIC_INPUT_FILE_SIZE_BYTES);
mInputFileOffsetBytesGauge = mMetricsRecordRef->GetIntGauge(METRIC_INPUT_FILE_OFFSET_BYTES);
mOutEventsTotal = mMetricsRecordRef->GetCounter(METRIC_PLUGIN_OUT_EVENTS_TOTAL);
mOutEventGroupsTotal = mMetricsRecordRef->GetCounter(METRIC_PLUGIN_OUT_EVENT_GROUPS_TOTAL);
mOutSizeBytes = mMetricsRecordRef->GetCounter(METRIC_PLUGIN_OUT_SIZE_BYTES);
mSourceSizeBytes = mMetricsRecordRef->GetIntGauge(METRIC_PLUGIN_SOURCE_SIZE_BYTES);
mSourceReadOffsetBytes = mMetricsRecordRef->GetIntGauge(METRIC_PLUGIN_SOURCE_READ_OFFSET_BYTES);
mMetricInited = true;
}

Expand Down Expand Up @@ -2133,10 +2134,11 @@ std::unique_ptr<Event> LogFileReader::CreateFlushTimeoutEvent() {

void LogFileReader::ReportMetrics(uint64_t readSize) {
if (mMetricInited) {
mInputReadTotalCounter->Add(1);
mInputRecordsSizeBytesCounter->Add(readSize);
mInputFileOffsetBytesGauge->Set(GetLastFilePos());
mInputFileSizeBytesGauge->Set(GetFileSize());
mOutEventsTotal->Add(1);
mOutEventGroupsTotal->Add(1);
mOutSizeBytes->Add(readSize);
mSourceReadOffsetBytes->Set(GetLastFilePos());
mSourceSizeBytes->Set(GetFileSize());
}
}

Expand Down
9 changes: 5 additions & 4 deletions core/file_server/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,11 @@ class LogFileReader {
MetricLabels mMetricLabels;
bool mMetricInited;
ReentrantMetricsRecordRef mMetricsRecordRef;
CounterPtr mInputRecordsSizeBytesCounter;
CounterPtr mInputReadTotalCounter;
IntGaugePtr mInputFileSizeBytesGauge;
IntGaugePtr mInputFileOffsetBytesGauge;
CounterPtr mOutEventsTotal;
CounterPtr mOutEventGroupsTotal;
CounterPtr mOutSizeBytes;
IntGaugePtr mSourceSizeBytes;
IntGaugePtr mSourceReadOffsetBytes;

private:
bool mHasReadContainerBom = false;
Expand Down
13 changes: 9 additions & 4 deletions core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ using namespace sls_logs;

namespace logtail {

const std::string LABEL_PREFIX = "label.";
const std::string VALUE_PREFIX = "value.";

MetricsRecord::MetricsRecord(MetricLabelsPtr labels, DynamicMetricLabelsPtr dynamicLabels)
: mLabels(labels), mDynamicLabels(dynamicLabels), mDeleted(false) {
}
Expand Down Expand Up @@ -325,15 +328,17 @@ ReadMetrics::~ReadMetrics() {
Clear();
}

void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const {
void ReadMetrics::ReadAsLogGroup(std::string regionFieldName,
std::string defaultRegion,
std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const {
ReadLock lock(mReadWriteLock);
MetricsRecord* tmp = mHead;
while (tmp) {
Log* logPtr = nullptr;

for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
if (METRIC_FIELD_REGION == pair.first) {
if (regionFieldName == pair.first) {
std::map<std::string, sls_logs::LogGroup*>::iterator iter;
std::string region = pair.second;
iter = logGroupMap.find(region);
Expand All @@ -349,14 +354,14 @@ void ReadMetrics::ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& log
}
if (!logPtr) {
std::map<std::string, sls_logs::LogGroup*>::iterator iter;
iter = logGroupMap.find(METRIC_REGION_DEFAULT);
iter = logGroupMap.find(defaultRegion);
if (iter != logGroupMap.end()) {
sls_logs::LogGroup* logGroup = iter->second;
logPtr = logGroup->add_logs();
} else {
sls_logs::LogGroup* logGroup = new sls_logs::LogGroup();
logPtr = logGroup->add_logs();
logGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(METRIC_REGION_DEFAULT, logGroup));
logGroupMap.insert(std::pair<std::string, sls_logs::LogGroup*>(defaultRegion, logGroup));
}
}
auto now = GetCurrentLogtailTime();
Expand Down
4 changes: 3 additions & 1 deletion core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ class ReadMetrics {
static ReadMetrics* ptr = new ReadMetrics();
return ptr;
}
void ReadAsLogGroup(std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const;
void ReadAsLogGroup(std::string regionFieldName,
std::string defaultRegion,
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const;
void ReadAsFileBuffer(std::string& metricsContent) const;
void UpdateMetrics();

Expand Down
Loading
Loading