Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick self monitor: 2.x base cpp #1775

Merged
merged 2 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ DEFINE_FLAG_INT32(max_holded_data_size,
DEFINE_FLAG_INT32(pub_max_holded_data_size,
"for every id and metric name, the max data size can be holded in memory (default 512KB)",
512 * 1024);
DEFINE_FLAG_STRING(metrics_report_method,
"method to report metrics (default none, means logtail will not report metrics)",
"sls");

namespace logtail {
AppConfig::AppConfig() {
Expand Down
2 changes: 2 additions & 0 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ void Application::Start() {
#endif

LogtailAlarm::GetInstance()->Init();
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();

PluginRegistry::GetInstance()->LoadPlugins();
Expand Down Expand Up @@ -322,6 +323,7 @@ void Application::Exit() {
#endif

LogtailMonitor::GetInstance()->Stop();
LoongCollectorMonitor::GetInstance()->Stop();
LogtailAlarm::GetInstance()->Stop();
// from now on, alarm should not be used.

Expand Down
1 change: 1 addition & 0 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,7 @@ void ModifyHandler::ForceReadLogAndPush(LogFileReaderPtr reader) {
int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* logBuffer) {
int32_t pushRetry = 0;
if (!logBuffer->rawBuffer.empty()) {
reader->ReportMetrics(logBuffer->readLength);
LogFileProfiler::GetInstance()->AddProfilingReadBytes(reader->GetConfigName(),
reader->GetRegion(),
reader->GetProject(),
Expand Down
13 changes: 10 additions & 3 deletions core/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ void LogInput::Start() {
initialized = true;

mInteruptFlag = false;

mGlobalOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_GLOBAL_OPEN_FD_TOTAL);
mGlobalRegisterHandlerTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_GLOBAL_REGISTER_HANDLER_TOTAL);

new Thread([this]() { ProcessLoop(); });
}

Expand Down Expand Up @@ -342,9 +346,12 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {

LogtailMonitor::GetInstance()->UpdateMetric("event_tps",
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
LogtailMonitor::GetInstance()->UpdateMetric("open_fd",
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", EventDispatcher::GetInstance()->GetHandlerCount());
int32_t openFdTotal = GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize();
LogtailMonitor::GetInstance()->UpdateMetric("open_fd", openFdTotal);
mGlobalOpenFdTotal->Set(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mGlobalRegisterHandlerTotal->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
Expand Down
3 changes: 3 additions & 0 deletions core/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "common/Lock.h"
#include "common/LogRunnable.h"
#include "monitor/Monitor.h"

namespace logtail {

Expand Down Expand Up @@ -78,6 +79,8 @@ class LogInput : public LogRunnable {
volatile bool mIdleFlag;
int32_t mEventProcessCount;
int32_t mLastUpdateMetricTime;
IntGaugePtr mGlobalOpenFdTotal;
IntGaugePtr mGlobalRegisterHandlerTotal;

std::atomic_int mLastReadEventTime{0};
mutable std::mutex mThreadRunningMux;
Expand Down
56 changes: 56 additions & 0 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ void FileServer::Stop() {

// 获取给定名称的文件发现配置
FileDiscoveryConfig FileServer::GetFileDiscoveryConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameFileDiscoveryConfigsMap.find(name);
if (itr != mPipelineNameFileDiscoveryConfigsMap.end()) {
return itr->second;
Expand All @@ -118,16 +119,19 @@ FileDiscoveryConfig FileServer::GetFileDiscoveryConfig(const string& name) const

// 添加文件发现配置
void FileServer::AddFileDiscoveryConfig(const string& name, FileDiscoveryOptions* opts, const PipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileDiscoveryConfigsMap[name] = make_pair(opts, ctx);
}

// 移除给定名称的文件发现配置
void FileServer::RemoveFileDiscoveryConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileDiscoveryConfigsMap.erase(name);
}

// 获取给定名称的文件读取器配置
FileReaderConfig FileServer::GetFileReaderConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameFileReaderConfigsMap.find(name);
if (itr != mPipelineNameFileReaderConfigsMap.end()) {
return itr->second;
Expand All @@ -137,16 +141,19 @@ FileReaderConfig FileServer::GetFileReaderConfig(const string& name) const {

// 添加文件读取器配置
void FileServer::AddFileReaderConfig(const string& name, const FileReaderOptions* opts, const PipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileReaderConfigsMap[name] = make_pair(opts, ctx);
}

// 移除给定名称的文件读取器配置
void FileServer::RemoveFileReaderConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameFileReaderConfigsMap.erase(name);
}

// 获取给定名称的多行配置
MultilineConfig FileServer::GetMultilineConfig(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameMultilineConfigsMap.find(name);
if (itr != mPipelineNameMultilineConfigsMap.end()) {
return itr->second;
Expand All @@ -156,21 +163,25 @@ MultilineConfig FileServer::GetMultilineConfig(const string& name) const {

// 添加多行配置
void FileServer::AddMultilineConfig(const string& name, const MultilineOptions* opts, const PipelineContext* ctx) {
WriteLock lock(mReadWriteLock);
mPipelineNameMultilineConfigsMap[name] = make_pair(opts, ctx);
}

// 移除给定名称的多行配置
void FileServer::RemoveMultilineConfig(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameMultilineConfigsMap.erase(name);
}

// 保存容器信息
void FileServer::SaveContainerInfo(const string& pipeline, const shared_ptr<vector<ContainerInfo>>& info) {
WriteLock lock(mReadWriteLock);
mAllContainerInfoMap[pipeline] = info;
}

// 获取并移除给定管道的容器信息
shared_ptr<vector<ContainerInfo>> FileServer::GetAndRemoveContainerInfo(const string& pipeline) {
WriteLock lock(mReadWriteLock);
auto iter = mAllContainerInfoMap.find(pipeline);
if (iter == mAllContainerInfoMap.end()) {
return make_shared<vector<ContainerInfo>>();
Expand All @@ -182,11 +193,53 @@ shared_ptr<vector<ContainerInfo>> FileServer::GetAndRemoveContainerInfo(const st

// 清除所有容器信息
void FileServer::ClearContainerInfo() {
WriteLock lock(mReadWriteLock);
mAllContainerInfoMap.clear();
}

// 获取插件的指标管理器
PluginMetricManagerPtr FileServer::GetPluginMetricManager(const std::string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNamePluginMetricManagersMap.find(name);
if (itr != mPipelineNamePluginMetricManagersMap.end()) {
return itr->second;
}
return nullptr;
}

// 添加插件的指标管理器
void FileServer::AddPluginMetricManager(const std::string& name, PluginMetricManagerPtr PluginMetricManager) {
WriteLock lock(mReadWriteLock);
mPipelineNamePluginMetricManagersMap[name] = PluginMetricManager;
}

// 移除插件的指标管理器
void FileServer::RemovePluginMetricManager(const std::string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNamePluginMetricManagersMap.erase(name);
}

// 获取“ReentrantMetricsRecordRef”指标记录对象
ReentrantMetricsRecordRef FileServer::GetOrCreateReentrantMetricsRecordRef(const std::string& name,
MetricLabels& labels) {
PluginMetricManagerPtr filePluginMetricManager = GetPluginMetricManager(name);
if (filePluginMetricManager != nullptr) {
return filePluginMetricManager->GetOrCreateReentrantMetricsRecordRef(labels);
}
return nullptr;
}

// 释放“ReentrantMetricsRecordRef”指标记录对象
void FileServer::ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels) {
PluginMetricManagerPtr filePluginMetricManager = GetPluginMetricManager(name);
if (filePluginMetricManager != nullptr) {
filePluginMetricManager->ReleaseReentrantMetricsRecordRef(labels);
}
}

// 获取给定名称的“ExactlyOnce”并发级别
uint32_t FileServer::GetExactlyOnceConcurrency(const string& name) const {
ReadLock lock(mReadWriteLock);
auto itr = mPipelineNameEOConcurrencyMap.find(name);
if (itr != mPipelineNameEOConcurrencyMap.end()) {
return itr->second;
Expand All @@ -196,6 +249,7 @@ uint32_t FileServer::GetExactlyOnceConcurrency(const string& name) const {

// 获取所有配置了“ExactlyOnce”选项的配置名列表
vector<string> FileServer::GetExactlyOnceConfigs() const {
ReadLock lock(mReadWriteLock);
vector<string> res;
for (const auto& item : mPipelineNameEOConcurrencyMap) {
if (item.second > 0) {
Expand All @@ -207,11 +261,13 @@ vector<string> FileServer::GetExactlyOnceConfigs() const {

// 添加“ExactlyOnce”并发配置
void FileServer::AddExactlyOnceConcurrency(const string& name, uint32_t concurrency) {
WriteLock lock(mReadWriteLock);
mPipelineNameEOConcurrencyMap[name] = concurrency;
}

// 移除给定名称的“ExactlyOnce”并发配置
void FileServer::RemoveExactlyOnceConcurrency(const string& name) {
WriteLock lock(mReadWriteLock);
mPipelineNameEOConcurrencyMap.erase(name);
}

Expand Down
17 changes: 17 additions & 0 deletions core/file_server/FileServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#include <unordered_map>
#include <utility>

#include "common/Lock.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/PipelineContext.h"
#include "reader/FileReaderOptions.h"

Expand All @@ -40,6 +42,7 @@ class FileServer {
void Start();
void Pause(bool isConfigUpdate = true);

// for plugin
FileDiscoveryConfig GetFileDiscoveryConfig(const std::string& name) const;
const std::unordered_map<std::string, FileDiscoveryConfig>& GetAllFileDiscoveryConfigs() const {
return mPipelineNameFileDiscoveryConfigsMap;
Expand All @@ -64,6 +67,17 @@ class FileServer {
void SaveContainerInfo(const std::string& pipeline, const std::shared_ptr<std::vector<ContainerInfo>>& info);
std::shared_ptr<std::vector<ContainerInfo>> GetAndRemoveContainerInfo(const std::string& pipeline);
void ClearContainerInfo();

PluginMetricManagerPtr GetPluginMetricManager(const std::string& name) const;
const std::unordered_map<std::string, PluginMetricManagerPtr>& GetAllMetricRecordSetDefinitions() const {
return mPipelineNamePluginMetricManagersMap;
}
void AddPluginMetricManager(const std::string& name, PluginMetricManagerPtr PluginMetricManager);
void RemovePluginMetricManager(const std::string& name);
// for reader, event_handler ...
ReentrantMetricsRecordRef GetOrCreateReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);
void ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);

// 过渡使用
void Resume(bool isConfigUpdate = true);
void Stop();
Expand All @@ -78,10 +92,13 @@ class FileServer {

void PauseInner();

mutable ReadWriteLock mReadWriteLock;

std::unordered_map<std::string, FileDiscoveryConfig> mPipelineNameFileDiscoveryConfigsMap;
std::unordered_map<std::string, FileReaderConfig> mPipelineNameFileReaderConfigsMap;
std::unordered_map<std::string, MultilineConfig> mPipelineNameMultilineConfigsMap;
std::unordered_map<std::string, std::shared_ptr<std::vector<ContainerInfo>>> mAllContainerInfoMap;
std::unordered_map<std::string, PluginMetricManagerPtr> mPipelineNamePluginMetricManagersMap;
// 过渡使用
std::unordered_map<std::string, uint32_t> mPipelineNameEOConcurrencyMap;
};
Expand Down
17 changes: 17 additions & 0 deletions core/input/InputFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "common/ParamExtractor.h"
#include "config_manager/ConfigManager.h"
#include "file_server/FileServer.h"
#include "monitor/MetricConstants.h"
#include "pipeline/Pipeline.h"
#include "pipeline/PipelineManager.h"

Expand Down Expand Up @@ -144,6 +145,20 @@ bool InputFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline)
mExactlyOnceConcurrency = exactlyOnceConcurrency;
}

mInputFileMonitorTotal = GetMetricsRecordRef().CreateIntGauge(METRIC_INPUT_FILE_MONITOR_TOTAL);
static const std::unordered_map<std::string, MetricType> inputFileMetricKeys = {
// {METRIC_INPUT_RECORDS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_RECORDS_SIZE_BYTES, MetricType::METRIC_TYPE_COUNTER},
// {METRIC_INPUT_BATCH_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_READ_TOTAL, MetricType::METRIC_TYPE_COUNTER},
{METRIC_INPUT_FILE_SIZE_BYTES, MetricType::METRIC_TYPE_INT_GAUGE},
// {METRIC_INPUT_FILE_READ_DELAY_TIME_MS, MetricType::METRIC_TYPE_INT_GAUGE},
{METRIC_INPUT_FILE_OFFSET_BYTES, MetricType::METRIC_TYPE_INT_GAUGE},
};
mPluginMetricManager
= std::make_shared<PluginMetricManager>(GetMetricsRecordRef()->GetLabels(), inputFileMetricKeys);
mPluginMetricManager->RegisterSizeGauge(mInputFileMonitorTotal);

return true;
}

Expand Down Expand Up @@ -200,6 +215,7 @@ bool InputFile::DeduceAndSetContainerBaseDir(ContainerInfo& containerInfo,
}

bool InputFile::Start() {
FileServer::GetInstance()->AddPluginMetricManager(mContext->GetConfigName(), mPluginMetricManager);
if (mEnableContainerDiscovery) {
mFileDiscovery.SetContainerInfo(
FileServer::GetInstance()->GetAndRemoveContainerInfo(mContext->GetPipeline().Name()));
Expand All @@ -219,6 +235,7 @@ bool InputFile::Stop(bool isPipelineRemoving) {
FileServer::GetInstance()->RemoveFileReaderConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveMultilineConfig(mContext->GetConfigName());
FileServer::GetInstance()->RemoveExactlyOnceConcurrency(mContext->GetConfigName());
FileServer::GetInstance()->RemovePluginMetricManager(mContext->GetConfigName());
return true;
}

Expand Down
3 changes: 3 additions & 0 deletions core/input/InputFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "container_manager/ContainerDiscoveryOptions.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "monitor/PluginMetricManager.h"
#include "plugin/interface/Input.h"
#include "reader/FileReaderOptions.h"

Expand All @@ -47,6 +48,8 @@ class InputFile : public Input {
ContainerDiscoveryOptions mContainerDiscovery;
FileReaderOptions mFileReader;
MultilineOptions mMultiline;
PluginMetricManagerPtr mPluginMetricManager;
IntGaugePtr mInputFileMonitorTotal;
// others
uint32_t mMaxCheckpointDirSearchDepth = 0;
uint32_t mExactlyOnceConcurrency = 0;
Expand Down
2 changes: 2 additions & 0 deletions core/logtail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ DECLARE_FLAG_STRING(logtail_sys_conf_dir);
DECLARE_FLAG_STRING(check_point_filename);
DECLARE_FLAG_STRING(default_buffer_file_path);
DECLARE_FLAG_STRING(ilogtail_docker_file_path_config);
DECLARE_FLAG_STRING(metrics_report_method);
DECLARE_FLAG_INT32(data_server_port);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);
DECLARE_FLAG_BOOL(enable_sls_metrics_format);
Expand Down Expand Up @@ -76,6 +77,7 @@ static void overwrite_community_edition_flags() {
STRING_FLAG(check_point_filename) = "checkpoint/logtail_check_point";
STRING_FLAG(default_buffer_file_path) = "checkpoint";
STRING_FLAG(ilogtail_docker_file_path_config) = "checkpoint/docker_path_config.json";
STRING_FLAG(metrics_report_method) = "";
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
Expand Down
2 changes: 2 additions & 0 deletions core/logtail_windows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ DECLARE_FLAG_STRING(logtail_sys_conf_dir);
DECLARE_FLAG_STRING(check_point_filename);
DECLARE_FLAG_STRING(default_buffer_file_path);
DECLARE_FLAG_STRING(ilogtail_docker_file_path_config);
DECLARE_FLAG_STRING(metrics_report_method);
DECLARE_FLAG_INT32(data_server_port);
DECLARE_FLAG_BOOL(enable_env_ref_in_config);
DECLARE_FLAG_BOOL(enable_sls_metrics_format);
Expand All @@ -38,6 +39,7 @@ static void overwrite_community_edition_flags() {
STRING_FLAG(check_point_filename) = "checkpoint/logtail_check_point";
STRING_FLAG(default_buffer_file_path) = "checkpoint";
STRING_FLAG(ilogtail_docker_file_path_config) = "checkpoint/docker_path_config.json";
STRING_FLAG(metrics_report_method) = "";
INT32_FLAG(data_server_port) = 443;
BOOL_FLAG(enable_env_ref_in_config) = true;
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
Expand Down
Loading
Loading