Skip to content

Commit

Permalink
fix plugin id
Browse files Browse the repository at this point in the history
  • Loading branch information
shunjiazhu committed Sep 24, 2024
1 parent 508ba88 commit e76377e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 25 deletions.
3 changes: 3 additions & 0 deletions pluginmanager/context_imp.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (p *ContextImp) GetRuntimeContext() context.Context {
return p.ctx
}

// GetExtension returns the extension by name.
// name can be a plugin type with instance name, i.e., ext_basicauth/1, ext_basicauth/shared
// name can be also just a plugin type, i.e., ext_basicauth
func (p *ContextImp) GetExtension(name string, cfg any) (pipeline.Extension, error) {
if p.logstoreC == nil || p.logstoreC.PluginRunner == nil {
return nil, fmt.Errorf("pipeline not initialized")
Expand Down
64 changes: 41 additions & 23 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,14 +853,47 @@ func applyPluginConfig(plugin interface{}, pluginConfig interface{}) error {
return err
}

// Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority.
// Rule: pluginTypeWithID=pluginType/{optional:instance_name}/pluginID#pluginPriority.
func getPluginType(pluginTypeWithID string) string {
if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 {
return pluginTypeWithID[:ids]
}
return pluginTypeWithID
}

func getPluginTypeAndName(pluginTypeWithID string) string {
if idx := strings.LastIndex(pluginTypeWithID, "/"); idx != -1 {
return pluginTypeWithID[:idx]
}
return pluginTypeWithID
}

func getPluginID(pluginTypeWithID string) string {
if lastIdx := strings.LastIndexByte(pluginTypeWithID, '/'); lastIdx != -1 {
return pluginTypeWithID[lastIdx+1:]
}
return ""
}

func isPluginTypeWithID(pluginTypeWithID string) bool {
if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 {
return true
}
return false
}

func GetPluginPriority(pluginTypeWithID string) int {
if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 {
val, err := strconv.Atoi(pluginTypeWithID[idx+1:])
if err != nil {
return 0
}
return val
}
return 0
}

// pluginTypeWithID=pluginType/{optional:instance_name}/pluginID#pluginPriority.
func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool, lastOne bool) *pipeline.PluginMeta {
nodeID := ""
childNodeID := ""
Expand All @@ -869,17 +902,20 @@ func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool,
if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 {
pluginTypeWithID = pluginTypeWithID[:idx]
}
if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 {

if isPluginTypeWithID(pluginTypeWithID) {
if genNodeID {
nodeID, childNodeID = lc.genNodeID(lastOne)
}
if pluginID, err := strconv.ParseInt(pluginTypeWithID[ids+1:], 10, 32); err == nil {

if pluginID, err := strconv.ParseInt(getPluginID(pluginTypeWithID), 10, 32); err == nil {
atomic.StoreInt32(&lc.pluginID, int32(pluginID))
}

return &pipeline.PluginMeta{
PluginTypeWithID: pluginTypeWithID,
PluginType: pluginTypeWithID[:ids],
PluginID: pluginTypeWithID[ids+1:],
PluginType: getPluginType(pluginTypeWithID),
PluginID: getPluginID(pluginTypeWithID),
NodeID: nodeID,
ChildNodeID: childNodeID,
}
Expand All @@ -900,24 +936,6 @@ func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool,
}
}

func isPluginTypeWithID(pluginTypeWithID string) bool {
if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 {
return true
}
return false
}

func GetPluginPriority(pluginTypeWithID string) int {
if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 {
val, err := strconv.Atoi(pluginTypeWithID[idx+1:])
if err != nil {
return 0
}
return val
}
return 0
}

func (lc *LogstoreConfig) genPluginID() string {
return fmt.Sprintf("%v", atomic.AddInt32(&lc.pluginID, 1))
}
Expand Down
25 changes: 25 additions & 0 deletions pluginmanager/logstore_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,28 @@ func Test_genPluginMeta(t *testing.T) {
assert.Regexp(t, `-1`, result.ChildNodeID)
}
}

func Test_getPluginTypeWithID(t *testing.T) {
{
input := "ext_basicauth/123"
assert.Equal(t, "ext_basicauth", getPluginType(input))
assert.Equal(t, "123", getPluginID(input))
assert.Equal(t, "ext_basicauth", getPluginTypeAndName(input))
assert.Equal(t, false, isPluginTypeWithID(input))

}
{
input := "ext_basicauth"
assert.Equal(t, "ext_basicauth", getPluginType(input))
assert.Equal(t, "", getPluginID(input))
assert.Equal(t, "ext_basicauth", getPluginTypeAndName(input))
assert.Equal(t, false, isPluginTypeWithID(input))
}
{
input := "ext_basicauth/name/123"
assert.Equal(t, "ext_basicauth", getPluginType(input))
assert.Equal(t, "123", getPluginID(input))
assert.Equal(t, "ext_basicauth/name", getPluginTypeAndName(input))
assert.Equal(t, true, isPluginTypeWithID(input))
}
}
2 changes: 1 addition & 1 deletion pluginmanager/plugin_runner_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (p *pluginv1Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category plu
}
case pluginExtension:
if extension, ok := plugin.(pipeline.Extension); ok {
return p.addExtension(pluginMeta.PluginTypeWithID, extension)
return p.addExtension(getPluginTypeAndName(pluginMeta.PluginTypeWithID), extension)
}
default:
return pluginCategoryUndefinedError(category)
Expand Down
2 changes: 1 addition & 1 deletion pluginmanager/plugin_runner_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *pluginv2Runner) AddPlugin(pluginMeta *pipeline.PluginMeta, category plu
}
case pluginExtension:
if extension, ok := plugin.(pipeline.Extension); ok {
return p.addExtension(pluginMeta.PluginTypeWithID, extension)
return p.addExtension(getPluginTypeAndName(pluginMeta.PluginTypeWithID), extension)
}
default:
return pluginCategoryUndefinedError(category)
Expand Down

0 comments on commit e76377e

Please sign in to comment.