Skip to content

Commit

Permalink
[chore][graph] Decompose buildConnector (#11330)
Browse files Browse the repository at this point in the history
The primary goal here was to delegate from each switch case, rather than
nest logic as deep as necessary.
  • Loading branch information
djaglowski authored Oct 2, 2024
1 parent 69ff46b commit 4ace638
Showing 1 changed file with 203 additions and 157 deletions.
360 changes: 203 additions & 157 deletions service/internal/graph/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,173 +55,219 @@ func (n *connectorNode) buildComponent(
) error {
tel.Logger = components.ConnectorLogger(tel.Logger, n.componentID, n.exprPipelineType, n.rcvrPipelineType)
set := connector.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}

switch n.rcvrPipelineType {
case pipeline.SignalTraces:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewTracesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewTraces(conn, capability)
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
return n.buildTraces(ctx, set, builder, nexts)
case pipeline.SignalMetrics:
return n.buildMetrics(ctx, set, builder, nexts)
case pipeline.SignalLogs:
return n.buildLogs(ctx, set, builder, nexts)
case componentprofiles.SignalProfiles:
return n.buildProfiles(ctx, set, builder, nexts)
}
return nil
}

func (n *connectorNode) buildTraces(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Traces)
}
next := connector.NewTracesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToTraces(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalTraces {
n.baseConsumer = capabilityconsumer.NewTraces(
n.Component.(connector.Traces),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildMetrics(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
}
next := connector.NewMetricsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Metrics)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewMetricsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewMetrics(conn, capability)
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
conn, err := builder.CreateMetricsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connector.NewLogsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewLogs(conn, capability)
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
conn, err := builder.CreateLogsToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
capability := consumer.Capabilities{MutatesData: false}
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
capability.MutatesData = capability.MutatesData || next.Capabilities().MutatesData
}
next := connectorprofiles.NewProfilesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component = conn
// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
capability.MutatesData = capability.MutatesData || conn.Capabilities().MutatesData
n.baseConsumer = capabilityconsumer.NewProfiles(conn, capability)
conn, err := builder.CreateProfilesToMetrics(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalMetrics {
n.baseConsumer = capabilityconsumer.NewMetrics(
n.Component.(connector.Metrics),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildLogs(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumer.Logs)
}
next := connector.NewLogsRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToLogs(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == pipeline.SignalLogs {
n.baseConsumer = capabilityconsumer.NewLogs(
n.Component.(connector.Logs),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

func (n *connectorNode) buildProfiles(
ctx context.Context,
set connector.Settings,
builder *builders.ConnectorBuilder,
nexts []baseConsumer,
) error {
consumers := make(map[pipeline.ID]consumerprofiles.Profiles, len(nexts))
for _, next := range nexts {
consumers[next.(*capabilitiesNode).pipelineID] = next.(consumerprofiles.Profiles)
}
next := connectorprofiles.NewProfilesRouter(consumers)

switch n.exprPipelineType {
case pipeline.SignalTraces:
conn, err := builder.CreateTracesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalMetrics:
conn, err := builder.CreateMetricsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case pipeline.SignalLogs:
conn, err := builder.CreateLogsToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
case componentprofiles.SignalProfiles:
conn, err := builder.CreateProfilesToProfiles(ctx, set, next)
if err != nil {
return err
}
n.Component, n.baseConsumer = conn, conn
}

if n.exprPipelineType == componentprofiles.SignalProfiles {
n.baseConsumer = capabilityconsumer.NewProfiles(
n.Component.(connectorprofiles.Profiles),
aggregateCapabilities(n.baseConsumer, nexts),
)
}
return nil
}

// When connecting pipelines of the same data type, the connector must
// inherit the capabilities of pipelines in which it is acting as a receiver.
// Since the incoming and outgoing data types are the same, we must also consider
// that the connector itself may MutatesData.
func aggregateCapabilities(base baseConsumer, nexts []baseConsumer) consumer.Capabilities {
capabilities := base.Capabilities()
for _, next := range nexts {
capabilities.MutatesData = capabilities.MutatesData || next.Capabilities().MutatesData
}
return capabilities
}

0 comments on commit 4ace638

Please sign in to comment.