Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support Go input to Cpp pipeline #1715

Merged
merged 4 commits into from
Oct 31, 2024
Merged

Conversation

Assassin718
Copy link
Contributor

@Assassin718 Assassin718 commented Aug 26, 2024

已完成

  • 增加加载Go流水线的返回值信息input mode(Go input plugin类型),根据input mode创建对应的process queue(Bounded/Circular Process Queue)
  • 支持v1流水线LogEvent的转发:
    • 支持对多个LogEvent缓存后打包成PipelineEventGroup,异步发送给Cpp processor
    • 支持对缓存参数的设置:在流水线配置文件global中设置PushNativeMaxCachedSize、PushNativeTimeoutMs,分别表示最大一次性发送日志数、超时发送时间
    • input mode为PUSH情况下可根据process queue水位线阻塞input,降低输入速率
  • 支持v2流水线PipelineEventGroup转发:
    • 对每个PipelineEventGroup进行同步转发
    • 暂不支持缓存多个PipelineEventGroup、调节输入速率

待完成

  • 单元测试
  • e2e测试

}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个PR会把Metric也处理吗,还是只是给出定义,后续补齐?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric和Trace只要能转成PipelineEventGroup,后续都可以用现有流水线处理了

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

转换也在这个PR里补齐吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpp这边的metric event暂时只支持untyped single value,go的multivalue以及typed value都无法转换

core/config/PipelineConfig.cpp Outdated Show resolved Hide resolved
}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metric和Trace只要能转成PipelineEventGroup,后续都可以用现有流水线处理了

core/go_pipeline/LogtailPlugin.cpp Outdated Show resolved Hide resolved
core/go_pipeline/LogtailPlugin.cpp Outdated Show resolved Hide resolved
}
}
break;
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

转换也在这个PR里补齐吧

pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Show resolved Hide resolved
pluginmanager/plugin_wrapper_service_v1.go Outdated Show resolved Hide resolved
@Abingcbc Abingcbc changed the title [WIP] feat: add Go input to Cpp processor feat: add Go input to Cpp processor Sep 2, 2024
@Abingcbc Abingcbc changed the title feat: add Go input to Cpp processor feat: support Go input to Cpp pipeline Sep 2, 2024
@@ -0,0 +1,85 @@
syntax = "proto2";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块有测试pb 序列化的开销和反序列化开销吗?另外有没有对比一些其他协议的性能测试,比如flatbuffer,arrow。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个之前有测试过了,pb序列化和大小都是比较有优势的,但是反序列稍微慢些

pkg/pipeline/input.go Outdated Show resolved Hide resolved
pkg/pipeline/input.go Outdated Show resolved Hide resolved
state: input,
interval: wrapper.Interval * time.Millisecond,
state: &wrapper,
interval: wrapper.Interval,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么少了time.Millisecond

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里wrapper.Interval本来就是乘过time.Millisecond的,这里再乘一个Millisecond运行过程中不对,应该是之前写错了

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个确定是写错了吗,我们应该有业务用MetricInputV2 的,好像没反馈 interval 不对呀?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里wrapper.Interval是由metricWrapperV2.Init初始化的,这个Init函数输入参数有一个int类型的inputInterval,在函数中对inputInterval乘time.Millisecond赋给了wrapper.Interval

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的,这个好像是近期引入的 bug,我们内部会延后一点时间和社区 main 同步,所以上一个版本还没问题

pluginmanager/plugin_runner_v2.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_service_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_service_v1.go Outdated Show resolved Hide resolved
@@ -59,6 +59,11 @@ struct containerMeta{
char** envsVal;
};

struct loadGoPipelineResp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func GetPipelineMetrics() *C.PluginMetrics也要调整吗?改成 pb协议到core。

@@ -51,11 +52,11 @@ class Pipeline {
PipelineContext& GetContext() const { return mContext; }
const Json::Value& GetConfig() const { return *mConfig; }
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; }
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); }
bool IsFlushingThroughGoPipeline() const;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是因为当流水线配置为go input -> cpp processor -> go processor/aggregator/flusher时,在pipeline::Init()中将go的配置全部添加到了mGoPipelineWithInput,此时mGoPipelineWithoutInput为空,因此这里会返回false,导致cpp无法向go发送数据

core/go_pipeline/LogtailPlugin.cpp Outdated Show resolved Hide resolved
core/go_pipeline/LogtailPlugin.cpp Outdated Show resolved Hide resolved
core/go_pipeline/LogtailPlugin.cpp Show resolved Hide resolved
core/pipeline/Pipeline.cpp Outdated Show resolved Hide resolved
@@ -71,6 +73,7 @@ func (p *pluginv2Runner) Init(inputQueueSize int, flushQueueSize int) error {
p.ProcessPipeContext = helper.NewGroupedPipelineConext()
p.AggregatePipeContext = helper.NewObservePipelineConext(flushQueueSize)
p.FlushPipeContext = helper.NewNoopPipelineConext()
p.NativeInputPipeContext = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

空指针很危险,这里还是初始化一个默认的

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NativeProcessorPipeContext

Copy link
Contributor Author

@Assassin718 Assassin718 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里InputMode只有后面具体分析每个Input的类型时才能确定下来,Init在这个过程之前,但是这个Context又必须传入InputMode,不太好改

pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
pluginmanager/plugin_wrapper_metric_v1.go Outdated Show resolved Hide resolved
for {
if isValidToPushNativeProcessQueue {
select {
case <-wrapper.timer.C:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不用timer,直接 <-time.After

Copy link
Contributor Author

@Assassin718 Assassin718 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benchmark时直接timer.After会有比较大的定时器创建开销

defer close(wrapper.ShutdownCachedChan)

for {
if isValidToPushNativeProcessQueue {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这块逻辑有点乱
pushNativeProcessQueue是否可以是阻塞的,无限重试的,这样外面不需要复杂的处理了

Copy link
Contributor Author

@Assassin718 Assassin718 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里因为是直接调用cpp的PushQueue接口,cpp的ProcessQueueManager::PushQueue不是阻塞的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里主要逻辑是:检测到ProcessQueue水位高时会停止监听LogCachedChan,以阻塞Input;直到检测到ProcessQueue水位低后才重新监听LogCachedChan,使Input运行

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isValidToPushNativeProcessQueue这个其实是表示的上次push是否成功?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的

}

func (wrapper *MetricWrapperV1) pushNativeProcessQueue(retryCnt int) bool {
if len(wrapper.pbBuffer) == 0 && len(wrapper.eventCached) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为什么有两个buffer?
我理解应该只需要有event的buffer?每次发送前转成pb之后,就直接发送了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pbBuffer是用来放pb反序列化后的结果,这里是尽量希望内存复用

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

并且如果向cpp发送失败,需要保存上次已经反序列化后的结果,否则会重新反序列化

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

应该只有pb的buffer就够了?从channel里弹出event时,就序列化放入到pbbuffer中,似乎不需要再对event、tag和context再做一层buffer

Copy link
Contributor Author

@Assassin718 Assassin718 Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • 因为序列化的对象是PipelineEventGroup,如果要将每次event序列化后的结果正确拼接到PipelineEventGroup序列化结果之后实现比较复杂
  • 如果不保留event、tag、context的buffer的话,在出现tag/context不一致再加上发送失败时需要检查更多的边界条件,这里暂时是比较简单的实现
  • 这里优化的话能减少一小部分内存占用,之后可以尝试

@Abingcbc Abingcbc changed the base branch from main to ospp_dev October 30, 2024 03:40
@Abingcbc Abingcbc merged commit 9fefc58 into alibaba:ospp_dev Oct 31, 2024
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants