-
Notifications
You must be signed in to change notification settings - Fork 389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support Go input to Cpp pipeline #1715
Conversation
core/go_pipeline/LogtailPlugin.cpp
Outdated
} | ||
} | ||
break; | ||
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个PR会把Metric也处理吗,还是只是给出定义,后续补齐?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metric和Trace只要能转成PipelineEventGroup,后续都可以用现有流水线处理了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
转换也在这个PR里补齐吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp这边的metric event暂时只支持untyped single value,go的multivalue以及typed value都无法转换
core/go_pipeline/LogtailPlugin.cpp
Outdated
} | ||
} | ||
break; | ||
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metric和Trace只要能转成PipelineEventGroup,后续都可以用现有流水线处理了
core/go_pipeline/LogtailPlugin.cpp
Outdated
} | ||
} | ||
break; | ||
case sls_logs::PipelineEventGroup::EventType::PipelineEventGroup_EventType_METRIC: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
转换也在这个PR里补齐吧
@@ -0,0 +1,85 @@ | |||
syntax = "proto2"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这块有测试pb 序列化的开销和反序列化开销吗?另外有没有对比一些其他协议的性能测试,比如flatbuffer,arrow。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个之前有测试过了,pb序列化和大小都是比较有优势的,但是反序列稍微慢些
pluginmanager/plugin_runner_v2.go
Outdated
state: input, | ||
interval: wrapper.Interval * time.Millisecond, | ||
state: &wrapper, | ||
interval: wrapper.Interval, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为什么少了time.Millisecond
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里wrapper.Interval本来就是乘过time.Millisecond的,这里再乘一个Millisecond运行过程中不对,应该是之前写错了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个确定是写错了吗,我们应该有业务用MetricInputV2 的,好像没反馈 interval 不对呀?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里wrapper.Interval是由metricWrapperV2.Init初始化的,这个Init函数输入参数有一个int类型的inputInterval,在函数中对inputInterval乘time.Millisecond赋给了wrapper.Interval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好的,这个好像是近期引入的 bug,我们内部会延后一点时间和社区 main 同步,所以上一个版本还没问题
@@ -59,6 +59,11 @@ struct containerMeta{ | |||
char** envsVal; | |||
}; | |||
|
|||
struct loadGoPipelineResp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func GetPipelineMetrics() *C.PluginMetrics也要调整吗?改成 pb协议到core。
@@ -51,11 +52,11 @@ class Pipeline { | |||
PipelineContext& GetContext() const { return mContext; } | |||
const Json::Value& GetConfig() const { return *mConfig; } | |||
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; } | |||
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); } | |||
bool IsFlushingThroughGoPipeline() const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里是因为当流水线配置为go input -> cpp processor -> go processor/aggregator/flusher时,在pipeline::Init()中将go的配置全部添加到了mGoPipelineWithInput,此时mGoPipelineWithoutInput为空,因此这里会返回false,导致cpp无法向go发送数据
pluginmanager/plugin_runner_v2.go
Outdated
@@ -71,6 +73,7 @@ func (p *pluginv2Runner) Init(inputQueueSize int, flushQueueSize int) error { | |||
p.ProcessPipeContext = helper.NewGroupedPipelineConext() | |||
p.AggregatePipeContext = helper.NewObservePipelineConext(flushQueueSize) | |||
p.FlushPipeContext = helper.NewNoopPipelineConext() | |||
p.NativeInputPipeContext = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
空指针很危险,这里还是初始化一个默认的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NativeProcessorPipeContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里InputMode只有后面具体分析每个Input的类型时才能确定下来,Init在这个过程之前,但是这个Context又必须传入InputMode,不太好改
for { | ||
if isValidToPushNativeProcessQueue { | ||
select { | ||
case <-wrapper.timer.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不用timer,直接 <-time.After
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
benchmark时直接timer.After会有比较大的定时器创建开销
defer close(wrapper.ShutdownCachedChan) | ||
|
||
for { | ||
if isValidToPushNativeProcessQueue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这块逻辑有点乱
pushNativeProcessQueue是否可以是阻塞的,无限重试的,这样外面不需要复杂的处理了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里因为是直接调用cpp的PushQueue接口,cpp的ProcessQueueManager::PushQueue不是阻塞的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里主要逻辑是:检测到ProcessQueue水位高时会停止监听LogCachedChan,以阻塞Input;直到检测到ProcessQueue水位低后才重新监听LogCachedChan,使Input运行
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isValidToPushNativeProcessQueue这个其实是表示的上次push是否成功?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是的
} | ||
|
||
func (wrapper *MetricWrapperV1) pushNativeProcessQueue(retryCnt int) bool { | ||
if len(wrapper.pbBuffer) == 0 && len(wrapper.eventCached) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里为什么有两个buffer?
我理解应该只需要有event的buffer?每次发送前转成pb之后,就直接发送了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pbBuffer是用来放pb反序列化后的结果,这里是尽量希望内存复用
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
并且如果向cpp发送失败,需要保存上次已经反序列化后的结果,否则会重新反序列化
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
应该只有pb的buffer就够了?从channel里弹出event时,就序列化放入到pbbuffer中,似乎不需要再对event、tag和context再做一层buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 因为序列化的对象是PipelineEventGroup,如果要将每次event序列化后的结果正确拼接到PipelineEventGroup序列化结果之后实现比较复杂
- 如果不保留event、tag、context的buffer的话,在出现tag/context不一致再加上发送失败时需要检查更多的边界条件,这里暂时是比较简单的实现
- 这里优化的话能减少一小部分内存占用,之后可以尝试
已完成
待完成