Skip to content

Commit

Permalink
Make componentstatus.InstanceID immutable and comparable
Browse files Browse the repository at this point in the history
  • Loading branch information
mwear committed Aug 16, 2024
1 parent abed3ab commit c8d3033
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 52 deletions.
25 changes: 25 additions & 0 deletions .chloggen/immutable-instance-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: componentstatus

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make componentstatus.InstanceID immutable.

# One or more tracking issues or pull requests related to the change
issues: [10494]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
83 changes: 79 additions & 4 deletions component/componentstatus/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,89 @@

package componentstatus // import "go.opentelemetry.io/collector/component/componentstatus"

import "go.opentelemetry.io/collector/component"
import (
"slices"
"sort"
"strings"

"go.opentelemetry.io/collector/component"
)

// InstanceID uniquely identifies a component instance
//
// TODO: consider moving this struct to a new package/module like `extension/statuswatcher`
// https://github.com/open-telemetry/opentelemetry-collector/issues/10764
// pipelineDelim is the delimeter for internal representation of pipeline
// component IDs.
const pipelineDelim = byte(0x20)

// InstanceID uniquely identifies a component instance.
type InstanceID struct {
ID component.ID
Kind component.Kind
PipelineIDs map[component.ID]struct{}
componentID component.ID
kind component.Kind
pipelineIDs string // IDs encoded as a string so InstanceID is Comparable.
}

// NewInstanceID returns an ID that uniquely identifies a component.
func NewInstanceID(componentID component.ID, kind component.Kind, pipelineIDs ...component.ID) *InstanceID {
instanceID := &InstanceID{
componentID: componentID,
kind: kind,
}
instanceID.addPipelines(pipelineIDs)
return instanceID
}

// ComponentID returns the ComponentID associated with this instance.
func (id *InstanceID) ComponentID() component.ID {
return id.componentID

Check warning on line 41 in component/componentstatus/instance.go

View check run for this annotation

Codecov / codecov/patch

component/componentstatus/instance.go#L40-L41

Added lines #L40 - L41 were not covered by tests
}

// Kind returns the component Kind associated with this instance.
func (id *InstanceID) Kind() component.Kind {
return id.kind

Check warning on line 46 in component/componentstatus/instance.go

View check run for this annotation

Codecov / codecov/patch

component/componentstatus/instance.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}

// AllPipelineIDs calls f for each pipeline this instance is associated with. If
// f returns false it will stop iteration.
func (id *InstanceID) AllPipelineIDs(f func(component.ID) bool) {
var bs []byte
for _, b := range []byte(id.pipelineIDs) {
if b != pipelineDelim {
bs = append(bs, b)
continue
}
pipelineID := component.ID{}
err := pipelineID.UnmarshalText(bs)
bs = bs[:0]
if err != nil {
continue
}
if !f(pipelineID) {
break
}
}
}

// WithPipelines returns a new InstanceID updated to include the given
// pipelineIDs.
func (id *InstanceID) WithPipelines(pipelineIDs ...component.ID) *InstanceID {
instanceID := &InstanceID{
componentID: id.componentID,
kind: id.kind,
pipelineIDs: id.pipelineIDs,
}
instanceID.addPipelines(pipelineIDs)
return instanceID
}

func (id *InstanceID) addPipelines(pipelineIDs []component.ID) {
delim := string(pipelineDelim)
strIDs := strings.Split(id.pipelineIDs, delim)
for _, pID := range pipelineIDs {
strIDs = append(strIDs, pID.String())
}
sort.Strings(strIDs)
strIDs = slices.Compact(strIDs)
id.pipelineIDs = strings.Join(strIDs, delim) + delim
}
94 changes: 94 additions & 0 deletions component/componentstatus/instance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componentstatus

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
)

func TestInstanceID(t *testing.T) {
traces := component.MustNewID("traces")
tracesA := component.MustNewIDWithName("traces", "a")
tracesB := component.MustNewIDWithName("traces", "b")
tracesC := component.MustNewIDWithName("traces", "c")

idTracesA := NewInstanceID(traces, component.KindReceiver, tracesA)
idTracesAll := NewInstanceID(traces, component.KindReceiver, tracesA, tracesB, tracesC)
assert.NotEqual(t, idTracesA, idTracesAll)

assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []component.ID) {
var pipelineIDs []component.ID
instanceID.AllPipelineIDs(func(id component.ID) bool {
pipelineIDs = append(pipelineIDs, id)
return true
})
assert.Equal(t, expectedPipelineIDs, pipelineIDs)
}

for _, tc := range []struct {
name string
id1 *InstanceID
id2 *InstanceID
pipelineIDs []component.ID
}{
{
name: "equal instances",
id1: idTracesA,
id2: NewInstanceID(traces, component.KindReceiver, tracesA),
pipelineIDs: []component.ID{tracesA},
},
{
name: "equal instances - out of order",
id1: idTracesAll,
id2: NewInstanceID(traces, component.KindReceiver, tracesC, tracesB, tracesA),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesB, tracesC),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines - out of order",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesC, tracesB),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
},
} {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.id1, tc.id2)
assertHasPipelines(t, tc.id1, tc.pipelineIDs)
assertHasPipelines(t, tc.id2, tc.pipelineIDs)
})
}
}

func TestAllPipelineIDs(t *testing.T) {
instanceID := NewInstanceID(
component.MustNewID("traces"),
component.KindReceiver,
component.MustNewIDWithName("traces", "a"),
component.MustNewIDWithName("traces", "b"),
component.MustNewIDWithName("traces", "c"),
)

count := 0
instanceID.AllPipelineIDs(func(id component.ID) bool {
count++
return true
})
assert.Equal(t, 3, count)

count = 0
instanceID.AllPipelineIDs(func(id component.ID) bool {
count++
return false
})
assert.Equal(t, 1, count)

}
2 changes: 1 addition & 1 deletion internal/e2e/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) {
assert.Equal(t, 5, len(eventsReceived))

for instanceID, events := range eventsReceived {
if instanceID.ID == component.NewID(component.MustNewType("test")) {
if instanceID.ComponentID() == component.NewID(component.MustNewType("test")) {
for i, e := range events {
if i == 0 {
assert.Equal(t, componentstatus.StatusStarting, e.Status())
Expand Down
4 changes: 2 additions & 2 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestComponentStatusWatcher(t *testing.T) {
changedComponents := map[*componentstatus.InstanceID][]componentstatus.Status{}
var mux sync.Mutex
onStatusChanged := func(source *componentstatus.InstanceID, event *componentstatus.Event) {
if source.ID.Type() != unhealthyProcessorFactory.Type() {
if source.ComponentID().Type() != unhealthyProcessorFactory.Type() {
return
}
mux.Lock()
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestComponentStatusWatcher(t *testing.T) {

for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ComponentID())
// And all must have a valid startup sequence
assert.Equal(t, startupStatuses(v), v)
}
Expand Down
5 changes: 1 addition & 4 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,7 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext
}

for _, extID := range cfg {
instanceID := &componentstatus.InstanceID{
ID: extID,
Kind: component.KindExtension,
}
instanceID := componentstatus.NewInstanceID(extID, component.KindExtension)
extSet := extension.Settings{
ID: extID,
TelemetrySettings: set.Telemetry,
Expand Down
54 changes: 19 additions & 35 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,67 +197,51 @@ func (g *Graph) createNodes(set Settings) error {
func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineID.Type(), recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{}
instanceID := g.instanceIDs[node.ID()]
g.instanceIDs[node.ID()] = instanceID.WithPipelines(pipelineID)
return node.(*receiverNode)
}
g.componentGraph.AddNode(rcvrNode)
g.instanceIDs[rcvrNode.ID()] = &componentstatus.InstanceID{
ID: recvID,
Kind: component.KindReceiver,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[rcvrNode.ID()] = componentstatus.NewInstanceID(
recvID, component.KindReceiver, pipelineID,
)
return rcvrNode
}

func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode {
procNode := newProcessorNode(pipelineID, procID)
g.componentGraph.AddNode(procNode)
g.instanceIDs[procNode.ID()] = &componentstatus.InstanceID{
ID: procID,
Kind: component.KindProcessor,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[procNode.ID()] = componentstatus.NewInstanceID(
procID, component.KindProcessor, pipelineID,
)
return procNode
}

func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineID.Type(), exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{}
instanceID := g.instanceIDs[expNode.ID()]
g.instanceIDs[expNode.ID()] = instanceID.WithPipelines(pipelineID)
return node.(*exporterNode)
}
g.componentGraph.AddNode(expNode)
g.instanceIDs[expNode.ID()] = &componentstatus.InstanceID{
ID: expNode.componentID,
Kind: component.KindExporter,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[expNode.ID()] = componentstatus.NewInstanceID(
expNode.componentID, component.KindExporter, pipelineID,
)
return expNode
}

func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode {
connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID)
if node := g.componentGraph.Node(connNode.ID()); node != nil {
instanceID := g.instanceIDs[connNode.ID()]
instanceID.PipelineIDs[exprPipelineID] = struct{}{}
instanceID.PipelineIDs[rcvrPipelineID] = struct{}{}
g.instanceIDs[connNode.ID()] = instanceID.WithPipelines(exprPipelineID, rcvrPipelineID)
return node.(*connectorNode)
}
g.componentGraph.AddNode(connNode)
g.instanceIDs[connNode.ID()] = &componentstatus.InstanceID{
ID: connNode.componentID,
Kind: component.KindConnector,
PipelineIDs: map[component.ID]struct{}{
exprPipelineID: {},
rcvrPipelineID: {},
},
}
g.instanceIDs[connNode.ID()] = componentstatus.NewInstanceID(
connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID,
)
return connNode
}

Expand Down Expand Up @@ -429,8 +413,8 @@ func (g *Graph) StartAll(ctx context.Context, host *Host) error {
g.telemetry.Logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).
Error("Failed to start component",
zap.Error(compErr),
zap.String("type", instanceID.Kind.String()),
zap.String("id", instanceID.ID.String()),
zap.String("type", instanceID.Kind().String()),
zap.String("id", instanceID.ComponentID().String()),
)
return compErr
}
Expand Down
12 changes: 6 additions & 6 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2260,12 +2260,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
eSdErr := &testNode{id: component.MustNewIDWithName("e_sd_err", "1"), shutdownErr: assert.AnError}

instanceIDs := map[*testNode]*componentstatus.InstanceID{
rNoErr: {ID: rNoErr.id},
rStErr: {ID: rStErr.id},
rSdErr: {ID: rSdErr.id},
eNoErr: {ID: eNoErr.id},
eStErr: {ID: eStErr.id},
eSdErr: {ID: eSdErr.id},
rNoErr: componentstatus.NewInstanceID(rNoErr.id, component.KindReceiver),
rStErr: componentstatus.NewInstanceID(rStErr.id, component.KindReceiver),
rSdErr: componentstatus.NewInstanceID(rSdErr.id, component.KindReceiver),
eNoErr: componentstatus.NewInstanceID(eNoErr.id, component.KindExporter),
eStErr: componentstatus.NewInstanceID(eStErr.id, component.KindExporter),
eSdErr: componentstatus.NewInstanceID(eSdErr.id, component.KindExporter),
}

// compare two maps of status events ignoring timestamp
Expand Down

0 comments on commit c8d3033

Please sign in to comment.