Skip to content

Commit

Permalink
Refactor InstanceID to be Comparable
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Jul 18, 2024
1 parent 18dccc3 commit 5e7d9eb
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 40 deletions.
67 changes: 47 additions & 20 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ package component // import "go.opentelemetry.io/collector/component"
import (
"context"
"errors"
"slices"
"sort"
"strings"
)

var (
Expand Down Expand Up @@ -190,24 +193,25 @@ func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config {
return f()
}

// pipelineDelim is the delimeter for internal representation of pipeline
// component IDs.
const pipelineDelim = byte(0x20)

// InstanceID uniquely identifies a component instance.
type InstanceID struct {
componentID ID
kind Kind
pipelineIDs map[ID]struct{}
pipelineIDs string // IDs encoded as a string so InstanceID is Comparable.
}

// NewInstanceID returns an ID that uniquely identifies a component.
func NewInstanceID(componentID ID, kind Kind, pipelineIDs ...ID) *InstanceID {
instanceID := InstanceID{
instanceID := &InstanceID{
componentID: componentID,
kind: kind,
pipelineIDs: make(map[ID]struct{}, len(pipelineIDs)),
}
for _, pid := range pipelineIDs {
instanceID.pipelineIDs[pid] = struct{}{}
}
return &instanceID
instanceID.addPipelines(pipelineIDs)
return instanceID
}

// ComponentID returns the ComponentID associated with this instance.
Expand All @@ -220,24 +224,47 @@ func (id *InstanceID) Kind() Kind {
return id.kind
}

// PipelineIDs returns a set of PipelineIDs associated with this instance.
func (id *InstanceID) PipelineIDs() map[ID]struct{} {
return id.pipelineIDs
// EachPipelineID calls f for each pipeline this instance is associated with. If
// f returns false it will stop iteration.
func (id *InstanceID) EachPipelineID(f func(ID) bool) {
var bs []byte
for _, b := range []byte(id.pipelineIDs) {
if b != pipelineDelim {
bs = append(bs, b)
continue
}
pipelineID := ID{}
err := pipelineID.UnmarshalText(bs)
bs = bs[:0]
if err != nil {
continue
}
if !f(pipelineID) {
break
}
}
}

// WithPipelines returns a new InstanceID updated to include the given
// pipelineIDs.
func (id *InstanceID) WithPipelines(pipelineIDs ...ID) *InstanceID {
instanceID := InstanceID{
componentID: id.ComponentID(),
kind: id.Kind(),
pipelineIDs: make(map[ID]struct{}, len(id.PipelineIDs())+len(pipelineIDs)),
instanceID := &InstanceID{
componentID: id.componentID,
kind: id.kind,
pipelineIDs: id.pipelineIDs,
}
for pid := range id.PipelineIDs() {
instanceID.pipelineIDs[pid] = struct{}{}
}
for _, pid := range pipelineIDs {
instanceID.pipelineIDs[pid] = struct{}{}
instanceID.addPipelines(pipelineIDs)
return instanceID
}

func (id *InstanceID) addPipelines(pipelineIDs []ID) {
strIDs := strings.Split(id.pipelineIDs, string(pipelineDelim))
for _, pID := range pipelineIDs {
strIDs = append(strIDs, pID.String())
}
return &instanceID
// Normalize order
sort.Strings(strIDs)
// Dedup
strIDs = slices.Compact(strIDs)
id.pipelineIDs = strings.Join(strIDs, string(pipelineDelim)) + string(pipelineDelim)
}
99 changes: 79 additions & 20 deletions component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,83 @@ func TestStabilityLevelString(t *testing.T) {

func TestInstanceID(t *testing.T) {
traces := MustNewID("traces")
metrics := MustNewID("metrics")
logs := MustNewID("logs")
receiver := MustNewID("receiver")

id1 := NewInstanceID(receiver, KindReceiver, traces)
id2 := id1.WithPipelines(metrics, logs)

assert.Equal(t, receiver, id1.ComponentID())
assert.Equal(t, KindReceiver, id1.Kind())
assert.Equal(t, map[ID]struct{}{
traces: {},
}, id1.pipelineIDs)

assert.Equal(t, receiver, id2.ComponentID())
assert.Equal(t, KindReceiver, id2.Kind())
assert.Equal(t, map[ID]struct{}{
traces: {},
metrics: {},
logs: {},
}, id2.pipelineIDs)
tracesA := MustNewIDWithName("traces", "a")
tracesB := MustNewIDWithName("traces", "b")
tracesC := MustNewIDWithName("traces", "c")

idTracesA := NewInstanceID(traces, KindReceiver, tracesA)
idTracesAll := NewInstanceID(traces, KindReceiver, tracesA, tracesB, tracesC)
assert.NotEqual(t, idTracesA, idTracesAll)

assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []ID) {
var pipelineIDs []ID
instanceID.EachPipelineID(func(id ID) bool {
pipelineIDs = append(pipelineIDs, id)
return true
})
assert.Equal(t, expectedPipelineIDs, pipelineIDs)
}

for _, tc := range []struct {
name string
id1 *InstanceID
id2 *InstanceID
pipelineIDs []ID
}{
{
name: "equal instances",
id1: idTracesA,
id2: NewInstanceID(traces, KindReceiver, tracesA),
pipelineIDs: []ID{tracesA},
},
{
name: "equal instances - out of order",
id1: idTracesAll,
id2: NewInstanceID(traces, KindReceiver, tracesC, tracesB, tracesA),
pipelineIDs: []ID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesB, tracesC),
pipelineIDs: []ID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines - out of order",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesC, tracesB),
pipelineIDs: []ID{tracesA, tracesB, tracesC},
},
} {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.id1, tc.id2)
assertHasPipelines(t, tc.id1, tc.pipelineIDs)
assertHasPipelines(t, tc.id2, tc.pipelineIDs)
})
}
}

func TestInstanceIDEachPipelineID(t *testing.T) {
instanceID := NewInstanceID(
MustNewID("traces"),
KindReceiver,
MustNewIDWithName("traces", "a"),
MustNewIDWithName("traces", "b"),
MustNewIDWithName("traces", "c"),
)

count := 0
instanceID.EachPipelineID(func(id ID) bool {
count++
return true
})
assert.Equal(t, 3, count)

count = 0
instanceID.EachPipelineID(func(id ID) bool {
count++
return false
})
assert.Equal(t, 1, count)

}

0 comments on commit 5e7d9eb

Please sign in to comment.