Skip to content

Commit

Permalink
pass publishers interface instead of extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Sep 17, 2024
1 parent ad62aab commit 0b694d2
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 23 deletions.
9 changes: 6 additions & 3 deletions processor/internal/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

package internal // import "go.opentelemetry.io/collector/processor/internal"

import "go.opentelemetry.io/collector/component"
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata"
)

// Settings is passed to Create* functions in Factory.
type Settings struct {
Expand All @@ -15,6 +18,6 @@ type Settings struct {
// BuildInfo can be used by components for informational purposes
BuildInfo component.BuildInfo

// Extensions can be used by components to interact with extensions
Extensions func() map[component.ID]component.Component
// Publishers can be used by components to publish data after processing
Publishers []pdata.Publisher
}
10 changes: 5 additions & 5 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ func NewLogsProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
remoteTapType, _ := component.NewType("remotetap")
remoteTapId := component.NewID(remoteTapType)
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
ld, err = logsFunc(ctx, ld)

// Publish logs to the remotetap extension if active.
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
remotetap.(pdata.Publisher).PublishLogs(pdata.ComponentID(set.ID.String()), ld)
// Publish data to active streams.
for _, p := range set.Publishers {
if p.IsActive(pdata.ComponentID(set.ID.String())) {
p.PublishLogs(pdata.ComponentID(set.ID.String()), ld)
}
}

span.AddEvent("End processing.", eventOptions)
Expand Down
10 changes: 5 additions & 5 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ func NewMetricsProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
remoteTapType, _ := component.NewType("remotetap")
remoteTapId := component.NewID(remoteTapType)
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
md, err = metricsFunc(ctx, md)

// Publish metrics to the remotetap extension if active.
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
remotetap.(pdata.Publisher).PublishMetrics(pdata.ComponentID(set.ID.String()), md)
// Publish data to active streams.
for _, p := range set.Publishers {
if p.IsActive(pdata.ComponentID(set.ID.String())) {
p.PublishMetrics(pdata.ComponentID(set.ID.String()), md)
}
}

span.AddEvent("End processing.", eventOptions)
Expand Down
10 changes: 5 additions & 5 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ func NewTracesProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
remoteTapType, _ := component.NewType("remotetap")
remoteTapId := component.NewID(remoteTapType)
traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
td, err = tracesFunc(ctx, td)

// Publish traces to the remotetap extension if active.
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
remotetap.(pdata.Publisher).PublishTraces(pdata.ComponentID(set.ID.String()), td)
// Publish data to active streams.
for _, p := range set.Publishers {
if p.IsActive(pdata.ComponentID(set.ID.String())) {
p.PublishTraces(pdata.ComponentID(set.ID.String()), td)
}
}

span.AddEvent("End processing.", eventOptions)
Expand Down
5 changes: 3 additions & 2 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/pdata"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
"go.opentelemetry.io/collector/service/internal/status"
Expand All @@ -50,7 +51,7 @@ type Settings struct {

ReportStatus status.ServiceStatusFunc

Extensions func() map[component.ID]component.Component
Publishers []pdata.Publisher
}

type Graph struct {
Expand Down Expand Up @@ -289,7 +290,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
case *processorNode:
// nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters.
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, set.Extensions, g.nextConsumers(n.ID())[0])
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, set.Publishers, g.nextConsumers(n.ID())[0])
case *exporterNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
case *connectorNode:
Expand Down
11 changes: 11 additions & 0 deletions service/internal/graph/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/builders"
"go.opentelemetry.io/collector/service/internal/status"
Expand Down Expand Up @@ -64,6 +65,16 @@ func (host *Host) GetExtensions() map[component.ID]component.Component {
return host.ServiceExtensions.GetExtensions()
}

func (host *Host) GetPublishers() []pdata.Publisher {
publishers := make([]pdata.Publisher, 0)
for _, v := range host.ServiceExtensions.GetExtensions() {
if publisher, ok := v.(pdata.Publisher); ok {
publishers = append(publishers, publisher)
}
}
return publishers
}

// Deprecated: [0.79.0] This function will be removed in the future.
// Several components in the contrib repository use this function so it cannot be removed
// before those cases are removed. In most cases, use of this function can be replaced by a
Expand Down
5 changes: 3 additions & 2 deletions service/internal/graph/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/internal/fanoutconsumer"
"go.opentelemetry.io/collector/pdata"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/service/internal/builders"
Expand Down Expand Up @@ -132,11 +133,11 @@ func (n *processorNode) buildComponent(ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder builders.Processor,
extensions func() map[component.ID]component.Component,
publishers []pdata.Publisher,
next baseConsumer,
) error {
tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID)
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, Extensions: extensions}
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, Publishers: publishers}
var err error
switch n.pipelineID.Type() {
case component.DataTypeTraces:
Expand Down
2 changes: 1 addition & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func (srv *Service) initGraph(ctx context.Context, cfg Config) error {
ConnectorBuilder: srv.host.Connectors,
PipelineConfigs: cfg.Pipelines,
ReportStatus: srv.host.Reporter.ReportStatus,
Extensions: srv.host.ServiceExtensions.GetExtensions,
Publishers: srv.host.GetPublishers(),
}); err != nil {
return fmt.Errorf("failed to build pipelines: %w", err)
}
Expand Down

0 comments on commit 0b694d2

Please sign in to comment.