Skip to content

Commit

Permalink
polish code
Browse files Browse the repository at this point in the history
  • Loading branch information
笃敏 committed Dec 11, 2023
1 parent 94ff42a commit 2a01ca6
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 35 deletions.
3 changes: 1 addition & 2 deletions core/common/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/

#pragma once

#include <boost/thread.hpp>

#include <functional>
#include <memory>
#include <thread>
#include <utility>

#include "boost/thread.hpp"

namespace logtail {

class Thread {
Expand Down
36 changes: 23 additions & 13 deletions core/config/provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

#include "config/provider/CommonConfigProvider.h"

#include <json/json.h>

#include <filesystem>
#include <iostream>
#include <random>

#include <json/json.h>

#include "app_config/AppConfig.h"
#include "application/Application.h"
#include "common/LogtailCommonFlags.h"
Expand All @@ -28,8 +28,8 @@
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "sdk/Common.h"
#include "sdk/Exception.h"
#include "sdk/CurlImp.h"
#include "sdk/Exception.h"

using namespace std;

Expand All @@ -38,7 +38,17 @@ DEFINE_FLAG_INT32(config_update_interval, "second", 10);
namespace logtail {

CommonConfigProvider::~CommonConfigProvider() {
mThreadIsRunning = false;
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
mStopCV.notify_one();
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("common config provider", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("common config provider", "forced to stopped"));
}
}

void CommonConfigProvider::Init(const string& dir) {
Expand Down Expand Up @@ -82,23 +92,23 @@ void CommonConfigProvider::Init(const string& dir) {
LOG_INFO(sLogger, ("ilogtail_configserver_tags", confJson["ilogtail_tags"].toStyledString()));
}

mCheckUpdateThread = thread([this] { CheckUpdateThread(); });
mThreadRes = async(launch::async, &CommonConfigProvider::CheckUpdateThread, this);
}

void CommonConfigProvider::CheckUpdateThread() {
LOG_INFO(sLogger, ("common config provider", "started"));
usleep((rand() % 10) * 100 * 1000);
int32_t lastCheckTime = 0;
mThreadIsRunning = true;
while (mThreadIsRunning.load()) {
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
int32_t curTime = time(NULL);
if (curTime - lastCheckTime >= INT32_FLAG(config_update_interval)) {
GetConfigUpdate();
lastCheckTime = curTime;
}
if (mThreadIsRunning.load())
sleep(1);
else
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}

Expand Down Expand Up @@ -313,9 +323,9 @@ void CommonConfigProvider::UpdateRemoteConfig(
error_code ec;
filesystem::rename(tmpFilePath, filePath, ec);
if (ec) {
LOG_WARNING(
sLogger,
("failed to dump config file", filePath.string())("error code", ec.value())("error msg", ec.message()));
LOG_WARNING(sLogger,
("failed to dump config file",
filePath.string())("error code", ec.value())("error msg", ec.message()));
filesystem::remove(tmpFilePath, ec);
}
break;
Expand Down
9 changes: 6 additions & 3 deletions core/config/provider/CommonConfigProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

#pragma once

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <future>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -73,8 +74,10 @@ class CommonConfigProvider : public ConfigProvider {
int mConfigServerAddressId = 0;
std::vector<std::string> mConfigServerTags;

JThread mCheckUpdateThread;
std::atomic_bool mThreadIsRunning = false;
std::future<void> mThreadRes;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = true;
std::condition_variable mStopCV;
std::unordered_map<std::string, int64_t> mConfigNameVersionMap;
bool mConfigServerAvailable = false;
};
Expand Down
3 changes: 1 addition & 2 deletions core/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ void LogInput::Start() {

mInteruptFlag = false;
new Thread([this]() { ProcessLoop(); });
LOG_INFO(sLogger, ("event handle daemon", "started"));
}

void LogInput::Resume() {
Expand Down Expand Up @@ -344,7 +343,7 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {
}

void* LogInput::ProcessLoop() {
LOG_DEBUG(sLogger, ("LogInputThread", "Start"));
LOG_INFO(sLogger, ("event handle daemon", "started"));
EventDispatcher* dispatcher = EventDispatcher::GetInstance();
dispatcher->StartTimeCount();
int32_t prevTime = time(NULL);
Expand Down
3 changes: 1 addition & 2 deletions core/monitor/LogtailAlarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ LogtailAlarm::LogtailAlarm() {

void LogtailAlarm::Init() {
mThreadRes = async(launch::async, &LogtailAlarm::SendAlarmLoop, this);
LOG_INFO(sLogger, ("alarm gathering", "started"));
}

void LogtailAlarm::Stop() {
Expand All @@ -123,9 +122,9 @@ void LogtailAlarm::Stop() {
}

bool LogtailAlarm::SendAlarmLoop() {
LOG_INFO(sLogger, ("alarm gathering", "started"));
{
unique_lock<mutex> lock(mThreadRunningMux);
mIsThreadRunning = true;
while (mIsThreadRunning) {
SendAllRegionAlarm();
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
Expand Down
6 changes: 3 additions & 3 deletions core/monitor/LogtailAlarm.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ struct LogtailAlarmMessage {
class LogtailAlarm {
public:
static LogtailAlarm* GetInstance() {
static LogtailAlarm ptr;
return &ptr;
static LogtailAlarm instance;
return &instance;
}

void Init();
Expand Down Expand Up @@ -147,7 +147,7 @@ class LogtailAlarm {

std::future<bool> mThreadRes;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = false;
bool mIsThreadRunning = true;
std::condition_variable mStopCV;


Expand Down
10 changes: 4 additions & 6 deletions core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ bool LogtailMonitor::Init() {

// Initialize monitor thread.
mThreadRes = async(launch::async, &LogtailMonitor::Monitor, this);
LOG_INFO(sLogger, ("profiling", "started"));
return true;
}

Expand All @@ -131,12 +130,15 @@ void LogtailMonitor::Stop() {
}

void LogtailMonitor::Monitor() {
LOG_INFO(sLogger, ("profiling", "started"));
int32_t lastMonitorTime = time(NULL);
CpuStat curCpuStat;
{
unique_lock<mutex> lock(mThreadRunningMux);
mIsThreadRunning = true;
while (mIsThreadRunning) {
if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) {
break;
}
GetCpuStat(curCpuStat);

// Update mRealtimeCpuStat for InputFlowControl.
Expand Down Expand Up @@ -196,10 +198,6 @@ void LogtailMonitor::Monitor() {
if (!DumpMonitorInfo(monitorTime))
LOG_ERROR(sLogger, ("Fail to dump monitor info", ""));
}

if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendStatusProfile(true);
Expand Down
2 changes: 1 addition & 1 deletion core/monitor/Monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class LogtailMonitor : public MetricStore {

std::future<void> mThreadRes;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = false;
bool mIsThreadRunning = true;
std::condition_variable mStopCV;

// Control report status profile frequency.
Expand Down
2 changes: 1 addition & 1 deletion core/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ void PollingDirFile::Start() {
ClearCache();
mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
LOG_INFO(sLogger, ("polling discovery", "started"));
}

void PollingDirFile::Stop() {
Expand Down Expand Up @@ -123,6 +122,7 @@ void PollingDirFile::CheckConfigPollingStatCount(const int32_t lastStatCount,
}

void PollingDirFile::Polling() {
LOG_INFO(sLogger, ("polling discovery", "started"));
mHoldOnFlag = false;
while (mRuningFlag) {
LOG_DEBUG(sLogger, ("start dir file polling, mCurrentRound", mCurrentRound));
Expand Down
3 changes: 1 addition & 2 deletions core/polling/PollingModify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ void PollingModify::Start() {
ClearCache();
mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
LOG_INFO(sLogger, ("polling modify", "started"));
}

void PollingModify::Stop() {
Expand Down Expand Up @@ -237,7 +236,7 @@ bool PollingModify::UpdateDeletedFile(const SplitedFilePath& filePath,
}

void PollingModify::Polling() {
LOG_INFO(sLogger, ("PollingModify::Polling", "start"));
LOG_INFO(sLogger, ("polling modify", "started"));
mHoldOnFlag = false;
while (mRuningFlag) {
{
Expand Down

0 comments on commit 2a01ca6

Please sign in to comment.