Skip to content

Commit

Permalink
Merge remote-tracking branch 'alibaba/main' into go-config
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 24, 2024
2 parents 5d46f61 + 7cd66f4 commit 17d47d0
Show file tree
Hide file tree
Showing 34 changed files with 372 additions and 228 deletions.
14 changes: 7 additions & 7 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ namespace logtail {
void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES);
mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES);
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt");
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_size_bytes");
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS);
mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES);
mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES);
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS);
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
Expand Down
9 changes: 7 additions & 2 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/ConfigManager.h"
#include "plugin/input/InputFile.h"
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingModify.h"
#include "plugin/input/InputFile.h"

DEFINE_FLAG_BOOL(enable_polling_discovery, "", true);

using namespace std;

namespace logtail {

FileServer::FileServer() {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, "file_server"}});
}

// 启动文件服务,包括加载配置、处理检查点、注册事件等
void FileServer::Start() {
ConfigManager::GetInstance()->LoadDockerConfig();
Expand Down
9 changes: 7 additions & 2 deletions core/file_server/FileServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#include "common/Lock.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/LogtailMetric.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/PipelineContext.h"
#include "file_server/reader/FileReaderOptions.h"


namespace logtail {

Expand Down Expand Up @@ -78,6 +80,7 @@ class FileServer {
// for reader, event_handler ...
ReentrantMetricsRecordRef GetOrCreateReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);
void ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);
MetricsRecordRef& GetMetricsRecordRef() { return mMetricsRecordRef; }

// 过渡使用
void Resume(bool isConfigUpdate = true);
Expand All @@ -88,7 +91,7 @@ class FileServer {
void RemoveExactlyOnceConcurrency(const std::string& name);

private:
FileServer() = default;
FileServer();
~FileServer() = default;

void PauseInner();
Expand All @@ -102,6 +105,8 @@ class FileServer {
std::unordered_map<std::string, PluginMetricManagerPtr> mPipelineNamePluginMetricManagersMap;
// 过渡使用
std::unordered_map<std::string, uint32_t> mPipelineNameEOConcurrencyMap;

mutable MetricsRecordRef mMetricsRecordRef;
};

} // namespace logtail
24 changes: 14 additions & 10 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include <time.h>

#include "file_server/event_handler/EventHandler.h"
#include "file_server/event_handler/HistoryFileImporter.h"
#include "app_config/AppConfig.h"
#include "application/Application.h"
#include "checkpoint/CheckPointManager.h"
Expand All @@ -27,18 +25,20 @@
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/ConfigManager.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/Monitor.h"
#include "file_server/event_handler/EventHandler.h"
#include "file_server/event_handler/HistoryFileImporter.h"
#include "file_server/polling/PollingCache.h"
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "file_server/reader/GloablFileDescriptorManager.h"
#include "file_server/reader/LogFileReader.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/Monitor.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
Expand Down Expand Up @@ -88,9 +88,11 @@ void LogInput::Start() {

mInteruptFlag = false;

mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mAgentRegisterHandlerTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_REGISTER_HANDLER_TOTAL);
mRegisterdHandlersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_CNT);
mActiveReadersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_CNT);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);

new Thread([this]() { ProcessLoop(); });
}
Expand All @@ -104,7 +106,7 @@ void LogInput::Resume() {

void LogInput::HoldOn() {
LOG_INFO(sLogger, ("event handle daemon pause", "starts"));
if (BOOL_FLAG(enable_full_drain_mode)) {
if (BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()) {
unique_lock<mutex> lock(mThreadRunningMux);
mStopCV.wait(lock, [this]() { return mInteruptFlag; });
} else {
Expand Down Expand Up @@ -342,6 +344,7 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {
void LogInput::UpdateCriticalMetric(int32_t curTime) {
LogtailMonitor::GetInstance()->UpdateMetric("last_read_event_time",
GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S"));
mLastRunTime->Set(mLastReadEventTime.load());

LogtailMonitor::GetInstance()->UpdateMetric("event_tps",
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
Expand All @@ -350,8 +353,9 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {
mAgentOpenFdTotal->Set(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mAgentRegisterHandlerTotal->Set(handlerCount);
mRegisterdHandlersCnt->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
mActiveReadersCnt->Set(CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
}
Expand Down
6 changes: 5 additions & 1 deletion core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ class LogInput : public LogRunnable {
volatile bool mIdleFlag;
int32_t mEventProcessCount;
int32_t mLastUpdateMetricTime;

IntGaugePtr mLastRunTime;
IntGaugePtr mAgentOpenFdTotal;
IntGaugePtr mAgentRegisterHandlerTotal;
IntGaugePtr mRegisterdHandlersCnt;
IntGaugePtr mActiveReadersCnt;
IntGaugePtr mEnableFileIncludedByMultiConfigs;

std::atomic_int mLastReadEventTime{0};
mutable std::mutex mThreadRunningMux;
Expand Down
19 changes: 10 additions & 9 deletions core/file_server/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@
#endif
#include <sys/stat.h>

#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "app_config/AppConfig.h"
#include "common/ErrorUtil.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/event/Event.h"
#include "file_server/FileServer.h"
#include "file_server/event/Event.h"
#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"

// Control the check frequency to call ClearUnavailableFileAndDir.
DEFINE_FLAG_INT32(check_not_exist_file_dir_round, "clear not exist file dir cache, round", 20);
Expand Down Expand Up @@ -69,10 +70,10 @@ static const int64_t NANO_CONVERTING = 1000000000;
void PollingDirFile::Start() {
ClearCache();
mAgentConfigTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL);
mAgentPollingDirCacheSizeTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL);
mAgentPollingFileCacheSizeTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL);
mPollingDirCacheSize
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_DIR_CACHE_SIZE);
mPollingFileCacheSize
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_FILE_CACHE_SIZE);
mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
}
Expand Down Expand Up @@ -157,10 +158,10 @@ void PollingDirFile::Polling() {
ScopedSpinLock lock(mCacheLock);
size_t pollingDirCacheSize = mDirCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize);
mAgentPollingDirCacheSizeTotal->Set(pollingDirCacheSize);
mPollingDirCacheSize->Set(pollingDirCacheSize);
size_t pollingFileCacheSize = mFileCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize);
mAgentPollingFileCacheSizeTotal->Set(pollingFileCacheSize);
mPollingFileCacheSize->Set(pollingFileCacheSize);
}

// Iterate all normal configs, make sure stat count will not exceed limit.
Expand Down
4 changes: 2 additions & 2 deletions core/file_server/polling/PollingDirFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class PollingDirFile : public LogRunnable {
uint64_t mCurrentRound;

IntGaugePtr mAgentConfigTotal;
IntGaugePtr mAgentPollingDirCacheSizeTotal;
IntGaugePtr mAgentPollingFileCacheSizeTotal;
IntGaugePtr mPollingDirCacheSize;
IntGaugePtr mPollingFileCacheSize;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PollingUnittest;
Expand Down
7 changes: 5 additions & 2 deletions core/file_server/polling/PollingModify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/FileServer.h"
#include "file_server/event/Event.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"

using namespace std;

Expand All @@ -47,7 +49,8 @@ PollingModify::~PollingModify() {

void PollingModify::Start() {
ClearCache();
mAgentPollingModifySizeTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_MODIFY_SIZE_TOTAL);
mPollingModifySize
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_MODIFY_CACHE_SIZE);

mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
Expand Down Expand Up @@ -251,7 +254,7 @@ void PollingModify::Polling() {
int32_t statCount = 0;
size_t pollingModifySizeTotal = mModifyCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal);
mAgentPollingModifySizeTotal->Set(pollingModifySizeTotal);
mPollingModifySize->Set(pollingModifySizeTotal);
for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) {
if (!mRuningFlag || mHoldOnFlag)
break;
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/polling/PollingModify.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class PollingModify : public LogRunnable {

ModifyCheckCacheMap mModifyCacheMap;

IntGaugePtr mAgentPollingModifySizeTotal;
IntGaugePtr mPollingModifySize;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PollingUnittest;
Expand Down
27 changes: 2 additions & 25 deletions core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,34 +205,11 @@ WriteMetrics::~WriteMetrics() {
Clear();
}

void WriteMetrics::PreparePluginCommonLabels(const std::string& projectName,
const std::string& logstoreName,
const std::string& region,
const std::string& configName,
const std::string& pluginType,
const std::string& pluginID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels) {
labels.emplace_back(std::make_pair(METRIC_LABEL_PROJECT, projectName));
labels.emplace_back(std::make_pair(METRIC_LABEL_LOGSTORE, logstoreName));
labels.emplace_back(std::make_pair(METRIC_LABEL_REGION, region));
labels.emplace_back(std::make_pair(METRIC_LABEL_CONFIG_NAME, configName));
labels.emplace_back(std::make_pair(METRIC_LABEL_PLUGIN_NAME, pluginType));
labels.emplace_back(std::make_pair(METRIC_LABEL_PLUGIN_ID, pluginID));
labels.emplace_back(std::make_pair(METRIC_LABEL_NODE_ID, nodeID));
labels.emplace_back(std::make_pair(METRIC_LABEL_CHILD_NODE_ID, childNodeID));
}

void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref,
MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels),
std::make_shared<DynamicMetricLabels>(dynamicLabels));
ref.SetMetricsRecord(cur);
std::lock_guard<std::mutex> lock(mMutex);
cur->SetNext(mHead);
mHead = cur;
CreateMetricsRecordRef(ref, std::move(labels), std::move(dynamicLabels));
CommitMetricsRecordRef(ref);
}

void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref,
Expand Down
10 changes: 1 addition & 9 deletions core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,7 @@ class WriteMetrics {
static WriteMetrics* ptr = new WriteMetrics();
return ptr;
}
void PreparePluginCommonLabels(const std::string& projectName,
const std::string& logstoreName,
const std::string& region,
const std::string& configName,
const std::string& pluginType,
const std::string& pluginID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels);

void
PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void CreateMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
Expand Down
Loading

0 comments on commit 17d47d0

Please sign in to comment.