Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 24, 2024
1 parent b29b57f commit e770c59
Show file tree
Hide file tree
Showing 12 changed files with 49 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct K8sContainerMeta {
// Methods export by plugin.
typedef GoInt (*LoadGlobalConfigFun)(GoString);
typedef GoInt (*LoadPipelineFun)(GoString p, GoString l, GoString c, GoInt64 k, GoString p2);
typedef GoInt (*UnloadPipelineFun)(GoString p, GoString l, GoString c);
typedef GoInt (*UnloadPipelineFun)(GoString c);
typedef void (*StopAllFun)(GoInt);
typedef void (*StopFun)(GoString, GoInt);
typedef void (*StopBuiltInFun)();
Expand Down
1 change: 0 additions & 1 deletion core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class Pipeline {
CounterPtr mProcessorsInGroupsCnt;
CounterPtr mProcessorsInGroupDataSizeBytes;
CounterPtr mProcessorsTotalDelayMs;
CounterPtr mLoadDelayMs;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineMock;
Expand Down
6 changes: 3 additions & 3 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(name, ConfigFeedbackStatus::DELETED);
}
for (auto& config : diff.mModified) {
auto iter = mPipelineNameEntityMap.find(config.mName);
auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue
if (!p) {
LOG_WARNING(sLogger,
LOG_WARNING(sLogger,
("failed to build pipeline for existing config",
"keep current pipeline running")("config", config.mName));
LogtailAlarm::GetInstance()->SendAlarm(
Expand All @@ -115,6 +114,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
LOG_INFO(sLogger,
("pipeline building for existing config succeeded",
"stop the old pipeline and start the new one")("config", config.mName));
auto iter = mPipelineNameEntityMap.find(config.mName);
iter->second->Stop(false);
DecreasePluginUsageCnt(iter->second->GetPluginStatistics());

Expand All @@ -126,7 +126,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
for (auto& config : diff.mAdded) {
auto p = BuildPipeline(std::move(config));
if (!p) {
LOG_WARNING(sLogger,
LOG_WARNING(sLogger,
("failed to build pipeline for new config", "skip current object")("config", config.mName));
LogtailAlarm::GetInstance()->SendAlarm(
CATEGORY_CONFIG_ALARM,
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class Flusher : public Plugin {
#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
#endif
};

Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/queue/BoundedSenderQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface<std::unique_ptr

void SetRateLimiter(uint32_t maxRate);
void SetConcurrencyLimiters(std::vector<std::shared_ptr<ConcurrencyLimiter>>&& limiters);
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const = 0;
virtual void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const = 0;

#ifdef APSARA_UNIT_TEST_MAIN
std::optional<RateLimiter>& GetRateLimiter() { return mRateLimiter; }
Expand Down
8 changes: 8 additions & 0 deletions core/pipeline/queue/ExactlyOnceQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ uint32_t ExactlyOnceQueueManager::GetProcessQueueCnt() const {
return mProcessQueues.size();
}

void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, std::shared_ptr<Pipeline>& p) {
lock_guard<mutex> lock(mSenderQueueMux);
auto iter = mSenderQueues.find(key);
if (iter != mSenderQueues.end()) {
iter->second.SetPipelineForItems(p);
}
}

#ifdef APSARA_UNIT_TEST_MAIN
void ExactlyOnceQueueManager::Clear() {
{
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ExactlyOnceQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class ExactlyOnceQueueManager {
void GetAllAvailableSenderQueueItems(std::vector<SenderQueueItem*>& item, bool withLimits = true);
bool RemoveSenderQueueItem(QueueKey key, SenderQueueItem* item);
bool IsAllSenderQueueEmpty() const;
void SetPipelineForSenderItems(QueueKey key, std::shared_ptr<Pipeline>& p);

void ClearTimeoutQueues();

Expand Down
8 changes: 8 additions & 0 deletions core/pipeline/queue/ExactlyOnceSenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,12 @@ void ExactlyOnceSenderQueue::Reset(const vector<RangeCheckpointPtr>& checkpoints
mRangeCheckpoints = checkpoints;
}

void ExactlyOnceSenderQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
for (size_t index = 0; index < mCapacity; ++index) {
if (!mQueue[index]) {
mQueue[index]->mPipeline = p;
}
}
}

} // namespace logtail
5 changes: 4 additions & 1 deletion core/pipeline/queue/ExactlyOnceSenderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ namespace logtail {
// not thread-safe, should be protected explicitly by queue manager
class ExactlyOnceSenderQueue : public BoundedSenderQueueInterface {
public:
ExactlyOnceSenderQueue(const std::vector<RangeCheckpointPtr>& checkpoints, QueueKey key, const PipelineContext& ctx);
ExactlyOnceSenderQueue(const std::vector<RangeCheckpointPtr>& checkpoints,
QueueKey key,
const PipelineContext& ctx);

bool Push(std::unique_ptr<SenderQueueItem>&& item) override;
bool Remove(SenderQueueItem* item) override;
void GetAllAvailableItems(std::vector<SenderQueueItem*>& items, bool withLimits = true) override;
void SetPipelineForItems(std::shared_ptr<Pipeline>& p) const override;

void Reset(const std::vector<RangeCheckpointPtr>& checkpoints);

Expand Down
4 changes: 2 additions & 2 deletions core/pipeline/queue/SenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ void SenderQueue::GetAllAvailableItems(vector<SenderQueueItem*>& items, bool wit
}
}

void SenderQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& pipeline) const {
void SenderQueue::SetPipelineForItems(std::shared_ptr<Pipeline>& p) const {
for (auto& item : mQueue) {
if (!item->mPipeline) {
item->mPipeline = pipeline;
item->mPipeline = p;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueueManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ void SenderQueueManager::SetPipelineForItems(QueueKey key, std::shared_ptr<Pipel
if (iter != mQueues.end()) {
iter->second.SetPipelineForItems(p);
}
ExactlyOnceQueueManager::GetInstance()->SetPipelineForSenderItems(key, p);
}

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down
19 changes: 19 additions & 0 deletions core/unittest/pipeline/PipelineUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

#include <json/json.h>

#include <future>
#include <memory>
#include <string>
#include <thread>

#include "app_config/AppConfig.h"
#include "common/JsonUtil.h"
Expand Down Expand Up @@ -48,6 +50,7 @@ class PipelineUnittest : public ::testing::Test {
void TestSend() const;
void TestFlushBatch() const;
void TestInProcessingCount() const;
void TestWaitAllItemsInProcessFinished() const;

protected:
static void SetUpTestCase() {
Expand Down Expand Up @@ -2876,6 +2879,21 @@ void PipelineUnittest::TestInProcessingCount() const {
APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load());
}

void PipelineUnittest::TestWaitAllItemsInProcessFinished() const {
auto pipeline = make_shared<Pipeline>();
pipeline->mPluginID.store(0);
pipeline->mInProcessCnt.store(0);

pipeline->mInProcessCnt.store(1);
std::future<void> future = std::async(std::launch::async, &Pipeline::WaitAllItemsInProcessFinished, pipeline.get());

// block
APSARA_TEST_NOT_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0)));
pipeline->mInProcessCnt.store(0);
// recover
APSARA_TEST_EQUAL(std::future_status::ready, future.wait_for(std::chrono::seconds(0)));
}

UNIT_TEST_CASE(PipelineUnittest, OnSuccessfulInit)
UNIT_TEST_CASE(PipelineUnittest, OnFailedInit)
UNIT_TEST_CASE(PipelineUnittest, TestProcessQueue)
Expand All @@ -2886,6 +2904,7 @@ UNIT_TEST_CASE(PipelineUnittest, TestProcess)
UNIT_TEST_CASE(PipelineUnittest, TestSend)
UNIT_TEST_CASE(PipelineUnittest, TestFlushBatch)
UNIT_TEST_CASE(PipelineUnittest, TestInProcessingCount)
UNIT_TEST_CASE(PipelineUnittest, TestWaitAllItemsInProcessFinished)

} // namespace logtail

Expand Down

0 comments on commit e770c59

Please sign in to comment.