Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): ensure format consistency for all emitted logs #5992

Merged
merged 10 commits into from
Feb 21, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ Main (unreleased)
- Fix an issue with static integrations-next marshaling where non singletons
would cause `/-/config` to fail to marshal. (@erikbaranowski)

- Fix an issue where agent logs are emitted before the logging format
- is correctly determined. (@hainenber)

### Other changes

- Removed support for Windows 2012 in line with Microsoft end of life. (@mattdurham)
Expand Down
9 changes: 7 additions & 2 deletions pkg/flow/logging/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@ func newTestRecord(msg string) slog.Record {
func getTestHandler(t *testing.T, w io.Writer) slog.Handler {
t.Helper()

l, err := New(w, Options{
o := Options{
Level: LevelDebug,
Format: FormatLogfmt,
})
}

l, err := New(w, o)
require.NoError(t, err)

err = l.Update(o)
require.NoError(t, err)

return l.handler
Expand Down
31 changes: 27 additions & 4 deletions pkg/flow/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type EnabledAware interface {
type Logger struct {
inner io.Writer // Writer passed to New.

bufferMut sync.Mutex
buffer [][]interface{} // Store logs before correctly determine the log format
hasLogFormat bool // Confirmation whether log format has been determined

level *slog.LevelVar // Current configured level.
format *formatVar // Current configured format.
writer *writerVar // Current configured multiwriter (inner + write_to).
Expand All @@ -47,6 +51,9 @@ func New(w io.Writer, o Options) (*Logger, error) {
l := &Logger{
inner: w,

buffer: [][]interface{}{},
hasLogFormat: false,

level: &leveler,
format: &format,
writer: &writer,
Expand All @@ -57,9 +64,6 @@ func New(w io.Writer, o Options) (*Logger, error) {
},
}

if err := l.Update(o); err != nil {
return nil, err
}
wildum marked this conversation as resolved.
Show resolved Hide resolved
return l, nil
}

Expand All @@ -69,9 +73,12 @@ func (l *Logger) Handler() slog.Handler { return l.handler }

// Update re-configures the options used for the logger.
func (l *Logger) Update(o Options) error {
l.bufferMut.Lock()
defer l.bufferMut.Unlock()

switch o.Format {
case FormatLogfmt, FormatJSON:
// no-op
l.hasLogFormat = true
default:
return fmt.Errorf("unrecognized log format %q", o.Format)
}
Expand All @@ -85,11 +92,27 @@ func (l *Logger) Update(o Options) error {
}
l.writer.Set(newWriter)

// Print out the buffered logs since we determined the log format already
for _, bufferedLogChunk := range l.buffer {
if err := slogadapter.GoKit(l.handler).Log(bufferedLogChunk...); err != nil {
return err
}
}
wildum marked this conversation as resolved.
Show resolved Hide resolved
l.buffer = nil

return nil
}

// Log implements log.Logger.
func (l *Logger) Log(kvps ...interface{}) error {
// Buffer logs before confirming log format is configured in `logging` block
l.bufferMut.Lock()
defer l.bufferMut.Unlock()
wildum marked this conversation as resolved.
Show resolved Hide resolved
if !l.hasLogFormat {
l.buffer = append(l.buffer, kvps)
return nil
}

// NOTE(rfratto): this method is a temporary shim while log/slog is still
// being adopted throughout the codebase.
return slogadapter.GoKit(l.handler).Log(kvps...)
Expand Down
118 changes: 104 additions & 14 deletions pkg/flow/logging/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,48 @@ func TestLevels(t *testing.T) {

var testCases = []testCase{
{
name: "no level - prints",
logger: func(w io.Writer) (log.Logger, error) { return logging.New(w, debugLevel()) },
name: "no level - prints",
logger: func(w io.Writer) (log.Logger, error) {
o := debugLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return logger, err
},
message: "hello",
expected: "level=info msg=hello\n",
},
{
name: "no level - drops",
logger: func(w io.Writer) (log.Logger, error) { return logging.New(w, warnLevel()) },
name: "no level - drops",
logger: func(w io.Writer) (log.Logger, error) {
o := warnLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return logger, err
},
message: "hello",
expected: "",
},
{
name: "flow info level - drops",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, warnLevel())
o := warnLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return flowlevel.Info(logger), err
},
message: "hello",
Expand All @@ -67,7 +94,14 @@ func TestLevels(t *testing.T) {
{
name: "flow debug level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, debugLevel())
o := debugLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return flowlevel.Debug(logger), err
},
message: "hello",
Expand All @@ -76,7 +110,14 @@ func TestLevels(t *testing.T) {
{
name: "flow info level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, infoLevel())
o := infoLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return flowlevel.Info(logger), err
},
message: "hello",
Expand All @@ -85,7 +126,14 @@ func TestLevels(t *testing.T) {
{
name: "flow warn level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, debugLevel())
o := debugLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return flowlevel.Warn(logger), err
},
message: "hello",
Expand All @@ -94,7 +142,14 @@ func TestLevels(t *testing.T) {
{
name: "flow error level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, debugLevel())
o := debugLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return flowlevel.Error(logger), err
},
message: "hello",
Expand All @@ -103,7 +158,14 @@ func TestLevels(t *testing.T) {
{
name: "gokit info level - drops",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, warnLevel())
o := warnLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return gokitlevel.Info(logger), err
},
message: "hello",
Expand All @@ -112,7 +174,14 @@ func TestLevels(t *testing.T) {
{
name: "gokit debug level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, debugLevel())
o := debugLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return gokitlevel.Debug(logger), err
},
message: "hello",
Expand All @@ -121,7 +190,14 @@ func TestLevels(t *testing.T) {
{
name: "gokit info level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, infoLevel())
o := infoLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return gokitlevel.Info(logger), err
},
message: "hello",
Expand All @@ -130,7 +206,14 @@ func TestLevels(t *testing.T) {
{
name: "gokit warn level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, debugLevel())
o := debugLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
return gokitlevel.Warn(logger), err
},
message: "hello",
Expand All @@ -139,7 +222,14 @@ func TestLevels(t *testing.T) {
{
name: "gokit error level - prints",
logger: func(w io.Writer) (log.Logger, error) {
logger, err := logging.New(w, debugLevel())
o := debugLevel()
logger, err := logging.New(w, o)
if err != nil {
return nil, err
}
if err := logger.Update(o); err != nil {
return nil, err
}
wildum marked this conversation as resolved.
Show resolved Hide resolved
return gokitlevel.Error(logger), err
},
message: "hello",
Expand Down
Loading