diff --git a/pkg/config/global_config.go b/pkg/config/global_config.go index 11ebef8197..64b6b74868 100644 --- a/pkg/config/global_config.go +++ b/pkg/config/global_config.go @@ -42,6 +42,8 @@ type GlobalConfig struct { // Go Input to Native Processor GoInputToNativeProcessor bool + PushNativeMaxCachedSize int + PushNativeTimeoutMs int } // LogtailGlobalConfig is the singleton instance of GlobalConfig. @@ -60,6 +62,8 @@ func newGlobalConfig() (cfg GlobalConfig) { DefaultLogGroupQueueSize: 4, LogtailSysConfDir: ".", DelayStopSec: 300, + PushNativeMaxCachedSize: 100, + PushNativeTimeoutMs: 1000, } return } diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index 26037fa5b5..d19dcefe21 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -147,9 +147,8 @@ func (p *pluginv1Runner) addMetricInput(pluginMeta *pipeline.PluginMeta, input p wrapper.LogsChan = p.LogsChan wrapper.LatencyMetric = p.LogstoreConfig.Statistics.CollecLatencytMetric if p.LogstoreConfig.GlobalConfig.GoInputToNativeProcessor { - // TODO : give config for MaxCachedSize and PushNativeTimeout - wrapper.MaxCachedSize = 1000 - wrapper.PushNativeTimeout = time.Duration(1000) * time.Millisecond + wrapper.MaxCachedSize = p.LogstoreConfig.GlobalConfig.PushNativeMaxCachedSize + wrapper.PushNativeTimeout = time.Duration(p.LogstoreConfig.GlobalConfig.PushNativeTimeoutMs) * time.Millisecond } p.MetricPlugins = append(p.MetricPlugins, &wrapper) return wrapper.Init(pluginMeta, inputInterval) @@ -161,9 +160,8 @@ func (p *pluginv1Runner) addServiceInput(pluginMeta *pipeline.PluginMeta, input wrapper.Input = input wrapper.LogsChan = p.LogsChan if p.LogstoreConfig.GlobalConfig.GoInputToNativeProcessor { - // TODO : give config for MaxCachedSize and PushNativeTimeout - wrapper.MaxCachedSize = 1000 - wrapper.PushNativeTimeout = time.Duration(1000) * time.Millisecond + wrapper.MaxCachedSize = p.LogstoreConfig.GlobalConfig.PushNativeMaxCachedSize + wrapper.PushNativeTimeout = time.Duration(p.LogstoreConfig.GlobalConfig.PushNativeTimeoutMs) * time.Millisecond } p.ServicePlugins = append(p.ServicePlugins, &wrapper) return wrapper.Init(pluginMeta) diff --git a/plugins/input/goprofile/golang_profile.go b/plugins/input/goprofile/golang_profile.go index 0b2353b62a..818c9d8382 100644 --- a/plugins/input/goprofile/golang_profile.go +++ b/plugins/input/goprofile/golang_profile.go @@ -73,5 +73,5 @@ func init() { } func (g *GoProfile) GetMode() pipeline.InputModeType { - return pipeline.PUSH + return pipeline.PULL } diff --git a/plugins/input/mysql/mysql.go b/plugins/input/mysql/mysql.go index e1a11959a6..07565d4898 100644 --- a/plugins/input/mysql/mysql.go +++ b/plugins/input/mysql/mysql.go @@ -465,5 +465,5 @@ func init() { } func (m *Mysql) GetMode() pipeline.InputModeType { - return pipeline.PUSH + return pipeline.PULL } diff --git a/plugins/input/redis/input_redis.go b/plugins/input/redis/input_redis.go index 518c81a8db..b09b25cedc 100644 --- a/plugins/input/redis/input_redis.go +++ b/plugins/input/redis/input_redis.go @@ -275,5 +275,5 @@ func init() { } func (r *InputRedis) GetMode() pipeline.InputModeType { - return pipeline.PUSH + return pipeline.PULL }