Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 25, 2024
1 parent f5c4bf4 commit 2bf161e
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 254 deletions.
12 changes: 6 additions & 6 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/watcher/ConfigWatcher.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
#include "monitor/Monitor.h"
#include "pipeline/InstanceConfigManager.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "runner/FlusherRunner.h"
#include "runner/ProcessorRunner.h"
#include "runner/sink/http/HttpSink.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
Expand Down Expand Up @@ -270,7 +270,7 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

LogProcess::GetInstance()->Start();
ProcessorRunner::GetInstance()->Init();

time_t curTime = 0, lastProfilingCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
lastCheckTagsTime = 0, lastQueueGCTime = 0;
Expand Down
10 changes: 5 additions & 5 deletions core/file_server/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
#include <string>
#include <vector>

#include "file_server/event_handler/LogInput.h"
#include "app_config/AppConfig.h"
#include "common/FileSystemUtil.h"
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "file_server/event/BlockEventManager.h"
#include "file_server/event_handler/LogInput.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "runner/ProcessorRunner.h"

using namespace std;
using namespace sls_logs;
Expand Down Expand Up @@ -1090,7 +1090,7 @@ int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* lo
time(NULL));
PipelineEventGroup group = LogFileReader::GenerateEventGroup(reader, logBuffer);

while (!LogProcess::GetInstance()->PushBuffer(reader->GetQueueKey(), 0, std::move(group))) // 10ms
while (!ProcessorRunner::GetInstance()->PushQueue(reader->GetQueueKey(), 0, std::move(group))) // 10ms
{
++pushRetry;
if (pushRetry % 10 == 0)
Expand Down
8 changes: 4 additions & 4 deletions core/file_server/event_handler/HistoryFileImporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
#include "common/Thread.h"
#include "common/TimeUtil.h"
#include "file_server/ConfigManager.h"
#include "file_server/reader/LogFileReader.h"
#include "logger/Logger.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "file_server/reader/LogFileReader.h"
#include "runner/ProcessorRunner.h"

namespace logtail {

Expand Down Expand Up @@ -59,7 +59,7 @@ void HistoryFileImporter::LoadCheckPoint() {
}

void HistoryFileImporter::ProcessEvent(const HistoryFileEvent& event, const std::vector<std::string>& fileNames) {
static LogProcess* logProcess = LogProcess::GetInstance();
static ProcessorRunner* logProcess = ProcessorRunner::GetInstance();

LOG_INFO(sLogger, ("begin load history files, count", fileNames.size())("file list", ToString(fileNames)));
for (size_t i = 0; i < fileNames.size(); ++i) {
Expand Down Expand Up @@ -116,7 +116,7 @@ void HistoryFileImporter::ProcessEvent(const HistoryFileEvent& event, const std:

// TODO: currently only 1 input is allowed, so we assume 0 here. It should be the actual input seq after
// refactorization.
logProcess->PushBuffer(readerSharePtr->GetQueueKey(), 0, std::move(group), 100000000);
logProcess->PushQueue(readerSharePtr->GetQueueKey(), 0, std::move(group), 100000000);
} else {
// when ReadLog return false, retry once
if (doneFlag) {
Expand Down
31 changes: 16 additions & 15 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#if defined(__linux__) && !defined(__ANDROID__)
#include "ebpf/eBPFServer.h"
#endif
#include "runner/LogProcess.h"
#include "runner/ProcessorRunner.h"
#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
#include "app_config/AppConfig.h"
#include "shennong/ShennongManager.h"
Expand All @@ -36,6 +36,15 @@ using namespace std;

namespace logtail {

PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
#endif
}) {
}

void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用
Expand Down Expand Up @@ -175,25 +184,17 @@ void PipelineManager::StopAllPipelines() {
StreamLogManager::GetInstance()->Shutdown();
}
#endif
PrometheusInputRunner::GetInstance()->Stop();
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance()->Stop();
#endif
for (auto& item : mInputRunners) {
if (item->HasRegisteredPlugins()) {
item->Stop();
}
}
FileServer::GetInstance()->Stop();

LogtailPlugin::GetInstance()->StopAll(true);
LogtailPlugin::GetInstance()->StopBuiltInModules();

bool logProcessFlushFlag = false;
for (int i = 0; !logProcessFlushFlag && i < 500; ++i) {
logProcessFlushFlag = LogProcess::GetInstance()->FlushOut(10);
}
if (!logProcessFlushFlag) {
LOG_WARNING(sLogger, ("flush process daemon queue", "failed"));
} else {
LOG_INFO(sLogger, ("flush process daemon queue", "succeeded"));
}
LogProcess::GetInstance()->HoldOn();
ProcessorRunner::GetInstance()->Stop();

FlushAllBatch();

Expand Down
5 changes: 4 additions & 1 deletion core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "common/Lock.h"
#include "config/ConfigDiff.h"
#include "pipeline/Pipeline.h"
#include "runner/InputRunner.h"

namespace logtail {

Expand All @@ -48,7 +49,7 @@ class PipelineManager {
void StopAllPipelines();

private:
PipelineManager() = default;
PipelineManager();
~PipelineManager() = default;

virtual std::shared_ptr<Pipeline> BuildPipeline(PipelineConfig&& config); // virtual for ut
Expand All @@ -63,6 +64,8 @@ class PipelineManager {
mutable SpinLock mPluginCntMapLock;
std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>> mPluginCntMap;

std::vector<InputRunner*> mInputRunners;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineManagerMock;
friend class PipelineManagerUnittest;
Expand Down
71 changes: 0 additions & 71 deletions core/runner/LogProcess.h

This file was deleted.

Loading

0 comments on commit 2bf161e

Please sign in to comment.