diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 1962bf9e49f..6c4dc61307a 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -5,15 +5,12 @@ package graph import ( "context" - "errors" "fmt" "strings" - "sync" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gonum.org/v1/gonum/graph/simple" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentprofiles" @@ -22,19 +19,13 @@ import ( "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/connector/connectorprofiles" "go.opentelemetry.io/collector/connector/connectortest" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerprofiles" - "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterprofiles" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receiverprofiles" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/service/internal/builders" "go.opentelemetry.io/collector/service/internal/status" @@ -43,190 +34,6 @@ import ( "go.opentelemetry.io/collector/service/pipelines" ) -var _ component.Component = (*testNode)(nil) - -type testNode struct { - id component.ID - startErr error - shutdownErr error -} - -// ID satisfies the graph.Node interface, allowing -// testNode to be used in a simple.DirectedGraph -func (n *testNode) ID() int64 { - return int64(newNodeID(n.id.String())) -} - -func (n *testNode) Start(ctx context.Context, _ component.Host) error { - if n.startErr != nil { - return n.startErr - } - if cwo, ok := ctx.(*contextWithOrder); ok { - cwo.record(n.id) - } - return nil -} - -func (n *testNode) Shutdown(ctx context.Context) error { - if n.shutdownErr != nil { - return n.shutdownErr - } - if cwo, ok := ctx.(*contextWithOrder); ok { - cwo.record(n.id) - } - return nil -} - -type contextWithOrder struct { - context.Context - sync.Mutex - next int - order map[component.ID]int -} - -func (c *contextWithOrder) record(id component.ID) { - c.Lock() - c.order[id] = c.next - c.next++ - c.Unlock() -} - -func TestGraphStartStop(t *testing.T) { - testCases := []struct { - name string - edges [][2]component.ID - }{ - { - name: "single", - edges: [][2]component.ID{ - {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "1")}, - {component.MustNewIDWithName("r", "2"), component.MustNewIDWithName("p", "1")}, - {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("p", "2")}, - {component.MustNewIDWithName("p", "2"), component.MustNewIDWithName("e", "1")}, - {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("e", "2")}, - }, - }, - { - name: "multi", - edges: [][2]component.ID{ - // Pipeline 1 - {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "1")}, - {component.MustNewIDWithName("r", "2"), component.MustNewIDWithName("p", "1")}, - {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("p", "2")}, - {component.MustNewIDWithName("p", "2"), component.MustNewIDWithName("e", "1")}, - {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("e", "2")}, - - // Pipeline 2, shares r1 and e2 - {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "3")}, - {component.MustNewIDWithName("p", "3"), component.MustNewIDWithName("e", "2")}, - }, - }, - { - name: "connected", - edges: [][2]component.ID{ - // Pipeline 1 - {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "1")}, - {component.MustNewIDWithName("r", "2"), component.MustNewIDWithName("p", "1")}, - {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("p", "2")}, - {component.MustNewIDWithName("p", "2"), component.MustNewIDWithName("e", "1")}, - {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("c", "1")}, - - // Pipeline 2, shares r1 and c1 - {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "3")}, - {component.MustNewIDWithName("p", "3"), component.MustNewIDWithName("c", "1")}, - - // Pipeline 3, emits to e2 and c2 - {component.MustNewIDWithName("c", "1"), component.MustNewIDWithName("e", "2")}, - {component.MustNewIDWithName("c", "1"), component.MustNewIDWithName("c", "2")}, - - // Pipeline 4, also emits to e2 - {component.MustNewIDWithName("c", "2"), component.MustNewIDWithName("e", "2")}, - }, - }, - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - ctx := &contextWithOrder{ - Context: context.Background(), - order: map[component.ID]int{}, - } - - pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = componenttest.NewNopTelemetrySettings() - pg.instanceIDs = make(map[int64]*componentstatus.InstanceID) - - for _, edge := range tt.edges { - f, t := &testNode{id: edge[0]}, &testNode{id: edge[1]} - pg.instanceIDs[f.ID()] = &componentstatus.InstanceID{} - pg.instanceIDs[t.ID()] = &componentstatus.InstanceID{} - pg.componentGraph.SetEdge(simple.Edge{F: f, T: t}) - } - - require.NoError(t, pg.StartAll(ctx, &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) - for _, edge := range tt.edges { - assert.Greater(t, ctx.order[edge[0]], ctx.order[edge[1]]) - } - - ctx.order = map[component.ID]int{} - require.NoError(t, pg.ShutdownAll(ctx, statustest.NewNopStatusReporter())) - for _, edge := range tt.edges { - assert.Less(t, ctx.order[edge[0]], ctx.order[edge[1]]) - } - }) - } -} - -func TestGraphStartStopCycle(t *testing.T) { - pg := &Graph{componentGraph: simple.NewDirectedGraph()} - - r1 := &testNode{id: component.MustNewIDWithName("r", "1")} - p1 := &testNode{id: component.MustNewIDWithName("p", "1")} - c1 := &testNode{id: component.MustNewIDWithName("c", "1")} - e1 := &testNode{id: component.MustNewIDWithName("e", "1")} - - pg.instanceIDs = map[int64]*componentstatus.InstanceID{ - r1.ID(): {}, - p1.ID(): {}, - c1.ID(): {}, - e1.ID(): {}, - } - - pg.componentGraph.SetEdge(simple.Edge{F: r1, T: p1}) - pg.componentGraph.SetEdge(simple.Edge{F: p1, T: c1}) - pg.componentGraph.SetEdge(simple.Edge{F: c1, T: e1}) - pg.componentGraph.SetEdge(simple.Edge{F: c1, T: p1}) // loop back - - err := pg.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})}) - require.ErrorContains(t, err, `topo: no topological ordering: cyclic components`) - - err = pg.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()) - assert.ErrorContains(t, err, `topo: no topological ordering: cyclic components`) -} - -func TestGraphStartStopComponentError(t *testing.T) { - pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = componenttest.NewNopTelemetrySettings() - r1 := &testNode{ - id: component.MustNewIDWithName("r", "1"), - startErr: errors.New("foo"), - } - e1 := &testNode{ - id: component.MustNewIDWithName("e", "1"), - shutdownErr: errors.New("bar"), - } - pg.instanceIDs = map[int64]*componentstatus.InstanceID{ - r1.ID(): {}, - e1.ID(): {}, - } - pg.componentGraph.SetEdge(simple.Edge{ - F: r1, - T: e1, - }) - require.EqualError(t, pg.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})}), "foo") - assert.EqualError(t, pg.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()), "bar") -} - func TestConnectorPipelinesGraph(t *testing.T) { tests := []struct { name string @@ -2664,501 +2471,3 @@ func TestGraphBuildErrors(t *testing.T) { }) } } - -// This includes all tests from the previous implementation, plus a new one -// relevant only to the new graph-based implementation. -func TestGraphFailToStartAndShutdown(t *testing.T) { - errReceiverFactory := newErrReceiverFactory() - errProcessorFactory := newErrProcessorFactory() - errExporterFactory := newErrExporterFactory() - errConnectorFactory := newErrConnectorFactory() - nopReceiverFactory := receivertest.NewNopFactory() - nopProcessorFactory := processortest.NewNopFactory() - nopExporterFactory := exportertest.NewNopFactory() - nopConnectorFactory := connectortest.NewNopFactory() - - set := Settings{ - Telemetry: componenttest.NewNopTelemetrySettings(), - BuildInfo: component.NewDefaultBuildInfo(), - ReceiverBuilder: builders.NewReceiver( - map[component.ID]component.Config{ - component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), - component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), - }, - map[component.Type]receiver.Factory{ - nopReceiverFactory.Type(): nopReceiverFactory, - errReceiverFactory.Type(): errReceiverFactory, - }), - ProcessorBuilder: builders.NewProcessor( - map[component.ID]component.Config{ - component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), - component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), - }, - map[component.Type]processor.Factory{ - nopProcessorFactory.Type(): nopProcessorFactory, - errProcessorFactory.Type(): errProcessorFactory, - }), - ExporterBuilder: builders.NewExporter( - map[component.ID]component.Config{ - component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), - component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), - }, - map[component.Type]exporter.Factory{ - nopExporterFactory.Type(): nopExporterFactory, - errExporterFactory.Type(): errExporterFactory, - }), - ConnectorBuilder: builders.NewConnector( - map[component.ID]component.Config{ - component.NewIDWithName(nopConnectorFactory.Type(), "conn"): nopConnectorFactory.CreateDefaultConfig(), - component.NewIDWithName(errConnectorFactory.Type(), "conn"): errConnectorFactory.CreateDefaultConfig(), - }, - map[component.Type]connector.Factory{ - nopConnectorFactory.Type(): nopConnectorFactory, - errConnectorFactory.Type(): errConnectorFactory, - }), - } - - dataTypes := []pipeline.Signal{pipeline.SignalTraces, pipeline.SignalMetrics, pipeline.SignalLogs} - for _, dt := range dataTypes { - t.Run(dt.String()+"/receiver", func(t *testing.T) { - set.PipelineConfigs = pipelines.Config{ - pipeline.NewID(dt): { - Receivers: []component.ID{component.MustNewID("nop"), component.MustNewID("err")}, - Processors: []component.ID{component.MustNewID("nop")}, - Exporters: []component.ID{component.MustNewID("nop")}, - }, - } - pipelines, err := Build(context.Background(), set) - require.NoError(t, err) - require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) - assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) - }) - - t.Run(dt.String()+"/processor", func(t *testing.T) { - set.PipelineConfigs = pipelines.Config{ - pipeline.NewID(dt): { - Receivers: []component.ID{component.MustNewID("nop")}, - Processors: []component.ID{component.MustNewID("nop"), component.MustNewID("err")}, - Exporters: []component.ID{component.MustNewID("nop")}, - }, - } - pipelines, err := Build(context.Background(), set) - require.NoError(t, err) - require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) - assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) - }) - - t.Run(dt.String()+"/exporter", func(t *testing.T) { - set.PipelineConfigs = pipelines.Config{ - pipeline.NewID(dt): { - Receivers: []component.ID{component.MustNewID("nop")}, - Processors: []component.ID{component.MustNewID("nop")}, - Exporters: []component.ID{component.MustNewID("nop"), component.MustNewID("err")}, - }, - } - pipelines, err := Build(context.Background(), set) - require.NoError(t, err) - require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) - assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) - }) - - for _, dt2 := range dataTypes { - t.Run(dt.String()+"/"+dt2.String()+"/connector", func(t *testing.T) { - set.PipelineConfigs = pipelines.Config{ - pipeline.NewIDWithName(dt, "in"): { - Receivers: []component.ID{component.MustNewID("nop")}, - Processors: []component.ID{component.MustNewID("nop")}, - Exporters: []component.ID{component.MustNewID("nop"), component.MustNewIDWithName("err", "conn")}, - }, - pipeline.NewIDWithName(dt2, "out"): { - Receivers: []component.ID{component.MustNewID("nop"), component.MustNewIDWithName("err", "conn")}, - Processors: []component.ID{component.MustNewID("nop")}, - Exporters: []component.ID{component.MustNewID("nop")}, - }, - } - pipelines, err := Build(context.Background(), set) - require.NoError(t, err) - require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) - assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) - }) - } - } -} - -func TestStatusReportedOnStartupShutdown(t *testing.T) { - - rNoErr := &testNode{id: component.MustNewIDWithName("r_no_err", "1")} - rStErr := &testNode{id: component.MustNewIDWithName("r_st_err", "1"), startErr: assert.AnError} - rSdErr := &testNode{id: component.MustNewIDWithName("r_sd_err", "1"), shutdownErr: assert.AnError} - - eNoErr := &testNode{id: component.MustNewIDWithName("e_no_err", "1")} - eStErr := &testNode{id: component.MustNewIDWithName("e_st_err", "1"), startErr: assert.AnError} - eSdErr := &testNode{id: component.MustNewIDWithName("e_sd_err", "1"), shutdownErr: assert.AnError} - - instanceIDs := map[*testNode]*componentstatus.InstanceID{ - rNoErr: componentstatus.NewInstanceID(rNoErr.id, component.KindReceiver), - rStErr: componentstatus.NewInstanceID(rStErr.id, component.KindReceiver), - rSdErr: componentstatus.NewInstanceID(rSdErr.id, component.KindReceiver), - eNoErr: componentstatus.NewInstanceID(eNoErr.id, component.KindExporter), - eStErr: componentstatus.NewInstanceID(eStErr.id, component.KindExporter), - eSdErr: componentstatus.NewInstanceID(eSdErr.id, component.KindExporter), - } - - // compare two maps of status events ignoring timestamp - assertEqualStatuses := func(t *testing.T, evMap1, evMap2 map[*componentstatus.InstanceID][]*componentstatus.Event) { - assert.Equal(t, len(evMap1), len(evMap2)) - for id, evts1 := range evMap1 { - evts2 := evMap2[id] - assert.Equal(t, len(evts1), len(evts2)) - for i := 0; i < len(evts1); i++ { - ev1 := evts1[i] - ev2 := evts2[i] - assert.Equal(t, ev1.Status(), ev2.Status()) - assert.Equal(t, ev1.Err(), ev2.Err()) - } - } - - } - - for _, tt := range []struct { - name string - edge [2]*testNode - expectedStatuses map[*componentstatus.InstanceID][]*componentstatus.Event - startupErr error - shutdownErr error - }{ - { - name: "successful startup/shutdown", - edge: [2]*testNode{rNoErr, eNoErr}, - expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ - instanceIDs[rNoErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewEvent(componentstatus.StatusOK), - componentstatus.NewEvent(componentstatus.StatusStopping), - componentstatus.NewEvent(componentstatus.StatusStopped), - }, - instanceIDs[eNoErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewEvent(componentstatus.StatusOK), - componentstatus.NewEvent(componentstatus.StatusStopping), - componentstatus.NewEvent(componentstatus.StatusStopped), - }, - }, - }, - { - name: "early startup error", - edge: [2]*testNode{rNoErr, eStErr}, - expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ - instanceIDs[eStErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewPermanentErrorEvent(assert.AnError), - }, - }, - startupErr: assert.AnError, - }, - { - name: "late startup error", - edge: [2]*testNode{rStErr, eNoErr}, - expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ - instanceIDs[rStErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewPermanentErrorEvent(assert.AnError), - }, - instanceIDs[eNoErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewEvent(componentstatus.StatusOK), - componentstatus.NewEvent(componentstatus.StatusStopping), - componentstatus.NewEvent(componentstatus.StatusStopped), - }, - }, - startupErr: assert.AnError, - }, - { - name: "early shutdown error", - edge: [2]*testNode{rSdErr, eNoErr}, - expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ - instanceIDs[rSdErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewEvent(componentstatus.StatusOK), - componentstatus.NewEvent(componentstatus.StatusStopping), - componentstatus.NewPermanentErrorEvent(assert.AnError), - }, - instanceIDs[eNoErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewEvent(componentstatus.StatusOK), - componentstatus.NewEvent(componentstatus.StatusStopping), - componentstatus.NewEvent(componentstatus.StatusStopped), - }, - }, - shutdownErr: assert.AnError, - }, - { - name: "late shutdown error", - edge: [2]*testNode{rNoErr, eSdErr}, - expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ - instanceIDs[rNoErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewEvent(componentstatus.StatusOK), - componentstatus.NewEvent(componentstatus.StatusStopping), - componentstatus.NewEvent(componentstatus.StatusStopped), - }, - instanceIDs[eSdErr]: { - componentstatus.NewEvent(componentstatus.StatusStarting), - componentstatus.NewEvent(componentstatus.StatusOK), - componentstatus.NewEvent(componentstatus.StatusStopping), - componentstatus.NewPermanentErrorEvent(assert.AnError), - }, - }, - shutdownErr: assert.AnError, - }, - } { - t.Run(tt.name, func(t *testing.T) { - pg := &Graph{componentGraph: simple.NewDirectedGraph()} - pg.telemetry = componenttest.NewNopTelemetrySettings() - - actualStatuses := make(map[*componentstatus.InstanceID][]*componentstatus.Event) - rep := status.NewReporter(func(id *componentstatus.InstanceID, ev *componentstatus.Event) { - actualStatuses[id] = append(actualStatuses[id], ev) - }, func(error) { - }) - - rep.Ready() - - e0, e1 := tt.edge[0], tt.edge[1] - pg.instanceIDs = map[int64]*componentstatus.InstanceID{ - e0.ID(): instanceIDs[e0], - e1.ID(): instanceIDs[e1], - } - pg.componentGraph.SetEdge(simple.Edge{F: e0, T: e1}) - - assert.Equal(t, tt.startupErr, pg.StartAll(context.Background(), &Host{Reporter: rep})) - assert.Equal(t, tt.shutdownErr, pg.ShutdownAll(context.Background(), rep)) - assertEqualStatuses(t, tt.expectedStatuses, actualStatuses) - }) - } -} - -func (g *Graph) getReceivers() map[pipeline.Signal]map[component.ID]component.Component { - receiversMap := make(map[pipeline.Signal]map[component.ID]component.Component) - receiversMap[pipeline.SignalTraces] = make(map[component.ID]component.Component) - receiversMap[pipeline.SignalMetrics] = make(map[component.ID]component.Component) - receiversMap[pipeline.SignalLogs] = make(map[component.ID]component.Component) - receiversMap[componentprofiles.SignalProfiles] = make(map[component.ID]component.Component) - - for _, pg := range g.pipelines { - for _, rcvrNode := range pg.receivers { - rcvrOrConnNode := g.componentGraph.Node(rcvrNode.ID()) - rcvrNode, ok := rcvrOrConnNode.(*receiverNode) - if !ok { - continue - } - receiversMap[rcvrNode.pipelineType][rcvrNode.componentID] = rcvrNode.Component - } - } - return receiversMap -} - -// Calculates the expected number of receiver and exporter instances in the specified pipeline. -// -// Expect one instance of each receiver and exporter, unless it is a connector. -// -// For Connectors: -// - Let E equal the number of pipeline types in which the connector is used as an exporter. -// - Let R equal the number of pipeline types in which the connector is used as a receiver. -// -// Within the graph as a whole, we expect E*R instances, i.e. one per combination of data types. -// -// However, within an individual pipeline, we expect: -// - E instances of the connector as a receiver. -// - R instances of the connector as an exporter. -func expectedInstances(m pipelines.Config, pID pipeline.ID) (int, int) { - exConnectorType := component.MustNewType("exampleconnector") - var r, e int - for _, rID := range m[pID].Receivers { - if rID.Type() != exConnectorType { - r++ - continue - } - - // This is a connector. Count the pipeline types where it is an exporter. - typeMap := map[pipeline.Signal]bool{} - for pID, pCfg := range m { - for _, eID := range pCfg.Exporters { - if eID == rID { - typeMap[pID.Signal()] = true - } - } - } - r += len(typeMap) - } - for _, eID := range m[pID].Exporters { - if eID.Type() != exConnectorType { - e++ - continue - } - - // This is a connector. Count the pipeline types where it is a receiver. - typeMap := map[pipeline.Signal]bool{} - for pID, pCfg := range m { - for _, rID := range pCfg.Receivers { - if rID == eID { - typeMap[pID.Signal()] = true - } - } - } - e += len(typeMap) - } - return r, e -} - -func newBadReceiverFactory() receiver.Factory { - return receiver.NewFactory(component.MustNewType("bf"), func() component.Config { - return &struct{}{} - }) -} - -func newBadProcessorFactory() processor.Factory { - return processor.NewFactory(component.MustNewType("bf"), func() component.Config { - return &struct{}{} - }) -} - -func newBadExporterFactory() exporter.Factory { - return exporter.NewFactory(component.MustNewType("bf"), func() component.Config { - return &struct{}{} - }) -} - -func newBadConnectorFactory() connector.Factory { - return connector.NewFactory(component.MustNewType("bf"), func() component.Config { - return &struct{}{} - }) -} - -func newErrReceiverFactory() receiver.Factory { - return receiverprofiles.NewFactory(component.MustNewType("err"), - func() component.Config { return &struct{}{} }, - receiverprofiles.WithTraces(func(context.Context, receiver.Settings, component.Config, consumer.Traces) (receiver.Traces, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - receiverprofiles.WithLogs(func(context.Context, receiver.Settings, component.Config, consumer.Logs) (receiver.Logs, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - receiverprofiles.WithMetrics(func(context.Context, receiver.Settings, component.Config, consumer.Metrics) (receiver.Metrics, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - receiverprofiles.WithProfiles(func(context.Context, receiver.Settings, component.Config, consumerprofiles.Profiles) (receiverprofiles.Profiles, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - ) -} - -func newErrProcessorFactory() processor.Factory { - return processorprofiles.NewFactory(component.MustNewType("err"), - func() component.Config { return &struct{}{} }, - processorprofiles.WithTraces(func(context.Context, processor.Settings, component.Config, consumer.Traces) (processor.Traces, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - processorprofiles.WithLogs(func(context.Context, processor.Settings, component.Config, consumer.Logs) (processor.Logs, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - processorprofiles.WithMetrics(func(context.Context, processor.Settings, component.Config, consumer.Metrics) (processor.Metrics, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - processorprofiles.WithProfiles(func(context.Context, processor.Settings, component.Config, consumerprofiles.Profiles) (processorprofiles.Profiles, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - ) -} - -func newErrExporterFactory() exporter.Factory { - return exporterprofiles.NewFactory(component.MustNewType("err"), - func() component.Config { return &struct{}{} }, - exporterprofiles.WithTraces(func(context.Context, exporter.Settings, component.Config) (exporter.Traces, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - exporterprofiles.WithLogs(func(context.Context, exporter.Settings, component.Config) (exporter.Logs, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - exporterprofiles.WithMetrics(func(context.Context, exporter.Settings, component.Config) (exporter.Metrics, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - exporterprofiles.WithProfiles(func(context.Context, exporter.Settings, component.Config) (exporterprofiles.Profiles, error) { - return &errComponent{}, nil - }, component.StabilityLevelUndefined), - ) -} - -func newErrConnectorFactory() connector.Factory { - return connectorprofiles.NewFactory(component.MustNewType("err"), func() component.Config { - return &struct{}{} - }, - connectorprofiles.WithTracesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Traces, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithTracesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Traces, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithTracesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Traces, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithTracesToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connector.Traces, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - - connectorprofiles.WithMetricsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Metrics, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithMetricsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Metrics, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithMetricsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Metrics, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithMetricsToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connector.Metrics, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - - connectorprofiles.WithLogsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Logs, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithLogsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Logs, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithLogsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Logs, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithLogsToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connector.Logs, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - - connectorprofiles.WithProfilesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connectorprofiles.Profiles, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithProfilesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connectorprofiles.Profiles, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithProfilesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connectorprofiles.Profiles, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - connectorprofiles.WithProfilesToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connectorprofiles.Profiles, error) { - return &errComponent{}, nil - }, component.StabilityLevelUnmaintained), - ) -} - -type errComponent struct { - consumertest.Consumer -} - -func (e errComponent) Capabilities() consumer.Capabilities { - return consumer.Capabilities{MutatesData: false} -} - -func (e errComponent) Start(context.Context, component.Host) error { - return errors.New("my error") -} - -func (e errComponent) Shutdown(context.Context) error { - return errors.New("my error") -} diff --git a/service/internal/graph/lifecycle_test.go b/service/internal/graph/lifecycle_test.go new file mode 100644 index 00000000000..b2c9c195533 --- /dev/null +++ b/service/internal/graph/lifecycle_test.go @@ -0,0 +1,440 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gonum.org/v1/gonum/graph/simple" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/service/internal/builders" + "go.opentelemetry.io/collector/service/internal/status" + "go.opentelemetry.io/collector/service/internal/status/statustest" + "go.opentelemetry.io/collector/service/pipelines" +) + +func TestGraphStartStop(t *testing.T) { + testCases := []struct { + name string + edges [][2]component.ID + }{ + { + name: "single", + edges: [][2]component.ID{ + {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "1")}, + {component.MustNewIDWithName("r", "2"), component.MustNewIDWithName("p", "1")}, + {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("p", "2")}, + {component.MustNewIDWithName("p", "2"), component.MustNewIDWithName("e", "1")}, + {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("e", "2")}, + }, + }, + { + name: "multi", + edges: [][2]component.ID{ + // Pipeline 1 + {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "1")}, + {component.MustNewIDWithName("r", "2"), component.MustNewIDWithName("p", "1")}, + {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("p", "2")}, + {component.MustNewIDWithName("p", "2"), component.MustNewIDWithName("e", "1")}, + {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("e", "2")}, + + // Pipeline 2, shares r1 and e2 + {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "3")}, + {component.MustNewIDWithName("p", "3"), component.MustNewIDWithName("e", "2")}, + }, + }, + { + name: "connected", + edges: [][2]component.ID{ + // Pipeline 1 + {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "1")}, + {component.MustNewIDWithName("r", "2"), component.MustNewIDWithName("p", "1")}, + {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("p", "2")}, + {component.MustNewIDWithName("p", "2"), component.MustNewIDWithName("e", "1")}, + {component.MustNewIDWithName("p", "1"), component.MustNewIDWithName("c", "1")}, + + // Pipeline 2, shares r1 and c1 + {component.MustNewIDWithName("r", "1"), component.MustNewIDWithName("p", "3")}, + {component.MustNewIDWithName("p", "3"), component.MustNewIDWithName("c", "1")}, + + // Pipeline 3, emits to e2 and c2 + {component.MustNewIDWithName("c", "1"), component.MustNewIDWithName("e", "2")}, + {component.MustNewIDWithName("c", "1"), component.MustNewIDWithName("c", "2")}, + + // Pipeline 4, also emits to e2 + {component.MustNewIDWithName("c", "2"), component.MustNewIDWithName("e", "2")}, + }, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + ctx := &contextWithOrder{ + Context: context.Background(), + order: map[component.ID]int{}, + } + + pg := &Graph{componentGraph: simple.NewDirectedGraph()} + pg.telemetry = componenttest.NewNopTelemetrySettings() + pg.instanceIDs = make(map[int64]*componentstatus.InstanceID) + + for _, edge := range tt.edges { + f, t := &testNode{id: edge[0]}, &testNode{id: edge[1]} + pg.instanceIDs[f.ID()] = &componentstatus.InstanceID{} + pg.instanceIDs[t.ID()] = &componentstatus.InstanceID{} + pg.componentGraph.SetEdge(simple.Edge{F: f, T: t}) + } + + require.NoError(t, pg.StartAll(ctx, &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) + for _, edge := range tt.edges { + assert.Greater(t, ctx.order[edge[0]], ctx.order[edge[1]]) + } + + ctx.order = map[component.ID]int{} + require.NoError(t, pg.ShutdownAll(ctx, statustest.NewNopStatusReporter())) + for _, edge := range tt.edges { + assert.Less(t, ctx.order[edge[0]], ctx.order[edge[1]]) + } + }) + } +} + +func TestGraphStartStopCycle(t *testing.T) { + pg := &Graph{componentGraph: simple.NewDirectedGraph()} + + r1 := &testNode{id: component.MustNewIDWithName("r", "1")} + p1 := &testNode{id: component.MustNewIDWithName("p", "1")} + c1 := &testNode{id: component.MustNewIDWithName("c", "1")} + e1 := &testNode{id: component.MustNewIDWithName("e", "1")} + + pg.instanceIDs = map[int64]*componentstatus.InstanceID{ + r1.ID(): {}, + p1.ID(): {}, + c1.ID(): {}, + e1.ID(): {}, + } + + pg.componentGraph.SetEdge(simple.Edge{F: r1, T: p1}) + pg.componentGraph.SetEdge(simple.Edge{F: p1, T: c1}) + pg.componentGraph.SetEdge(simple.Edge{F: c1, T: e1}) + pg.componentGraph.SetEdge(simple.Edge{F: c1, T: p1}) // loop back + + err := pg.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})}) + require.ErrorContains(t, err, `topo: no topological ordering: cyclic components`) + + err = pg.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()) + assert.ErrorContains(t, err, `topo: no topological ordering: cyclic components`) +} + +func TestGraphStartStopComponentError(t *testing.T) { + pg := &Graph{componentGraph: simple.NewDirectedGraph()} + pg.telemetry = componenttest.NewNopTelemetrySettings() + r1 := &testNode{ + id: component.MustNewIDWithName("r", "1"), + startErr: errors.New("foo"), + } + e1 := &testNode{ + id: component.MustNewIDWithName("e", "1"), + shutdownErr: errors.New("bar"), + } + pg.instanceIDs = map[int64]*componentstatus.InstanceID{ + r1.ID(): {}, + e1.ID(): {}, + } + pg.componentGraph.SetEdge(simple.Edge{ + F: r1, + T: e1, + }) + require.EqualError(t, pg.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})}), "foo") + assert.EqualError(t, pg.ShutdownAll(context.Background(), statustest.NewNopStatusReporter()), "bar") +} + +// This includes all tests from the previous implementation, plus a new one +// relevant only to the new graph-based implementation. +func TestGraphFailToStartAndShutdown(t *testing.T) { + errReceiverFactory := newErrReceiverFactory() + errProcessorFactory := newErrProcessorFactory() + errExporterFactory := newErrExporterFactory() + errConnectorFactory := newErrConnectorFactory() + nopReceiverFactory := receivertest.NewNopFactory() + nopProcessorFactory := processortest.NewNopFactory() + nopExporterFactory := exportertest.NewNopFactory() + nopConnectorFactory := connectortest.NewNopFactory() + + set := Settings{ + Telemetry: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + ReceiverBuilder: builders.NewReceiver( + map[component.ID]component.Config{ + component.NewID(nopReceiverFactory.Type()): nopReceiverFactory.CreateDefaultConfig(), + component.NewID(errReceiverFactory.Type()): errReceiverFactory.CreateDefaultConfig(), + }, + map[component.Type]receiver.Factory{ + nopReceiverFactory.Type(): nopReceiverFactory, + errReceiverFactory.Type(): errReceiverFactory, + }), + ProcessorBuilder: builders.NewProcessor( + map[component.ID]component.Config{ + component.NewID(nopProcessorFactory.Type()): nopProcessorFactory.CreateDefaultConfig(), + component.NewID(errProcessorFactory.Type()): errProcessorFactory.CreateDefaultConfig(), + }, + map[component.Type]processor.Factory{ + nopProcessorFactory.Type(): nopProcessorFactory, + errProcessorFactory.Type(): errProcessorFactory, + }), + ExporterBuilder: builders.NewExporter( + map[component.ID]component.Config{ + component.NewID(nopExporterFactory.Type()): nopExporterFactory.CreateDefaultConfig(), + component.NewID(errExporterFactory.Type()): errExporterFactory.CreateDefaultConfig(), + }, + map[component.Type]exporter.Factory{ + nopExporterFactory.Type(): nopExporterFactory, + errExporterFactory.Type(): errExporterFactory, + }), + ConnectorBuilder: builders.NewConnector( + map[component.ID]component.Config{ + component.NewIDWithName(nopConnectorFactory.Type(), "conn"): nopConnectorFactory.CreateDefaultConfig(), + component.NewIDWithName(errConnectorFactory.Type(), "conn"): errConnectorFactory.CreateDefaultConfig(), + }, + map[component.Type]connector.Factory{ + nopConnectorFactory.Type(): nopConnectorFactory, + errConnectorFactory.Type(): errConnectorFactory, + }), + } + + dataTypes := []pipeline.Signal{pipeline.SignalTraces, pipeline.SignalMetrics, pipeline.SignalLogs} + for _, dt := range dataTypes { + t.Run(dt.String()+"/receiver", func(t *testing.T) { + set.PipelineConfigs = pipelines.Config{ + pipeline.NewID(dt): { + Receivers: []component.ID{component.MustNewID("nop"), component.MustNewID("err")}, + Processors: []component.ID{component.MustNewID("nop")}, + Exporters: []component.ID{component.MustNewID("nop")}, + }, + } + pipelines, err := Build(context.Background(), set) + require.NoError(t, err) + require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) + assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) + }) + + t.Run(dt.String()+"/processor", func(t *testing.T) { + set.PipelineConfigs = pipelines.Config{ + pipeline.NewID(dt): { + Receivers: []component.ID{component.MustNewID("nop")}, + Processors: []component.ID{component.MustNewID("nop"), component.MustNewID("err")}, + Exporters: []component.ID{component.MustNewID("nop")}, + }, + } + pipelines, err := Build(context.Background(), set) + require.NoError(t, err) + require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) + assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) + }) + + t.Run(dt.String()+"/exporter", func(t *testing.T) { + set.PipelineConfigs = pipelines.Config{ + pipeline.NewID(dt): { + Receivers: []component.ID{component.MustNewID("nop")}, + Processors: []component.ID{component.MustNewID("nop")}, + Exporters: []component.ID{component.MustNewID("nop"), component.MustNewID("err")}, + }, + } + pipelines, err := Build(context.Background(), set) + require.NoError(t, err) + require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) + assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) + }) + + for _, dt2 := range dataTypes { + t.Run(dt.String()+"/"+dt2.String()+"/connector", func(t *testing.T) { + set.PipelineConfigs = pipelines.Config{ + pipeline.NewIDWithName(dt, "in"): { + Receivers: []component.ID{component.MustNewID("nop")}, + Processors: []component.ID{component.MustNewID("nop")}, + Exporters: []component.ID{component.MustNewID("nop"), component.MustNewIDWithName("err", "conn")}, + }, + pipeline.NewIDWithName(dt2, "out"): { + Receivers: []component.ID{component.MustNewID("nop"), component.MustNewIDWithName("err", "conn")}, + Processors: []component.ID{component.MustNewID("nop")}, + Exporters: []component.ID{component.MustNewID("nop")}, + }, + } + pipelines, err := Build(context.Background(), set) + require.NoError(t, err) + require.Error(t, pipelines.StartAll(context.Background(), &Host{Reporter: status.NewReporter(func(*componentstatus.InstanceID, *componentstatus.Event) {}, func(error) {})})) + assert.Error(t, pipelines.ShutdownAll(context.Background(), statustest.NewNopStatusReporter())) + }) + } + } +} + +func TestStatusReportedOnStartupShutdown(t *testing.T) { + + rNoErr := &testNode{id: component.MustNewIDWithName("r_no_err", "1")} + rStErr := &testNode{id: component.MustNewIDWithName("r_st_err", "1"), startErr: assert.AnError} + rSdErr := &testNode{id: component.MustNewIDWithName("r_sd_err", "1"), shutdownErr: assert.AnError} + + eNoErr := &testNode{id: component.MustNewIDWithName("e_no_err", "1")} + eStErr := &testNode{id: component.MustNewIDWithName("e_st_err", "1"), startErr: assert.AnError} + eSdErr := &testNode{id: component.MustNewIDWithName("e_sd_err", "1"), shutdownErr: assert.AnError} + + instanceIDs := map[*testNode]*componentstatus.InstanceID{ + rNoErr: componentstatus.NewInstanceID(rNoErr.id, component.KindReceiver), + rStErr: componentstatus.NewInstanceID(rStErr.id, component.KindReceiver), + rSdErr: componentstatus.NewInstanceID(rSdErr.id, component.KindReceiver), + eNoErr: componentstatus.NewInstanceID(eNoErr.id, component.KindExporter), + eStErr: componentstatus.NewInstanceID(eStErr.id, component.KindExporter), + eSdErr: componentstatus.NewInstanceID(eSdErr.id, component.KindExporter), + } + + // compare two maps of status events ignoring timestamp + assertEqualStatuses := func(t *testing.T, evMap1, evMap2 map[*componentstatus.InstanceID][]*componentstatus.Event) { + assert.Equal(t, len(evMap1), len(evMap2)) + for id, evts1 := range evMap1 { + evts2 := evMap2[id] + assert.Equal(t, len(evts1), len(evts2)) + for i := 0; i < len(evts1); i++ { + ev1 := evts1[i] + ev2 := evts2[i] + assert.Equal(t, ev1.Status(), ev2.Status()) + assert.Equal(t, ev1.Err(), ev2.Err()) + } + } + + } + + for _, tt := range []struct { + name string + edge [2]*testNode + expectedStatuses map[*componentstatus.InstanceID][]*componentstatus.Event + startupErr error + shutdownErr error + }{ + { + name: "successful startup/shutdown", + edge: [2]*testNode{rNoErr, eNoErr}, + expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ + instanceIDs[rNoErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewEvent(componentstatus.StatusOK), + componentstatus.NewEvent(componentstatus.StatusStopping), + componentstatus.NewEvent(componentstatus.StatusStopped), + }, + instanceIDs[eNoErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewEvent(componentstatus.StatusOK), + componentstatus.NewEvent(componentstatus.StatusStopping), + componentstatus.NewEvent(componentstatus.StatusStopped), + }, + }, + }, + { + name: "early startup error", + edge: [2]*testNode{rNoErr, eStErr}, + expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ + instanceIDs[eStErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewPermanentErrorEvent(assert.AnError), + }, + }, + startupErr: assert.AnError, + }, + { + name: "late startup error", + edge: [2]*testNode{rStErr, eNoErr}, + expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ + instanceIDs[rStErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewPermanentErrorEvent(assert.AnError), + }, + instanceIDs[eNoErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewEvent(componentstatus.StatusOK), + componentstatus.NewEvent(componentstatus.StatusStopping), + componentstatus.NewEvent(componentstatus.StatusStopped), + }, + }, + startupErr: assert.AnError, + }, + { + name: "early shutdown error", + edge: [2]*testNode{rSdErr, eNoErr}, + expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ + instanceIDs[rSdErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewEvent(componentstatus.StatusOK), + componentstatus.NewEvent(componentstatus.StatusStopping), + componentstatus.NewPermanentErrorEvent(assert.AnError), + }, + instanceIDs[eNoErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewEvent(componentstatus.StatusOK), + componentstatus.NewEvent(componentstatus.StatusStopping), + componentstatus.NewEvent(componentstatus.StatusStopped), + }, + }, + shutdownErr: assert.AnError, + }, + { + name: "late shutdown error", + edge: [2]*testNode{rNoErr, eSdErr}, + expectedStatuses: map[*componentstatus.InstanceID][]*componentstatus.Event{ + instanceIDs[rNoErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewEvent(componentstatus.StatusOK), + componentstatus.NewEvent(componentstatus.StatusStopping), + componentstatus.NewEvent(componentstatus.StatusStopped), + }, + instanceIDs[eSdErr]: { + componentstatus.NewEvent(componentstatus.StatusStarting), + componentstatus.NewEvent(componentstatus.StatusOK), + componentstatus.NewEvent(componentstatus.StatusStopping), + componentstatus.NewPermanentErrorEvent(assert.AnError), + }, + }, + shutdownErr: assert.AnError, + }, + } { + t.Run(tt.name, func(t *testing.T) { + pg := &Graph{componentGraph: simple.NewDirectedGraph()} + pg.telemetry = componenttest.NewNopTelemetrySettings() + + actualStatuses := make(map[*componentstatus.InstanceID][]*componentstatus.Event) + rep := status.NewReporter(func(id *componentstatus.InstanceID, ev *componentstatus.Event) { + actualStatuses[id] = append(actualStatuses[id], ev) + }, func(error) { + }) + + rep.Ready() + + e0, e1 := tt.edge[0], tt.edge[1] + pg.instanceIDs = map[int64]*componentstatus.InstanceID{ + e0.ID(): instanceIDs[e0], + e1.ID(): instanceIDs[e1], + } + pg.componentGraph.SetEdge(simple.Edge{F: e0, T: e1}) + + assert.Equal(t, tt.startupErr, pg.StartAll(context.Background(), &Host{Reporter: rep})) + assert.Equal(t, tt.shutdownErr, pg.ShutdownAll(context.Background(), rep)) + assertEqualStatuses(t, tt.expectedStatuses, actualStatuses) + }) + } +} diff --git a/service/internal/graph/util_test.go b/service/internal/graph/util_test.go new file mode 100644 index 00000000000..43f1e64bb6c --- /dev/null +++ b/service/internal/graph/util_test.go @@ -0,0 +1,299 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package graph + +import ( + "context" + "errors" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/connectorprofiles" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterprofiles" + "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorprofiles" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverprofiles" + "go.opentelemetry.io/collector/service/pipelines" +) + +var _ component.Component = (*testNode)(nil) + +type testNode struct { + id component.ID + startErr error + shutdownErr error +} + +// ID satisfies the graph.Node interface, allowing +// testNode to be used in a simple.DirectedGraph +func (n *testNode) ID() int64 { + return int64(newNodeID(n.id.String())) +} + +func (n *testNode) Start(ctx context.Context, _ component.Host) error { + if n.startErr != nil { + return n.startErr + } + if cwo, ok := ctx.(*contextWithOrder); ok { + cwo.record(n.id) + } + return nil +} + +func (n *testNode) Shutdown(ctx context.Context) error { + if n.shutdownErr != nil { + return n.shutdownErr + } + if cwo, ok := ctx.(*contextWithOrder); ok { + cwo.record(n.id) + } + return nil +} + +type contextWithOrder struct { + context.Context + sync.Mutex + next int + order map[component.ID]int +} + +func (c *contextWithOrder) record(id component.ID) { + c.Lock() + c.order[id] = c.next + c.next++ + c.Unlock() +} + +func (g *Graph) getReceivers() map[pipeline.Signal]map[component.ID]component.Component { + receiversMap := make(map[pipeline.Signal]map[component.ID]component.Component) + receiversMap[pipeline.SignalTraces] = make(map[component.ID]component.Component) + receiversMap[pipeline.SignalMetrics] = make(map[component.ID]component.Component) + receiversMap[pipeline.SignalLogs] = make(map[component.ID]component.Component) + receiversMap[componentprofiles.SignalProfiles] = make(map[component.ID]component.Component) + + for _, pg := range g.pipelines { + for _, rcvrNode := range pg.receivers { + rcvrOrConnNode := g.componentGraph.Node(rcvrNode.ID()) + rcvrNode, ok := rcvrOrConnNode.(*receiverNode) + if !ok { + continue + } + receiversMap[rcvrNode.pipelineType][rcvrNode.componentID] = rcvrNode.Component + } + } + return receiversMap +} + +// Calculates the expected number of receiver and exporter instances in the specified pipeline. +// +// Expect one instance of each receiver and exporter, unless it is a connector. +// +// For Connectors: +// - Let E equal the number of pipeline types in which the connector is used as an exporter. +// - Let R equal the number of pipeline types in which the connector is used as a receiver. +// +// Within the graph as a whole, we expect E*R instances, i.e. one per combination of data types. +// +// However, within an individual pipeline, we expect: +// - E instances of the connector as a receiver. +// - R instances of the connector as an exporter. +func expectedInstances(m pipelines.Config, pID pipeline.ID) (int, int) { + exConnectorType := component.MustNewType("exampleconnector") + var r, e int + for _, rID := range m[pID].Receivers { + if rID.Type() != exConnectorType { + r++ + continue + } + + // This is a connector. Count the pipeline types where it is an exporter. + typeMap := map[pipeline.Signal]bool{} + for pID, pCfg := range m { + for _, eID := range pCfg.Exporters { + if eID == rID { + typeMap[pID.Signal()] = true + } + } + } + r += len(typeMap) + } + for _, eID := range m[pID].Exporters { + if eID.Type() != exConnectorType { + e++ + continue + } + + // This is a connector. Count the pipeline types where it is a receiver. + typeMap := map[pipeline.Signal]bool{} + for pID, pCfg := range m { + for _, rID := range pCfg.Receivers { + if rID == eID { + typeMap[pID.Signal()] = true + } + } + } + e += len(typeMap) + } + return r, e +} + +func newBadReceiverFactory() receiver.Factory { + return receiver.NewFactory(component.MustNewType("bf"), func() component.Config { + return &struct{}{} + }) +} + +func newBadProcessorFactory() processor.Factory { + return processor.NewFactory(component.MustNewType("bf"), func() component.Config { + return &struct{}{} + }) +} + +func newBadExporterFactory() exporter.Factory { + return exporter.NewFactory(component.MustNewType("bf"), func() component.Config { + return &struct{}{} + }) +} + +func newBadConnectorFactory() connector.Factory { + return connector.NewFactory(component.MustNewType("bf"), func() component.Config { + return &struct{}{} + }) +} + +func newErrReceiverFactory() receiver.Factory { + return receiverprofiles.NewFactory(component.MustNewType("err"), + func() component.Config { return &struct{}{} }, + receiverprofiles.WithTraces(func(context.Context, receiver.Settings, component.Config, consumer.Traces) (receiver.Traces, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + receiverprofiles.WithLogs(func(context.Context, receiver.Settings, component.Config, consumer.Logs) (receiver.Logs, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + receiverprofiles.WithMetrics(func(context.Context, receiver.Settings, component.Config, consumer.Metrics) (receiver.Metrics, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + receiverprofiles.WithProfiles(func(context.Context, receiver.Settings, component.Config, consumerprofiles.Profiles) (receiverprofiles.Profiles, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + ) +} + +func newErrProcessorFactory() processor.Factory { + return processorprofiles.NewFactory(component.MustNewType("err"), + func() component.Config { return &struct{}{} }, + processorprofiles.WithTraces(func(context.Context, processor.Settings, component.Config, consumer.Traces) (processor.Traces, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + processorprofiles.WithLogs(func(context.Context, processor.Settings, component.Config, consumer.Logs) (processor.Logs, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + processorprofiles.WithMetrics(func(context.Context, processor.Settings, component.Config, consumer.Metrics) (processor.Metrics, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + processorprofiles.WithProfiles(func(context.Context, processor.Settings, component.Config, consumerprofiles.Profiles) (processorprofiles.Profiles, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + ) +} + +func newErrExporterFactory() exporter.Factory { + return exporterprofiles.NewFactory(component.MustNewType("err"), + func() component.Config { return &struct{}{} }, + exporterprofiles.WithTraces(func(context.Context, exporter.Settings, component.Config) (exporter.Traces, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + exporterprofiles.WithLogs(func(context.Context, exporter.Settings, component.Config) (exporter.Logs, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + exporterprofiles.WithMetrics(func(context.Context, exporter.Settings, component.Config) (exporter.Metrics, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + exporterprofiles.WithProfiles(func(context.Context, exporter.Settings, component.Config) (exporterprofiles.Profiles, error) { + return &errComponent{}, nil + }, component.StabilityLevelUndefined), + ) +} + +func newErrConnectorFactory() connector.Factory { + return connectorprofiles.NewFactory(component.MustNewType("err"), func() component.Config { + return &struct{}{} + }, + connectorprofiles.WithTracesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Traces, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithTracesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Traces, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithTracesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Traces, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithTracesToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connector.Traces, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + + connectorprofiles.WithMetricsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Metrics, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithMetricsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Metrics, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithMetricsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Metrics, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithMetricsToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connector.Metrics, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + + connectorprofiles.WithLogsToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connector.Logs, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithLogsToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connector.Logs, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithLogsToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connector.Logs, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithLogsToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connector.Logs, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + + connectorprofiles.WithProfilesToTraces(func(context.Context, connector.Settings, component.Config, consumer.Traces) (connectorprofiles.Profiles, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithProfilesToMetrics(func(context.Context, connector.Settings, component.Config, consumer.Metrics) (connectorprofiles.Profiles, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithProfilesToLogs(func(context.Context, connector.Settings, component.Config, consumer.Logs) (connectorprofiles.Profiles, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + connectorprofiles.WithProfilesToProfiles(func(context.Context, connector.Settings, component.Config, consumerprofiles.Profiles) (connectorprofiles.Profiles, error) { + return &errComponent{}, nil + }, component.StabilityLevelUnmaintained), + ) +} + +type errComponent struct { + consumertest.Consumer +} + +func (e errComponent) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (e errComponent) Start(context.Context, component.Host) error { + return errors.New("my error") +} + +func (e errComponent) Shutdown(context.Context) error { + return errors.New("my error") +}