diff --git a/service/collector.go b/service/collector.go index b5aa8fa69d9..9771b134680 100644 --- a/service/collector.go +++ b/service/collector.go @@ -26,19 +26,13 @@ import ( "sync/atomic" "syscall" - "go.opentelemetry.io/contrib/zpages" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/nonrecording" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/ballastextension" - "go.opentelemetry.io/collector/service/internal" - "go.opentelemetry.io/collector/service/internal/telemetrylogs" ) // State defines Collector's state. @@ -81,9 +75,8 @@ type Collector struct { set CollectorSettings logger *zap.Logger - tracerProvider trace.TracerProvider - meterProvider metric.MeterProvider - zPagesSpanProcessor *zpages.SpanProcessor + tracerProvider trace.TracerProvider + meterProvider metric.MeterProvider service *service state int32 @@ -104,6 +97,10 @@ func New(set CollectorSettings) (*Collector, error) { return nil, errors.New("invalid nil config provider") } + if set.TelemetryProvider == nil { + return nil, errors.New("invalid nil telemetry provider") + } + return &Collector{ logger: zap.NewNop(), // Set a Nop logger as a place holder until a logger is created based on configuration @@ -193,23 +190,22 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to get config: %w", err) } - if col.logger, err = telemetrylogs.NewLogger(cfg.Service.Telemetry.Logs, col.set.LoggingOptions); err != nil { - return fmt.Errorf("failed to get logger: %w", err) + + telemetrySettings, err := col.set.TelemetryProvider.SetupTelemetry(cfg.Service.Telemetry) + if err != nil { + return fmt.Errorf("failed to setup telemetry: %w", err) } - telemetrylogs.SetColGRPCLogger(col.logger, cfg.Service.Telemetry.Logs.Level) + col.logger = telemetrySettings.Logger + col.meterProvider = telemetrySettings.MeterProvider + col.tracerProvider = telemetrySettings.TracerProvider col.service, err = newService(&svcSettings{ - BuildInfo: col.set.BuildInfo, - Factories: col.set.Factories, - Config: cfg, - Telemetry: component.TelemetrySettings{ - Logger: col.logger, - TracerProvider: col.tracerProvider, - MeterProvider: col.meterProvider, - MetricsLevel: cfg.Telemetry.Metrics.Level, - }, - ZPagesSpanProcessor: col.zPagesSpanProcessor, + BuildInfo: col.set.BuildInfo, + Factories: col.set.Factories, + Config: cfg, + Telemetry: telemetrySettings, + ZPagesSpanProcessor: col.set.TelemetryProvider.ZPages(), AsyncErrorChannel: col.asyncErrorChannel, }) if err != nil { @@ -226,17 +222,6 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { // Run starts the collector according to the given configuration given, and waits for it to complete. // Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down. func (col *Collector) Run(ctx context.Context) error { - col.zPagesSpanProcessor = zpages.NewSpanProcessor() - col.tracerProvider = sdktrace.NewTracerProvider( - sdktrace.WithSampler(internal.AlwaysRecord()), - sdktrace.WithSpanProcessor(col.zPagesSpanProcessor)) - - // Set the constructed tracer provider as Global, in case any component uses the - // global TracerProvider. - otel.SetTracerProvider(col.tracerProvider) - - col.meterProvider = nonrecording.NewNoopMeterProvider() - col.asyncErrorChannel = make(chan error) if err := col.setupConfigurationComponents(ctx); err != nil { diff --git a/service/collector_test.go b/service/collector_test.go index 507a3fc6ee6..438073a1ba1 100644 --- a/service/collector_test.go +++ b/service/collector_test.go @@ -65,6 +65,7 @@ func TestCollector_StartAsGoRoutine(t *testing.T) { Factories: factories, ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, []string{"service.telemetry.metrics.address=localhost:" + strconv.FormatUint(uint64(testutil.GetAvailablePort(t)), 10)}), + TelemetryProvider: NewDefaultTelemetryProvider(), } col, err := New(set) require.NoError(t, err) @@ -104,7 +105,7 @@ func testCollectorStartHelper(t *testing.T) { ConfigProvider: MustNewDefaultConfigProvider( []string{filepath.Join("testdata", "otelcol-config.yaml")}, []string{"service.telemetry.metrics.address=localhost:" + strconv.FormatUint(uint64(metricsPort), 10)}), - LoggingOptions: []zap.Option{zap.Hooks(hook)}, + TelemetryProvider: NewDefaultTelemetryProvider(zap.Hooks(hook)), }) require.NoError(t, err) @@ -170,9 +171,10 @@ func TestCollector_ShutdownNoop(t *testing.T) { require.NoError(t, err) set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: factories, - ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil), + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil), + TelemetryProvider: NewDefaultTelemetryProvider(), } col, err := New(set) require.NoError(t, err) @@ -191,9 +193,10 @@ func TestCollector_ShutdownBeforeRun(t *testing.T) { require.NoError(t, err) set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: factories, - ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil), + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil), + TelemetryProvider: NewDefaultTelemetryProvider(), } col, err := New(set) require.NoError(t, err) @@ -223,9 +226,10 @@ func TestCollector_ClosedStateOnStartUpError(t *testing.T) { // Load a bad config causing startup to fail set := CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: factories, - ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}, nil), + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}, nil), + TelemetryProvider: NewDefaultTelemetryProvider(), } col, err := New(set) require.NoError(t, err) @@ -257,9 +261,10 @@ func TestCollector_ReportError(t *testing.T) { require.NoError(t, err) col, err := New(CollectorSettings{ - BuildInfo: component.NewDefaultBuildInfo(), - Factories: factories, - ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil), + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil), + TelemetryProvider: NewDefaultTelemetryProvider(), }) require.NoError(t, err) @@ -293,6 +298,7 @@ func TestCollector_ContextCancel(t *testing.T) { Factories: factories, ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, []string{"service.telemetry.metrics.address=localhost:" + strconv.FormatUint(uint64(testutil.GetAvailablePort(t)), 10)}), + TelemetryProvider: NewDefaultTelemetryProvider(), } col, err := New(set) require.NoError(t, err) diff --git a/service/collector_windows.go b/service/collector_windows.go index a9c74c1c55d..c518c318bb4 100644 --- a/service/collector_windows.go +++ b/service/collector_windows.go @@ -146,10 +146,9 @@ func newWithWindowsEventLogCore(set CollectorSettings, elog *eventlog.Log) (*Col if set.ConfigProvider == nil { set.ConfigProvider = MustNewDefaultConfigProvider(getConfigFlag(), getSetFlag()) } - set.LoggingOptions = append( - set.LoggingOptions, - zap.WrapCore(withWindowsCore(elog)), - ) + if set.TelemetryProvider == nil { + set.TelemetryProvider = NewDefaultTelemetryProvider(zap.WrapCore(withWindowsCore(elog))) + } return New(set) } diff --git a/service/settings.go b/service/settings.go index 9a8f6ff8be8..24f58e98730 100644 --- a/service/settings.go +++ b/service/settings.go @@ -16,7 +16,6 @@ package service // import "go.opentelemetry.io/collector/service" import ( "go.opentelemetry.io/contrib/zpages" - "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" @@ -61,6 +60,6 @@ type CollectorSettings struct { // If the provider watches for configuration change, collector may reload the new configuration upon changes. ConfigProvider ConfigProvider - // LoggingOptions provides a way to change behavior of zap logging. - LoggingOptions []zap.Option + // TelemetryProvider setups and provides the Collector's own telemetry. + TelemetryProvider TelemetryProvider } diff --git a/service/telemetry_provider.go b/service/telemetry_provider.go new file mode 100644 index 00000000000..49aa11c3a2d --- /dev/null +++ b/service/telemetry_provider.go @@ -0,0 +1,79 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service // import "go.opentelemetry.io/collector/service" + +import ( + "fmt" + + "go.opentelemetry.io/contrib/zpages" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric/nonrecording" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/service/internal" + "go.opentelemetry.io/collector/service/internal/telemetrylogs" +) + +// TelemetryProvider is the provider of telemetry. +type TelemetryProvider interface { + // SetupTelemetry creates component.TelemetrySettings and possibly handles global telemetry setup. + SetupTelemetry(cfg config.ServiceTelemetry) (component.TelemetrySettings, error) + // ZPages returns the zpages.SpanProcessor associated with telemetry. + ZPages() *zpages.SpanProcessor +} + +var _ TelemetryProvider = (*defaultTelemetryFactory)(nil) + +type defaultTelemetryFactory struct { + zPagesSpanProcessor *zpages.SpanProcessor + loggingOptions []zap.Option +} + +// NewDefaultTelemetryProvider creates the default telemetry provider. +func NewDefaultTelemetryProvider(loggingOptions ...zap.Option) TelemetryProvider { + return &defaultTelemetryFactory{ + zPagesSpanProcessor: zpages.NewSpanProcessor(), + loggingOptions: loggingOptions, + } +} + +// SetupTelemetry implements the TelemetryProvider interface. +func (t *defaultTelemetryFactory) SetupTelemetry(cfg config.ServiceTelemetry) (set component.TelemetrySettings, err error) { + set.Logger, err = telemetrylogs.NewLogger(cfg.Logs, t.loggingOptions) + if err != nil { + return component.TelemetrySettings{}, fmt.Errorf("failed to get logger: %w", err) + } + + telemetrylogs.SetColGRPCLogger(set.Logger, cfg.Logs.Level) + + set.TracerProvider = sdktrace.NewTracerProvider( + sdktrace.WithSampler(internal.AlwaysRecord()), + sdktrace.WithSpanProcessor(t.zPagesSpanProcessor)) + + otel.SetTracerProvider(set.TracerProvider) + + set.MeterProvider = nonrecording.NewNoopMeterProvider() + set.MetricsLevel = cfg.Metrics.Level + + return set, nil +} + +// ZPages implements the TelemetryProvider interface. +func (t *defaultTelemetryFactory) ZPages() *zpages.SpanProcessor { + return t.zPagesSpanProcessor +}