Skip to content

Commit

Permalink
Telemetry provider PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
mx-psi committed Mar 29, 2022
1 parent 8c54787 commit 1101695
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 53 deletions.
51 changes: 18 additions & 33 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,13 @@ import (
"sync/atomic"
"syscall"

"go.opentelemetry.io/contrib/zpages"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
)

// State defines Collector's state.
Expand Down Expand Up @@ -81,9 +75,8 @@ type Collector struct {
set CollectorSettings
logger *zap.Logger

tracerProvider trace.TracerProvider
meterProvider metric.MeterProvider
zPagesSpanProcessor *zpages.SpanProcessor
tracerProvider trace.TracerProvider
meterProvider metric.MeterProvider

service *service
state int32
Expand All @@ -104,6 +97,10 @@ func New(set CollectorSettings) (*Collector, error) {
return nil, errors.New("invalid nil config provider")
}

if set.TelemetryProvider == nil {
return nil, errors.New("invalid nil telemetry provider")
}

return &Collector{
logger: zap.NewNop(), // Set a Nop logger as a place holder until a logger is created based on configuration

Expand Down Expand Up @@ -193,23 +190,22 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}
if col.logger, err = telemetrylogs.NewLogger(cfg.Service.Telemetry.Logs, col.set.LoggingOptions); err != nil {
return fmt.Errorf("failed to get logger: %w", err)

telemetrySettings, err := col.set.TelemetryProvider.SetupTelemetry(cfg.Service.Telemetry)
if err != nil {
return fmt.Errorf("failed to setup telemetry: %w", err)
}

telemetrylogs.SetColGRPCLogger(col.logger, cfg.Service.Telemetry.Logs.Level)
col.logger = telemetrySettings.Logger
col.meterProvider = telemetrySettings.MeterProvider
col.tracerProvider = telemetrySettings.TracerProvider

col.service, err = newService(&svcSettings{
BuildInfo: col.set.BuildInfo,
Factories: col.set.Factories,
Config: cfg,
Telemetry: component.TelemetrySettings{
Logger: col.logger,
TracerProvider: col.tracerProvider,
MeterProvider: col.meterProvider,
MetricsLevel: cfg.Telemetry.Metrics.Level,
},
ZPagesSpanProcessor: col.zPagesSpanProcessor,
BuildInfo: col.set.BuildInfo,
Factories: col.set.Factories,
Config: cfg,
Telemetry: telemetrySettings,
ZPagesSpanProcessor: col.set.TelemetryProvider.ZPages(),
AsyncErrorChannel: col.asyncErrorChannel,
})
if err != nil {
Expand All @@ -226,17 +222,6 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
// Run starts the collector according to the given configuration given, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
func (col *Collector) Run(ctx context.Context) error {
col.zPagesSpanProcessor = zpages.NewSpanProcessor()
col.tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(internal.AlwaysRecord()),
sdktrace.WithSpanProcessor(col.zPagesSpanProcessor))

// Set the constructed tracer provider as Global, in case any component uses the
// global TracerProvider.
otel.SetTracerProvider(col.tracerProvider)

col.meterProvider = nonrecording.NewNoopMeterProvider()

col.asyncErrorChannel = make(chan error)

if err := col.setupConfigurationComponents(ctx); err != nil {
Expand Down
32 changes: 19 additions & 13 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestCollector_StartAsGoRoutine(t *testing.T) {
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")},
[]string{"service.telemetry.metrics.address=localhost:" + strconv.FormatUint(uint64(testutil.GetAvailablePort(t)), 10)}),
TelemetryProvider: NewDefaultTelemetryProvider(),
}
col, err := New(set)
require.NoError(t, err)
Expand Down Expand Up @@ -104,7 +105,7 @@ func testCollectorStartHelper(t *testing.T) {
ConfigProvider: MustNewDefaultConfigProvider(
[]string{filepath.Join("testdata", "otelcol-config.yaml")},
[]string{"service.telemetry.metrics.address=localhost:" + strconv.FormatUint(uint64(metricsPort), 10)}),
LoggingOptions: []zap.Option{zap.Hooks(hook)},
TelemetryProvider: NewDefaultTelemetryProvider(zap.Hooks(hook)),
})
require.NoError(t, err)

Expand Down Expand Up @@ -170,9 +171,10 @@ func TestCollector_ShutdownNoop(t *testing.T) {
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil),
TelemetryProvider: NewDefaultTelemetryProvider(),
}
col, err := New(set)
require.NoError(t, err)
Expand All @@ -191,9 +193,10 @@ func TestCollector_ShutdownBeforeRun(t *testing.T) {
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil),
TelemetryProvider: NewDefaultTelemetryProvider(),
}
col, err := New(set)
require.NoError(t, err)
Expand Down Expand Up @@ -223,9 +226,10 @@ func TestCollector_ClosedStateOnStartUpError(t *testing.T) {

// Load a bad config causing startup to fail
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}, nil),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-invalid.yaml")}, nil),
TelemetryProvider: NewDefaultTelemetryProvider(),
}
col, err := New(set)
require.NoError(t, err)
Expand Down Expand Up @@ -257,9 +261,10 @@ func TestCollector_ReportError(t *testing.T) {
require.NoError(t, err)

col, err := New(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")}, nil),
TelemetryProvider: NewDefaultTelemetryProvider(),
})
require.NoError(t, err)

Expand Down Expand Up @@ -293,6 +298,7 @@ func TestCollector_ContextCancel(t *testing.T) {
Factories: factories,
ConfigProvider: MustNewDefaultConfigProvider([]string{filepath.Join("testdata", "otelcol-config.yaml")},
[]string{"service.telemetry.metrics.address=localhost:" + strconv.FormatUint(uint64(testutil.GetAvailablePort(t)), 10)}),
TelemetryProvider: NewDefaultTelemetryProvider(),
}
col, err := New(set)
require.NoError(t, err)
Expand Down
7 changes: 3 additions & 4 deletions service/collector_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,9 @@ func newWithWindowsEventLogCore(set CollectorSettings, elog *eventlog.Log) (*Col
if set.ConfigProvider == nil {
set.ConfigProvider = MustNewDefaultConfigProvider(getConfigFlag(), getSetFlag())
}
set.LoggingOptions = append(
set.LoggingOptions,
zap.WrapCore(withWindowsCore(elog)),
)
if set.TelemetryProvider == nil {
set.TelemetryProvider = NewDefaultTelemetryProvider(zap.WrapCore(withWindowsCore(elog)))
}
return New(set)
}

Expand Down
5 changes: 2 additions & 3 deletions service/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"go.opentelemetry.io/contrib/zpages"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -61,6 +60,6 @@ type CollectorSettings struct {
// If the provider watches for configuration change, collector may reload the new configuration upon changes.
ConfigProvider ConfigProvider

// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option
// TelemetryProvider setups and provides the Collector's own telemetry.
TelemetryProvider TelemetryProvider
}
79 changes: 79 additions & 0 deletions service/telemetry_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service // import "go.opentelemetry.io/collector/service"

import (
"fmt"

"go.opentelemetry.io/contrib/zpages"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
)

// TelemetryProvider is the provider of telemetry.
type TelemetryProvider interface {
// SetupTelemetry creates component.TelemetrySettings and possibly handles global telemetry setup.
SetupTelemetry(cfg config.ServiceTelemetry) (component.TelemetrySettings, error)
// ZPages returns the zpages.SpanProcessor associated with telemetry.
ZPages() *zpages.SpanProcessor
}

var _ TelemetryProvider = (*defaultTelemetryFactory)(nil)

type defaultTelemetryFactory struct {
zPagesSpanProcessor *zpages.SpanProcessor
loggingOptions []zap.Option
}

// NewDefaultTelemetryProvider creates the default telemetry provider.
func NewDefaultTelemetryProvider(loggingOptions ...zap.Option) TelemetryProvider {
return &defaultTelemetryFactory{
zPagesSpanProcessor: zpages.NewSpanProcessor(),
loggingOptions: loggingOptions,
}
}

// SetupTelemetry implements the TelemetryProvider interface.
func (t *defaultTelemetryFactory) SetupTelemetry(cfg config.ServiceTelemetry) (set component.TelemetrySettings, err error) {
set.Logger, err = telemetrylogs.NewLogger(cfg.Logs, t.loggingOptions)
if err != nil {
return component.TelemetrySettings{}, fmt.Errorf("failed to get logger: %w", err)
}

telemetrylogs.SetColGRPCLogger(set.Logger, cfg.Logs.Level)

set.TracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(internal.AlwaysRecord()),
sdktrace.WithSpanProcessor(t.zPagesSpanProcessor))

otel.SetTracerProvider(set.TracerProvider)

set.MeterProvider = nonrecording.NewNoopMeterProvider()
set.MetricsLevel = cfg.Metrics.Level

return set, nil
}

// ZPages implements the TelemetryProvider interface.
func (t *defaultTelemetryFactory) ZPages() *zpages.SpanProcessor {
return t.zPagesSpanProcessor
}

0 comments on commit 1101695

Please sign in to comment.