diff --git a/.chloggen/immutable-instance-id.yaml b/.chloggen/immutable-instance-id.yaml new file mode 100644 index 00000000000..c23aa60c8b5 --- /dev/null +++ b/.chloggen/immutable-instance-id.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: componentstatus + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make componentstatus.InstanceID immutable. + +# One or more tracking issues or pull requests related to the change +issues: [10494] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/component/componentstatus/instance.go b/component/componentstatus/instance.go index 22ea4c8003e..8d4d3181f27 100644 --- a/component/componentstatus/instance.go +++ b/component/componentstatus/instance.go @@ -3,14 +3,89 @@ package componentstatus // import "go.opentelemetry.io/collector/component/componentstatus" -import "go.opentelemetry.io/collector/component" +import ( + "slices" + "sort" + "strings" + + "go.opentelemetry.io/collector/component" +) // InstanceID uniquely identifies a component instance // // TODO: consider moving this struct to a new package/module like `extension/statuswatcher` // https://github.com/open-telemetry/opentelemetry-collector/issues/10764 +// pipelineDelim is the delimeter for internal representation of pipeline +// component IDs. +const pipelineDelim = byte(0x20) + +// InstanceID uniquely identifies a component instance. type InstanceID struct { - ID component.ID - Kind component.Kind - PipelineIDs map[component.ID]struct{} + componentID component.ID + kind component.Kind + pipelineIDs string // IDs encoded as a string so InstanceID is Comparable. +} + +// NewInstanceID returns an ID that uniquely identifies a component. +func NewInstanceID(componentID component.ID, kind component.Kind, pipelineIDs ...component.ID) *InstanceID { + instanceID := &InstanceID{ + componentID: componentID, + kind: kind, + } + instanceID.addPipelines(pipelineIDs) + return instanceID +} + +// ComponentID returns the ComponentID associated with this instance. +func (id *InstanceID) ComponentID() component.ID { + return id.componentID +} + +// Kind returns the component Kind associated with this instance. +func (id *InstanceID) Kind() component.Kind { + return id.kind +} + +// AllPipelineIDs calls f for each pipeline this instance is associated with. If +// f returns false it will stop iteration. +func (id *InstanceID) AllPipelineIDs(f func(component.ID) bool) { + var bs []byte + for _, b := range []byte(id.pipelineIDs) { + if b != pipelineDelim { + bs = append(bs, b) + continue + } + pipelineID := component.ID{} + err := pipelineID.UnmarshalText(bs) + bs = bs[:0] + if err != nil { + continue + } + if !f(pipelineID) { + break + } + } +} + +// WithPipelines returns a new InstanceID updated to include the given +// pipelineIDs. +func (id *InstanceID) WithPipelines(pipelineIDs ...component.ID) *InstanceID { + instanceID := &InstanceID{ + componentID: id.componentID, + kind: id.kind, + pipelineIDs: id.pipelineIDs, + } + instanceID.addPipelines(pipelineIDs) + return instanceID +} + +func (id *InstanceID) addPipelines(pipelineIDs []component.ID) { + delim := string(pipelineDelim) + strIDs := strings.Split(id.pipelineIDs, delim) + for _, pID := range pipelineIDs { + strIDs = append(strIDs, pID.String()) + } + sort.Strings(strIDs) + strIDs = slices.Compact(strIDs) + id.pipelineIDs = strings.Join(strIDs, delim) + delim } diff --git a/component/componentstatus/instance_test.go b/component/componentstatus/instance_test.go new file mode 100644 index 00000000000..88e74ac4a7f --- /dev/null +++ b/component/componentstatus/instance_test.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package componentstatus + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" +) + +func TestInstanceID(t *testing.T) { + traces := component.MustNewID("traces") + tracesA := component.MustNewIDWithName("traces", "a") + tracesB := component.MustNewIDWithName("traces", "b") + tracesC := component.MustNewIDWithName("traces", "c") + + idTracesA := NewInstanceID(traces, component.KindReceiver, tracesA) + idTracesAll := NewInstanceID(traces, component.KindReceiver, tracesA, tracesB, tracesC) + assert.NotEqual(t, idTracesA, idTracesAll) + + assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []component.ID) { + var pipelineIDs []component.ID + instanceID.AllPipelineIDs(func(id component.ID) bool { + pipelineIDs = append(pipelineIDs, id) + return true + }) + assert.Equal(t, expectedPipelineIDs, pipelineIDs) + } + + for _, tc := range []struct { + name string + id1 *InstanceID + id2 *InstanceID + pipelineIDs []component.ID + }{ + { + name: "equal instances", + id1: idTracesA, + id2: NewInstanceID(traces, component.KindReceiver, tracesA), + pipelineIDs: []component.ID{tracesA}, + }, + { + name: "equal instances - out of order", + id1: idTracesAll, + id2: NewInstanceID(traces, component.KindReceiver, tracesC, tracesB, tracesA), + pipelineIDs: []component.ID{tracesA, tracesB, tracesC}, + }, + { + name: "with pipelines", + id1: idTracesAll, + id2: idTracesA.WithPipelines(tracesB, tracesC), + pipelineIDs: []component.ID{tracesA, tracesB, tracesC}, + }, + { + name: "with pipelines - out of order", + id1: idTracesAll, + id2: idTracesA.WithPipelines(tracesC, tracesB), + pipelineIDs: []component.ID{tracesA, tracesB, tracesC}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.id1, tc.id2) + assertHasPipelines(t, tc.id1, tc.pipelineIDs) + assertHasPipelines(t, tc.id2, tc.pipelineIDs) + }) + } +} + +func TestAllPipelineIDs(t *testing.T) { + instanceID := NewInstanceID( + component.MustNewID("traces"), + component.KindReceiver, + component.MustNewIDWithName("traces", "a"), + component.MustNewIDWithName("traces", "b"), + component.MustNewIDWithName("traces", "c"), + ) + + count := 0 + instanceID.AllPipelineIDs(func(id component.ID) bool { + count++ + return true + }) + assert.Equal(t, 3, count) + + count = 0 + instanceID.AllPipelineIDs(func(id component.ID) bool { + count++ + return false + }) + assert.Equal(t, 1, count) + +} diff --git a/internal/e2e/status_test.go b/internal/e2e/status_test.go index b49d7bea600..daf90cb76ed 100644 --- a/internal/e2e/status_test.go +++ b/internal/e2e/status_test.go @@ -105,7 +105,7 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) { assert.Equal(t, 5, len(eventsReceived)) for instanceID, events := range eventsReceived { - if instanceID.ID == component.NewID(component.MustNewType("test")) { + if instanceID.ComponentID() == component.NewID(component.MustNewType("test")) { for i, e := range events { if i == 0 { assert.Equal(t, componentstatus.StatusStarting, e.Status()) diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index ea3578e47b4..14465dff826 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -149,7 +149,7 @@ func TestComponentStatusWatcher(t *testing.T) { changedComponents := map[*componentstatus.InstanceID][]componentstatus.Status{} var mux sync.Mutex onStatusChanged := func(source *componentstatus.InstanceID, event *componentstatus.Event) { - if source.ID.Type() != unhealthyProcessorFactory.Type() { + if source.ComponentID().Type() != unhealthyProcessorFactory.Type() { return } mux.Lock() @@ -200,7 +200,7 @@ func TestComponentStatusWatcher(t *testing.T) { for k, v := range changedComponents { // All processors must report a status change with the same ID - assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID) + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ComponentID()) // And all must have a valid startup sequence assert.Equal(t, startupStatuses(v), v) } diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index e87de41b6e3..c7d3a5cfa6c 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -198,10 +198,7 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext } for _, extID := range cfg { - instanceID := &componentstatus.InstanceID{ - ID: extID, - Kind: component.KindExtension, - } + instanceID := componentstatus.NewInstanceID(extID, component.KindExtension) extSet := extension.Settings{ ID: extID, TelemetrySettings: set.Telemetry, diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 9839c193022..1916247ca0c 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -197,47 +197,37 @@ func (g *Graph) createNodes(set Settings) error { func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode { rcvrNode := newReceiverNode(pipelineID.Type(), recvID) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { - g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{} + instanceID := g.instanceIDs[node.ID()] + g.instanceIDs[node.ID()] = instanceID.WithPipelines(pipelineID) return node.(*receiverNode) } g.componentGraph.AddNode(rcvrNode) - g.instanceIDs[rcvrNode.ID()] = &componentstatus.InstanceID{ - ID: recvID, - Kind: component.KindReceiver, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } + g.instanceIDs[rcvrNode.ID()] = componentstatus.NewInstanceID( + recvID, component.KindReceiver, pipelineID, + ) return rcvrNode } func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode { procNode := newProcessorNode(pipelineID, procID) g.componentGraph.AddNode(procNode) - g.instanceIDs[procNode.ID()] = &componentstatus.InstanceID{ - ID: procID, - Kind: component.KindProcessor, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } + g.instanceIDs[procNode.ID()] = componentstatus.NewInstanceID( + procID, component.KindProcessor, pipelineID, + ) return procNode } func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode { expNode := newExporterNode(pipelineID.Type(), exprID) if node := g.componentGraph.Node(expNode.ID()); node != nil { - g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{} + instanceID := g.instanceIDs[expNode.ID()] + g.instanceIDs[expNode.ID()] = instanceID.WithPipelines(pipelineID) return node.(*exporterNode) } g.componentGraph.AddNode(expNode) - g.instanceIDs[expNode.ID()] = &componentstatus.InstanceID{ - ID: expNode.componentID, - Kind: component.KindExporter, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } + g.instanceIDs[expNode.ID()] = componentstatus.NewInstanceID( + expNode.componentID, component.KindExporter, pipelineID, + ) return expNode } @@ -245,19 +235,13 @@ func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID) if node := g.componentGraph.Node(connNode.ID()); node != nil { instanceID := g.instanceIDs[connNode.ID()] - instanceID.PipelineIDs[exprPipelineID] = struct{}{} - instanceID.PipelineIDs[rcvrPipelineID] = struct{}{} + g.instanceIDs[connNode.ID()] = instanceID.WithPipelines(exprPipelineID, rcvrPipelineID) return node.(*connectorNode) } g.componentGraph.AddNode(connNode) - g.instanceIDs[connNode.ID()] = &componentstatus.InstanceID{ - ID: connNode.componentID, - Kind: component.KindConnector, - PipelineIDs: map[component.ID]struct{}{ - exprPipelineID: {}, - rcvrPipelineID: {}, - }, - } + g.instanceIDs[connNode.ID()] = componentstatus.NewInstanceID( + connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID, + ) return connNode } @@ -429,8 +413,8 @@ func (g *Graph) StartAll(ctx context.Context, host *Host) error { g.telemetry.Logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)). Error("Failed to start component", zap.Error(compErr), - zap.String("type", instanceID.Kind.String()), - zap.String("id", instanceID.ID.String()), + zap.String("type", instanceID.Kind().String()), + zap.String("id", instanceID.ComponentID().String()), ) return compErr } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 8b98d020fd5..3593d4533fc 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -2260,12 +2260,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) { eSdErr := &testNode{id: component.MustNewIDWithName("e_sd_err", "1"), shutdownErr: assert.AnError} instanceIDs := map[*testNode]*componentstatus.InstanceID{ - rNoErr: {ID: rNoErr.id}, - rStErr: {ID: rStErr.id}, - rSdErr: {ID: rSdErr.id}, - eNoErr: {ID: eNoErr.id}, - eStErr: {ID: eStErr.id}, - eSdErr: {ID: eSdErr.id}, + rNoErr: componentstatus.NewInstanceID(rNoErr.id, component.KindReceiver), + rStErr: componentstatus.NewInstanceID(rStErr.id, component.KindReceiver), + rSdErr: componentstatus.NewInstanceID(rSdErr.id, component.KindReceiver), + eNoErr: componentstatus.NewInstanceID(eNoErr.id, component.KindExporter), + eStErr: componentstatus.NewInstanceID(eStErr.id, component.KindExporter), + eSdErr: componentstatus.NewInstanceID(eSdErr.id, component.KindExporter), } // compare two maps of status events ignoring timestamp