Skip to content

Commit

Permalink
Add detected_level from label or existing structured metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
shantanualsi committed May 6, 2024
1 parent 4dacb2b commit 3253e68
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
20 changes: 14 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize := 0
prevTs := stream.Entries[0].Timestamp

addLogLevel := validationContext.allowStructuredMetadata && validationContext.discoverLogLevels && !hasAnyLevelLabels(lbs)
shouldDiscoverLevels := validationContext.allowStructuredMetadata && validationContext.discoverLogLevels
levelFromLabel, hasLevelLabel := hasAnyLevelLabels(lbs)
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
Expand All @@ -396,8 +397,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
if addLogLevel && !hasAnyLevelLabels(structuredMetadata) {
logLevel := detectLogLevelFromLogEntry(entry, structuredMetadata)
if shouldDiscoverLevels {
var logLevel string
if hasLevelLabel {
logLevel = levelFromLabel
} else if levelFromMetadata, ok := hasAnyLevelLabels(structuredMetadata); ok {
logLevel = levelFromMetadata
} else {
logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata)
}
entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{
Name: levelLabel,
Value: logLevel,
Expand Down Expand Up @@ -548,13 +556,13 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

func hasAnyLevelLabels(l labels.Labels) bool {
func hasAnyLevelLabels(l labels.Labels) (string, bool) {
for lbl := range allowedLabelsForLevel {
if l.Has(lbl) {
return true
return l.Get(lbl), true
}
}
return false
return "", false
}

// shardStream shards (divides) the given stream into N smaller streams, where
Expand Down
11 changes: 9 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,10 @@ func Test_DetectLogLevels(t *testing.T) {
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels)
require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 0)
sm := topVal.Streams[0].Entries[0].StructuredMetadata
require.Len(t, sm, 1)
require.Equal(t, sm[0].Name, levelLabel)
require.Equal(t, sm[0].Value, logLevelDebug)
})

t.Run("log level detection enabled but log level already present as structured metadata", func(t *testing.T) {
Expand All @@ -1620,12 +1623,16 @@ func Test_DetectLogLevels(t *testing.T) {
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels)
sm := topVal.Streams[0].Entries[0].StructuredMetadata
require.Equal(t, push.LabelsAdapter{
{
Name: "severity",
Value: logLevelWarn,
}, {
Name: levelLabel,
Value: logLevelWarn,
},
}, topVal.Streams[0].Entries[0].StructuredMetadata)
}, sm)
})
}

Expand Down

0 comments on commit 3253e68

Please sign in to comment.