Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remotetap extension concept #10963

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pdata/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package pdata // import "go.opentelemetry.io/collector/pdata"

import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

type ComponentID string

// Publisher is used by components to publish data to the RemoteTap extension.
type Publisher interface {
// IsActive returns true when at least one connection is open for the given componentID.
IsActive(ComponentID) bool
// PublishMetrics sends metrics for a given componentID.
PublishMetrics(ComponentID, pmetric.Metrics)
// PublishTraces sends traces for a given componentID.
PublishTraces(ComponentID, ptrace.Traces)
// PublishLogs sends logs for a given componentID.
PublishLogs(ComponentID, plog.Logs)
}
3 changes: 3 additions & 0 deletions processor/internal/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ type Settings struct {

// BuildInfo can be used by components for informational purposes
BuildInfo component.BuildInfo

// Extensions can be used by components to interact with extensions
Extensions func() map[component.ID]component.Component
}
9 changes: 9 additions & 0 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
)
Expand Down Expand Up @@ -41,11 +42,19 @@ func NewLogsProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
remoteTapType, _ := component.NewType("remotetap")
remoteTapId := component.NewID(remoteTapType)
logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
ld, err = logsFunc(ctx, ld)

// Publish logs to the remotetap extension if active.
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
remotetap.(pdata.Publisher).PublishLogs(pdata.ComponentID(set.ID.String()), ld)
}

span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
Expand Down
9 changes: 9 additions & 0 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor"
)
Expand Down Expand Up @@ -41,11 +42,19 @@ func NewMetricsProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
remoteTapType, _ := component.NewType("remotetap")
remoteTapId := component.NewID(remoteTapType)
metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
md, err = metricsFunc(ctx, md)

// Publish metrics to the remotetap extension if active.
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
remotetap.(pdata.Publisher).PublishMetrics(pdata.ComponentID(set.ID.String()), md)
}

span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
Expand Down
9 changes: 9 additions & 0 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
)
Expand Down Expand Up @@ -41,11 +42,19 @@ func NewTracesProcessor(

eventOptions := spanAttributes(set.ID)
bs := fromOptions(options)
remoteTapType, _ := component.NewType("remotetap")
wildum marked this conversation as resolved.
Show resolved Hide resolved
remoteTapId := component.NewID(remoteTapType)
traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
var err error
td, err = tracesFunc(ctx, td)

// Publish traces to the remotetap extension if active.
if remotetap := set.Extensions()[remoteTapId]; remotetap != nil && remotetap.(pdata.Publisher).IsActive(pdata.ComponentID(set.ID.String())) {
remotetap.(pdata.Publisher).PublishTraces(pdata.ComponentID(set.ID.String()), td)
}

span.AddEvent("End processing.", eventOptions)
if err != nil {
if errors.Is(err, ErrSkipProcessingData) {
Expand Down
4 changes: 3 additions & 1 deletion service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Settings struct {
PipelineConfigs pipelines.Config

ReportStatus status.ServiceStatusFunc

Extensions func() map[component.ID]component.Component
wildum marked this conversation as resolved.
Show resolved Hide resolved
}

type Graph struct {
Expand Down Expand Up @@ -287,7 +289,7 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID()))
case *processorNode:
// nextConsumers is guaranteed to be length 1. Either it is the next processor or it is the fanout node for the exporters.
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0])
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, set.Extensions, g.nextConsumers(n.ID())[0])
case *exporterNode:
err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder)
case *connectorNode:
Expand Down
3 changes: 2 additions & 1 deletion service/internal/graph/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ func (n *processorNode) buildComponent(ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder builders.Processor,
extensions func() map[component.ID]component.Component,
next baseConsumer,
) error {
tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID)
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info}
set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, Extensions: extensions}
var err error
switch n.pipelineID.Type() {
case component.DataTypeTraces:
Expand Down
7 changes: 4 additions & 3 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
// ignore other errors as they represent invalid state transitions and are considered benign.
})

if err = srv.initGraph(ctx, cfg); err != nil {
// process the configuration and initialize the pipeline
if err = srv.initExtensions(ctx, cfg.Extensions); err != nil {
err = multierr.Append(err, srv.shutdownTelemetry(ctx))
return nil, err
}

// process the configuration and initialize the pipeline
if err = srv.initExtensions(ctx, cfg.Extensions); err != nil {
if err = srv.initGraph(ctx, cfg); err != nil {
err = multierr.Append(err, srv.shutdownTelemetry(ctx))
return nil, err
}
Expand Down Expand Up @@ -371,6 +371,7 @@ func (srv *Service) initGraph(ctx context.Context, cfg Config) error {
ConnectorBuilder: srv.host.Connectors,
PipelineConfigs: cfg.Pipelines,
ReportStatus: srv.host.Reporter.ReportStatus,
Extensions: srv.host.ServiceExtensions.GetExtensions,
}); err != nil {
return fmt.Errorf("failed to build pipelines: %w", err)
}
Expand Down
Loading