Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update: check pipeline queue before scrape, try again after 1 second if is not valid to push #1757

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions core/common/timer/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Timer {

#ifdef APSARA_UNIT_TEST_MAIN
friend class TimerUnittest;
friend class ScrapeSchedulerUnittest;
#endif
};

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputPrometheus.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class InputPrometheus : public Input {
bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override;
bool Start() override;
bool Stop(bool isPipelineRemoving) override;
bool SupportAck() const override { return false; }
bool SupportAck() const override { return true; }

private:
bool CreateInnerProcessors(const Json::Value& inputConfig);
Expand Down
22 changes: 16 additions & 6 deletions core/prometheus/async/PromFuture.cpp
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
#include "prometheus/async/PromFuture.h"

#include "common/Lock.h"
#include "common/http/HttpResponse.h"

namespace logtail {

void PromFuture::Process(const HttpResponse& response, uint64_t timestampMilliSec) {
template <typename... Args>
bool PromFuture<Args...>::Process(Args... args) {
WriteLock lock(mStateRWLock);
if (mState == PromFutureState::New) {
for (auto& callback : mDoneCallbacks) {
callback(response, timestampMilliSec);
if (!callback(std::forward<Args>(args)...)) {
mState = PromFutureState::Done;
return false;
}
}
mState = PromFutureState::Done;
} else {
return;
}

return true;
}

void PromFuture::AddDoneCallback(std::function<void(const HttpResponse&, uint64_t timestampMilliSec)>&& callback) {
template <typename... Args>
void PromFuture<Args...>::AddDoneCallback(CallbackSignature&& callback) {
mDoneCallbacks.emplace_back(std::move(callback));
}

void PromFuture::Cancel() {
template <typename... Args>
void PromFuture<Args...>::Cancel() {
WriteLock lock(mStateRWLock);
mState = PromFutureState::Done;
}

template class PromFuture<const HttpResponse&, uint64_t>;
template class PromFuture<>;

} // namespace logtail
11 changes: 7 additions & 4 deletions core/prometheus/async/PromFuture.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
#pragma once

#include <functional>

#include "common/Lock.h"
#include "common/http/HttpResponse.h"

namespace logtail {

enum class PromFutureState { New, Processing, Done };

template <typename... Args>
class PromFuture {
public:
using CallbackSignature = std::function<bool(Args...)>;
// Process should support oneshot and streaming mode.
void Process(const HttpResponse&, uint64_t timestampMilliSec);
bool Process(Args...);

void AddDoneCallback(std::function<void(const HttpResponse&, uint64_t timestampMilliSec)>&& callback);
void AddDoneCallback(CallbackSignature&&);

void Cancel();

protected:
PromFutureState mState = {PromFutureState::New};
ReadWriteLock mStateRWLock;

std::vector<std::function<void(const HttpResponse&, uint64_t timestampMilliSec)>> mDoneCallbacks;
std::vector<CallbackSignature> mDoneCallbacks;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ScrapeSchedulerUnittest;
Expand Down
15 changes: 11 additions & 4 deletions core/prometheus/async/PromHttpRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@ PromHttpRequest::PromHttpRequest(const std::string& method,
const std::string& body,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture> future)
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> future,
std::shared_ptr<PromFuture<>> isContextValidFuture)
: AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body, timeout, maxTryCnt),
mFuture(std::move(future)) {
mFuture(std::move(future)),
mIsContextValidFuture(std::move(isContextValidFuture)) {
}

void PromHttpRequest::OnSendDone(const HttpResponse& response) {
mFuture->Process(response,
std::chrono::duration_cast<std::chrono::milliseconds>(mLastSendTime.time_since_epoch()).count());
if (mFuture != nullptr) {
mFuture->Process(
response, std::chrono::duration_cast<std::chrono::milliseconds>(mLastSendTime.time_since_epoch()).count());
}
}

[[nodiscard]] bool PromHttpRequest::IsContextValid() const {
if (mIsContextValidFuture != nullptr) {
return mIsContextValidFuture->Process();
}
return true;
}

Expand Down
6 changes: 4 additions & 2 deletions core/prometheus/async/PromHttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class PromHttpRequest : public AsynHttpRequest {
const std::string& body,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture> future);
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> future,
std::shared_ptr<PromFuture<>> isContextValidFuture = nullptr);
PromHttpRequest(const PromHttpRequest&) = default;
~PromHttpRequest() override = default;

Expand All @@ -30,7 +31,8 @@ class PromHttpRequest : public AsynHttpRequest {
private:
void SetNextExecTime(std::chrono::steady_clock::time_point execTime);

std::shared_ptr<PromFuture> mFuture;
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> mFuture;
std::shared_ptr<PromFuture<>> mIsContextValidFuture;
};

} // namespace logtail
8 changes: 7 additions & 1 deletion core/prometheus/schedulers/BaseScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
namespace logtail {
void BaseScheduler::ExecDone() {
mExecCount++;
mLatestExecTime = mFirstExecTime + std::chrono::seconds(mExecCount * mInterval);
}

std::chrono::steady_clock::time_point BaseScheduler::GetNextExecTime() {
return mFirstExecTime + std::chrono::seconds(mExecCount * mInterval);
return mLatestExecTime;
}

void BaseScheduler::SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime) {
mFirstExecTime = firstExecTime;
mLatestExecTime = mFirstExecTime;
}

void BaseScheduler::DelayExecTime(uint64_t delaySeconds) {
mLatestExecTime = mLatestExecTime + std::chrono::seconds(delaySeconds);
}

void BaseScheduler::Cancel() {
Expand Down
6 changes: 5 additions & 1 deletion core/prometheus/schedulers/BaseScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <memory>

#include "common/http/HttpResponse.h"
#include "prometheus/async/PromFuture.h"

namespace logtail {
Expand All @@ -18,18 +19,21 @@ class BaseScheduler {
std::chrono::steady_clock::time_point GetNextExecTime();

void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime);
void DelayExecTime(uint64_t delaySeconds);

virtual void Cancel();

protected:
bool IsCancelled();

std::chrono::steady_clock::time_point mFirstExecTime;
std::chrono::steady_clock::time_point mLatestExecTime;
int64_t mExecCount = 0;
int64_t mInterval = 0;

ReadWriteLock mLock;
bool mValidState = true;
std::shared_ptr<PromFuture> mFuture;
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> mFuture;
std::shared_ptr<PromFuture<>> mIsContextValidFuture;
};
} // namespace logtail
39 changes: 32 additions & 7 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
#include "common/timer/HttpRequestTimerEvent.h"
#include "common/timer/Timer.h"
#include "logger/Logger.h"
#include "prometheus/Constants.h"
#include "prometheus/async/PromHttpRequest.h"
#include "pipeline/queue/ProcessQueueItem.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/async/PromFuture.h"
#include "prometheus/async/PromHttpRequest.h"

using namespace std;

Expand Down Expand Up @@ -95,40 +96,60 @@ void ScrapeScheduler::PushEventGroup(PipelineEventGroup&& eGroup) {
auto item = make_unique<ProcessQueueItem>(std::move(eGroup), mInputIndex);
#ifdef APSARA_UNIT_TEST_MAIN
mItem.push_back(std::move(item));
return;
#endif
ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item));
while (true) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) == 0) {
break;
}
usleep(10 * 1000);
}
}

string ScrapeScheduler::GetId() const {
return mHash;
}

void ScrapeScheduler::ScheduleNext() {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
auto isContextValidFuture = std::make_shared<PromFuture<>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) {
this->OnMetricResult(response, timestampMilliSec);
this->ExecDone();
this->ScheduleNext();
return true;
});
isContextValidFuture->AddDoneCallback([this]() -> bool {
if (ProcessQueueManager::GetInstance()->IsValidToPush(mQueueKey)) {
return true;
} else {
this->DelayExecTime(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个加一秒是固定的吗?不需要可配置?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该不需要可配置,这部分执行的条件是处理或发送出现背压,延迟调度1s、5s效果都是一样的,进行毫秒级别的延迟可能队列仍旧是满的,所以就固定成1s了。

this->ScheduleNext();
return false;
}
});

if (IsCancelled()) {
mFuture->Cancel();
mIsContextValidFuture->Cancel();
return;
}

{
WriteLock lock(mLock);
mFuture = future;
mIsContextValidFuture = isContextValidFuture;
}

auto event = BuildScrapeTimerEvent(GetNextExecTime());
mTimer->PushEvent(std::move(event));
}

void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime) {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) {
this->OnMetricResult(response, timestampMilliSec);
return true;
});
mFuture = future;
auto event = BuildScrapeTimerEvent(execTime);
Expand All @@ -149,15 +170,19 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mScrapeConfigPtr->mScrapeTimeoutSeconds,
mScrapeConfigPtr->mScrapeIntervalSeconds
/ mScrapeConfigPtr->mScrapeTimeoutSeconds,
this->mFuture);
this->mFuture,
this->mIsContextValidFuture);
auto timerEvent = std::make_unique<HttpRequestTimerEvent>(execTime, std::move(request));
return timerEvent;
}

void ScrapeScheduler::Cancel() {
if (mFuture) {
if (mFuture != nullptr) {
mFuture->Cancel();
}
if (mIsContextValidFuture != nullptr) {
mIsContextValidFuture->Cancel();
}
{
WriteLock lock(mLock);
mValidState = false;
Expand Down
6 changes: 4 additions & 2 deletions core/prometheus/schedulers/TargetSubscriberScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,12 @@ string TargetSubscriberScheduler::GetId() const {
}

void TargetSubscriberScheduler::ScheduleNext() {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) {
this->OnSubscription(response, timestampMilliSec);
this->ExecDone();
this->ScheduleNext();
return true;
});
if (IsCancelled()) {
mFuture->Cancel();
Expand All @@ -265,9 +266,10 @@ void TargetSubscriberScheduler::Cancel() {
}

void TargetSubscriberScheduler::SubscribeOnce(std::chrono::steady_clock::time_point execTime) {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampNanoSec) {
this->OnSubscription(response, timestampNanoSec);
return true;
});
mFuture = future;
auto event = BuildSubscriberTimerEvent(execTime);
Expand Down
3 changes: 2 additions & 1 deletion core/unittest/prometheus/PromAsynUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ class PromAsynUnittest : public testing::Test {
};

void PromAsynUnittest::TestExecTime() {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
auto now = std::chrono::system_clock::now();
bool exec = false;
future->AddDoneCallback([&exec, now](const HttpResponse&, uint64_t timestampMilliSec) {
APSARA_TEST_EQUAL(timestampMilliSec,
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count());

APSARA_TEST_TRUE(exec);
return true;
});
auto request = std::make_shared<PromHttpRequest>(
"http", false, "127.0.0.1", 8080, "/", "", map<string, string>(), "", 10, 3, future);
Expand Down
Loading
Loading