Skip to content

Commit

Permalink
fix(flow): ensure format consistency for all emitted logs (#5992)
Browse files Browse the repository at this point in the history
* fix(flow): ensure format consistency for all emitted logs

Signed-off-by: hainenber <[email protected]>

* fix(flow/log): address reviews from core Agent contributors

Signed-off-by: hainenber <[email protected]>

* fix(flow/log): update initialized logger in tests

Signed-off-by: hainenber <[email protected]>

* fix(flow/log): revert to vanilla mutex

Signed-off-by: hainenber <[email protected]>

* chore(flow/log): reduce duplicate code in test cases

Signed-off-by: hainenber <[email protected]>

* fixflow/log): implement rfratto's approach for a performant logger

Signed-off-by: hainenber <[email protected]>

* fix(flow/log): revert changes to `New()` + usage of `NewDeferred` for main logger

Signed-off-by: hainenber <[email protected]>

* chore(flow/log): refactor func to remove unused param

Signed-off-by: hainenber <[email protected]>

* fix(flow/log): add missing unlock

Signed-off-by: hainenber <[email protected]>

---------

Signed-off-by: hainenber <[email protected]>
  • Loading branch information
hainenber authored Feb 21, 2024
1 parent acdde1c commit 91ea77d
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ Main (unreleased)
- Fix an issue with static integrations-next marshaling where non singletons
would cause `/-/config` to fail to marshal. (@erikbaranowski)

- Fix an issue where agent logs are emitted before the logging format
is correctly determined. (@hainenber)

- Fix divide-by-zero issue when sharding targets. (@hainenber)

- Fix bug where custom headers were not actually being set in loki client. (@captncraig)
Expand Down
3 changes: 2 additions & 1 deletion cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ func (fr *flowRun) Run(configPath string) error {
return fmt.Errorf("path argument not provided")
}

l, err := logging.New(os.Stderr, logging.DefaultOptions)
// Buffer logs until log format has been determined
l, err := logging.NewDeferred(os.Stderr)
if err != nil {
return fmt.Errorf("building logger: %w", err)
}
Expand Down
65 changes: 64 additions & 1 deletion pkg/flow/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type EnabledAware interface {
type Logger struct {
inner io.Writer // Writer passed to New.

bufferMut sync.RWMutex
buffer [][]interface{} // Store logs before correctly determine the log format
hasLogFormat bool // Confirmation whether log format has been determined

level *slog.LevelVar // Current configured level.
format *formatVar // Current configured format.
writer *writerVar // Current configured multiwriter (inner + write_to).
Expand All @@ -47,6 +51,9 @@ func New(w io.Writer, o Options) (*Logger, error) {
l := &Logger{
inner: w,

buffer: [][]interface{}{},
hasLogFormat: false,

level: &leveler,
format: &format,
writer: &writer,
Expand All @@ -60,6 +67,35 @@ func New(w io.Writer, o Options) (*Logger, error) {
if err := l.Update(o); err != nil {
return nil, err
}

return l, nil
}

// NewDeferred creates a new logger with the default log level and format.
// The logger is not updated during initialization.
func NewDeferred(w io.Writer) (*Logger, error) {
var (
leveler slog.LevelVar
format formatVar
writer writerVar
)

l := &Logger{
inner: w,

buffer: [][]interface{}{},
hasLogFormat: false,

level: &leveler,
format: &format,
writer: &writer,
handler: &handler{
w: &writer,
leveler: &leveler,
formatter: &format,
},
}

return l, nil
}

Expand All @@ -69,9 +105,12 @@ func (l *Logger) Handler() slog.Handler { return l.handler }

// Update re-configures the options used for the logger.
func (l *Logger) Update(o Options) error {
l.bufferMut.Lock()
defer l.bufferMut.Unlock()

switch o.Format {
case FormatLogfmt, FormatJSON:
// no-op
l.hasLogFormat = true
default:
return fmt.Errorf("unrecognized log format %q", o.Format)
}
Expand All @@ -85,11 +124,35 @@ func (l *Logger) Update(o Options) error {
}
l.writer.Set(newWriter)

// Print out the buffered logs since we determined the log format already
for _, bufferedLogChunk := range l.buffer {
if err := slogadapter.GoKit(l.handler).Log(bufferedLogChunk...); err != nil {
return err
}
}
l.buffer = nil

return nil
}

// Log implements log.Logger.
func (l *Logger) Log(kvps ...interface{}) error {
// Buffer logs before confirming log format is configured in `logging` block
l.bufferMut.RLock()
if !l.hasLogFormat {
l.bufferMut.RUnlock()
l.bufferMut.Lock()
// Check hasLogFormat again; could have changed since the unlock.
if !l.hasLogFormat {
l.buffer = append(l.buffer, kvps)
l.bufferMut.Unlock()
return nil
}
l.bufferMut.Unlock()
} else {
l.bufferMut.RUnlock()
}

// NOTE(rfratto): this method is a temporary shim while log/slog is still
// being adopted throughout the codebase.
return slogadapter.GoKit(l.handler).Log(kvps...)
Expand Down

0 comments on commit 91ea77d

Please sign in to comment.