Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
笃敏 committed Dec 11, 2023
1 parent 456f718 commit 2e5733e
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 66 deletions.
20 changes: 16 additions & 4 deletions core/monitor/LogtailAlarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,17 @@ LogtailAlarm::LogtailAlarm() {
}

void LogtailAlarm::Init() {
mIsThreadRunning = true;
mThreadRes = async(launch::async, &LogtailAlarm::SendAlarmLoop, this);
LOG_INFO(sLogger, ("alarm gathering", "started"));
}

void LogtailAlarm::Stop() {
mIsThreadRunning = false;
ForceToSend();
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
mStopCV.notify_one();
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("alarm gathering", "stopped successfully"));
Expand All @@ -119,9 +123,17 @@ void LogtailAlarm::Stop() {
}

bool LogtailAlarm::SendAlarmLoop() {
while (mIsThreadRunning.load()) {
SendAllRegionAlarm();
{
unique_lock<mutex> lock(mThreadRunningMux);
mIsThreadRunning = true;
while (mIsThreadRunning) {
SendAllRegionAlarm();
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendAllRegionAlarm();
return true;
}

Expand Down
6 changes: 5 additions & 1 deletion core/monitor/LogtailAlarm.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <stdio.h>

#include <atomic>
#include <condition_variable>
#include <future>
#include <map>
#include <memory>
Expand Down Expand Up @@ -145,7 +146,10 @@ class LogtailAlarm {
void SendAllRegionAlarm();

std::future<bool> mThreadRes;
std::atomic_bool mIsThreadRunning = false;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = false;
std::condition_variable mStopCV;


std::vector<std::string> mMessageType;
std::map<std::string, std::pair<std::shared_ptr<LogtailAlarmVector>, std::vector<int32_t> > > mAllAlarmMap;
Expand Down
129 changes: 70 additions & 59 deletions core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,16 @@ bool LogtailMonitor::Init() {
#endif

// Initialize monitor thread.
mIsThreadRunning = true;
mThreadRes = async(launch::async, &LogtailMonitor::Monitor, this);
LOG_INFO(sLogger, ("profiling", "started"));
return true;
}

void LogtailMonitor::Stop() {
mIsThreadRunning = false;
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("profiling", "stopped successfully"));
Expand All @@ -131,67 +133,76 @@ void LogtailMonitor::Stop() {
void LogtailMonitor::Monitor() {
int32_t lastMonitorTime = time(NULL);
CpuStat curCpuStat;
while (mIsThreadRunning.load()) {
sleep(1);
GetCpuStat(curCpuStat);

// Update mRealtimeCpuStat for InputFlowControl.
if (AppConfig::GetInstance()->IsInputFlowControl()) {
CalCpuStat(curCpuStat, mRealtimeCpuStat);
}

int32_t monitorTime = time(NULL);
{
unique_lock<mutex> lock(mThreadRunningMux);
mIsThreadRunning = true;
while (mIsThreadRunning) {
GetCpuStat(curCpuStat);

// Update mRealtimeCpuStat for InputFlowControl.
if (AppConfig::GetInstance()->IsInputFlowControl()) {
CalCpuStat(curCpuStat, mRealtimeCpuStat);
}

int32_t monitorTime = time(NULL);
#if defined(__linux__) // TODO: Add auto scale support for Windows.
// Update related CPU statistics for controlling resource auto scale (Linux only).
if (AppConfig::GetInstance()->IsResourceAutoScale()) {
CalCpuStat(curCpuStat, mCpuStatForScale);
CalOsCpuStat();
mCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE] = mCpuStatForScale.mCpuUsage;
mOsCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE] = mOsCpuStatForScale.mOsCpuUsage;
++mCpuArrayForScaleIdx;
CheckScaledCpuUsageUpLimit();
LOG_DEBUG(
sLogger,
("mCpuStatForScale", mCpuStatForScale.mCpuUsage)("mOsCpuStatForScale", mOsCpuStatForScale.mOsCpuUsage));
}
// Update related CPU statistics for controlling resource auto scale (Linux only).
if (AppConfig::GetInstance()->IsResourceAutoScale()) {
CalCpuStat(curCpuStat, mCpuStatForScale);
CalOsCpuStat();
mCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE] = mCpuStatForScale.mCpuUsage;
mOsCpuArrayForScale[mCpuArrayForScaleIdx % CPU_STAT_FOR_SCALE_ARRAY_SIZE]
= mOsCpuStatForScale.mOsCpuUsage;
++mCpuArrayForScaleIdx;
CheckScaledCpuUsageUpLimit();
LOG_DEBUG(sLogger,
("mCpuStatForScale", mCpuStatForScale.mCpuUsage)("mOsCpuStatForScale",
mOsCpuStatForScale.mOsCpuUsage));
}
#endif

// Update statistics and send to logtail_status_profile regularly.
// If CPU or memory limit triggered, send to logtail_suicide_profile.
if ((monitorTime - lastMonitorTime) < INT32_FLAG(monitor_interval))
continue;
lastMonitorTime = monitorTime;

// Memory usage has exceeded limit, try to free some timeout objects.
if (1 == mMemStat.mViolateNum) {
LOG_DEBUG(sLogger, ("Memory is upper limit", "run gabbage collection."));
LogInput::GetInstance()->SetForceClearFlag(true);
}
GetMemStat();
CalCpuStat(curCpuStat, mCpuStat);
// CalCpuLimit and CalMemLimit will check if the number of violation (CPU
// or memory exceeds limit) // is greater or equal than limits (
// flag(cpu_limit_num) and flag(mem_limit_num)).
// Returning true means too much violations, so we have to prepare to restart
// logtail to release resource.
// Mainly for controlling memory because we have no idea to descrease memory usage.
if (CheckCpuLimit() || CheckMemLimit()) {
LOG_ERROR(sLogger,
("Resource used by program exceeds upper limit",
"prepare restart Logtail")("cpu_usage", mCpuStat.mCpuUsage)("mem_rss", mMemStat.mRss));
Suicide();
}

if (IsHostIpChanged()) {
Suicide();
}

SendStatusProfile(false);
if (BOOL_FLAG(logtail_dump_monitor_info)) {
if (!DumpMonitorInfo(monitorTime))
LOG_ERROR(sLogger, ("Fail to dump monitor info", ""));
// Update statistics and send to logtail_status_profile regularly.
// If CPU or memory limit triggered, send to logtail_suicide_profile.
if ((monitorTime - lastMonitorTime) < INT32_FLAG(monitor_interval))
continue;
lastMonitorTime = monitorTime;

// Memory usage has exceeded limit, try to free some timeout objects.
if (1 == mMemStat.mViolateNum) {
LOG_DEBUG(sLogger, ("Memory is upper limit", "run gabbage collection."));
LogInput::GetInstance()->SetForceClearFlag(true);
}
GetMemStat();
CalCpuStat(curCpuStat, mCpuStat);
// CalCpuLimit and CalMemLimit will check if the number of violation (CPU
// or memory exceeds limit) // is greater or equal than limits (
// flag(cpu_limit_num) and flag(mem_limit_num)).
// Returning true means too much violations, so we have to prepare to restart
// logtail to release resource.
// Mainly for controlling memory because we have no idea to descrease memory usage.
if (CheckCpuLimit() || CheckMemLimit()) {
LOG_ERROR(sLogger,
("Resource used by program exceeds upper limit",
"prepare restart Logtail")("cpu_usage", mCpuStat.mCpuUsage)("mem_rss", mMemStat.mRss));
Suicide();
}

if (IsHostIpChanged()) {
Suicide();
}

SendStatusProfile(false);
if (BOOL_FLAG(logtail_dump_monitor_info)) {
if (!DumpMonitorInfo(monitorTime))
LOG_ERROR(sLogger, ("Fail to dump monitor info", ""));
}

if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendStatusProfile(true);
}

template <typename T>
Expand Down Expand Up @@ -293,7 +304,7 @@ bool LogtailMonitor::SendStatusProfile(bool suicide) {
AddLogContent(logPtr, "ecs_regioon_id", LogFileProfiler::mECSRegionID);
ClearMetric();

if (!mIsThreadRunning.load())
if (!mIsThreadRunning)
return false;

// Dump to local and send to enabled regions.
Expand Down
7 changes: 5 additions & 2 deletions core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

#pragma once

#include <atomic>
#include <condition_variable>>
#include <future>
#include <mutex>
#include <string>

#include "MetricStore.h"
Expand Down Expand Up @@ -144,7 +145,9 @@ class LogtailMonitor : public MetricStore {
void Suicide();

std::future<void> mThreadRes;
std::atomic_bool mIsThreadRunning = false;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = false;
std::condition_variable mStopCV;

// Control report status profile frequency.
int32_t mStatusCount;
Expand Down

0 comments on commit 2e5733e

Please sign in to comment.