Skip to content

Commit

Permalink
Merge branch 'main' into feat/appconfig
Browse files Browse the repository at this point in the history
  • Loading branch information
quzard committed Sep 26, 2024
2 parents b6df927 + d413393 commit ba71c25
Show file tree
Hide file tree
Showing 62 changed files with 820 additions and 446 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-core-ut.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ jobs:
run: make unittest_core

- name: Unit Test Coverage
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --root . --lcov coverage.lcov --txt coverage.txt -e \".*sdk.*\" -e \".*observer.*\" -e \".*protobuf.*\" -e \".*unittest.*\" -e \".*config_server.*\" -e \".*fuse.*\" -e \".*go_pipeline.*\""
run: docker build -t unittest_coverage -f ./docker/Dockerfile_coverage . && docker run -v $(pwd):$(pwd) unittest_coverage bash -c "cd $(pwd)/core && gcovr --root . --json coverage.json --json-summary-pretty --json-summary summary.json -e \".*sdk.*\" -e \".*observer.*\" -e \".*logger.*\" -e \".*unittest.*\" -e \".*config_server.*\" -e \".*go_pipeline.*\" -e \".*application.*\" -e \".*protobuf.*\" -e \".*runner.*\""

- name: Setup Python3.10
uses: actions/setup-python@v5
with:
python-version: "3.10"

- name: Report code coverage
run: python3 tools/coverage-diff/main.py core/coverage.txt
run: python3 tools/coverage-diff/main.py --path core/coverage.json --summary core/summary.json

result:
runs-on: arc-runner-set-ilogtail
Expand Down
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ set(SUB_DIRECTORIES_LIST
)
if (LINUX)
if (ENABLE_ENTERPRISE)
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} shennong shennong/sdk streamlog aggregator)
set(SUB_DIRECTORIES_LIST ${SUB_DIRECTORIES_LIST} shennong shennong/sdk)
endif()
elseif(MSVC)
if (ENABLE_ENTERPRISE)
Expand Down
9 changes: 0 additions & 9 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -980,15 +980,6 @@ bool AppConfig::CheckAndResetProxyAddress(const char* envKey, string& address) {
* 函数会根据配置项的存在与否来决定是否更新相应的成员变量或全局标志。
*/
void AppConfig::LoadOtherConf(const Json::Value& confJson) {
// if (confJson.isMember("mapping_conf_path") && confJson["mapping_conf_path"].isString())
// mMappingConfigPath = confJson["mapping_conf_path"].asString();
// else
// mMappingConfigPath = STRING_FLAG(default_mapping_config_path);

if (confJson.isMember("streamlog_open") && confJson["streamlog_open"].isBool()) {
mOpenStreamLog = confJson["streamlog_open"].asBool();
}

{
int32_t oasConnectTimeout = 0;
if (LoadInt32Parameter(
Expand Down
4 changes: 2 additions & 2 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class AppConfig {
// uint32_t mStreamLogTcpPort;
// uint32_t mStreamLogPoolSizeInMb;
// uint32_t mStreamLogRcvLenPerCall;
bool mOpenStreamLog;
// bool mOpenStreamLog;

// performance
float mCpuUsageUpLimit;
Expand Down Expand Up @@ -373,7 +373,7 @@ class AppConfig {

// uint32_t GetStreamLogRcvLenPerCall() const { return mStreamLogRcvLenPerCall; }

bool GetOpenStreamLog() const { return mOpenStreamLog; }
// bool GetOpenStreamLog() const { return mOpenStreamLog; }

std::string GetIlogtailConfigJson() {
ScopedSpinLock lock(mAppConfigLock);
Expand Down
1 change: 0 additions & 1 deletion core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
#if defined(__linux__) && !defined(__ANDROID__)
#include "common/LinuxDaemonUtil.h"
#include "shennong/ShennongManager.h"
#include "streamlog/StreamLogManager.h"
#endif
#else
#include "provider/Provider.h"
Expand Down
14 changes: 7 additions & 7 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ namespace logtail {
void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES);
mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES);
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt");
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_size_bytes");
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS);
mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_CNT);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEM_SIZE_BYTES);
mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_CNT);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEM_SIZE_BYTES);
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS);
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_CNT);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
Expand Down
40 changes: 0 additions & 40 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,6 @@ bool PipelineConfig::Parse() {
// extensions module parsing will rely on their results.
bool hasObserverInput = false;
bool hasFileInput = false;
#ifdef __ENTERPRISE__
bool hasStreamInput = false;
#endif
key = "inputs";
itr = mDetail->find(key.c_str(), key.c_str() + key.size());
if (!itr) {
Expand Down Expand Up @@ -244,21 +241,10 @@ bool PipelineConfig::Parse() {
hasObserverInput = true;
} else if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
#ifdef __ENTERPRISE__
} else if (pluginType == "input_stream") {
if (!AppConfig::GetInstance()->GetOpenStreamLog()) {
PARAM_ERROR_RETURN(
sLogger, alarm, "stream log is not enabled", noModule, mName, mProject, mLogstore, mRegion);
}
hasStreamInput = true;
#endif
}
}
// TODO: remove these special restrictions
bool hasSpecialInput = hasObserverInput || hasFileInput;
#ifdef __ENTERPRISE__
hasSpecialInput = hasSpecialInput || hasStreamInput;
#endif
if (hasSpecialInput && (*mDetail)["inputs"].size() > 1) {
PARAM_ERROR_RETURN(sLogger,
alarm,
Expand All @@ -283,19 +269,6 @@ bool PipelineConfig::Parse() {
mLogstore,
mRegion);
}
#ifdef __ENTERPRISE__
// TODO: remove these special restrictions
if (hasStreamInput && !itr->empty()) {
PARAM_ERROR_RETURN(sLogger,
alarm,
"processor plugins coexist with input_stream",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
#endif
bool isCurrentPluginNative = true;
for (Json::Value::ArrayIndex i = 0; i < itr->size(); ++i) {
const Json::Value& plugin = (*itr)[i];
Expand Down Expand Up @@ -520,19 +493,6 @@ bool PipelineConfig::Parse() {
PARAM_ERROR_RETURN(
sLogger, alarm, "unsupported flusher plugin", pluginType, mName, mProject, mLogstore, mRegion);
}
#ifdef __ENTERPRISE__
// TODO: remove these special restrictions
if (hasStreamInput && pluginType != "flusher_sls") {
PARAM_ERROR_RETURN(sLogger,
alarm,
"flusher plugins other than flusher_sls coexist with input_stream",
noModule,
mName,
mProject,
mLogstore,
mRegion);
}
#endif
mFlushers.push_back(&plugin);
}
// TODO: remove these special restrictions
Expand Down
9 changes: 7 additions & 2 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/ConfigManager.h"
#include "plugin/input/InputFile.h"
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingModify.h"
#include "plugin/input/InputFile.h"

DEFINE_FLAG_BOOL(enable_polling_discovery, "", true);

using namespace std;

namespace logtail {

FileServer::FileServer() {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, "file_server"}});
}

// 启动文件服务,包括加载配置、处理检查点、注册事件等
void FileServer::Start() {
ConfigManager::GetInstance()->LoadDockerConfig();
Expand Down
9 changes: 7 additions & 2 deletions core/file_server/FileServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#include "common/Lock.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/LogtailMetric.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/PipelineContext.h"
#include "file_server/reader/FileReaderOptions.h"


namespace logtail {

Expand Down Expand Up @@ -78,6 +80,7 @@ class FileServer {
// for reader, event_handler ...
ReentrantMetricsRecordRef GetOrCreateReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);
void ReleaseReentrantMetricsRecordRef(const std::string& name, MetricLabels& labels);
MetricsRecordRef& GetMetricsRecordRef() { return mMetricsRecordRef; }

// 过渡使用
void Resume(bool isConfigUpdate = true);
Expand All @@ -88,7 +91,7 @@ class FileServer {
void RemoveExactlyOnceConcurrency(const std::string& name);

private:
FileServer() = default;
FileServer();
~FileServer() = default;

void PauseInner();
Expand All @@ -102,6 +105,8 @@ class FileServer {
std::unordered_map<std::string, PluginMetricManagerPtr> mPipelineNamePluginMetricManagersMap;
// 过渡使用
std::unordered_map<std::string, uint32_t> mPipelineNameEOConcurrencyMap;

mutable MetricsRecordRef mMetricsRecordRef;
};

} // namespace logtail
24 changes: 14 additions & 10 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

#include <time.h>

#include "file_server/event_handler/EventHandler.h"
#include "file_server/event_handler/HistoryFileImporter.h"
#include "app_config/AppConfig.h"
#include "application/Application.h"
#include "checkpoint/CheckPointManager.h"
Expand All @@ -27,18 +25,20 @@
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/ConfigManager.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/Monitor.h"
#include "file_server/event_handler/EventHandler.h"
#include "file_server/event_handler/HistoryFileImporter.h"
#include "file_server/polling/PollingCache.h"
#include "file_server/polling/PollingDirFile.h"
#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "file_server/reader/GloablFileDescriptorManager.h"
#include "file_server/reader/LogFileReader.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/Monitor.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
Expand Down Expand Up @@ -88,9 +88,11 @@ void LogInput::Start() {

mInteruptFlag = false;

mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mAgentOpenFdTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mAgentRegisterHandlerTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_REGISTER_HANDLER_TOTAL);
mRegisterdHandlersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_CNT);
mActiveReadersCnt = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_CNT);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);

new Thread([this]() { ProcessLoop(); });
}
Expand All @@ -104,7 +106,7 @@ void LogInput::Resume() {

void LogInput::HoldOn() {
LOG_INFO(sLogger, ("event handle daemon pause", "starts"));
if (BOOL_FLAG(enable_full_drain_mode)) {
if (BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()) {
unique_lock<mutex> lock(mThreadRunningMux);
mStopCV.wait(lock, [this]() { return mInteruptFlag; });
} else {
Expand Down Expand Up @@ -342,6 +344,7 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {
void LogInput::UpdateCriticalMetric(int32_t curTime) {
LogtailMonitor::GetInstance()->UpdateMetric("last_read_event_time",
GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S"));
mLastRunTime->Set(mLastReadEventTime.load());

LogtailMonitor::GetInstance()->UpdateMetric("event_tps",
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
Expand All @@ -350,8 +353,9 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) {
mAgentOpenFdTotal->Set(openFdTotal);
size_t handlerCount = EventDispatcher::GetInstance()->GetHandlerCount();
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", handlerCount);
mAgentRegisterHandlerTotal->Set(handlerCount);
mRegisterdHandlersCnt->Set(handlerCount);
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
mActiveReadersCnt->Set(CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
mEventProcessCount = 0;
}
Expand Down
6 changes: 5 additions & 1 deletion core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,12 @@ class LogInput : public LogRunnable {
volatile bool mIdleFlag;
int32_t mEventProcessCount;
int32_t mLastUpdateMetricTime;

IntGaugePtr mLastRunTime;
IntGaugePtr mAgentOpenFdTotal;
IntGaugePtr mAgentRegisterHandlerTotal;
IntGaugePtr mRegisterdHandlersCnt;
IntGaugePtr mActiveReadersCnt;
IntGaugePtr mEnableFileIncludedByMultiConfigs;

std::atomic_int mLastReadEventTime{0};
mutable std::mutex mThreadRunningMux;
Expand Down
19 changes: 10 additions & 9 deletions core/file_server/polling/PollingDirFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@
#endif
#include <sys/stat.h>

#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "app_config/AppConfig.h"
#include "common/ErrorUtil.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/event/Event.h"
#include "file_server/FileServer.h"
#include "file_server/event/Event.h"
#include "file_server/polling/PollingEventQueue.h"
#include "file_server/polling/PollingModify.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"

// Control the check frequency to call ClearUnavailableFileAndDir.
DEFINE_FLAG_INT32(check_not_exist_file_dir_round, "clear not exist file dir cache, round", 20);
Expand Down Expand Up @@ -69,10 +70,10 @@ static const int64_t NANO_CONVERTING = 1000000000;
void PollingDirFile::Start() {
ClearCache();
mAgentConfigTotal = LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL);
mAgentPollingDirCacheSizeTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_DIR_CACHE_SIZE_TOTAL);
mAgentPollingFileCacheSizeTotal
= LoongCollectorMonitor::GetInstance()->GetIntGauge(METRIC_AGENT_POLLING_FILE_CACHE_SIZE_TOTAL);
mPollingDirCacheSize
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_DIR_CACHE_SIZE);
mPollingFileCacheSize
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_POLLING_FILE_CACHE_SIZE);
mRuningFlag = true;
mThreadPtr = CreateThread([this]() { Polling(); });
}
Expand Down Expand Up @@ -157,10 +158,10 @@ void PollingDirFile::Polling() {
ScopedSpinLock lock(mCacheLock);
size_t pollingDirCacheSize = mDirCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize);
mAgentPollingDirCacheSizeTotal->Set(pollingDirCacheSize);
mPollingDirCacheSize->Set(pollingDirCacheSize);
size_t pollingFileCacheSize = mFileCacheMap.size();
LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize);
mAgentPollingFileCacheSizeTotal->Set(pollingFileCacheSize);
mPollingFileCacheSize->Set(pollingFileCacheSize);
}

// Iterate all normal configs, make sure stat count will not exceed limit.
Expand Down
4 changes: 2 additions & 2 deletions core/file_server/polling/PollingDirFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class PollingDirFile : public LogRunnable {
uint64_t mCurrentRound;

IntGaugePtr mAgentConfigTotal;
IntGaugePtr mAgentPollingDirCacheSizeTotal;
IntGaugePtr mAgentPollingFileCacheSizeTotal;
IntGaugePtr mPollingDirCacheSize;
IntGaugePtr mPollingFileCacheSize;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PollingUnittest;
Expand Down
Loading

0 comments on commit ba71c25

Please sign in to comment.