Skip to content

Commit

Permalink
self telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 18, 2024
1 parent 9ddb08d commit 409f575
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 15 deletions.
15 changes: 13 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/Flags.h"
#include "common/ParamExtractor.h"
#include "go_pipeline/LogtailPlugin.h"
#include "monitor/MetricConstants.h"
#include "pipeline/batch/TimeoutFlushManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "pipeline/queue/ProcessQueueManager.h"
Expand Down Expand Up @@ -66,13 +67,24 @@ void AddExtendedGlobalParamToGoPipeline(const Json::Value& extendedParams, Json:
}

bool Pipeline::Init(PipelineConfig&& config) {
mInitTime = std::chrono::system_clock::now();
mName = config.mName;
mConfig = std::move(config.mDetail);
mContext.SetConfigName(mName);
mContext.SetCreateTime(config.mCreateTime);
mContext.SetPipeline(*this);
mContext.SetIsFirstProcessorJsonFlag(config.mIsFirstProcessorJson);


WriteMetrics::GetInstance()->CreateMetricsRecordRef(mMetricsRecordRef,
{
{METRIC_LABEL_PROJECT, config.mProject},
{METRIC_LABEL_CONFIG_NAME, config.mName},
});
mMetricsRecordRef.AddLabels({{METRIC_LABEL_CONFIG_NAME, mName}});
WriteMetrics::GetInstance()->CommitMetricsRecordRef(mMetricsRecordRef);
mLoadDelayMs = mMetricsRecordRef.CreateCounter("config_load_delay_ms");

// for special treatment below
const InputFile* inputFile = nullptr;
const InputContainerStdio* inputContainerStdio = nullptr;
Expand Down Expand Up @@ -339,6 +351,7 @@ void Pipeline::Start() {
}

LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName));
mLoadDelayMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - mInitTime).count());
}

void Pipeline::Process(vector<PipelineEventGroup>& logGroupList, size_t inputIndex) {
Expand Down Expand Up @@ -477,7 +490,6 @@ bool Pipeline::LoadGoPipelines() const {
// 目前按照从后往前顺序加载,即便without成功with失败导致without残留在插件系统中,也不会有太大的问题,但最好改成原子的。
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
LOG_INFO(sLogger, ("load go pipeline", "without input")("content", content)("config", mName));
string goConfigName = GetConfigNameOfGoPipelineWithoutInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
content,
Expand All @@ -498,7 +510,6 @@ bool Pipeline::LoadGoPipelines() const {
}
if (!mGoPipelineWithInput.isNull()) {
string content = mGoPipelineWithInput.toStyledString();
LOG_INFO(sLogger, ("load go pipeline", "with input")("content", content)("config", mName));
string goConfigName = GetConfigNameOfGoPipelineWithInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
content,
Expand Down
5 changes: 5 additions & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ class Pipeline {
std::atomic_uint16_t mPluginID;
std::atomic_uint16_t mProcessingCnt;

// self-observation
mutable MetricsRecordRef mMetricsRecordRef;
std::chrono::system_clock::time_point mInitTime;
CounterPtr mLoadDelayMs;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineMock;
friend class PipelineUnittest;
Expand Down
2 changes: 1 addition & 1 deletion pluginmanager/container_config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func isCollectContainers() bool {
for _, logstoreConfig := range LogtailConfig {
if logstoreConfig.CollectingContainersMeta {
found = true
return false // exit range iteration
break
}
}
LogtailConfigLock.RUnlock()
Expand Down
19 changes: 9 additions & 10 deletions pluginmanager/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,17 +278,16 @@ func Start(configName string) error {
LogtailConfigLock.Unlock()
ToStartLogtailConfigWithoutInput = nil
return nil
} else {
// should never happen
var loadedConfigName string
if ToStartLogtailConfigWithInput != nil {
loadedConfigName = ToStartLogtailConfigWithInput.ConfigNameWithSuffix
}
if ToStartLogtailConfigWithoutInput != nil {
loadedConfigName += " " + ToStartLogtailConfigWithoutInput.ConfigNameWithSuffix
}
return fmt.Errorf("config unmatch with the loaded pipeline: given %s, expect %s", configName, loadedConfigName)
}
// should never happen
var loadedConfigName string
if ToStartLogtailConfigWithInput != nil {
loadedConfigName = ToStartLogtailConfigWithInput.ConfigNameWithSuffix
}
if ToStartLogtailConfigWithoutInput != nil {
loadedConfigName += " " + ToStartLogtailConfigWithoutInput.ConfigNameWithSuffix
}
return fmt.Errorf("config unmatch with the loaded pipeline: given %s, expect %s", configName, loadedConfigName)
}

func init() {
Expand Down
6 changes: 4 additions & 2 deletions plugins/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ func LoadAndStartMockConfig(project, logstore, configName, jsonStr string) *plug
if err != nil {
panic(err)
}
pluginmanager.Start(configName)
object, _ := pluginmanager.LogtailConfig[configName]
if err := pluginmanager.Start(configName); err != nil {
panic(err)
}
object := pluginmanager.LogtailConfig[configName]
return object
}

Expand Down

0 comments on commit 409f575

Please sign in to comment.