diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ce9161b127d..945e13e139a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,9 @@ Main (unreleased) - Fix an issue with static integrations-next marshaling where non singletons would cause `/-/config` to fail to marshal. (@erikbaranowski) +- Fix an issue where agent logs are emitted before the logging format + is correctly determined. (@hainenber) + - Fix divide-by-zero issue when sharding targets. (@hainenber) - Fix bug where custom headers were not actually being set in loki client. (@captncraig) diff --git a/cmd/internal/flowmode/cmd_run.go b/cmd/internal/flowmode/cmd_run.go index fb3c5a235f52..263ed5ecdc88 100644 --- a/cmd/internal/flowmode/cmd_run.go +++ b/cmd/internal/flowmode/cmd_run.go @@ -162,7 +162,8 @@ func (fr *flowRun) Run(configPath string) error { return fmt.Errorf("path argument not provided") } - l, err := logging.New(os.Stderr, logging.DefaultOptions) + // Buffer logs until log format has been determined + l, err := logging.NewDeferred(os.Stderr) if err != nil { return fmt.Errorf("building logger: %w", err) } diff --git a/pkg/flow/logging/logger.go b/pkg/flow/logging/logger.go index 0b80e88336ef..da0046a281af 100644 --- a/pkg/flow/logging/logger.go +++ b/pkg/flow/logging/logger.go @@ -23,6 +23,10 @@ type EnabledAware interface { type Logger struct { inner io.Writer // Writer passed to New. + bufferMut sync.RWMutex + buffer [][]interface{} // Store logs before correctly determine the log format + hasLogFormat bool // Confirmation whether log format has been determined + level *slog.LevelVar // Current configured level. format *formatVar // Current configured format. writer *writerVar // Current configured multiwriter (inner + write_to). @@ -47,6 +51,9 @@ func New(w io.Writer, o Options) (*Logger, error) { l := &Logger{ inner: w, + buffer: [][]interface{}{}, + hasLogFormat: false, + level: &leveler, format: &format, writer: &writer, @@ -60,6 +67,35 @@ func New(w io.Writer, o Options) (*Logger, error) { if err := l.Update(o); err != nil { return nil, err } + + return l, nil +} + +// NewDeferred creates a new logger with the default log level and format. +// The logger is not updated during initialization. +func NewDeferred(w io.Writer) (*Logger, error) { + var ( + leveler slog.LevelVar + format formatVar + writer writerVar + ) + + l := &Logger{ + inner: w, + + buffer: [][]interface{}{}, + hasLogFormat: false, + + level: &leveler, + format: &format, + writer: &writer, + handler: &handler{ + w: &writer, + leveler: &leveler, + formatter: &format, + }, + } + return l, nil } @@ -69,9 +105,12 @@ func (l *Logger) Handler() slog.Handler { return l.handler } // Update re-configures the options used for the logger. func (l *Logger) Update(o Options) error { + l.bufferMut.Lock() + defer l.bufferMut.Unlock() + switch o.Format { case FormatLogfmt, FormatJSON: - // no-op + l.hasLogFormat = true default: return fmt.Errorf("unrecognized log format %q", o.Format) } @@ -85,11 +124,35 @@ func (l *Logger) Update(o Options) error { } l.writer.Set(newWriter) + // Print out the buffered logs since we determined the log format already + for _, bufferedLogChunk := range l.buffer { + if err := slogadapter.GoKit(l.handler).Log(bufferedLogChunk...); err != nil { + return err + } + } + l.buffer = nil + return nil } // Log implements log.Logger. func (l *Logger) Log(kvps ...interface{}) error { + // Buffer logs before confirming log format is configured in `logging` block + l.bufferMut.RLock() + if !l.hasLogFormat { + l.bufferMut.RUnlock() + l.bufferMut.Lock() + // Check hasLogFormat again; could have changed since the unlock. + if !l.hasLogFormat { + l.buffer = append(l.buffer, kvps) + l.bufferMut.Unlock() + return nil + } + l.bufferMut.Unlock() + } else { + l.bufferMut.RUnlock() + } + // NOTE(rfratto): this method is a temporary shim while log/slog is still // being adopted throughout the codebase. return slogadapter.GoKit(l.handler).Log(kvps...)