Skip to content

Commit

Permalink
support user define PushNativeProcessMaxCachedSize
Browse files Browse the repository at this point in the history
  • Loading branch information
Assassin718 committed Sep 23, 2024
1 parent 59db229 commit b675236
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 9 deletions.
4 changes: 4 additions & 0 deletions pkg/config/global_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type GlobalConfig struct {

// Go Input to Native Processor
GoInputToNativeProcessor bool
PushNativeMaxCachedSize int
PushNativeTimeoutMs int
}

// LogtailGlobalConfig is the singleton instance of GlobalConfig.
Expand All @@ -60,6 +62,8 @@ func newGlobalConfig() (cfg GlobalConfig) {
DefaultLogGroupQueueSize: 4,
LogtailSysConfDir: ".",
DelayStopSec: 300,
PushNativeMaxCachedSize: 100,
PushNativeTimeoutMs: 1000,
}
return
}
10 changes: 4 additions & 6 deletions pluginmanager/plugin_runner_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ func (p *pluginv1Runner) addMetricInput(pluginMeta *pipeline.PluginMeta, input p
wrapper.LogsChan = p.LogsChan
wrapper.LatencyMetric = p.LogstoreConfig.Statistics.CollecLatencytMetric
if p.LogstoreConfig.GlobalConfig.GoInputToNativeProcessor {
// TODO : give config for MaxCachedSize and PushNativeTimeout
wrapper.MaxCachedSize = 1000
wrapper.PushNativeTimeout = time.Duration(1000) * time.Millisecond
wrapper.MaxCachedSize = p.LogstoreConfig.GlobalConfig.PushNativeMaxCachedSize
wrapper.PushNativeTimeout = time.Duration(p.LogstoreConfig.GlobalConfig.PushNativeTimeoutMs) * time.Millisecond
}
p.MetricPlugins = append(p.MetricPlugins, &wrapper)
return wrapper.Init(pluginMeta, inputInterval)
Expand All @@ -161,9 +160,8 @@ func (p *pluginv1Runner) addServiceInput(pluginMeta *pipeline.PluginMeta, input
wrapper.Input = input
wrapper.LogsChan = p.LogsChan
if p.LogstoreConfig.GlobalConfig.GoInputToNativeProcessor {
// TODO : give config for MaxCachedSize and PushNativeTimeout
wrapper.MaxCachedSize = 1000
wrapper.PushNativeTimeout = time.Duration(1000) * time.Millisecond
wrapper.MaxCachedSize = p.LogstoreConfig.GlobalConfig.PushNativeMaxCachedSize
wrapper.PushNativeTimeout = time.Duration(p.LogstoreConfig.GlobalConfig.PushNativeTimeoutMs) * time.Millisecond
}
p.ServicePlugins = append(p.ServicePlugins, &wrapper)
return wrapper.Init(pluginMeta)
Expand Down
2 changes: 1 addition & 1 deletion plugins/input/goprofile/golang_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,5 @@ func init() {
}

func (g *GoProfile) GetMode() pipeline.InputModeType {
return pipeline.PUSH
return pipeline.PULL
}
2 changes: 1 addition & 1 deletion plugins/input/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,5 +465,5 @@ func init() {
}

func (m *Mysql) GetMode() pipeline.InputModeType {
return pipeline.PUSH
return pipeline.PULL
}
2 changes: 1 addition & 1 deletion plugins/input/redis/input_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,5 +275,5 @@ func init() {
}

func (r *InputRedis) GetMode() pipeline.InputModeType {
return pipeline.PUSH
return pipeline.PULL
}

0 comments on commit b675236

Please sign in to comment.