Skip to content

Commit

Permalink
[chore][graph] Separate node types (#11321)
Browse files Browse the repository at this point in the history
Having spent some time on #11311, I think it may be time to start
refactoring this codebase into a more maintainable state. This PR just
moves the various types of nodes into separate files, which I think is a
bit more readable when considering changes.
  • Loading branch information
djaglowski authored Oct 1, 2024
1 parent e69f2f3 commit 1295083
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 245 deletions.
40 changes: 40 additions & 0 deletions service/internal/graph/capabilities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/pipeline"
)

const capabilitiesSeed = "capabilities"

var _ consumerNode = (*capabilitiesNode)(nil)

// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
// There are two purposes for this node:
// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data.
// 2. Present a consistent "first consumer" for each pipeline.
// The nodeID is derived from "pipeline ID".
type capabilitiesNode struct {
nodeID
pipelineID pipeline.ID
baseConsumer
consumer.ConsumeTracesFunc
consumer.ConsumeMetricsFunc
consumer.ConsumeLogsFunc
consumerprofiles.ConsumeProfilesFunc
}

func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
return &capabilitiesNode{
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
pipelineID: pipelineID,
}
}

func (n *capabilitiesNode) getConsumer() baseConsumer {
return n
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,215 +5,20 @@ package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"context"
"fmt"
"hash/fnv"
"strings"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentprofiles"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/connector/connectorprofiles"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/components"
)

const (
receiverSeed = "receiver"
processorSeed = "processor"
exporterSeed = "exporter"
connectorSeed = "connector"
capabilitiesSeed = "capabilities"
fanOutToExporters = "fanout_to_exporters"
)

// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
type baseConsumer interface {
Capabilities() consumer.Capabilities
}

type nodeID int64

func (n nodeID) ID() int64 {
return int64(n)
}

func newNodeID(parts ...string) nodeID {
h := fnv.New64a()
h.Write([]byte(strings.Join(parts, "|")))
return nodeID(h.Sum64())
}

type consumerNode interface {
getConsumer() baseConsumer
}

// A receiver instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type receiverNode struct {
nodeID
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newReceiverNode(pipelineType pipeline.Signal, recvID component.ID) *receiverNode {
return &receiverNode{
nodeID: newNodeID(receiverSeed, pipelineType.String(), recvID.String()),
componentID: recvID,
pipelineType: pipelineType,
}
}

func (n *receiverNode) buildComponent(ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ReceiverBuilder,
nexts []baseConsumer,
) error {
tel.Logger = components.ReceiverLogger(tel.Logger, n.componentID, n.pipelineType)
set := receiver.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineType {
case pipeline.SignalTraces:
var consumers []consumer.Traces
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Traces))
}
n.Component, err = builder.CreateTraces(ctx, set, fanoutconsumer.NewTraces(consumers))
case pipeline.SignalMetrics:
var consumers []consumer.Metrics
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Metrics))
}
n.Component, err = builder.CreateMetrics(ctx, set, fanoutconsumer.NewMetrics(consumers))
case pipeline.SignalLogs:
var consumers []consumer.Logs
for _, next := range nexts {
consumers = append(consumers, next.(consumer.Logs))
}
n.Component, err = builder.CreateLogs(ctx, set, fanoutconsumer.NewLogs(consumers))
case componentprofiles.SignalProfiles:
var consumers []consumerprofiles.Profiles
for _, next := range nexts {
consumers = append(consumers, next.(consumerprofiles.Profiles))
}
n.Component, err = builder.CreateProfiles(ctx, set, fanoutconsumer.NewProfiles(consumers))
default:
return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType)
}
if err != nil {
return fmt.Errorf("failed to create %q receiver for data type %q: %w", set.ID, n.pipelineType, err)
}
return nil
}

var _ consumerNode = (*processorNode)(nil)

// Every processor instance is unique to one pipeline.
// Therefore, nodeID is derived from "pipeline ID" and "component ID".
type processorNode struct {
nodeID
componentID component.ID
pipelineID pipeline.ID
component.Component
}

func newProcessorNode(pipelineID pipeline.ID, procID component.ID) *processorNode {
return &processorNode{
nodeID: newNodeID(processorSeed, pipelineID.String(), procID.String()),
componentID: procID,
pipelineID: pipelineID,
}
}

func (n *processorNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *processorNode) buildComponent(ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ProcessorBuilder,
next baseConsumer,
) error {
tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID)
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineID.Signal() {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTraces(ctx, set, next.(consumer.Traces))
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetrics(ctx, set, next.(consumer.Metrics))
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogs(ctx, set, next.(consumer.Logs))
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfiles(ctx, set, next.(consumerprofiles.Profiles))
default:
return fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", set.ID, n.pipelineID.String(), n.pipelineID.Signal())
}
if err != nil {
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
}
return nil
}

var _ consumerNode = (*exporterNode)(nil)

// An exporter instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type exporterNode struct {
nodeID
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
return &exporterNode{
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
componentID: exprID,
pipelineType: pipelineType,
}
}

func (n *exporterNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *exporterNode) buildComponent(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ExporterBuilder,
) error {
tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType)
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineType {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTraces(ctx, set)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetrics(ctx, set)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogs(ctx, set)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfiles(ctx, set)
default:
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
}
if err != nil {
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
}
return nil
}
const connectorSeed = "connector"

var _ consumerNode = (*connectorNode)(nil)

Expand Down Expand Up @@ -420,52 +225,3 @@ func (n *connectorNode) buildComponent(
}
return nil
}

var _ consumerNode = (*capabilitiesNode)(nil)

// Every pipeline has a "virtual" capabilities node immediately after the receiver(s).
// There are two purposes for this node:
// 1. Present aggregated capabilities to receivers, such as whether the pipeline mutates data.
// 2. Present a consistent "first consumer" for each pipeline.
// The nodeID is derived from "pipeline ID".
type capabilitiesNode struct {
nodeID
pipelineID pipeline.ID
baseConsumer
consumer.ConsumeTracesFunc
consumer.ConsumeMetricsFunc
consumer.ConsumeLogsFunc
consumerprofiles.ConsumeProfilesFunc
}

func newCapabilitiesNode(pipelineID pipeline.ID) *capabilitiesNode {
return &capabilitiesNode{
nodeID: newNodeID(capabilitiesSeed, pipelineID.String()),
pipelineID: pipelineID,
}
}

func (n *capabilitiesNode) getConsumer() baseConsumer {
return n
}

var _ consumerNode = (*fanOutNode)(nil)

// Each pipeline has one fan-out node before exporters.
// Therefore, nodeID is derived from "pipeline ID".
type fanOutNode struct {
nodeID
pipelineID pipeline.ID
baseConsumer
}

func newFanOutNode(pipelineID pipeline.ID) *fanOutNode {
return &fanOutNode{
nodeID: newNodeID(fanOutToExporters, pipelineID.String()),
pipelineID: pipelineID,
}
}

func (n *fanOutNode) getConsumer() baseConsumer {
return n.baseConsumer
}
17 changes: 17 additions & 0 deletions service/internal/graph/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"go.opentelemetry.io/collector/consumer"
)

// baseConsumer redeclared here since not public in consumer package. May consider to make that public.
type baseConsumer interface {
Capabilities() consumer.Capabilities
}

type consumerNode interface {
getConsumer() baseConsumer
}
68 changes: 68 additions & 0 deletions service/internal/graph/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package graph // import "go.opentelemetry.io/collector/service/internal/graph"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentprofiles"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/components"
)

const exporterSeed = "exporter"

var _ consumerNode = (*exporterNode)(nil)

// An exporter instance can be shared by multiple pipelines of the same type.
// Therefore, nodeID is derived from "pipeline type" and "component ID".
type exporterNode struct {
nodeID
componentID component.ID
pipelineType pipeline.Signal
component.Component
}

func newExporterNode(pipelineType pipeline.Signal, exprID component.ID) *exporterNode {
return &exporterNode{
nodeID: newNodeID(exporterSeed, pipelineType.String(), exprID.String()),
componentID: exprID,
pipelineType: pipelineType,
}
}

func (n *exporterNode) getConsumer() baseConsumer {
return n.Component.(baseConsumer)
}

func (n *exporterNode) buildComponent(
ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ExporterBuilder,
) error {
tel.Logger = components.ExporterLogger(tel.Logger, n.componentID, n.pipelineType)
set := exporter.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
var err error
switch n.pipelineType {
case pipeline.SignalTraces:
n.Component, err = builder.CreateTraces(ctx, set)
case pipeline.SignalMetrics:
n.Component, err = builder.CreateMetrics(ctx, set)
case pipeline.SignalLogs:
n.Component, err = builder.CreateLogs(ctx, set)
case componentprofiles.SignalProfiles:
n.Component, err = builder.CreateProfiles(ctx, set)
default:
return fmt.Errorf("error creating exporter %q for data type %q is not supported", set.ID, n.pipelineType)
}
if err != nil {
return fmt.Errorf("failed to create %q exporter for data type %q: %w", set.ID, n.pipelineType, err)
}
return nil
}
Loading

0 comments on commit 1295083

Please sign in to comment.