Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(flow): ensure format consistency for all emitted logs #5992

Merged
merged 10 commits into from
Feb 21, 2024
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ Main (unreleased)
- Fix an issue with static integrations-next marshaling where non singletons
would cause `/-/config` to fail to marshal. (@erikbaranowski)

- Fix an issue where agent logs are emitted before the logging format
is correctly determined. (@hainenber)

- Fix divide-by-zero issue when sharding targets. (@hainenber)

- Fix bug where custom headers were not actually being set in loki client. (@captncraig)
Expand Down
3 changes: 2 additions & 1 deletion cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (fr *flowRun) Run(configPath string) error {
return fmt.Errorf("path argument not provided")
}

l, err := logging.New(os.Stderr, logging.DefaultOptions)
// Buffer logs until log format has been determined
l, err := logging.NewDeferred(os.Stderr)
if err != nil {
return fmt.Errorf("building logger: %w", err)
}
Expand Down
65 changes: 64 additions & 1 deletion pkg/flow/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type EnabledAware interface {
type Logger struct {
inner io.Writer // Writer passed to New.

bufferMut sync.RWMutex
buffer [][]interface{} // Store logs before correctly determine the log format
hasLogFormat bool // Confirmation whether log format has been determined

level *slog.LevelVar // Current configured level.
format *formatVar // Current configured format.
writer *writerVar // Current configured multiwriter (inner + write_to).
Expand All @@ -47,6 +51,9 @@ func New(w io.Writer, o Options) (*Logger, error) {
l := &Logger{
inner: w,

buffer: [][]interface{}{},
hasLogFormat: false,

level: &leveler,
format: &format,
writer: &writer,
Expand All @@ -60,6 +67,35 @@ func New(w io.Writer, o Options) (*Logger, error) {
if err := l.Update(o); err != nil {
return nil, err
}
wildum marked this conversation as resolved.
Show resolved Hide resolved

return l, nil
}

// NewDeferred creates a new logger with the default log level and format.
// The logger is not updated during initialization.
func NewDeferred(w io.Writer) (*Logger, error) {
var (
leveler slog.LevelVar
format formatVar
writer writerVar
)

l := &Logger{
inner: w,

buffer: [][]interface{}{},
hasLogFormat: false,

level: &leveler,
format: &format,
writer: &writer,
handler: &handler{
w: &writer,
leveler: &leveler,
formatter: &format,
},
}

return l, nil
}

Expand All @@ -69,9 +105,12 @@ func (l *Logger) Handler() slog.Handler { return l.handler }

// Update re-configures the options used for the logger.
func (l *Logger) Update(o Options) error {
l.bufferMut.Lock()
defer l.bufferMut.Unlock()

switch o.Format {
case FormatLogfmt, FormatJSON:
// no-op
l.hasLogFormat = true
default:
return fmt.Errorf("unrecognized log format %q", o.Format)
}
Expand All @@ -85,11 +124,35 @@ func (l *Logger) Update(o Options) error {
}
l.writer.Set(newWriter)

// Print out the buffered logs since we determined the log format already
for _, bufferedLogChunk := range l.buffer {
if err := slogadapter.GoKit(l.handler).Log(bufferedLogChunk...); err != nil {
return err
}
}
wildum marked this conversation as resolved.
Show resolved Hide resolved
l.buffer = nil

return nil
}

// Log implements log.Logger.
func (l *Logger) Log(kvps ...interface{}) error {
// Buffer logs before confirming log format is configured in `logging` block
l.bufferMut.RLock()
if !l.hasLogFormat {
l.bufferMut.RUnlock()
l.bufferMut.Lock()
// Check hasLogFormat again; could have changed since the unlock.
if !l.hasLogFormat {
l.buffer = append(l.buffer, kvps)
l.bufferMut.Unlock()
return nil
}
l.bufferMut.Unlock()
} else {
l.bufferMut.RUnlock()
}

// NOTE(rfratto): this method is a temporary shim while log/slog is still
// being adopted throughout the codebase.
return slogadapter.GoKit(l.handler).Log(kvps...)
Expand Down
Loading