-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PoC][DO NOT MERGE] Pipelines without a service #5149
Closed
Closed
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package pipeline // import "go.opentelemetry.io/collector/pipeline" | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
"go.uber.org/multierr" | ||
) | ||
|
||
var _ component.Host = (*host)(nil) | ||
|
||
// host is a component.Host for a single pipeline. | ||
// It has no extensions and a single exporter. | ||
type host struct { | ||
// factories available for components. | ||
factories component.Factories | ||
// datatype is the telemetry signal type for the pipeline. | ||
datatype config.DataType | ||
// exporterID is the component ID for the pipeline's exporter. | ||
exporterID config.ComponentID | ||
// exporter is the exporter on this pipeline. | ||
exporter component.Exporter | ||
// asyncErrorChannel is used to signal a fatal error from any component. | ||
asyncErrorChannel chan error | ||
} | ||
|
||
// ReportFatalError implements the component.Host interface. | ||
func (h *host) ReportFatalError(err error) { | ||
h.asyncErrorChannel <- err | ||
} | ||
|
||
// GetFactory implements the component.Host interface. | ||
func (h *host) GetFactory(kind component.Kind, componentType config.Type) component.Factory { | ||
switch kind { | ||
case component.KindReceiver: | ||
return h.factories.Receivers[componentType] | ||
case component.KindExtension: | ||
return h.factories.Extensions[componentType] | ||
case component.KindExporter: | ||
return h.factories.Exporters[componentType] | ||
case component.KindProcessor: | ||
return h.factories.Processors[componentType] | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// GetExtensions implements the component.Host interface. | ||
func (*host) GetExtensions() map[config.ComponentID]component.Extension { | ||
// TODO: A pipeline may want to have extensions for authentication; | ||
// this would not be hard to add here but it is not implemented in this PoC. | ||
return map[config.ComponentID]component.Extension{} | ||
} | ||
|
||
// GetExporters implements the component.Host interface. | ||
func (h *host) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter { | ||
// Return the only exporter available on the pipeline. | ||
return map[config.DataType]map[config.ComponentID]component.Exporter{ | ||
h.datatype: {h.exporterID: h.exporter}, | ||
} | ||
} | ||
|
||
// Pipeline is a telemetry pipeline. | ||
type Pipeline struct { | ||
// components is the list of components in the pipeline: | ||
// - components[i+1] sends data to components[i] | ||
// - components[0] is the exporter | ||
// - components[-1] is the receiver. | ||
components []component.Component | ||
|
||
// host is the pipeline's component.Host | ||
host *host | ||
|
||
shutdownChan chan struct{} | ||
} | ||
|
||
// Run the pipeline until one component errors out. | ||
func (p *Pipeline) Run(ctx context.Context) error { | ||
for _, component := range p.components { | ||
err := component.Start(ctx, p.host) | ||
if err != nil { | ||
// TODO: Components up to this point should be shutdown. | ||
return err | ||
} | ||
} | ||
|
||
for { | ||
select { | ||
case err := <-p.host.asyncErrorChannel: | ||
shutdownErr := p.shutdown(ctx) | ||
return multierr.Append(err, shutdownErr) | ||
case <-ctx.Done(): | ||
return p.shutdown(ctx) | ||
case <-p.shutdownChan: | ||
return p.shutdown(ctx) | ||
} | ||
} | ||
|
||
} | ||
|
||
// shutdown the pipeline components. | ||
func (p *Pipeline) shutdown(ctx context.Context) (err error) { | ||
for _, component := range p.components { | ||
multierr.Append(err, component.Shutdown(ctx)) | ||
} | ||
return | ||
} | ||
|
||
// Shutdown the pipeline. | ||
func (p *Pipeline) Shutdown() { | ||
close(p.shutdownChan) | ||
} | ||
|
||
// Builder builds a pipeline. | ||
type Builder struct { | ||
set component.TelemetrySettings | ||
buildInfo component.BuildInfo | ||
factories component.Factories | ||
} | ||
|
||
// NewBuilder creates a pipeline builder. | ||
func NewBuilder( | ||
set component.TelemetrySettings, | ||
buildInfo component.BuildInfo, | ||
factories component.Factories, | ||
) *Builder { | ||
|
||
// TODO: Should the Exporter|Receiver|ProcessorCreateSettings be passed instead? | ||
return &Builder{ | ||
set: set, | ||
buildInfo: buildInfo, | ||
factories: factories, | ||
} | ||
} | ||
|
||
// BuildMetricsPipeline creates a metrics pipeline based on the configuration of components. | ||
// Components' configuration need to have a valid component ID. | ||
// | ||
// TODO: Some missing functionality here: | ||
// - Factories can't be reused. | ||
// - Extensions are not suppported. | ||
// - Exactly one receiver and exporter is allowed. | ||
func (b *Builder) BuildMetricsPipeline( | ||
ctx context.Context, | ||
receiver config.Receiver, | ||
processors []config.Processor, | ||
exporter config.Exporter, | ||
) (*Pipeline, error) { | ||
|
||
// TODO: The logger is passed as-is; in a real implementation, | ||
// we would want to add fields for context as done in service.Service. | ||
pipeline := &Pipeline{ | ||
shutdownChan: make(chan struct{}), | ||
} | ||
|
||
eFactory, ok := b.factories.Exporters[exporter.ID().Type()] | ||
if !ok { | ||
return nil, fmt.Errorf("factory not found for %q exporter", exporter.ID()) | ||
} | ||
|
||
c, err := eFactory.CreateMetricsExporter( | ||
ctx, | ||
component.ExporterCreateSettings{ | ||
BuildInfo: b.buildInfo, | ||
TelemetrySettings: b.set, | ||
}, | ||
exporter, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
pipeline.components = append(pipeline.components, c) | ||
pipeline.host = &host{ | ||
factories: b.factories, | ||
datatype: config.MetricsDataType, | ||
exporterID: exporter.ID(), | ||
exporter: c, | ||
asyncErrorChannel: make(chan error), | ||
} | ||
|
||
for i := len(processors) - 1; i >= 0; i-- { | ||
processor := processors[i] | ||
pFactory, ok := b.factories.Processors[processor.ID().Type()] | ||
if !ok { | ||
return nil, fmt.Errorf("factory not found for %q processor", processor.ID()) | ||
} | ||
c, err = pFactory.CreateMetricsProcessor( | ||
ctx, | ||
component.ProcessorCreateSettings{ | ||
BuildInfo: b.buildInfo, | ||
TelemetrySettings: b.set, | ||
}, | ||
processor, c) | ||
if err != nil { | ||
return nil, err | ||
} | ||
pipeline.components = append(pipeline.components, c) | ||
} | ||
|
||
rFactory, ok := b.factories.Receivers[receiver.ID().Type()] | ||
if !ok { | ||
return nil, fmt.Errorf("factory not found for %q receiver", receiver.ID()) | ||
} | ||
r, err := rFactory.CreateMetricsReceiver(ctx, | ||
component.ReceiverCreateSettings{ | ||
BuildInfo: b.buildInfo, | ||
TelemetrySettings: b.set, | ||
}, | ||
receiver, c, | ||
) | ||
pipeline.components = append(pipeline.components, r) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return pipeline, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package pipeline // import "go.opentelemetry.io/collector/pipeline" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/config/configgrpc" | ||
"go.opentelemetry.io/collector/config/confignet" | ||
"go.opentelemetry.io/collector/exporter/loggingexporter" | ||
"go.opentelemetry.io/collector/exporter/otlpexporter" | ||
"go.opentelemetry.io/collector/processor/batchprocessor" | ||
"go.opentelemetry.io/collector/receiver/otlpreceiver" | ||
"go.opentelemetry.io/otel/metric/nonrecording" | ||
"go.opentelemetry.io/otel/trace" | ||
"go.uber.org/zap" | ||
) | ||
|
||
func components() (component.Factories, error) { | ||
var err error | ||
factories := component.Factories{} | ||
|
||
factories.Receivers, err = component.MakeReceiverFactoryMap( | ||
otlpreceiver.NewFactory(), | ||
) | ||
if err != nil { | ||
return component.Factories{}, err | ||
} | ||
|
||
factories.Exporters, err = component.MakeExporterFactoryMap( | ||
loggingexporter.NewFactory(), | ||
otlpexporter.NewFactory(), | ||
) | ||
if err != nil { | ||
return component.Factories{}, err | ||
} | ||
|
||
factories.Processors, err = component.MakeProcessorFactoryMap( | ||
batchprocessor.NewFactory(), | ||
) | ||
if err != nil { | ||
return component.Factories{}, err | ||
} | ||
|
||
return factories, nil | ||
} | ||
|
||
func ExampleNewBuilder() { | ||
|
||
receiverFactory := otlpreceiver.NewFactory() | ||
receiverCfg := receiverFactory.CreateDefaultConfig().(*otlpreceiver.Config) | ||
receiverCfg.HTTP = nil // I need to explicitly nil HTTP, since this is done in Unmarshal usually | ||
receiverCfg.GRPC = &configgrpc.GRPCServerSettings{ | ||
NetAddr: confignet.NetAddr{ | ||
Transport: "tcp", | ||
// I can't really express 'use the default gRPC settings here' as one can do by setting 'grpc:' on the YAML | ||
Endpoint: "0.0.0.0:4317", | ||
}, | ||
// I only know this by reading the code | ||
ReadBufferSize: 512 * 1024, | ||
} | ||
|
||
processorFactory := batchprocessor.NewFactory() | ||
processorCfg := processorFactory.CreateDefaultConfig() | ||
|
||
exporterFactory := loggingexporter.NewFactory() | ||
exporterCfg := exporterFactory.CreateDefaultConfig().(*loggingexporter.Config) | ||
|
||
components, err := components() | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
logger, err := zap.NewDevelopment() | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
builder := NewBuilder( | ||
component.TelemetrySettings{ | ||
Logger: logger, | ||
MeterProvider: nonrecording.NewNoopMeterProvider(), | ||
TracerProvider: trace.NewNoopTracerProvider(), | ||
}, | ||
component.NewDefaultBuildInfo(), | ||
components, | ||
) | ||
|
||
pipeline, err := builder.BuildMetricsPipeline( | ||
context.TODO(), | ||
receiverCfg, | ||
[]config.Processor{processorCfg}, | ||
exporterCfg, | ||
) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
err = pipeline.Run(context.TODO()) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This, to me, is the hardest problem to solve: declaring configuration for components is very brittle and has a big number of gotchas. The ones that I have thought about are:
Problem 1: Can't express default configuration (nil but present sections)
On the OTLP receiver and other components that have a custom unmarshaling function, we rely on having empty sections for configuration:
We can't express that on the Go API easily (setting to nil disables them), and we need to explicitly declare the endpoint. In fact, for the OTLP receiver, we need to explicitly disable an endpoint by setting to nil instead of the other way around.
Problem 2: Users can't rely on zero values
Configuration structs on the different components don't usually have the zero value of the fields as the default. For example, one may be tempted to set the default configuration for gRPC connection as:
while disabling the read buffer is not necessarily catastrophic, in other situations it can lead to more serious unexpected side effects. This can be partially mitigated by components if they assign meaning to zero values, but this is not always possible (e.g. what about booleans?)
Problem 3: Can't express overrides
Imagine a component with a config like the following:
A component can express this right now via a custom unmarshaling function, by checking if
foo::timeout
orbar::timeout
are set and overriding if so. In the Go API, we can't really do this:I think all of these are solvable by adding a configuration builder for each component like
otlpreceiver.NewConfig(options ...otlpreceiver.ConfigOption)
and nudging users toward using those, but it needs individual component support. Furthermore, since the configs of components are meant to have all fields public and behave like PODs, users can be tempted to use the configuration struct directly regardless. This is not very worrisome (nobody doeszap.Logger{}
, even if you can), but it's yet another point of friction.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mx-psi I understand all these possible problems, but I am still not sure I understand what are you trying to solve. In your case if I understood correctly you have a "hardcoded" pipeline, and you don't need to let users define the config. Do you care that you have to do some "hardcoded" config? What problem are you trying to solve with this proposal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try to describe my use case in more detail. We use the Collector on the Datadog Agent, an observability daemon like the Collector. We have fixed pipelines for different telemetry signals with
Parts of the configuration for the pipelines are hardcoded (e.g. what components the pipeline has), while other parts are customizable by end-users via YAML (but not necessarily in the format of the Collector).
I am okay with hardcoding things. We currently use a custom ConfigProvider that assembles a
config.Map
from the YAML configuration and the fixed settings, and we run aservice.Collector
instead of an individual pipeline.However, right now I either have to hardcode things in a config.Map or deal with the problems described in my message above, neither option being ideal.
The overarching problem I am trying to solve is to run a pipeline within another existing application and integrating it with this application (e.g. by integrating the logging, metrics, CLI or configuration). In particular, this PoC stemmed from my need for overriding telemetry providers so that we can eventually access the different components' telemetry.
I hoped to solve this with #5107, but I understand that
service.Collector
may not fit my use case and I want to take your concerns seriously, thus I am exploring the second option of getting rid of it entirely and just having a lightweight pipeline builder.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry my comment about
What problem are you trying to solve with this proposal?
was not clear, I was referring to the config part not the initial problem you are trying to solve.So for configuration let me see if I understood the problem:
Datadog Agent wants to provide some configuration for some of the components in this hardcoded pipeline that they need, in a yaml format that is not the collectors yaml
Some solutions that I see:
receiver
,service
, etc.). In this case you should parse that as amap[string]interface{}
->config.Map
-> Maybe do any conversion to the Map -> Unmarshal into the componentConfig
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like I misunderstood you but I think I managed to answer the question anyway :)
I think this is a good summary, thanks for taking the time to work with me on this. Right now indeed we are using solution (1), which works even if a bit cumbersome to use. Solution (2) has the pitfalls I described above and I don't feel like it's safe to use with the current API design.
In any case, I still don't feel like this solves the original 'overriding telemetry providers' problem, in that I wouldn't want to use code building pipelines like the one on this PR if it did not live on the Collector repo: things today are mostly designed to work only with the 'Collector distro' use case in mind, and I would worry that I can't reliably keep using the latest version of upstream. Do you think it makes sense to have pipeline-building code on
go.opentelemetry.io/collector
? Is this better supported by eventually exposingservice.Service
or do we need this different thing?