Skip to content

Commit

Permalink
Refactor to show how to use with skipGRPCLogger
Browse files Browse the repository at this point in the history
  • Loading branch information
mx-psi committed Apr 29, 2022
1 parent 1eaafda commit ac30277
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 86 deletions.
15 changes: 3 additions & 12 deletions service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -185,14 +183,7 @@ func TestCollectorFailedShutdown(t *testing.T) {
func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter) {
factories, err := testcomponents.NewDefaultFactories()
require.NoError(t, err)
var once sync.Once
loggingHookCalled := false
hook := func(entry zapcore.Entry) error {
once.Do(func() {
loggingHookCalled = true
})
return nil
}
telemetryProvider, observed := NewTestTelemetryProvider()

metricsAddr := testutil.GetAvailableLocalAddress(t)
cfgSet := newDefaultConfigProviderSettings([]string{
Expand All @@ -211,7 +202,7 @@ func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter
BuildInfo: component.NewDefaultBuildInfo(),
Factories: factories,
ConfigProvider: cfgProvider,
TelemetryProvider: NewDefaultTelemetryProvider(zap.Hooks(hook)),
TelemetryProvider: telemetryProvider,
telemetry: telemetry,
})
require.NoError(t, err)
Expand All @@ -222,7 +213,7 @@ func testCollectorStartHelper(t *testing.T, telemetry collectorTelemetryExporter
return Running == col.GetState()
}, 2*time.Second, 200*time.Millisecond)
assert.Equal(t, col.logger, col.GetLogger())
assert.True(t, loggingHookCalled)
assert.True(t, observed.Len() > 1)

// All labels added to all collector metrics by default are listed below.
// These labels are hard coded here in order to avoid inadvertent changes:
Expand Down
71 changes: 1 addition & 70 deletions service/collector_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"syscall"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/windows/svc"
"golang.org/x/sys/windows/svc/eventlog"

Expand Down Expand Up @@ -152,74 +150,7 @@ func newWithWindowsEventLogCore(set CollectorSettings, elog *eventlog.Log) (*Col
}
}
if set.TelemetryProvider == nil {
set.TelemetryProvider = NewDefaultTelemetryProvider(zap.WrapCore(withWindowsCore(elog)))
set.TelemetryProvider = newWindowsServiceTelemetryProvider(elog)
}
return New(set)
}

var _ zapcore.Core = (*windowsEventLogCore)(nil)

type windowsEventLogCore struct {
core zapcore.Core
elog *eventlog.Log
encoder zapcore.Encoder
}

func (w windowsEventLogCore) Enabled(level zapcore.Level) bool {
return w.core.Enabled(level)
}

func (w windowsEventLogCore) With(fields []zapcore.Field) zapcore.Core {
enc := w.encoder.Clone()
for _, field := range fields {
field.AddTo(enc)
}
return windowsEventLogCore{
core: w.core,
elog: w.elog,
encoder: enc,
}
}

func (w windowsEventLogCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if w.Enabled(ent.Level) {
return ce.AddCore(ent, w)
}
return ce
}

func (w windowsEventLogCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
buf, err := w.encoder.EncodeEntry(ent, fields)
if err != nil {
w.elog.Warning(2, fmt.Sprintf("failed encoding log entry %v\r\n", err))
return err
}
msg := buf.String()
buf.Free()

switch ent.Level {
case zapcore.FatalLevel, zapcore.PanicLevel, zapcore.DPanicLevel:
// golang.org/x/sys/windows/svc/eventlog does not support Critical level event logs
return w.elog.Error(3, msg)
case zapcore.ErrorLevel:
return w.elog.Error(3, msg)
case zapcore.WarnLevel:
return w.elog.Warning(2, msg)
case zapcore.InfoLevel:
return w.elog.Info(1, msg)
}
// We would not be here if debug were disabled so log as info to not drop.
return w.elog.Info(1, msg)
}

func (w windowsEventLogCore) Sync() error {
return w.core.Sync()
}

func withWindowsCore(elog *eventlog.Log) func(zapcore.Core) zapcore.Core {
return func(core zapcore.Core) zapcore.Core {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.LineEnding = "\r\n"
return windowsEventLogCore{core, elog, zapcore.NewConsoleEncoder(encoderConfig)}
}
}
26 changes: 22 additions & 4 deletions service/telemetry_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand All @@ -40,14 +42,14 @@ var _ TelemetryProvider = (*defaultTelemetryFactory)(nil)

type defaultTelemetryFactory struct {
zPagesSpanProcessor *zpages.SpanProcessor
skipGRPCLogger bool
loggingOptions []zap.Option
}

// NewDefaultTelemetryProvider creates the default telemetry provider.
func NewDefaultTelemetryProvider(loggingOptions ...zap.Option) TelemetryProvider {
// NewDefaultTelemetryProvider creates the default telemetry provider
func NewDefaultTelemetryProvider() TelemetryProvider {
return &defaultTelemetryFactory{
zPagesSpanProcessor: zpages.NewSpanProcessor(),
loggingOptions: loggingOptions,
}
}

Expand All @@ -58,7 +60,9 @@ func (t *defaultTelemetryFactory) SetupTelemetry(cfg config.ServiceTelemetry) (s
return component.TelemetrySettings{}, fmt.Errorf("failed to get logger: %w", err)
}

telemetrylogs.SetColGRPCLogger(set.Logger, cfg.Logs.Level)
if !t.skipGRPCLogger {
telemetrylogs.SetColGRPCLogger(set.Logger, cfg.Logs.Level)
}

set.TracerProvider = sdktrace.NewTracerProvider(
sdktrace.WithSampler(internal.AlwaysRecord()),
Expand All @@ -74,3 +78,17 @@ func (t *defaultTelemetryFactory) SetupTelemetry(cfg config.ServiceTelemetry) (s
func (t *defaultTelemetryFactory) ZPages() *zpages.SpanProcessor {
return t.zPagesSpanProcessor
}

// NewTestTelemetryProvider creates a telemetry provider for use in tests.
// It provides an observer.ObservedLogs to test logging behavior.
func NewTestTelemetryProvider() (TelemetryProvider, *observer.ObservedLogs) {
testCore, observed := observer.New(zapcore.DebugLevel)
return &defaultTelemetryFactory{
skipGRPCLogger: true,
loggingOptions: []zap.Option{zap.WrapCore(func(core zapcore.Core) zapcore.Core {
// tee output both to usual core and to a test core that allows for observing logs.
return zapcore.NewTee(core, testCore)
})},
zPagesSpanProcessor: zpages.NewSpanProcessor(),
}, observed
}
101 changes: 101 additions & 0 deletions service/telemetry_provider_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build windows
// +build windows

package service // import "go.opentelemetry.io/collector/service"

import (
"fmt"

"go.opentelemetry.io/contrib/zpages"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/windows/svc/eventlog"
)

var _ zapcore.Core = (*windowsEventLogCore)(nil)

type windowsEventLogCore struct {
core zapcore.Core
elog *eventlog.Log
encoder zapcore.Encoder
}

func (w windowsEventLogCore) Enabled(level zapcore.Level) bool {
return w.core.Enabled(level)
}

func (w windowsEventLogCore) With(fields []zapcore.Field) zapcore.Core {
enc := w.encoder.Clone()
for _, field := range fields {
field.AddTo(enc)
}
return windowsEventLogCore{
core: w.core,
elog: w.elog,
encoder: enc,
}
}

func (w windowsEventLogCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if w.Enabled(ent.Level) {
return ce.AddCore(ent, w)
}
return ce
}

func (w windowsEventLogCore) Write(ent zapcore.Entry, fields []zapcore.Field) error {
buf, err := w.encoder.EncodeEntry(ent, fields)
if err != nil {
w.elog.Warning(2, fmt.Sprintf("failed encoding log entry %v\r\n", err))
return err
}
msg := buf.String()
buf.Free()

switch ent.Level {
case zapcore.FatalLevel, zapcore.PanicLevel, zapcore.DPanicLevel:
// golang.org/x/sys/windows/svc/eventlog does not support Critical level event logs
return w.elog.Error(3, msg)
case zapcore.ErrorLevel:
return w.elog.Error(3, msg)
case zapcore.WarnLevel:
return w.elog.Warning(2, msg)
case zapcore.InfoLevel:
return w.elog.Info(1, msg)
}
// We would not be here if debug were disabled so log as info to not drop.
return w.elog.Info(1, msg)
}

func (w windowsEventLogCore) Sync() error {
return w.core.Sync()
}

func withWindowsCore(elog *eventlog.Log) func(zapcore.Core) zapcore.Core {
return func(core zapcore.Core) zapcore.Core {
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.LineEnding = "\r\n"
return windowsEventLogCore{core, elog, zapcore.NewConsoleEncoder(encoderConfig)}
}
}

func newWindowsServiceTelemetryProvider(elog *eventlog.Log) TelemetryProvider {
return &defaultTelemetryFactory{
zPagesSpanProcessor: zpages.NewSpanProcessor(),
loggingOptions: []zap.Option{zap.WrapCore(withWindowsCore(elog))},
}
}

0 comments on commit ac30277

Please sign in to comment.