Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC][DO NOT MERGE] Pipelines without a service #5149

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 233 additions & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline // import "go.opentelemetry.io/collector/pipeline"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.uber.org/multierr"
)

var _ component.Host = (*host)(nil)

// host is a component.Host for a single pipeline.
// It has no extensions and a single exporter.
type host struct {
// factories available for components.
factories component.Factories
// datatype is the telemetry signal type for the pipeline.
datatype config.DataType
// exporterID is the component ID for the pipeline's exporter.
exporterID config.ComponentID
// exporter is the exporter on this pipeline.
exporter component.Exporter
// asyncErrorChannel is used to signal a fatal error from any component.
asyncErrorChannel chan error
}

// ReportFatalError implements the component.Host interface.
func (h *host) ReportFatalError(err error) {
h.asyncErrorChannel <- err
}

// GetFactory implements the component.Host interface.
func (h *host) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
switch kind {
case component.KindReceiver:
return h.factories.Receivers[componentType]
case component.KindExtension:
return h.factories.Extensions[componentType]
case component.KindExporter:
return h.factories.Exporters[componentType]
case component.KindProcessor:
return h.factories.Processors[componentType]
}

return nil
}

// GetExtensions implements the component.Host interface.
func (*host) GetExtensions() map[config.ComponentID]component.Extension {
// TODO: A pipeline may want to have extensions for authentication;
// this would not be hard to add here but it is not implemented in this PoC.
return map[config.ComponentID]component.Extension{}
}

// GetExporters implements the component.Host interface.
func (h *host) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
// Return the only exporter available on the pipeline.
return map[config.DataType]map[config.ComponentID]component.Exporter{
h.datatype: {h.exporterID: h.exporter},
}
}

// Pipeline is a telemetry pipeline.
type Pipeline struct {
// components is the list of components in the pipeline:
// - components[i+1] sends data to components[i]
// - components[0] is the exporter
// - components[-1] is the receiver.
components []component.Component

// host is the pipeline's component.Host
host *host

shutdownChan chan struct{}
}

// Run the pipeline until one component errors out.
func (p *Pipeline) Run(ctx context.Context) error {
for _, component := range p.components {
err := component.Start(ctx, p.host)
if err != nil {
// TODO: Components up to this point should be shutdown.
return err
}
}

for {
select {
case err := <-p.host.asyncErrorChannel:
shutdownErr := p.shutdown(ctx)
return multierr.Append(err, shutdownErr)
case <-ctx.Done():
return p.shutdown(ctx)
case <-p.shutdownChan:
return p.shutdown(ctx)
}
}

}

// shutdown the pipeline components.
func (p *Pipeline) shutdown(ctx context.Context) (err error) {
for _, component := range p.components {
multierr.Append(err, component.Shutdown(ctx))
}
return
}

// Shutdown the pipeline.
func (p *Pipeline) Shutdown() {
close(p.shutdownChan)
}

// Builder builds a pipeline.
type Builder struct {
set component.TelemetrySettings
buildInfo component.BuildInfo
factories component.Factories
}

// NewBuilder creates a pipeline builder.
func NewBuilder(
set component.TelemetrySettings,
buildInfo component.BuildInfo,
factories component.Factories,
) *Builder {

// TODO: Should the Exporter|Receiver|ProcessorCreateSettings be passed instead?
return &Builder{
set: set,
buildInfo: buildInfo,
factories: factories,
}
}

// BuildMetricsPipeline creates a metrics pipeline based on the configuration of components.
// Components' configuration need to have a valid component ID.
//
// TODO: Some missing functionality here:
// - Factories can't be reused.
// - Extensions are not suppported.
// - Exactly one receiver and exporter is allowed.
func (b *Builder) BuildMetricsPipeline(
ctx context.Context,
receiver config.Receiver,
processors []config.Processor,
exporter config.Exporter,
) (*Pipeline, error) {

// TODO: The logger is passed as-is; in a real implementation,
// we would want to add fields for context as done in service.Service.
pipeline := &Pipeline{
shutdownChan: make(chan struct{}),
}

eFactory, ok := b.factories.Exporters[exporter.ID().Type()]
if !ok {
return nil, fmt.Errorf("factory not found for %q exporter", exporter.ID())
}

c, err := eFactory.CreateMetricsExporter(
ctx,
component.ExporterCreateSettings{
BuildInfo: b.buildInfo,
TelemetrySettings: b.set,
},
exporter,
)
if err != nil {
return nil, err
}
pipeline.components = append(pipeline.components, c)
pipeline.host = &host{
factories: b.factories,
datatype: config.MetricsDataType,
exporterID: exporter.ID(),
exporter: c,
asyncErrorChannel: make(chan error),
}

for i := len(processors) - 1; i >= 0; i-- {
processor := processors[i]
pFactory, ok := b.factories.Processors[processor.ID().Type()]
if !ok {
return nil, fmt.Errorf("factory not found for %q processor", processor.ID())
}
c, err = pFactory.CreateMetricsProcessor(
ctx,
component.ProcessorCreateSettings{
BuildInfo: b.buildInfo,
TelemetrySettings: b.set,
},
processor, c)
if err != nil {
return nil, err
}
pipeline.components = append(pipeline.components, c)
}

rFactory, ok := b.factories.Receivers[receiver.ID().Type()]
if !ok {
return nil, fmt.Errorf("factory not found for %q receiver", receiver.ID())
}
r, err := rFactory.CreateMetricsReceiver(ctx,
component.ReceiverCreateSettings{
BuildInfo: b.buildInfo,
TelemetrySettings: b.set,
},
receiver, c,
)
pipeline.components = append(pipeline.components, r)
if err != nil {
return nil, err
}

return pipeline, nil
}
117 changes: 117 additions & 0 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline // import "go.opentelemetry.io/collector/pipeline"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/otel/metric/nonrecording"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

func components() (component.Factories, error) {
var err error
factories := component.Factories{}

factories.Receivers, err = component.MakeReceiverFactoryMap(
otlpreceiver.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}

factories.Exporters, err = component.MakeExporterFactoryMap(
loggingexporter.NewFactory(),
otlpexporter.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}

factories.Processors, err = component.MakeProcessorFactoryMap(
batchprocessor.NewFactory(),
)
if err != nil {
return component.Factories{}, err
}

return factories, nil
}

func ExampleNewBuilder() {

receiverFactory := otlpreceiver.NewFactory()
receiverCfg := receiverFactory.CreateDefaultConfig().(*otlpreceiver.Config)
receiverCfg.HTTP = nil // I need to explicitly nil HTTP, since this is done in Unmarshal usually
receiverCfg.GRPC = &configgrpc.GRPCServerSettings{
NetAddr: confignet.NetAddr{
Transport: "tcp",
// I can't really express 'use the default gRPC settings here' as one can do by setting 'grpc:' on the YAML
Endpoint: "0.0.0.0:4317",
},
// I only know this by reading the code
ReadBufferSize: 512 * 1024,
}
Comment on lines +68 to +79
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, to me, is the hardest problem to solve: declaring configuration for components is very brittle and has a big number of gotchas. The ones that I have thought about are:

Problem 1: Can't express default configuration (nil but present sections)

On the OTLP receiver and other components that have a custom unmarshaling function, we rely on having empty sections for configuration:

# Example: enable only HTTP with default endpoint
otlp:
  protocols:
    http:

We can't express that on the Go API easily (setting to nil disables them), and we need to explicitly declare the endpoint. In fact, for the OTLP receiver, we need to explicitly disable an endpoint by setting to nil instead of the other way around.

Problem 2: Users can't rely on zero values

Configuration structs on the different components don't usually have the zero value of the fields as the default. For example, one may be tempted to set the default configuration for gRPC connection as:

// WRONG: ReadBufferSize is 0, which disables read buffering (not the default)
receiverCfg.GRPC = &configgrpc.GRPCServerSettings{
		NetAddr: confignet.NetAddr{
			Transport: "tcp",
			Endpoint: "0.0.0.0:4317",
		},
	}

while disabling the read buffer is not necessarily catastrophic, in other situations it can lead to more serious unexpected side effects. This can be partially mitigated by components if they assign meaning to zero values, but this is not always possible (e.g. what about booleans?)

Problem 3: Can't express overrides

Imagine a component with a config like the following:

timeout: <time.Duration, 0 disables timeout>

foo:
  timeout: <time.Duration, 0 disables timeout, overrides timeout for foos>
bar:
  timeout: <time.Duration, 0 disables timeout, overrides timeout for bars>

A component can express this right now via a custom unmarshaling function, by checking if foo::timeout or bar::timeout are set and overriding if so. In the Go API, we can't really do this:

cfg := component.Config{
   Timeout: 10*time.Second,
   Foo: Foo{ Timeout: 30 *time.Second },
   // Pipeline doesn't know that Bar was unset, so it doesn't know that it should ignore Bar 0 value and use top-level timeout instead
}

I think all of these are solvable by adding a configuration builder for each component like otlpreceiver.NewConfig(options ...otlpreceiver.ConfigOption) and nudging users toward using those, but it needs individual component support. Furthermore, since the configs of components are meant to have all fields public and behave like PODs, users can be tempted to use the configuration struct directly regardless. This is not very worrisome (nobody does zap.Logger{}, even if you can), but it's yet another point of friction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mx-psi I understand all these possible problems, but I am still not sure I understand what are you trying to solve. In your case if I understood correctly you have a "hardcoded" pipeline, and you don't need to let users define the config. Do you care that you have to do some "hardcoded" config? What problem are you trying to solve with this proposal?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not sure I understand what are you trying to solve. In your case if I understood correctly you have a "hardcoded" pipeline, and you don't need to let users define the config

Let me try to describe my use case in more detail. We use the Collector on the Datadog Agent, an observability daemon like the Collector. We have fixed pipelines for different telemetry signals with

  • an OTLP receiver,
  • some processing (e.g. batching) and
  • a custom exporter that converts to the Datadog format and forwards the telemetry data internally to existing components of the Datadog Agent.

Parts of the configuration for the pipelines are hardcoded (e.g. what components the pipeline has), while other parts are customizable by end-users via YAML (but not necessarily in the format of the Collector).

Do you care that you have to do some "hardcoded" config?

I am okay with hardcoding things. We currently use a custom ConfigProvider that assembles a config.Map from the YAML configuration and the fixed settings, and we run a service.Collector instead of an individual pipeline.

However, right now I either have to hardcode things in a config.Map or deal with the problems described in my message above, neither option being ideal.

What problem are you trying to solve with this proposal?

The overarching problem I am trying to solve is to run a pipeline within another existing application and integrating it with this application (e.g. by integrating the logging, metrics, CLI or configuration). In particular, this PoC stemmed from my need for overriding telemetry providers so that we can eventually access the different components' telemetry.

I hoped to solve this with #5107, but I understand that service.Collector may not fit my use case and I want to take your concerns seriously, thus I am exploring the second option of getting rid of it entirely and just having a lightweight pipeline builder.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my comment about What problem are you trying to solve with this proposal? was not clear, I was referring to the config part not the initial problem you are trying to solve.

So for configuration let me see if I understood the problem:
Datadog Agent wants to provide some configuration for some of the components in this hardcoded pipeline that they need, in a yaml format that is not the collectors yaml

Some solutions that I see:

  1. You translate the Datadog's YAML into collector's YAML (something that you do right now). In this you need indeed to take care of the default config etc, because the "Unmarshal" function on the component config may make some assumptions about the default configuration.
  2. You translate the Datadog's YAML into collector's component Config (the unmarshalled config). Here you don't care about the "default config" etc. You define your own rules on how to configure components.
  3. You want the same component yaml config for Components that you use in the hardcoded pipeline, but in a different high-level yaml structure (no top-level fields like receiver, service, etc.). In this case you should parse that as a map[string]interface{} -> config.Map -> Maybe do any conversion to the Map -> Unmarshal into the component Config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my comment about What problem are you trying to solve with this proposal? was not clear, I was referring to the config part not the initial problem you are trying to solve.

Looks like I misunderstood you but I think I managed to answer the question anyway :)

So for configuration let me see if I understood the problem:
Datadog Agent wants to provide some configuration for some of the components in this hardcoded pipeline that they need, in a yaml format that is not the collectors yaml

I think this is a good summary, thanks for taking the time to work with me on this. Right now indeed we are using solution (1), which works even if a bit cumbersome to use. Solution (2) has the pitfalls I described above and I don't feel like it's safe to use with the current API design.

In any case, I still don't feel like this solves the original 'overriding telemetry providers' problem, in that I wouldn't want to use code building pipelines like the one on this PR if it did not live on the Collector repo: things today are mostly designed to work only with the 'Collector distro' use case in mind, and I would worry that I can't reliably keep using the latest version of upstream. Do you think it makes sense to have pipeline-building code on go.opentelemetry.io/collector? Is this better supported by eventually exposing service.Service or do we need this different thing?


processorFactory := batchprocessor.NewFactory()
processorCfg := processorFactory.CreateDefaultConfig()

exporterFactory := loggingexporter.NewFactory()
exporterCfg := exporterFactory.CreateDefaultConfig().(*loggingexporter.Config)

components, err := components()
if err != nil {
panic(err)
}

logger, err := zap.NewDevelopment()
if err != nil {
panic(err)
}

builder := NewBuilder(
component.TelemetrySettings{
Logger: logger,
MeterProvider: nonrecording.NewNoopMeterProvider(),
TracerProvider: trace.NewNoopTracerProvider(),
},
component.NewDefaultBuildInfo(),
components,
)

pipeline, err := builder.BuildMetricsPipeline(
context.TODO(),
receiverCfg,
[]config.Processor{processorCfg},
exporterCfg,
)
if err != nil {
panic(err)
}

err = pipeline.Run(context.TODO())
if err != nil {
panic(err)
}
}