Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metric for queue #1756

Merged
merged 14 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ void LogFileReader::initExactlyOnce(uint32_t concurrency) {
mEOOption->fbKey = QueueKeyManager::GetInstance()->GetKey(GetProject() + "-" + mEOOption->primaryCheckpointKey
+ mEOOption->rangeCheckpointPtrs[0]->data.hash_key());
ExactlyOnceQueueManager::GetInstance()->CreateOrUpdateQueue(
mEOOption->fbKey, ProcessQueueManager::sMaxPriority, mConfigName, mEOOption->rangeCheckpointPtrs);
mEOOption->fbKey, ProcessQueueManager::sMaxPriority, *mReaderConfig.second, mEOOption->rangeCheckpointPtrs);
for (auto& cpt : mEOOption->rangeCheckpointPtrs) {
cpt->fbKey = mEOOption->fbKey;
}
Expand Down
31 changes: 30 additions & 1 deletion core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ CounterPtr MetricsRecord::CreateCounter(const std::string& name) {
}

IntGaugePtr MetricsRecord::CreateIntGauge(const std::string& name) {
IntGaugePtr gaugePtr = std::make_shared<Gauge<uint64_t>>(name);
IntGaugePtr gaugePtr = std::make_shared<IntGauge>(name);
mIntGauges.emplace_back(gaugePtr);
return gaugePtr;
}
Expand Down Expand Up @@ -134,6 +134,21 @@ const MetricsRecord* MetricsRecordRef::operator->() const {
return mMetrics;
}

void MetricsRecordRef::AddLabels(MetricLabels&& labels) {
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
mMetrics->GetLabels()->insert(mMetrics->GetLabels()->end(), labels.begin(), labels.end());
}

#ifdef APSARA_UNIT_TEST_MAIN
bool MetricsRecordRef::HasLabel(const std::string& key, const std::string& value) const {
for (auto item : *(mMetrics->GetLabels())) {
if (item.first == key && item.second == value) {
return true;
}
}
return false;
}
#endif

// ReentrantMetricsRecord相关操作可以无锁,因为mCounters、mGauges只在初始化时会添加内容,后续只允许Get操作
void ReentrantMetricsRecord::Init(MetricLabels& labels, std::unordered_map<std::string, MetricType>& metricKeys) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels));
Expand Down Expand Up @@ -220,6 +235,20 @@ void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref,
mHead = cur;
}

void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这一对能在PrepareMetricsRecordRef中复用吗?让他们建立点联系,一看就知道API选用哪个

MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels),
std::make_shared<DynamicMetricLabels>(dynamicLabels));
ref.SetMetricsRecord(cur);
}

void WriteMetrics::CommitMetricsRecordRef(MetricsRecordRef& ref) {
std::lock_guard<std::mutex> lock(mMutex);
ref.mMetrics->SetNext(mHead);
mHead = ref.mMetrics;
}

MetricsRecord* WriteMetrics::GetHead() {
std::lock_guard<std::mutex> lock(mMutex);
return mHead;
Expand Down
21 changes: 18 additions & 3 deletions core/monitor/LogtailMetric.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/

#pragma once

#include <atomic>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved

#include "LoongCollectorMetricTypes.h"
#include "common/Lock.h"
#include "monitor/LoongCollectorMetricTypes.h"
#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {
Expand Down Expand Up @@ -53,6 +57,8 @@ class MetricsRecord {
};

class MetricsRecordRef {
friend class WriteMetrics;

private:
MetricsRecord* mMetrics = nullptr;

Expand All @@ -70,6 +76,11 @@ class MetricsRecordRef {
IntGaugePtr CreateIntGauge(const std::string& name);
DoubleGaugePtr CreateDoubleGauge(const std::string& name);
const MetricsRecord* operator->() const;
// this is not thread-safe, and should be only used before WriteMetrics::CommitMetricsRecordRef
void AddLabels(MetricLabels&& labels);
#ifdef APSARA_UNIT_TEST_MAIN
bool HasLabel(const std::string& key, const std::string& value) const;
#endif
};

class ReentrantMetricsRecord {
Expand Down Expand Up @@ -110,10 +121,13 @@ class WriteMetrics {
const std::string& configName,
const std::string& pluginType,
const std::string& pluginID,
const std::string& nodeID,
const std::string& nodeID,
const std::string& childNodeID,
MetricLabels& labels);
void PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void
PrepareMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void CreateMetricsRecordRef(MetricsRecordRef& ref, MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});
void CommitMetricsRecordRef(MetricsRecordRef& ref);
MetricsRecord* DoSnapshot();


Expand Down Expand Up @@ -144,4 +158,5 @@ class ReadMetrics {
friend class ILogtailMetricUnittest;
#endif
};

} // namespace logtail
31 changes: 23 additions & 8 deletions core/monitor/LoongCollectorMetricTypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
*/

#pragma once

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "common/Lock.h"
#include "protobuf/sls/sls_logs.pb.h"


namespace logtail {

enum class MetricType {
Expand All @@ -45,25 +48,37 @@ class Counter {

template <typename T>
class Gauge {
private:
std::string mName;
std::atomic<T> mVal;

public:
Gauge(const std::string& name, T val = 0) : mName(name), mVal(val) {}
~Gauge() = default;

T GetValue() const { return mVal.load(); }
const std::string& GetName() const { return mName; }
void Set(T val) { mVal.store(val); }
Gauge* Collect() { return new Gauge<T>(mName, mVal.load()); }

protected:
std::string mName;
std::atomic<T> mVal;
};

class IntGauge : public Gauge<uint64_t> {
public:
IntGauge(const std::string& name, uint64_t val = 0) : Gauge<uint64_t>(name, val) {}
~IntGauge() = default;

IntGauge* Collect() { return new IntGauge(mName, mVal.load()); }
void Add(uint64_t val) { mVal.fetch_add(val); }
void Sub(uint64_t val) { mVal.fetch_sub(val); }
};

using CounterPtr = std::shared_ptr<Counter>;
using IntGaugePtr = std::shared_ptr<Gauge<uint64_t>>;
using IntGaugePtr = std::shared_ptr<IntGauge>;
using DoubleGaugePtr = std::shared_ptr<Gauge<double>>;

using MetricLabels = std::vector<std::pair<std::string, std::string>>;
using MetricLabelsPtr = std::shared_ptr<MetricLabels>;
using DynamicMetricLabels = std::vector<std::pair<std::string, std::function<std::string()>>>;
using DynamicMetricLabelsPtr = std::shared_ptr<DynamicMetricLabels>;

} // namespace logtail
} // namespace logtail
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ const std::string METRIC_LABEL_PLUGIN_ID = "plugin_id";
const std::string METRIC_LABEL_NODE_ID = "node_id";
const std::string METRIC_LABEL_CHILD_NODE_ID = "child_node_id";

const std::string METRIC_LABEL_KEY_COMPONENT_NAME = "component_name";
const std::string METRIC_LABEL_KEY_QUEUE_TYPE = "queue_type";
const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG = "is_exactly_once";
const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID = "flusher_node_id";

// input file plugin labels
const std::string METRIC_LABEL_FILE_DEV = "file_dev";
const std::string METRIC_LABEL_FILE_INODE = "file_inode";
Expand Down
5 changes: 5 additions & 0 deletions core/monitor/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ extern const std::string METRIC_LABEL_FILE_DEV;
extern const std::string METRIC_LABEL_FILE_INODE;
extern const std::string METRIC_LABEL_FILE_NAME;

extern const std::string METRIC_LABEL_KEY_COMPONENT_NAME;
extern const std::string METRIC_LABEL_KEY_QUEUE_TYPE;
extern const std::string METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG;
extern const std::string METRIC_LABEL_KEY_FLUSHER_NODE_ID;

// input file metrics
extern const std::string METRIC_INPUT_RECORDS_TOTAL;
extern const std::string METRIC_INPUT_RECORDS_SIZE_BYTES;
Expand Down
13 changes: 7 additions & 6 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
#include <cstdint>
#include <utility>

#include "pipeline/batch/TimeoutFlushManager.h"
#include "common/Flags.h"
#include "common/ParamExtractor.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "pipeline/batch/TimeoutFlushManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

Expand Down Expand Up @@ -288,10 +288,11 @@ bool Pipeline::Init(PipelineConfig&& config) {
? ProcessQueueManager::sMaxPriority
: mContext.GetGlobalConfig().mProcessPriority - 1;
if (isInputSupportAck) {
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(mContext.GetProcessQueueKey(), priority);
ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(
mContext.GetProcessQueueKey(), priority, mContext);
} else {
ProcessQueueManager::GetInstance()->CreateOrUpdateCircularQueue(
mContext.GetProcessQueueKey(), priority, 1024);
mContext.GetProcessQueueKey(), priority, 1024, mContext);
}


Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/FlusherInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
namespace logtail {
bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline) {
mPlugin->SetContext(context);
mPlugin->SetNodeID(NodeID());
mPlugin->SetMetricsRecordRef(Name(), PluginID(), NodeID(), ChildNodeID());
if (!mPlugin->Init(config, optionalGoPipeline)) {
return false;
Expand Down
3 changes: 3 additions & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ class Flusher : public Plugin {
virtual SinkType GetSinkType() { return SinkType::NONE; }

QueueKey GetQueueKey() const { return mQueueKey; }
void SetNodeID(const std::string& nodeID) { mNodeID = nodeID; }
const std::string& GetNodeID() const { return mNodeID; }

protected:
void GenerateQueueKey(const std::string& target);
bool PushToQueue(std::unique_ptr<SenderQueueItem>&& item, uint32_t retryTimes = 500);
void DealSenderQueueItemAfterSend(SenderQueueItem* item, bool keep);

QueueKey mQueueKey;
std::string mNodeID;

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherInstanceUnittest;
Expand Down
28 changes: 27 additions & 1 deletion core/pipeline/queue/BoundedProcessQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,31 @@ using namespace std;

namespace logtail {

BoundedProcessQueue::BoundedProcessQueue(
size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const PipelineContext& ctx)
: QueueInterface(key, cap, ctx),
BoundedQueueInterface(key, cap, low, high, ctx),
ProcessQueueInterface(key, cap, priority, ctx) {
if (ctx.IsExactlyOnceEnabled()) {
mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_EXACTLY_ONCE_FLAG, "true"}});
}
WriteMetrics::GetInstance()->CommitMetricsRecordRef(mMetricsRecordRef);
}

bool BoundedProcessQueue::Push(unique_ptr<ProcessQueueItem>&& item) {
if (!IsValidToPush()) {
return false;
}
item->mEnqueTime = chrono::system_clock::now();
auto size = item->mEventGroup.DataSize();
mQueue.push(std::move(item));
ChangeStateIfNeededAfterPush();

mInItemsCnt->Add(1);
mInItemDataSizeBytes->Add(size);
mQueueSize->Set(Size());
mQueueDataSizeByte->Add(size);
mValidToPushFlag->Set(IsValidToPush());
return true;
}

Expand All @@ -36,10 +55,17 @@ bool BoundedProcessQueue::Pop(unique_ptr<ProcessQueueItem>& item) {
if (ChangeStateIfNeededAfterPop()) {
GiveFeedback();
}

mOutItemsCnt->Add(1);
mTotalDelayMs->Add(
chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - item->mEnqueTime).count());
mQueueSize->Set(Size());
mQueueDataSizeByte->Sub(item->mEventGroup.DataSize());
mValidToPushFlag->Set(IsValidToPush());
return true;
}

void BoundedProcessQueue::SetUpStreamFeedbacks(std::vector<FeedbackInterface*>&& feedbacks) {
void BoundedProcessQueue::SetUpStreamFeedbacks(vector<FeedbackInterface*>&& feedbacks) {
mUpStreamFeedbacks.clear();
for (auto& item : feedbacks) {
if (item == nullptr) {
Expand Down
5 changes: 1 addition & 4 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ namespace logtail {
class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<ProcessQueueItem>>,
public ProcessQueueInterface {
public:
BoundedProcessQueue(size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const std::string& config)
: QueueInterface(key, cap),
BoundedQueueInterface(key, cap, low, high),
ProcessQueueInterface(key, cap, priority, config) {}
BoundedProcessQueue(size_t cap, size_t low, size_t high, int64_t key, uint32_t priority, const PipelineContext& ctx);

bool Push(std::unique_ptr<ProcessQueueItem>&& item) override;
bool Pop(std::unique_ptr<ProcessQueueItem>& item) override;
Expand Down
9 changes: 7 additions & 2 deletions core/pipeline/queue/BoundedQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ namespace logtail {
template <typename T>
class BoundedQueueInterface : virtual public QueueInterface<T> {
public:
BoundedQueueInterface(QueueKey key, size_t cap, size_t low, size_t high)
: QueueInterface<T>(key, cap), mLowWatermark(low), mHighWatermark(high) {}
BoundedQueueInterface(QueueKey key, size_t cap, size_t low, size_t high, const PipelineContext& ctx)
: QueueInterface<T>(key, cap, ctx), mLowWatermark(low), mHighWatermark(high) {
this->mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_QUEUE_TYPE, "bounded"}});
mValidToPushFlag = this->mMetricsRecordRef.CreateIntGauge("valid_to_push");
}
virtual ~BoundedQueueInterface() = default;

BoundedQueueInterface(const BoundedQueueInterface& que) = delete;
Expand Down Expand Up @@ -57,6 +60,8 @@ class BoundedQueueInterface : virtual public QueueInterface<T> {
mValidToPush = true;
}

IntGaugePtr mValidToPushFlag;

private:
virtual void GiveFeedback() const = 0;
virtual size_t Size() const = 0;
Expand Down
9 changes: 9 additions & 0 deletions core/pipeline/queue/BoundedSenderQueueInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ namespace logtail {

FeedbackInterface* BoundedSenderQueueInterface::sFeedback = nullptr;

BoundedSenderQueueInterface::BoundedSenderQueueInterface(
size_t cap, size_t low, size_t high, QueueKey key, const string& flusherId, const PipelineContext& ctx)
: QueueInterface(key, cap, ctx), BoundedQueueInterface<std::unique_ptr<SenderQueueItem>>(key, cap, low, high, ctx) {
mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_COMPONENT_NAME, "sender_queue"}});
mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}});
mExtraBufferSize = mMetricsRecordRef.CreateIntGauge("extra_buffer_size");
mExtraBufferDataSizeBytes = mMetricsRecordRef.CreateIntGauge("extra_buffer_data_size_bytes");
}

void BoundedSenderQueueInterface::SetFeedback(FeedbackInterface* feedback) {
if (feedback == nullptr) {
// should not happen
Expand Down
Loading
Loading