From 2e5733e154d718c606d6f5c0176af006e7eb92c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=AC=83=E6=95=8F?= Date: Mon, 11 Dec 2023 03:47:56 +0000 Subject: [PATCH] polish --- core/monitor/LogtailAlarm.cpp | 20 ++++-- core/monitor/LogtailAlarm.h | 6 +- core/monitor/Monitor.cpp | 129 ++++++++++++++++++---------------- core/monitor/Monitor.h | 7 +- 4 files changed, 96 insertions(+), 66 deletions(-) diff --git a/core/monitor/LogtailAlarm.cpp b/core/monitor/LogtailAlarm.cpp index dd0df4b7e0..b6a77f796e 100644 --- a/core/monitor/LogtailAlarm.cpp +++ b/core/monitor/LogtailAlarm.cpp @@ -103,13 +103,17 @@ LogtailAlarm::LogtailAlarm() { } void LogtailAlarm::Init() { - mIsThreadRunning = true; mThreadRes = async(launch::async, &LogtailAlarm::SendAlarmLoop, this); LOG_INFO(sLogger, ("alarm gathering", "started")); } void LogtailAlarm::Stop() { - mIsThreadRunning = false; + ForceToSend(); + { + lock_guard lock(mThreadRunningMux); + mIsThreadRunning = false; + } + mStopCV.notify_one(); future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("alarm gathering", "stopped successfully")); @@ -119,9 +123,17 @@ void LogtailAlarm::Stop() { } bool LogtailAlarm::SendAlarmLoop() { - while (mIsThreadRunning.load()) { - SendAllRegionAlarm(); + { + unique_lock lock(mThreadRunningMux); + mIsThreadRunning = true; + while (mIsThreadRunning) { + SendAllRegionAlarm(); + if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) { + break; + } + } } + SendAllRegionAlarm(); return true; } diff --git a/core/monitor/LogtailAlarm.h b/core/monitor/LogtailAlarm.h index 68fcf4e71c..1c21b6ac82 100644 --- a/core/monitor/LogtailAlarm.h +++ b/core/monitor/LogtailAlarm.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -145,7 +146,10 @@ class LogtailAlarm { void SendAllRegionAlarm(); std::future mThreadRes; - std::atomic_bool mIsThreadRunning = false; + std::mutex mThreadRunningMux; + bool mIsThreadRunning = false; + std::condition_variable mStopCV; + std::vector mMessageType; std::map, std::vector > > mAllAlarmMap; diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index f181390885..2ceb6992d2 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -112,14 +112,16 @@ bool LogtailMonitor::Init() { #endif // Initialize monitor thread. - mIsThreadRunning = true; mThreadRes = async(launch::async, &LogtailMonitor::Monitor, this); LOG_INFO(sLogger, ("profiling", "started")); return true; } void LogtailMonitor::Stop() { - mIsThreadRunning = false; + { + lock_guard lock(mThreadRunningMux); + mIsThreadRunning = false; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("profiling", "stopped successfully")); @@ -131,67 +133,76 @@ void LogtailMonitor::Stop() { void LogtailMonitor::Monitor() { int32_t lastMonitorTime = time(NULL); CpuStat curCpuStat; - while (mIsThreadRunning.load()) { - sleep(1); - GetCpuStat(curCpuStat); - - // Update mRealtimeCpuStat for InputFlowControl. - if (AppConfig::GetInstance()->IsInputFlowControl()) { - CalCpuStat(curCpuStat, mRealtimeCpuStat); - } - - int32_t monitorTime = time(NULL); + { + unique_lock lock(mThreadRunningMux); + mIsThreadRunning = true; + while (mIsThreadRunning) { + GetCpuStat(curCpuStat); + + // Update mRealtimeCpuStat for InputFlowControl. + if (AppConfig::GetInstance()->IsInputFlowControl()) { + CalCpuStat(curCpuStat, mRealtimeCpuStat); + } + + int32_t monitorTime = time(NULL); #if defined(__linux__) // TODO: Add auto scale support for Windows. - // Update related CPU statistics for controlling resource auto scale (Linux only). - if (AppConfig::GetInstance()->IsResourceAutoScale()) { - CalCpuStat(curCpuStat, mCpuStatForScale); - CalOsCpuStat(); - mCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE] = mCpuStatForScale.mCpuUsage; - mOsCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE] = mOsCpuStatForScale.mOsCpuUsage; - ++mCpuArrayForScaleIdx; - CheckScaledCpuUsageUpLimit(); - LOG_DEBUG( - sLogger, - ("mCpuStatForScale", mCpuStatForScale.mCpuUsage)("mOsCpuStatForScale", mOsCpuStatForScale.mOsCpuUsage)); - } + // Update related CPU statistics for controlling resource auto scale (Linux only). + if (AppConfig::GetInstance()->IsResourceAutoScale()) { + CalCpuStat(curCpuStat, mCpuStatForScale); + CalOsCpuStat(); + mCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE] = mCpuStatForScale.mCpuUsage; + mOsCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE] + = mOsCpuStatForScale.mOsCpuUsage; + ++mCpuArrayForScaleIdx; + CheckScaledCpuUsageUpLimit(); + LOG_DEBUG(sLogger, + ("mCpuStatForScale", mCpuStatForScale.mCpuUsage)("mOsCpuStatForScale", + mOsCpuStatForScale.mOsCpuUsage)); + } #endif - // Update statistics and send to logtail_status_profile regularly. - // If CPU or memory limit triggered, send to logtail_suicide_profile. - if ((monitorTime - lastMonitorTime) < INT32_FLAG(monitor_interval)) - continue; - lastMonitorTime = monitorTime; - - // Memory usage has exceeded limit, try to free some timeout objects. - if (1 == mMemStat.mViolateNum) { - LOG_DEBUG(sLogger, ("Memory is upper limit", "run gabbage collection.")); - LogInput::GetInstance()->SetForceClearFlag(true); - } - GetMemStat(); - CalCpuStat(curCpuStat, mCpuStat); - // CalCpuLimit and CalMemLimit will check if the number of violation (CPU - // or memory exceeds limit) // is greater or equal than limits ( - // flag(cpu_limit_num) and flag(mem_limit_num)). - // Returning true means too much violations, so we have to prepare to restart - // logtail to release resource. - // Mainly for controlling memory because we have no idea to descrease memory usage. - if (CheckCpuLimit() || CheckMemLimit()) { - LOG_ERROR(sLogger, - ("Resource used by program exceeds upper limit", - "prepare restart Logtail")("cpu_usage", mCpuStat.mCpuUsage)("mem_rss", mMemStat.mRss)); - Suicide(); - } - - if (IsHostIpChanged()) { - Suicide(); - } - - SendStatusProfile(false); - if (BOOL_FLAG(logtail_dump_monitor_info)) { - if (!DumpMonitorInfo(monitorTime)) - LOG_ERROR(sLogger, ("Fail to dump monitor info", "")); + // Update statistics and send to logtail_status_profile regularly. + // If CPU or memory limit triggered, send to logtail_suicide_profile. + if ((monitorTime - lastMonitorTime) < INT32_FLAG(monitor_interval)) + continue; + lastMonitorTime = monitorTime; + + // Memory usage has exceeded limit, try to free some timeout objects. + if (1 == mMemStat.mViolateNum) { + LOG_DEBUG(sLogger, ("Memory is upper limit", "run gabbage collection.")); + LogInput::GetInstance()->SetForceClearFlag(true); + } + GetMemStat(); + CalCpuStat(curCpuStat, mCpuStat); + // CalCpuLimit and CalMemLimit will check if the number of violation (CPU + // or memory exceeds limit) // is greater or equal than limits ( + // flag(cpu_limit_num) and flag(mem_limit_num)). + // Returning true means too much violations, so we have to prepare to restart + // logtail to release resource. + // Mainly for controlling memory because we have no idea to descrease memory usage. + if (CheckCpuLimit() || CheckMemLimit()) { + LOG_ERROR(sLogger, + ("Resource used by program exceeds upper limit", + "prepare restart Logtail")("cpu_usage", mCpuStat.mCpuUsage)("mem_rss", mMemStat.mRss)); + Suicide(); + } + + if (IsHostIpChanged()) { + Suicide(); + } + + SendStatusProfile(false); + if (BOOL_FLAG(logtail_dump_monitor_info)) { + if (!DumpMonitorInfo(monitorTime)) + LOG_ERROR(sLogger, ("Fail to dump monitor info", "")); + } + + if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) { + break; + } } } + SendStatusProfile(true); } template @@ -293,7 +304,7 @@ bool LogtailMonitor::SendStatusProfile(bool suicide) { AddLogContent(logPtr, "ecs_regioon_id", LogFileProfiler::mECSRegionID); ClearMetric(); - if (!mIsThreadRunning.load()) + if (!mIsThreadRunning) return false; // Dump to local and send to enabled regions. diff --git a/core/monitor/Monitor.h b/core/monitor/Monitor.h index 6632c90032..9a2150d3dd 100644 --- a/core/monitor/Monitor.h +++ b/core/monitor/Monitor.h @@ -16,8 +16,9 @@ #pragma once -#include +#include > #include +#include #include #include "MetricStore.h" @@ -144,7 +145,9 @@ class LogtailMonitor : public MetricStore { void Suicide(); std::future mThreadRes; - std::atomic_bool mIsThreadRunning = false; + std::mutex mThreadRunningMux; + bool mIsThreadRunning = false; + std::condition_variable mStopCV; // Control report status profile frequency. int32_t mStatusCount;