From e76377e657e7d59a975f0b25401711da4c123f90 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Tue, 24 Sep 2024 19:58:35 +0800 Subject: [PATCH] fix plugin id --- pluginmanager/context_imp.go | 3 ++ pluginmanager/logstore_config.go | 64 +++++++++++++++++---------- pluginmanager/logstore_config_test.go | 25 +++++++++++ pluginmanager/plugin_runner_v1.go | 2 +- pluginmanager/plugin_runner_v2.go | 2 +- 5 files changed, 71 insertions(+), 25 deletions(-) diff --git a/pluginmanager/context_imp.go b/pluginmanager/context_imp.go index 24eb046759..45f9f6bdc0 100644 --- a/pluginmanager/context_imp.go +++ b/pluginmanager/context_imp.go @@ -43,6 +43,9 @@ func (p *ContextImp) GetRuntimeContext() context.Context { return p.ctx } +// GetExtension returns the extension by name. +// name can be a plugin type with instance name, i.e., ext_basicauth/1, ext_basicauth/shared +// name can be also just a plugin type, i.e., ext_basicauth func (p *ContextImp) GetExtension(name string, cfg any) (pipeline.Extension, error) { if p.logstoreC == nil || p.logstoreC.PluginRunner == nil { return nil, fmt.Errorf("pipeline not initialized") diff --git a/pluginmanager/logstore_config.go b/pluginmanager/logstore_config.go index 3afbad7bf6..e847f5daa9 100644 --- a/pluginmanager/logstore_config.go +++ b/pluginmanager/logstore_config.go @@ -853,7 +853,7 @@ func applyPluginConfig(plugin interface{}, pluginConfig interface{}) error { return err } -// Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority. +// Rule: pluginTypeWithID=pluginType/{optional:instance_name}/pluginID#pluginPriority. func getPluginType(pluginTypeWithID string) string { if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 { return pluginTypeWithID[:ids] @@ -861,6 +861,39 @@ func getPluginType(pluginTypeWithID string) string { return pluginTypeWithID } +func getPluginTypeAndName(pluginTypeWithID string) string { + if idx := strings.LastIndex(pluginTypeWithID, "/"); idx != -1 { + return pluginTypeWithID[:idx] + } + return pluginTypeWithID +} + +func getPluginID(pluginTypeWithID string) string { + if lastIdx := strings.LastIndexByte(pluginTypeWithID, '/'); lastIdx != -1 { + return pluginTypeWithID[lastIdx+1:] + } + return "" +} + +func isPluginTypeWithID(pluginTypeWithID string) bool { + if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 { + return true + } + return false +} + +func GetPluginPriority(pluginTypeWithID string) int { + if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 { + val, err := strconv.Atoi(pluginTypeWithID[idx+1:]) + if err != nil { + return 0 + } + return val + } + return 0 +} + +// pluginTypeWithID=pluginType/{optional:instance_name}/pluginID#pluginPriority. func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool, lastOne bool) *pipeline.PluginMeta { nodeID := "" childNodeID := "" @@ -869,17 +902,20 @@ func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool, if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 { pluginTypeWithID = pluginTypeWithID[:idx] } - if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 { + + if isPluginTypeWithID(pluginTypeWithID) { if genNodeID { nodeID, childNodeID = lc.genNodeID(lastOne) } - if pluginID, err := strconv.ParseInt(pluginTypeWithID[ids+1:], 10, 32); err == nil { + + if pluginID, err := strconv.ParseInt(getPluginID(pluginTypeWithID), 10, 32); err == nil { atomic.StoreInt32(&lc.pluginID, int32(pluginID)) } + return &pipeline.PluginMeta{ PluginTypeWithID: pluginTypeWithID, - PluginType: pluginTypeWithID[:ids], - PluginID: pluginTypeWithID[ids+1:], + PluginType: getPluginType(pluginTypeWithID), + PluginID: getPluginID(pluginTypeWithID), NodeID: nodeID, ChildNodeID: childNodeID, } @@ -900,24 +936,6 @@ func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool, } } -func isPluginTypeWithID(pluginTypeWithID string) bool { - if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 { - return true - } - return false -} - -func GetPluginPriority(pluginTypeWithID string) int { - if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 { - val, err := strconv.Atoi(pluginTypeWithID[idx+1:]) - if err != nil { - return 0 - } - return val - } - return 0 -} - func (lc *LogstoreConfig) genPluginID() string { return fmt.Sprintf("%v", atomic.AddInt32(&lc.pluginID, 1)) } diff --git a/pluginmanager/logstore_config_test.go b/pluginmanager/logstore_config_test.go index 01675e3fc7..56c2bf8d79 100644 --- a/pluginmanager/logstore_config_test.go +++ b/pluginmanager/logstore_config_test.go @@ -575,3 +575,28 @@ func Test_genPluginMeta(t *testing.T) { assert.Regexp(t, `-1`, result.ChildNodeID) } } + +func Test_getPluginTypeWithID(t *testing.T) { + { + input := "ext_basicauth/123" + assert.Equal(t, "ext_basicauth", getPluginType(input)) + assert.Equal(t, "123", getPluginID(input)) + assert.Equal(t, "ext_basicauth", getPluginTypeAndName(input)) + assert.Equal(t, false, isPluginTypeWithID(input)) + + } + { + input := "ext_basicauth" + assert.Equal(t, "ext_basicauth", getPluginType(input)) + assert.Equal(t, "", getPluginID(input)) + assert.Equal(t, "ext_basicauth", getPluginTypeAndName(input)) + assert.Equal(t, false, isPluginTypeWithID(input)) + } + { + input := "ext_basicauth/name/123" + assert.Equal(t, "ext_basicauth", getPluginType(input)) + assert.Equal(t, "123", getPluginID(input)) + assert.Equal(t, "ext_basicauth/name", getPluginTypeAndName(input)) + assert.Equal(t, true, isPluginTypeWithID(input)) + } +} diff --git a/pluginmanager/plugin_runner_v1.go b/pluginmanager/plugin_runner_v1.go index b2d3af518b..912b076ed6 100644 --- a/pluginmanager/plugin_runner_v1.go +++ b/pluginmanager/plugin_runner_v1.go @@ -110,7 +110,7 @@ func (p *pluginv1Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category plu } case pluginExtension: if extension, ok := plugin.(pipeline.Extension); ok { - return p.addExtension(pluginMeta.PluginTypeWithID, extension) + return p.addExtension(getPluginTypeAndName(pluginMeta.PluginTypeWithID), extension) } default: return pluginCategoryUndefinedError(category) diff --git a/pluginmanager/plugin_runner_v2.go b/pluginmanager/plugin_runner_v2.go index 39d834bf29..ae7de95c26 100644 --- a/pluginmanager/plugin_runner_v2.go +++ b/pluginmanager/plugin_runner_v2.go @@ -114,7 +114,7 @@ func (p *pluginv2Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category plu } case pluginExtension: if extension, ok := plugin.(pipeline.Extension); ok { - return p.addExtension(pluginMeta.PluginTypeWithID, extension) + return p.addExtension(getPluginTypeAndName(pluginMeta.PluginTypeWithID), extension) } default: return pluginCategoryUndefinedError(category)