From 91ea77d3311a6f64c8f690fd89916ad4bb783826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C4=90=E1=BB=97=20Tr=E1=BB=8Dng=20H=E1=BA=A3i?= <41283691+hainenber@users.noreply.github.com> Date: Wed, 21 Feb 2024 15:08:30 +0700 Subject: [PATCH] fix(flow): ensure format consistency for all emitted logs (#5992) * fix(flow): ensure format consistency for all emitted logs Signed-off-by: hainenber * fix(flow/log): address reviews from core Agent contributors Signed-off-by: hainenber * fix(flow/log): update initialized logger in tests Signed-off-by: hainenber * fix(flow/log): revert to vanilla mutex Signed-off-by: hainenber * chore(flow/log): reduce duplicate code in test cases Signed-off-by: hainenber * fixflow/log): implement rfratto's approach for a performant logger Signed-off-by: hainenber * fix(flow/log): revert changes to `New()` + usage of `NewDeferred` for main logger Signed-off-by: hainenber * chore(flow/log): refactor func to remove unused param Signed-off-by: hainenber * fix(flow/log): add missing unlock Signed-off-by: hainenber --------- Signed-off-by: hainenber --- CHANGELOG.md | 3 ++ cmd/internal/flowmode/cmd_run.go | 3 +- pkg/flow/logging/logger.go | 65 +++++++++++++++++++++++++++++++- 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ce9161b127d..945e13e139a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,9 @@ Main (unreleased) - Fix an issue with static integrations-next marshaling where non singletons would cause `/-/config` to fail to marshal. (@erikbaranowski) +- Fix an issue where agent logs are emitted before the logging format + is correctly determined. (@hainenber) + - Fix divide-by-zero issue when sharding targets. (@hainenber) - Fix bug where custom headers were not actually being set in loki client. (@captncraig) diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index fb3c5a235f52..263ed5ecdc88 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -162,7 +162,8 @@ func (fr *flowRun) Run(configPath string) error { return fmt.Errorf("path argument not provided") } - l, err := logging.New(os.Stderr, logging.DefaultOptions) + // Buffer logs until log format has been determined + l, err := logging.NewDeferred(os.Stderr) if err != nil { return fmt.Errorf("building logger: %w", err) } diff --git a/pkg/flow/logging/logger.go b/pkg/flow/logging/logger.go index 0b80e88336ef..da0046a281af 100644 --- a/pkg/flow/logging/logger.go +++ b/pkg/flow/logging/logger.go @@ -23,6 +23,10 @@ type EnabledAware interface { type Logger struct { inner io.Writer // Writer passed to New. + bufferMut sync.RWMutex + buffer [][]interface{} // Store logs before correctly determine the log format + hasLogFormat bool // Confirmation whether log format has been determined + level *slog.LevelVar // Current configured level. format *formatVar // Current configured format. writer *writerVar // Current configured multiwriter (inner + write_to). @@ -47,6 +51,9 @@ func New(w io.Writer, o Options) (*Logger, error) { l := &Logger{ inner: w, + buffer: [][]interface{}{}, + hasLogFormat: false, + level: &leveler, format: &format, writer: &writer, @@ -60,6 +67,35 @@ func New(w io.Writer, o Options) (*Logger, error) { if err := l.Update(o); err != nil { return nil, err } + + return l, nil +} + +// NewDeferred creates a new logger with the default log level and format. +// The logger is not updated during initialization. +func NewDeferred(w io.Writer) (*Logger, error) { + var ( + leveler slog.LevelVar + format formatVar + writer writerVar + ) + + l := &Logger{ + inner: w, + + buffer: [][]interface{}{}, + hasLogFormat: false, + + level: &leveler, + format: &format, + writer: &writer, + handler: &handler{ + w: &writer, + leveler: &leveler, + formatter: &format, + }, + } + return l, nil } @@ -69,9 +105,12 @@ func (l *Logger) Handler() slog.Handler { return l.handler } // Update re-configures the options used for the logger. func (l *Logger) Update(o Options) error { + l.bufferMut.Lock() + defer l.bufferMut.Unlock() + switch o.Format { case FormatLogfmt, FormatJSON: - // no-op + l.hasLogFormat = true default: return fmt.Errorf("unrecognized log format %q", o.Format) } @@ -85,11 +124,35 @@ func (l *Logger) Update(o Options) error { } l.writer.Set(newWriter) + // Print out the buffered logs since we determined the log format already + for _, bufferedLogChunk := range l.buffer { + if err := slogadapter.GoKit(l.handler).Log(bufferedLogChunk...); err != nil { + return err + } + } + l.buffer = nil + return nil } // Log implements log.Logger. func (l *Logger) Log(kvps ...interface{}) error { + // Buffer logs before confirming log format is configured in `logging` block + l.bufferMut.RLock() + if !l.hasLogFormat { + l.bufferMut.RUnlock() + l.bufferMut.Lock() + // Check hasLogFormat again; could have changed since the unlock. + if !l.hasLogFormat { + l.buffer = append(l.buffer, kvps) + l.bufferMut.Unlock() + return nil + } + l.bufferMut.Unlock() + } else { + l.bufferMut.RUnlock() + } + // NOTE(rfratto): this method is a temporary shim while log/slog is still // being adopted throughout the codebase. return slogadapter.GoKit(l.handler).Log(kvps...)