Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry provider PoC #5107

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@ import (
"runtime"
"syscall"

"go.opentelemetry.io/contrib/zpages"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
)

// State defines Collector's state.
Expand Down Expand Up @@ -80,9 +76,8 @@ type Collector struct {
set CollectorSettings
logger *zap.Logger

tracerProvider trace.TracerProvider
meterProvider metric.MeterProvider
zPagesSpanProcessor *zpages.SpanProcessor
tracerProvider trace.TracerProvider
meterProvider metric.MeterProvider

service *service
state *atomic.Int32
Expand All @@ -103,6 +98,10 @@ func New(set CollectorSettings) (*Collector, error) {
return nil, errors.New("invalid nil config provider")
}

if set.TelemetryProvider == nil {
return nil, errors.New("invalid nil telemetry provider")
}

if set.telemetry == nil {
set.telemetry = collectorTelemetry
}
Expand Down Expand Up @@ -202,25 +201,21 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

if col.logger, err = telemetrylogs.NewLogger(cfg.Service.Telemetry.Logs, col.set.LoggingOptions); err != nil {
return fmt.Errorf("failed to get logger: %w", err)
telemetrySettings, err := col.set.TelemetryProvider.SetupTelemetry(cfg.Service.Telemetry)
if err != nil {
return fmt.Errorf("failed to setup telemetry: %w", err)
}

if !col.set.SkipSettingGRPCLogger {
telemetrylogs.SetColGRPCLogger(col.logger, cfg.Service.Telemetry.Logs.Level)
}
col.logger = telemetrySettings.Logger
col.meterProvider = telemetrySettings.MeterProvider
col.tracerProvider = telemetrySettings.TracerProvider

col.service, err = newService(&svcSettings{
BuildInfo: col.set.BuildInfo,
Factories: col.set.Factories,
Config: cfg,
Telemetry: component.TelemetrySettings{
Logger: col.logger,
TracerProvider: col.tracerProvider,
MeterProvider: col.meterProvider,
MetricsLevel: cfg.Telemetry.Metrics.Level,
},
ZPagesSpanProcessor: col.zPagesSpanProcessor,
BuildInfo: col.set.BuildInfo,
Factories: col.set.Factories,
Config: cfg,
Telemetry: telemetrySettings,
ZPagesSpanProcessor: col.set.TelemetryProvider.ZPages(),
AsyncErrorChannel: col.asyncErrorChannel,
})
if err != nil {
Expand All @@ -244,10 +239,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
// Run starts the collector according to the given configuration given, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
func (col *Collector) Run(ctx context.Context) error {
col.zPagesSpanProcessor = zpages.NewSpanProcessor()
col.tracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(internal.AlwaysRecord()),
sdktrace.WithSpanProcessor(col.zPagesSpanProcessor))
col.asyncErrorChannel = make(chan error)

if err := col.setupConfigurationComponents(ctx); err != nil {
col.setCollectorState(Closed)
Expand Down
77 changes: 37 additions & 40 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -61,10 +59,11 @@ func TestCollectorStartAsGoRoutine(t *testing.T) {
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: newColTelemetry(featuregate.NewRegistry()),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: NewDefaultTelemetryProvider(),
telemetry: newColTelemetry(featuregate.NewRegistry()),
}
col, err := New(set)
require.NoError(t, err)
Expand Down Expand Up @@ -93,10 +92,11 @@ func TestCollectorCancelContext(t *testing.T) {
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: newColTelemetry(featuregate.NewRegistry()),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: NewDefaultTelemetryProvider(),
telemetry: newColTelemetry(featuregate.NewRegistry()),
}
col, err := New(set)
require.NoError(t, err)
Expand Down Expand Up @@ -124,10 +124,11 @@ func TestCollectorReportError(t *testing.T) {
require.NoError(t, err)

col, err := New(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: newColTelemetry(featuregate.NewRegistry()),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: NewDefaultTelemetryProvider(),
telemetry: newColTelemetry(featuregate.NewRegistry()),
})
require.NoError(t, err)

Expand All @@ -154,10 +155,11 @@ func TestCollectorFailedShutdown(t *testing.T) {
require.NoError(t, err)

col, err := New(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: &mockColTelemetry{},
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: NewDefaultTelemetryProvider(),
telemetry: &mockColTelemetry{},
})
require.NoError(t, err)

Expand All @@ -181,14 +183,7 @@ func TestCollectorFailedShutdown(t *testing.T) {
func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter) {
factories, err := testcomponents.NewDefaultFactories()
require.NoError(t, err)
var once sync.Once
loggingHookCalled := false
hook := func(entry zapcore.Entry) error {
once.Do(func() {
loggingHookCalled = true
})
return nil
}
telemetryProvider, observed := NewTestTelemetryProvider()

metricsAddr := testutil.GetAvailableLocalAddress(t)
cfgSet := newDefaultConfigProviderSettings([]string{
Expand All @@ -204,11 +199,11 @@ func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter
require.NoError(t, err)

col, err := New(CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
LoggingOptions: []zap.Option{zap.Hooks(hook)},
telemetry: telemetry,
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: telemetryProvider,
telemetry: telemetry,
})
require.NoError(t, err)

Expand All @@ -218,7 +213,7 @@ func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter
return Running == col.GetState()
}, 2*time.Second, 200*time.Millisecond)
assert.Equal(t, col.logger, col.GetLogger())
assert.True(t, loggingHookCalled)
assert.True(t, observed.Len() > 1)

// All labels added to all collector metrics by default are listed below.
// These labels are hard coded here in order to avoid inadvertent changes:
Expand Down Expand Up @@ -261,10 +256,11 @@ func TestCollectorShutdownBeforeRun(t *testing.T) {
require.NoError(t, err)

set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: newColTelemetry(featuregate.NewRegistry()),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: NewDefaultTelemetryProvider(),
telemetry: newColTelemetry(featuregate.NewRegistry()),
}
col, err := New(set)
require.NoError(t, err)
Expand All @@ -291,10 +287,11 @@ func TestCollectorClosedStateOnStartUpError(t *testing.T) {

// Load a bad config causing startup to fail
set := CollectorSettings{
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
telemetry: newColTelemetry(featuregate.NewRegistry()),
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: NewDefaultTelemetryProvider(),
telemetry: newColTelemetry(featuregate.NewRegistry()),
}
col, err := New(set)
require.NoError(t, err)
Expand Down
76 changes: 3 additions & 73 deletions service/collector_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"syscall"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/eventlog"

Expand Down Expand Up @@ -151,76 +149,8 @@ func newWithWindowsEventLogCore(set CollectorSettings, elog *eventlog.Log) (*Col
return nil, err
}
}
set.LoggingOptions = append(
set.LoggingOptions,
zap.WrapCore(withWindowsCore(elog)),
)
return New(set)
}

var _ zapcore.Core = (*windowsEventLogCore)(nil)

type windowsEventLogCore struct {
core zapcore.Core
elog *eventlog.Log
encoder zapcore.Encoder
}

func (w windowsEventLogCore) Enabled(level zapcore.Level) bool {
return w.core.Enabled(level)
}

func (w windowsEventLogCore) With(fields []zapcore.Field) zapcore.Core {
enc := w.encoder.Clone()
for _, field := range fields {
field.AddTo(enc)
}
return windowsEventLogCore{
core: w.core,
elog: w.elog,
encoder: enc,
}
}

func (w windowsEventLogCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if w.Enabled(ent.Level) {
return ce.AddCore(ent, w)
}
return ce
}

func (w windowsEventLogCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
buf, err := w.encoder.EncodeEntry(ent, fields)
if err != nil {
w.elog.Warning(2, fmt.Sprintf("failed encoding log entry %v\r\n", err))
return err
}
msg := buf.String()
buf.Free()

switch ent.Level {
case zapcore.FatalLevel, zapcore.PanicLevel, zapcore.DPanicLevel:
// golang.org/x/sys/windows/svc/eventlog does not support Critical level event logs
return w.elog.Error(3, msg)
case zapcore.ErrorLevel:
return w.elog.Error(3, msg)
case zapcore.WarnLevel:
return w.elog.Warning(2, msg)
case zapcore.InfoLevel:
return w.elog.Info(1, msg)
}
// We would not be here if debug were disabled so log as info to not drop.
return w.elog.Info(1, msg)
}

func (w windowsEventLogCore) Sync() error {
return w.core.Sync()
}

func withWindowsCore(elog *eventlog.Log) func(zapcore.Core) zapcore.Core {
return func(core zapcore.Core) zapcore.Core {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.LineEnding = "\r\n"
return windowsEventLogCore{core, elog, zapcore.NewConsoleEncoder(encoderConfig)}
if set.TelemetryProvider == nil {
set.TelemetryProvider = newWindowsServiceTelemetryProvider(elog)
}
return New(set)
}
8 changes: 2 additions & 6 deletions service/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"go.opentelemetry.io/contrib/zpages"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -61,11 +60,8 @@ type CollectorSettings struct {
// If the provider watches for configuration change, collector may reload the new configuration upon changes.
ConfigProvider ConfigProvider

// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option

// SkipSettingGRPCLogger avoids setting the grpc logger
SkipSettingGRPCLogger bool
// TelemetryProvider setups and provides the Collector's own telemetry.
TelemetryProvider TelemetryProvider

// For testing purpose only.
telemetry collectorTelemetryExporter
Expand Down
Loading