diff --git a/core/go_pipeline/LogtailPlugin.cpp b/core/go_pipeline/LogtailPlugin.cpp index 235863370a..6def9fd906 100644 --- a/core/go_pipeline/LogtailPlugin.cpp +++ b/core/go_pipeline/LogtailPlugin.cpp @@ -155,10 +155,10 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) { } } -void LogtailPlugin::StopBuiltIn() { +void LogtailPlugin::StopBuiltInModules() { if (mPluginValid && mStopFun != NULL) { LOG_INFO(sLogger, ("Go pipelines stop built-in", "starts")); - mStopBuiltInFun(); + mStopBuiltInModulesFun(); LOG_INFO(sLogger, ("Go pipelines stop built-in", "succeeded")); } } @@ -398,9 +398,9 @@ bool LogtailPlugin::LoadPluginBase() { return mPluginValid; } // 停止内置功能 - mStopBuiltInFun = (StopBuiltInFun)loader.LoadMethod("StopBuiltIn", error); + mStopBuiltInModulesFun = (StopBuiltInModulesFun)loader.LoadMethod("StopBuiltInModules", error); if (!error.empty()) { - LOG_ERROR(sLogger, ("load StopBuiltIn error, Message", error)); + LOG_ERROR(sLogger, ("load StopBuiltInModules error, Message", error)); return mPluginValid; } // 插件恢复 diff --git a/core/go_pipeline/LogtailPlugin.h b/core/go_pipeline/LogtailPlugin.h index eeacd62471..7934a65f9d 100644 --- a/core/go_pipeline/LogtailPlugin.h +++ b/core/go_pipeline/LogtailPlugin.h @@ -136,7 +136,7 @@ typedef GoInt (*LoadPipelineFun)(GoString p, GoString l, GoString c, GoInt64 k, typedef GoInt (*UnloadPipelineFun)(GoString c); typedef void (*StopAllFun)(GoInt); typedef void (*StopFun)(GoString, GoInt); -typedef void (*StopBuiltInFun)(); +typedef void (*StopBuiltInModulesFun)(); typedef void (*StartFun)(GoString); typedef GoInt (*InitPluginBaseFun)(); typedef GoInt (*InitPluginBaseV2Fun)(GoString cfg); @@ -216,7 +216,7 @@ class LogtailPlugin { bool UnloadPipeline(const std::string& pipelineName); void StopAll(bool withInputFlag); void Stop(const std::string& configName, bool removingFlag); - void StopBuiltIn(); + void StopBuiltInModules(); void Start(const std::string& configName); bool IsPluginOpened() { return mPluginValid; } @@ -266,7 +266,7 @@ class LogtailPlugin { UnloadPipelineFun mUnloadPipelineFun; StopAllFun mStopAllFun; StopFun mStopFun; - StopBuiltInFun mStopBuiltInFun; + StopBuiltInModulesFun mStopBuiltInModulesFun; StartFun mStartFun; volatile bool mPluginValid; logtail::FlusherSLS mPluginAlarmConfig; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 2dcefaa438..b45e30cc17 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -22,13 +22,11 @@ #include "prometheus/PrometheusInputRunner.h" #if defined(__linux__) && !defined(__ANDROID__) #include "ebpf/eBPFServer.h" -#include "observer/ObserverManager.h" #endif #include "runner/LogProcess.h" #if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) #include "app_config/AppConfig.h" #include "shennong/ShennongManager.h" -#include "streamlog/StreamLogManager.h" #endif #include "config/feedbacker/ConfigFeedbackReceiver.h" #include "pipeline/queue/ProcessQueueManager.h" @@ -42,9 +40,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { #ifndef APSARA_UNIT_TEST_MAIN // 过渡使用 static bool isFileServerStarted = false, isInputObserverStarted = false; -#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) - static bool isInputStreamStarted = false; -#endif bool isInputObserverChanged = false, isInputFileChanged = false, isInputStreamChanged = false, isInputContainerStdioChanged = false; for (const auto& name : diff.mRemoved) { @@ -73,14 +68,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { if (AppConfig::GetInstance()->ShennongSocketEnabled()) { ShennongManager::GetInstance()->Pause(); } - if (isInputStreamStarted && isInputStreamChanged) { - StreamLogManager::GetInstance()->ShutdownConfigUsage(); - } -#endif -#if defined(__linux__) && !defined(__ANDROID__) - if (isInputObserverStarted && isInputObserverChanged) { - ObserverManager::GetInstance()->HoldOn(false); - } #endif if (isFileServerStarted && (isInputFileChanged || isInputContainerStdioChanged)) { FileServer::GetInstance()->Pause(); @@ -98,7 +85,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { for (auto& config : diff.mModified) { auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue if (!p) { - LOG_WARNING(sLogger, + LOG_WARNING(sLogger, ("failed to build pipeline for existing config", "keep current pipeline running")("config", config.mName)); LogtailAlarm::GetInstance()->SendAlarm( @@ -126,7 +113,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { for (auto& config : diff.mAdded) { auto p = BuildPipeline(std::move(config)); if (!p) { - LOG_WARNING(sLogger, + LOG_WARNING(sLogger, ("failed to build pipeline for new config", "skip current object")("config", config.mName)); LogtailAlarm::GetInstance()->SendAlarm( CATEGORY_CONFIG_ALARM, @@ -159,29 +146,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { } } -#if defined(__linux__) && !defined(__ANDROID__) - if (isInputObserverChanged) { - if (isInputObserverStarted) { - ObserverManager::GetInstance()->Resume(); - } else { - // input_observer_network always relies on PluginBase - LogtailPlugin::GetInstance()->LoadPluginBase(); - ObserverManager::GetInstance()->Reload(); - isInputObserverStarted = true; - } - } -#endif #if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) - if (isInputStreamChanged) { - if (isInputStreamStarted) { - StreamLogManager::GetInstance()->StartupConfigUsage(); - } else { - if (AppConfig::GetInstance()->GetOpenStreamLog()) { - StreamLogManager::GetInstance()->Init(); - isInputStreamStarted = true; - } - } - } if (AppConfig::GetInstance()->ShennongSocketEnabled()) { ShennongManager::GetInstance()->Resume(); } @@ -225,13 +190,12 @@ void PipelineManager::StopAllPipelines() { #endif PrometheusInputRunner::GetInstance()->Stop(); #if defined(__linux__) && !defined(__ANDROID__) - ObserverManager::GetInstance()->HoldOn(true); ebpf::eBPFServer::GetInstance()->Stop(); #endif FileServer::GetInstance()->Stop(); LogtailPlugin::GetInstance()->StopAll(true); - LogtailPlugin::GetInstance()->StopBuiltIn(); + LogtailPlugin::GetInstance()->StopBuiltInModules(); bool logProcessFlushFlag = false; for (int i = 0; !logProcessFlushFlag && i < 500; ++i) { diff --git a/core/pipeline/PipelineManager.h b/core/pipeline/PipelineManager.h index 171a1e7084..4486760403 100644 --- a/core/pipeline/PipelineManager.h +++ b/core/pipeline/PipelineManager.h @@ -76,6 +76,7 @@ class PipelineManager { friend class BoundedProcessQueueUnittest; friend class CircularProcessQueueUnittest; friend class CommonConfigProviderUnittest; + friend class FlusherUnittest; #endif }; diff --git a/core/pipeline/plugin/interface/Flusher.cpp b/core/pipeline/plugin/interface/Flusher.cpp index 1ef65b77fa..5ac469ebd4 100644 --- a/core/pipeline/plugin/interface/Flusher.cpp +++ b/core/pipeline/plugin/interface/Flusher.cpp @@ -37,7 +37,7 @@ bool Flusher::Stop(bool isPipelineRemoving) { void Flusher::SetPipelineForItemsWhenStop() { if (HasContext()) { - auto pipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName()); + const auto& pipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName()); if (!pipeline) { LOG_ERROR(sLogger, ("failed to get pipeline context", "context not found")("action", "not set pipeline")); return; diff --git a/core/pipeline/queue/BoundedProcessQueue.cpp b/core/pipeline/queue/BoundedProcessQueue.cpp index 977d24ec55..9e3149fbc2 100644 --- a/core/pipeline/queue/BoundedProcessQueue.cpp +++ b/core/pipeline/queue/BoundedProcessQueue.cpp @@ -68,7 +68,7 @@ bool BoundedProcessQueue::Pop(unique_ptr& item) { return true; } -void BoundedProcessQueue::SetPipelineForItems(std::shared_ptr& p) const { +void BoundedProcessQueue::SetPipelineForItems(const std::shared_ptr& p) const { for (auto& item : mQueue) { if (!item->mPipeline) { item->mPipeline = p; diff --git a/core/pipeline/queue/BoundedProcessQueue.h b/core/pipeline/queue/BoundedProcessQueue.h index a614b5b923..728abe0bbe 100644 --- a/core/pipeline/queue/BoundedProcessQueue.h +++ b/core/pipeline/queue/BoundedProcessQueue.h @@ -36,7 +36,7 @@ class BoundedProcessQueue : public BoundedQueueInterface&& item) override; bool Pop(std::unique_ptr& item) override; - void SetPipelineForItems(std::shared_ptr& p) const override; + void SetPipelineForItems(const std::shared_ptr& p) const override; void SetUpStreamFeedbacks(std::vector&& feedbacks); diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.cpp b/core/pipeline/queue/BoundedSenderQueueInterface.cpp index 03e26ebac3..0de86c8a24 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.cpp +++ b/core/pipeline/queue/BoundedSenderQueueInterface.cpp @@ -61,7 +61,7 @@ void BoundedSenderQueueInterface::GiveFeedback() const { } void BoundedSenderQueueInterface::Reset(size_t cap, size_t low, size_t high) { - queue>().swap(mExtraBuffer); + deque>().swap(mExtraBuffer); mRateLimiter.reset(); mConcurrencyLimiters.clear(); BoundedQueueInterface::Reset(low, high); diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.h b/core/pipeline/queue/BoundedSenderQueueInterface.h index 7ee9a244c7..a2368333df 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.h +++ b/core/pipeline/queue/BoundedSenderQueueInterface.h @@ -47,7 +47,7 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface>&& limiters); - virtual void SetPipelineForItems(std::shared_ptr& p) const = 0; + virtual void SetPipelineForItems(const std::shared_ptr& p) const = 0; #ifdef APSARA_UNIT_TEST_MAIN std::optional& GetRateLimiter() { return mRateLimiter; } @@ -63,10 +63,14 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface mRateLimiter; std::vector> mConcurrencyLimiters; - std::queue> mExtraBuffer; + std::deque> mExtraBuffer; IntGaugePtr mExtraBufferSize; IntGaugePtr mExtraBufferDataSizeBytes; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class FlusherUnittest; +#endif }; } // namespace logtail diff --git a/core/pipeline/queue/CircularProcessQueue.cpp b/core/pipeline/queue/CircularProcessQueue.cpp index e3fe927645..3eeb1f4432 100644 --- a/core/pipeline/queue/CircularProcessQueue.cpp +++ b/core/pipeline/queue/CircularProcessQueue.cpp @@ -73,7 +73,7 @@ bool CircularProcessQueue::Pop(unique_ptr& item) { return true; } -void CircularProcessQueue::SetPipelineForItems(std::shared_ptr& p) const { +void CircularProcessQueue::SetPipelineForItems(const std::shared_ptr& p) const { for (auto& item : mQueue) { if (!item->mPipeline) { item->mPipeline = p; diff --git a/core/pipeline/queue/CircularProcessQueue.h b/core/pipeline/queue/CircularProcessQueue.h index 7d31a73e57..dc38a8f4ce 100644 --- a/core/pipeline/queue/CircularProcessQueue.h +++ b/core/pipeline/queue/CircularProcessQueue.h @@ -33,7 +33,7 @@ class CircularProcessQueue : virtual public QueueInterface&& item) override; bool Pop(std::unique_ptr& item) override; - void SetPipelineForItems(std::shared_ptr& p) const override; + void SetPipelineForItems(const std::shared_ptr& p) const override; void Reset(size_t cap); diff --git a/core/pipeline/queue/ExactlyOnceQueueManager.cpp b/core/pipeline/queue/ExactlyOnceQueueManager.cpp index 15fc9bba29..02d7133b22 100644 --- a/core/pipeline/queue/ExactlyOnceQueueManager.cpp +++ b/core/pipeline/queue/ExactlyOnceQueueManager.cpp @@ -154,7 +154,7 @@ void ExactlyOnceQueueManager::DisablePopProcessQueue(const string& configName, b if (iter.second->GetConfigName() == configName) { iter.second->DisablePop(); if (!isPipelineRemoving) { - auto p = PipelineManager::GetInstance()->FindConfigByName(configName); + const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName); if (p) { iter.second->SetPipelineForItems(p); } @@ -279,7 +279,7 @@ uint32_t ExactlyOnceQueueManager::GetProcessQueueCnt() const { return mProcessQueues.size(); } -void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, std::shared_ptr& p) { +void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, const std::shared_ptr& p) { lock_guard lock(mSenderQueueMux); auto iter = mSenderQueues.find(key); if (iter != mSenderQueues.end()) { diff --git a/core/pipeline/queue/ExactlyOnceQueueManager.h b/core/pipeline/queue/ExactlyOnceQueueManager.h index 9d726c9f4a..9bea5afa36 100644 --- a/core/pipeline/queue/ExactlyOnceQueueManager.h +++ b/core/pipeline/queue/ExactlyOnceQueueManager.h @@ -67,7 +67,7 @@ class ExactlyOnceQueueManager { void GetAllAvailableSenderQueueItems(std::vector& item, bool withLimits = true); bool RemoveSenderQueueItem(QueueKey key, SenderQueueItem* item); bool IsAllSenderQueueEmpty() const; - void SetPipelineForSenderItems(QueueKey key, std::shared_ptr& p); + void SetPipelineForSenderItems(QueueKey key, const std::shared_ptr& p); void ClearTimeoutQueues(); diff --git a/core/pipeline/queue/ExactlyOnceSenderQueue.cpp b/core/pipeline/queue/ExactlyOnceSenderQueue.cpp index 2582c0b9c6..1c85c65a85 100644 --- a/core/pipeline/queue/ExactlyOnceSenderQueue.cpp +++ b/core/pipeline/queue/ExactlyOnceSenderQueue.cpp @@ -78,7 +78,7 @@ bool ExactlyOnceSenderQueue::Push(unique_ptr&& item) { } if (!eo->IsComplete()) { item->mEnqueTime = chrono::system_clock::now(); - mExtraBuffer.push(std::move(item)); + mExtraBuffer.push_back(std::move(item)); return true; } } @@ -102,7 +102,7 @@ bool ExactlyOnceSenderQueue::Remove(SenderQueueItem* item) { if (!mExtraBuffer.empty()) { Push(std::move(mExtraBuffer.front())); - mExtraBuffer.pop(); + mExtraBuffer.pop_front(); return true; } if (ChangeStateIfNeededAfterPop()) { @@ -153,10 +153,22 @@ void ExactlyOnceSenderQueue::Reset(const vector& checkpoints mRangeCheckpoints = checkpoints; } -void ExactlyOnceSenderQueue::SetPipelineForItems(std::shared_ptr& p) const { +void ExactlyOnceSenderQueue::SetPipelineForItems(const std::shared_ptr& p) const { + if (Empty()) { + return; + } for (size_t index = 0; index < mCapacity; ++index) { - if (!mQueue[index]) { - mQueue[index]->mPipeline = p; + SenderQueueItem* item = mQueue[index].get(); + if (item == nullptr) { + continue; + } + if (!item->mPipeline) { + item->mPipeline = p; + } + } + for (auto& item : mExtraBuffer) { + if (!item->mPipeline) { + item->mPipeline = p; } } } diff --git a/core/pipeline/queue/ExactlyOnceSenderQueue.h b/core/pipeline/queue/ExactlyOnceSenderQueue.h index 2c13aa5391..dec4dd4887 100644 --- a/core/pipeline/queue/ExactlyOnceSenderQueue.h +++ b/core/pipeline/queue/ExactlyOnceSenderQueue.h @@ -37,7 +37,7 @@ class ExactlyOnceSenderQueue : public BoundedSenderQueueInterface { bool Push(std::unique_ptr&& item) override; bool Remove(SenderQueueItem* item) override; void GetAllAvailableItems(std::vector& items, bool withLimits = true) override; - void SetPipelineForItems(std::shared_ptr& p) const override; + void SetPipelineForItems(const std::shared_ptr& p) const override; void Reset(const std::vector& checkpoints); diff --git a/core/pipeline/queue/ProcessQueueInterface.h b/core/pipeline/queue/ProcessQueueInterface.h index 48a85a9a90..b8664cbdda 100644 --- a/core/pipeline/queue/ProcessQueueInterface.h +++ b/core/pipeline/queue/ProcessQueueInterface.h @@ -45,7 +45,7 @@ class ProcessQueueInterface : virtual public QueueInterface& p) const = 0; + virtual void SetPipelineForItems(const std::shared_ptr& p) const = 0; void Reset() { mDownStreamQueues.clear(); } diff --git a/core/pipeline/queue/ProcessQueueItem.h b/core/pipeline/queue/ProcessQueueItem.h index ae43221bbc..f49ce8e56b 100644 --- a/core/pipeline/queue/ProcessQueueItem.h +++ b/core/pipeline/queue/ProcessQueueItem.h @@ -38,7 +38,7 @@ struct ProcessQueueItem { if (mPipeline) { mPipeline->AddInProcessCnt(); } else { - auto p = PipelineManager::GetInstance()->FindConfigByName(configName); + const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName); if (p) { p->AddInProcessCnt(); } diff --git a/core/pipeline/queue/ProcessQueueManager.cpp b/core/pipeline/queue/ProcessQueueManager.cpp index af88c879b5..a7e32324cc 100644 --- a/core/pipeline/queue/ProcessQueueManager.cpp +++ b/core/pipeline/queue/ProcessQueueManager.cpp @@ -237,7 +237,7 @@ void ProcessQueueManager::DisablePop(const string& configName, bool isPipelineRe if (iter != mQueues.end()) { (*iter->second.first)->DisablePop(); if (!isPipelineRemoving) { - auto p = PipelineManager::GetInstance()->FindConfigByName(configName); + const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName); if (p) { (*iter->second.first)->SetPipelineForItems(p); } diff --git a/core/pipeline/queue/SenderQueue.cpp b/core/pipeline/queue/SenderQueue.cpp index ff0cf0fda9..f9368614f9 100644 --- a/core/pipeline/queue/SenderQueue.cpp +++ b/core/pipeline/queue/SenderQueue.cpp @@ -35,7 +35,7 @@ bool SenderQueue::Push(unique_ptr&& item) { mInItemDataSizeBytes->Add(size); if (Full()) { - mExtraBuffer.push(std::move(item)); + mExtraBuffer.push_back(std::move(item)); mExtraBufferSize->Set(mExtraBuffer.size()); mExtraBufferDataSizeBytes->Add(size); @@ -91,7 +91,7 @@ bool SenderQueue::Remove(SenderQueueItem* item) { if (!mExtraBuffer.empty()) { auto newSize = mExtraBuffer.front()->mData.size(); Push(std::move(mExtraBuffer.front())); - mExtraBuffer.pop(); + mExtraBuffer.pop_front(); mExtraBufferSize->Set(mExtraBuffer.size()); mExtraBufferDataSizeBytes->Sub(newSize); @@ -142,8 +142,20 @@ void SenderQueue::GetAllAvailableItems(vector& items, bool wit } } -void SenderQueue::SetPipelineForItems(std::shared_ptr& p) const { - for (auto& item : mQueue) { +void SenderQueue::SetPipelineForItems(const std::shared_ptr& p) const { + if (Empty()) { + return; + } + for (auto index = mRead; index < mWrite; ++index) { + SenderQueueItem* item = mQueue[index % mCapacity].get(); + if (item == nullptr) { + continue; + } + if (!item->mPipeline) { + item->mPipeline = p; + } + } + for (auto& item : mExtraBuffer) { if (!item->mPipeline) { item->mPipeline = p; } diff --git a/core/pipeline/queue/SenderQueue.h b/core/pipeline/queue/SenderQueue.h index b1ce94a285..a073f048e6 100644 --- a/core/pipeline/queue/SenderQueue.h +++ b/core/pipeline/queue/SenderQueue.h @@ -36,7 +36,7 @@ class SenderQueue : public BoundedSenderQueueInterface { bool Push(std::unique_ptr&& item) override; bool Remove(SenderQueueItem* item) override; void GetAllAvailableItems(std::vector& items, bool withLimits = true) override; - void SetPipelineForItems(std::shared_ptr& p) const override; + void SetPipelineForItems(const std::shared_ptr& p) const override; private: size_t Size() const override { return mSize; } @@ -49,6 +49,7 @@ class SenderQueue : public BoundedSenderQueueInterface { #ifdef APSARA_UNIT_TEST_MAIN friend class SenderQueueUnittest; friend class SenderQueueManagerUnittest; + friend class FlusherUnittest; #endif }; diff --git a/core/pipeline/queue/SenderQueueManager.cpp b/core/pipeline/queue/SenderQueueManager.cpp index dc5b0d9348..2eac71ad2f 100644 --- a/core/pipeline/queue/SenderQueueManager.cpp +++ b/core/pipeline/queue/SenderQueueManager.cpp @@ -196,13 +196,14 @@ void SenderQueueManager::Trigger() { mCond.notify_one(); } -void SenderQueueManager::SetPipelineForItems(QueueKey key, std::shared_ptr& p) { +void SenderQueueManager::SetPipelineForItems(QueueKey key, const std::shared_ptr& p) { lock_guard lock(mQueueMux); auto iter = mQueues.find(key); if (iter != mQueues.end()) { iter->second.SetPipelineForItems(p); + } else { + ExactlyOnceQueueManager::GetInstance()->SetPipelineForSenderItems(key, p); } - ExactlyOnceQueueManager::GetInstance()->SetPipelineForSenderItems(key, p); } #ifdef APSARA_UNIT_TEST_MAIN diff --git a/core/pipeline/queue/SenderQueueManager.h b/core/pipeline/queue/SenderQueueManager.h index 7988bf4cae..82ee254caa 100644 --- a/core/pipeline/queue/SenderQueueManager.h +++ b/core/pipeline/queue/SenderQueueManager.h @@ -62,7 +62,7 @@ class SenderQueueManager : public FeedbackInterface { bool IsAllQueueEmpty() const; void ClearUnusedQueues(); void NotifyPipelineStop(QueueKey key, const std::string& configName); - void SetPipelineForItems(QueueKey key, std::shared_ptr& p); + void SetPipelineForItems(QueueKey key, const std::shared_ptr& p); bool Wait(uint64_t ms); void Trigger(); diff --git a/core/unittest/plugin/CMakeLists.txt b/core/unittest/plugin/CMakeLists.txt index b579277496..4d9850bcfd 100644 --- a/core/unittest/plugin/CMakeLists.txt +++ b/core/unittest/plugin/CMakeLists.txt @@ -33,6 +33,9 @@ target_link_libraries(processor_instance_unittest ${UT_BASE_TARGET}) add_executable(flusher_instance_unittest FlusherInstanceUnittest.cpp) target_link_libraries(flusher_instance_unittest ${UT_BASE_TARGET}) +add_executable(flusher_unittest FlusherUnittest.cpp) +target_link_libraries(flusher_unittest ${UT_BASE_TARGET}) + add_executable(plugin_registry_unittest PluginRegistryUnittest.cpp) target_link_libraries(plugin_registry_unittest ${UT_BASE_TARGET}) @@ -42,5 +45,7 @@ gtest_discover_tests(static_processor_creator_unittest) gtest_discover_tests(static_flusher_creator_unittest) gtest_discover_tests(input_instance_unittest) gtest_discover_tests(processor_instance_unittest) +gtest_discover_tests(flusher_instance_unittest) +gtest_discover_tests(flusher_unittest) gtest_discover_tests(plugin_registry_unittest) diff --git a/core/unittest/plugin/FlusherUnittest.cpp b/core/unittest/plugin/FlusherUnittest.cpp new file mode 100644 index 0000000000..3f4268fad0 --- /dev/null +++ b/core/unittest/plugin/FlusherUnittest.cpp @@ -0,0 +1,86 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "pipeline/PipelineManager.h" +#include "pipeline/plugin/interface/Flusher.h" +#include "pipeline/queue/QueueKeyManager.h" +#include "unittest/Unittest.h" +#include "unittest/plugin/PluginMock.h" + +using namespace std; + +namespace logtail { + +class FlusherUnittest : public testing::Test { +public: + void TestStop() const; + +protected: + void TearDown() override { QueueKeyManager::GetInstance()->Clear(); } +}; + +void FlusherUnittest::TestStop() const { + auto pipeline = make_shared(); + PipelineManager::GetInstance()->mPipelineNameEntityMap["test_config"] = pipeline; + + auto ctx = PipelineContext(); + ctx.SetConfigName("test_config"); + + FlusherMock* mock = new FlusherMock(); + mock->SetContext(ctx); + Json::Value tmp; + mock->Init(Json::Value(), tmp); + + auto q = SenderQueueManager::GetInstance()->GetQueue(mock->GetQueueKey()); + // push items to queue + for (size_t i = 0; i < q->mCapacity; ++i) { + auto item = make_unique("content", 0, nullptr, mock->GetQueueKey()); + q->Push(std::move(item)); + } + // push items to extra buffer + for (size_t i = 0; i < 10; ++i) { + auto item = make_unique("content", 0, nullptr, mock->GetQueueKey()); + q->Push(std::move(item)); + } + + std::vector items1; + q->GetAllAvailableItems(items1, false); + for (auto item : items1) { + APSARA_TEST_EQUAL(item->mPipeline, nullptr); + } + for (size_t i = 0; i < q->mExtraBuffer.size(); ++i) { + auto item = q->mExtraBuffer[i].get(); + APSARA_TEST_EQUAL(item->mPipeline, nullptr); + } + + mock->Stop(false); + + std::vector items2; + q->GetAllAvailableItems(items2, false); + for (auto item : items2) { + APSARA_TEST_EQUAL(item->mPipeline, pipeline); + } + for (size_t i = 0; i < q->mExtraBuffer.size(); ++i) { + auto item = q->mExtraBuffer[i].get(); + APSARA_TEST_EQUAL(item->mPipeline, pipeline); + } +} + +UNIT_TEST_CASE(FlusherUnittest, TestStop) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/plugin_main/plugin_export.go b/plugin_main/plugin_export.go index 73e3060fb8..cadfd619ba 100644 --- a/plugin_main/plugin_export.go +++ b/plugin_main/plugin_export.go @@ -207,9 +207,9 @@ func Stop(configName string, removedFlag int) { } } -//export StopBuiltIn -func StopBuiltIn() { - pluginmanager.StopBuiltInConfig() +//export StopBuiltInModules +func StopBuiltInModules() { + pluginmanager.StopBuiltInModulesConfig() } //export Start diff --git a/pluginmanager/plugin_manager.go b/pluginmanager/plugin_manager.go index 00db263a6d..2dfa14ba0c 100644 --- a/pluginmanager/plugin_manager.go +++ b/pluginmanager/plugin_manager.go @@ -206,8 +206,8 @@ func StopAll(withInput bool) error { return nil } -// StopBuiltInConfig stops built-in services (self monitor, alarm, container and checkpoint manager). -func StopBuiltInConfig() { +// StopBuiltInModulesConfig stops built-in services (self monitor, alarm, container and checkpoint manager). +func StopBuiltInModulesConfig() { if StatisticsConfig != nil { if *flags.ForceSelfCollect { logger.Info(context.Background(), "force collect the static metrics")