Skip to content

Commit

Permalink
feat: add ReentrantMetricsRecord and PluginMetricManager for c++ self…
Browse files Browse the repository at this point in the history
… monitor, and support input_file self monitor
  • Loading branch information
Takuka0311 committed Sep 20, 2024
1 parent 8d9556f commit e381210
Show file tree
Hide file tree
Showing 23 changed files with 825 additions and 165 deletions.
3 changes: 3 additions & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ DEFINE_FLAG_INT32(max_holded_data_size,
DEFINE_FLAG_INT32(pub_max_holded_data_size,
"for every id and metric name, the max data size can be holded in memory (default 512KB)",
512 * 1024);
DEFINE_FLAG_STRING(metrics_report_method,
"method to report metrics (default none, means logtail will not report metrics)",
"sls");

namespace logtail {
AppConfig::AppConfig() {
Expand Down
1 change: 1 addition & 0 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ void ModifyHandler::ForceReadLogAndPush(LogFileReaderPtr reader) {
int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* logBuffer) {
int32_t pushRetry = 0;
if (!logBuffer->rawBuffer.empty()) {
reader->ReportMetrics(logBuffer->readLength);
LogFileProfiler::GetInstance()->AddProfilingReadBytes(reader->GetConfigName(),
reader->GetRegion(),
reader->GetProject(),
Expand Down
56 changes: 56 additions & 0 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ void FileServer::Stop() {

// 获取给定名称的文件发现配置
FileDiscoveryConfig FileServer::GetFileDiscoveryConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameFileDiscoveryConfigsMap.find(name);
if (itr != mPipelineNameFileDiscoveryConfigsMap.end()) {
return itr->second;
Expand All @@ -118,16 +119,19 @@ FileDiscoveryConfig FileServer::GetFileDiscoveryConfig(const string& name) const

// 添加文件发现配置
void FileServer::AddFileDiscoveryConfig(const string& name, FileDiscoveryOptions* opts, const PipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileDiscoveryConfigsMap[name] = make_pair(opts, ctx);
}

// 移除给定名称的文件发现配置
void FileServer::RemoveFileDiscoveryConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileDiscoveryConfigsMap.erase(name);
}

// 获取给定名称的文件读取器配置
FileReaderConfig FileServer::GetFileReaderConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameFileReaderConfigsMap.find(name);
if (itr != mPipelineNameFileReaderConfigsMap.end()) {
return itr->second;
Expand All @@ -137,16 +141,19 @@ FileReaderConfig FileServer::GetFileReaderConfig(const string& name) const {

// 添加文件读取器配置
void FileServer::AddFileReaderConfig(const string& name, const FileReaderOptions* opts, const PipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileReaderConfigsMap[name] = make_pair(opts, ctx);
}

// 移除给定名称的文件读取器配置
void FileServer::RemoveFileReaderConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileReaderConfigsMap.erase(name);
}

// 获取给定名称的多行配置
MultilineConfig FileServer::GetMultilineConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameMultilineConfigsMap.find(name);
if (itr != mPipelineNameMultilineConfigsMap.end()) {
return itr->second;
Expand All @@ -156,21 +163,25 @@ MultilineConfig FileServer::GetMultilineConfig(const string& name) const {

// 添加多行配置
void FileServer::AddMultilineConfig(const string& name, const MultilineOptions* opts, const PipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameMultilineConfigsMap[name] = make_pair(opts, ctx);
}

// 移除给定名称的多行配置
void FileServer::RemoveMultilineConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameMultilineConfigsMap.erase(name);
}

// 保存容器信息
void FileServer::SaveContainerInfo(const string& pipeline, const shared_ptr<vector<ContainerInfo>>& info) {
WriteLock lock(mReadWriteLock);
mAllContainerInfoMap[pipeline] = info;
}

// 获取并移除给定管道的容器信息
shared_ptr<vector<ContainerInfo>> FileServer::GetAndRemoveContainerInfo(const string& pipeline) {
WriteLock lock(mReadWriteLock);
auto iter = mAllContainerInfoMap.find(pipeline);
if (iter == mAllContainerInfoMap.end()) {
return make_shared<vector<ContainerInfo>>();
Expand All @@ -182,11 +193,53 @@ shared_ptr<vector<ContainerInfo>> FileServer::GetAndRemoveContainerInfo(const st

// 清除所有容器信息
void FileServer::ClearContainerInfo() {
WriteLock lock(mReadWriteLock);
mAllContainerInfoMap.clear();
}

// 获取插件的指标管理器
PluginMetricManagerPtr FileServer::GetPluginMetricManager(const std::string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNamePluginMetricManagersMap.find(name);
if (itr != mPipelineNamePluginMetricManagersMap.end()) {
return itr->second;
}
return nullptr;
}

// 添加插件的指标管理器
void FileServer::AddPluginMetricManager(const std::string& name, PluginMetricManagerPtr PluginMetricManager) {
WriteLock lock(mReadWriteLock);
mPipelineNamePluginMetricManagersMap[name] = PluginMetricManager;
}

// 移除插件的指标管理器
void FileServer::RemovePluginMetricManager(const std::string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNamePluginMetricManagersMap.erase(name);
}

// 获取“ReentrantMetricsRecordRef”指标记录对象
ReentrantMetricsRecordRef FileServer::GetOrCreateReentrantMetricsRecordRef(const std::string& name,
MetricLabels& labels) {
PluginMetricManagerPtr filePluginMetricManager = GetPluginMetricManager(name);
if (filePluginMetricManager != nullptr) {
return filePluginMetricManager->GetOrCreateReentrantMetricsRecordRef(labels);
}
return nullptr;
}

// 释放“ReentrantMetricsRecordRef”指标记录对象
void FileServer::ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels) {
PluginMetricManagerPtr filePluginMetricManager = GetPluginMetricManager(name);
if (filePluginMetricManager != nullptr) {
filePluginMetricManager->ReleaseReentrantMetricsRecordRef(labels);
}
}

// 获取给定名称的“ExactlyOnce”并发级别
uint32_t FileServer::GetExactlyOnceConcurrency(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameEOConcurrencyMap.find(name);
if (itr != mPipelineNameEOConcurrencyMap.end()) {
return itr->second;
Expand All @@ -196,6 +249,7 @@ uint32_t FileServer::GetExactlyOnceConcurrency(const string& name) const {

// 获取所有配置了“ExactlyOnce”选项的配置名列表
vector<string> FileServer::GetExactlyOnceConfigs() const {
ReadLock lock(mReadWriteLock);
vector<string> res;
for (const auto& item : mPipelineNameEOConcurrencyMap) {
if (item.second > 0) {
Expand All @@ -207,11 +261,13 @@ vector<string> FileServer::GetExactlyOnceConfigs() const {

// 添加“ExactlyOnce”并发配置
void FileServer::AddExactlyOnceConcurrency(const string& name, uint32_t concurrency) {
WriteLock lock(mReadWriteLock);
mPipelineNameEOConcurrencyMap[name] = concurrency;
}

// 移除给定名称的“ExactlyOnce”并发配置
void FileServer::RemoveExactlyOnceConcurrency(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameEOConcurrencyMap.erase(name);
}

Expand Down
17 changes: 17 additions & 0 deletions core/file_server/FileServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <unordered_map>
#include <utility>

#include "common/Lock.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/PipelineContext.h"
#include "reader/FileReaderOptions.h"

Expand All @@ -40,6 +42,7 @@ class FileServer {
void Start();
void Pause(bool isConfigUpdate = true);

// for plugin
FileDiscoveryConfig GetFileDiscoveryConfig(const std::string& name) const;
const std::unordered_map<std::string, FileDiscoveryConfig>& GetAllFileDiscoveryConfigs() const {
return mPipelineNameFileDiscoveryConfigsMap;
Expand All @@ -64,6 +67,17 @@ class FileServer {
void SaveContainerInfo(const std::string& pipeline, const std::shared_ptr<std::vector<ContainerInfo>>& info);
std::shared_ptr<std::vector<ContainerInfo>> GetAndRemoveContainerInfo(const std::string& pipeline);
void ClearContainerInfo();

PluginMetricManagerPtr GetPluginMetricManager(const std::string& name) const;
const std::unordered_map<std::string, PluginMetricManagerPtr>& GetAllMetricRecordSetDefinitions() const {
return mPipelineNamePluginMetricManagersMap;
}
void AddPluginMetricManager(const std::string& name, PluginMetricManagerPtr PluginMetricManager);
void RemovePluginMetricManager(const std::string& name);
// for reader, event_handler ...
ReentrantMetricsRecordRef GetOrCreateReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);
void ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);

// 过渡使用
void Resume(bool isConfigUpdate = true);
void Stop();
Expand All @@ -78,10 +92,13 @@ class FileServer {

void PauseInner();

mutable ReadWriteLock mReadWriteLock;

std::unordered_map<std::string, FileDiscoveryConfig> mPipelineNameFileDiscoveryConfigsMap;
std::unordered_map<std::string, FileReaderConfig> mPipelineNameFileReaderConfigsMap;
std::unordered_map<std::string, MultilineConfig> mPipelineNameMultilineConfigsMap;
std::unordered_map<std::string, std::shared_ptr<std::vector<ContainerInfo>>> mAllContainerInfoMap;
std::unordered_map<std::string, PluginMetricManagerPtr> mPipelineNamePluginMetricManagersMap;
// 过渡使用
std::unordered_map<std::string, uint32_t> mPipelineNameEOConcurrencyMap;
};
Expand Down
17 changes: 17 additions & 0 deletions core/input/InputFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "common/ParamExtractor.h"
#include "config_manager/ConfigManager.h"
#include "file_server/FileServer.h"
#include "monitor/MetricConstants.h"
#include "pipeline/Pipeline.h"
#include "pipeline/PipelineManager.h"

Expand Down Expand Up @@ -144,6 +145,20 @@ bool InputFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline)
mExactlyOnceConcurrency = exactlyOnceConcurrency;
}

mInputFileMonitorTotal = GetMetricsRecordRef().CreateGauge(METRIC_INPUT_FILE_MONITOR_TOTAL);
static const std::unordered_map<std::string, MetricType> inputFileMetricKeys = {
// {METRIC_INPUT_RECORDS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_RECORDS_SIZE_BYTES, MetricType::METRIC_TYPE_COUNTER},
// {METRIC_INPUT_BATCH_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_READ_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_FILE_SIZE_BYTES, MetricType::METRIC_TYPE_GAUGE},
// {METRIC_INPUT_FILE_READ_DELAY_TIME_MS, MetricType::METRIC_TYPE_GAUGE},
{METRIC_INPUT_FILE_OFFSET_BYTES, MetricType::METRIC_TYPE_GAUGE},
};
mPluginMetricManager
= std::make_shared<PluginMetricManager>(GetMetricsRecordRef()->GetLabels(), inputFileMetricKeys);
mPluginMetricManager->RegisterSizeGauge(mInputFileMonitorTotal);

return true;
}

Expand Down Expand Up @@ -200,6 +215,7 @@ bool InputFile::DeduceAndSetContainerBaseDir(ContainerInfo& containerInfo,
}

bool InputFile::Start() {
FileServer::GetInstance()->AddPluginMetricManager(mContext->GetConfigName(), mPluginMetricManager);
if (mEnableContainerDiscovery) {
mFileDiscovery.SetContainerInfo(
FileServer::GetInstance()->GetAndRemoveContainerInfo(mContext->GetPipeline().Name()));
Expand All @@ -219,6 +235,7 @@ bool InputFile::Stop(bool isPipelineRemoving) {
FileServer::GetInstance()->RemoveFileReaderConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveMultilineConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveExactlyOnceConcurrency(mContext->GetConfigName());
FileServer::GetInstance()->RemovePluginMetricManager(mContext->GetConfigName());
return true;
}

Expand Down
3 changes: 3 additions & 0 deletions core/input/InputFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "container_manager/ContainerDiscoveryOptions.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "monitor/PluginMetricManager.h"
#include "plugin/interface/Input.h"
#include "reader/FileReaderOptions.h"

Expand All @@ -47,6 +48,8 @@ class InputFile : public Input {
ContainerDiscoveryOptions mContainerDiscovery;
FileReaderOptions mFileReader;
MultilineOptions mMultiline;
PluginMetricManagerPtr mPluginMetricManager;
GaugePtr mInputFileMonitorTotal;
// others
uint32_t mMaxCheckpointDirSearchDepth = 0;
uint32_t mExactlyOnceConcurrency = 0;
Expand Down
2 changes: 2 additions & 0 deletions core/logtail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ DECLARE_FLAG_STRING(logtail_sys_conf_dir);
DECLARE_FLAG_STRING(check_point_filename);
DECLARE_FLAG_STRING(default_buffer_file_path);
DECLARE_FLAG_STRING(ilogtail_docker_file_path_config);
DECLARE_FLAG_STRING(metrics_report_method);
DECLARE_FLAG_INT32(data_server_port);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);
DECLARE_FLAG_BOOL(enable_sls_metrics_format);
Expand Down Expand Up @@ -76,6 +77,7 @@ static void overwrite_community_edition_flags() {
STRING_FLAG(check_point_filename) = "checkpoint/logtail_check_point";
STRING_FLAG(default_buffer_file_path) = "checkpoint";
STRING_FLAG(ilogtail_docker_file_path_config) = "checkpoint/docker_path_config.json";
STRING_FLAG(metrics_report_method) = "";
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
Expand Down
2 changes: 2 additions & 0 deletions core/logtail_windows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ DECLARE_FLAG_STRING(logtail_sys_conf_dir);
DECLARE_FLAG_STRING(check_point_filename);
DECLARE_FLAG_STRING(default_buffer_file_path);
DECLARE_FLAG_STRING(ilogtail_docker_file_path_config);
DECLARE_FLAG_STRING(metrics_report_method);
DECLARE_FLAG_INT32(data_server_port);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);
DECLARE_FLAG_BOOL(enable_sls_metrics_format);
Expand All @@ -38,6 +39,7 @@ static void overwrite_community_edition_flags() {
STRING_FLAG(check_point_filename) = "checkpoint/logtail_check_point";
STRING_FLAG(default_buffer_file_path) = "checkpoint";
STRING_FLAG(ilogtail_docker_file_path_config) = "checkpoint/docker_path_config.json";
STRING_FLAG(metrics_report_method) = "";
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
Expand Down
Loading

0 comments on commit e381210

Please sign in to comment.