Skip to content

Commit

Permalink
feat: add support for discovering and adding log levels as structured…
Browse files Browse the repository at this point in the history
… metadata (#12428)
  • Loading branch information
sandeepsukhani authored Apr 2, 2024
1 parent 71602eb commit b66c343
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 1 deletion.
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -validation.discover-service-name
[discover_service_name: <list of strings> | default = [service app application name app_kubernetes_io_name container container_name component workload job]]

# Discover and add log levels during ingestion, if not present already. Levels
# would be added to Structured Metadata with name 'level' and one of the values
# from 'debug', 'info', 'warn', 'error', 'critical', 'fatal'.
# CLI flag: -validation.discover-log-levels
[discover_log_levels: <boolean> | default = false]

# Maximum number of active streams per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-streams-per-user
[max_streams_per_user: <int> | default = 0]
Expand Down
70 changes: 70 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/prometheus/prometheus/model/labels"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/grpc/codes"

"github.com/grafana/dskit/httpgrpc"
Expand Down Expand Up @@ -57,6 +58,13 @@ const (

labelServiceName = "service_name"
serviceUnknown = "unknown_service"
labelLevel = "level"
logLevelDebug = "debug"
logLevelInfo = "info"
logLevelWarn = "warn"
logLevelError = "error"
logLevelFatal = "fatal"
logLevelCritical = "critical"
)

var (
Expand Down Expand Up @@ -367,13 +375,22 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
n := 0
pushSize := 0
prevTs := stream.Entries[0].Timestamp
addLogLevel := validationContext.allowStructuredMetadata && validationContext.discoverLogLevels && !lbs.Has(labelLevel)
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
}

structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
if addLogLevel && !structuredMetadata.Has(labelLevel) {
logLevel := detectLogLevelFromLogEntry(entry, structuredMetadata)
entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{
Name: labelLevel,
Value: logLevel,
})
}
stream.Entries[n] = entry

// If configured for this tenant, increment duplicate timestamps. Note, this is imperfect
Expand Down Expand Up @@ -838,3 +855,56 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l
func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string {
// otlp logs have a severity number, using which we are defining the log levels.
// Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" {
otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt)
if err != nil {
return logLevelInfo
}
if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) {
return logLevelDebug
} else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) {
return logLevelInfo
} else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) {
return logLevelWarn
} else if otlpSeverityNumber <= int(plog.SeverityNumberError4) {
return logLevelError
} else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) {
return logLevelFatal
}
return logLevelInfo
}

return extractLogLevelFromLogLine(entry.Line)
}

func extractLogLevelFromLogLine(log string) string {
if strings.Contains(log, `:"err"`) || strings.Contains(log, `:"ERR"`) ||
strings.Contains(log, "=err") || strings.Contains(log, "=ERR") ||
strings.Contains(log, "err:") || strings.Contains(log, "ERR:") ||
strings.Contains(log, "error") || strings.Contains(log, "ERROR") {
return logLevelError
}
if strings.Contains(log, `:"warn"`) || strings.Contains(log, `:"WARN"`) ||
strings.Contains(log, "=warn") || strings.Contains(log, "=WARN") ||
strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") ||
strings.Contains(log, "warning") || strings.Contains(log, "WARNING") {
return logLevelWarn
}
if strings.Contains(log, `:"critical"`) || strings.Contains(log, `:"CRITICAL"`) ||
strings.Contains(log, "=critical") || strings.Contains(log, "=CRITICAL") ||
strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") {
return logLevelCritical
}
if strings.Contains(log, `:"debug"`) || strings.Contains(log, `:"DEBUG"`) ||
strings.Contains(log, "=debug") || strings.Contains(log, "=DEBUG") ||
strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") {
return logLevelDebug
}

// Default to info if no specific level is found
return logLevelInfo
}
146 changes: 146 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/plog"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/ingester/client"
loghttp_push "github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/pkg/runtime"
"github.com/grafana/loki/pkg/util/constants"
fe "github.com/grafana/loki/pkg/util/flagext"
Expand Down Expand Up @@ -1491,3 +1494,146 @@ func TestDistributorTee(t *testing.T) {
require.Equal(t, "test", tee.tenant)
}
}

func Test_DetectLogLevels(t *testing.T) {
setup := func(discoverLogLevels bool) (*validation.Limits, *mockIngester) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

limits.DiscoverLogLevels = discoverLogLevels
limits.DiscoverServiceName = nil
limits.AllowStructuredMetadata = true
return limits, &mockIngester{}
}

t.Run("log level detection disabled", func(t *testing.T) {
limits, ingester := setup(false)
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
_, err := distributors[0].Push(ctx, writeReq)
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels)
require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 0)
})

t.Run("log level detection enabled", func(t *testing.T) {
limits, ingester := setup(true)
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
_, err := distributors[0].Push(ctx, writeReq)
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels)
require.Equal(t, push.LabelsAdapter{
{
Name: labelLevel,
Value: logLevelInfo,
},
}, topVal.Streams[0].Entries[0].StructuredMetadata)
})

t.Run("log level detection enabled but log level already present in stream", func(t *testing.T) {
limits, ingester := setup(true)
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`})
_, err := distributors[0].Push(ctx, writeReq)
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels)
require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 0)
})

t.Run("log level detection enabled but log level already present as structured metadata", func(t *testing.T) {
limits, ingester := setup(true)
distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })

writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`})
writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{
{
Name: labelLevel,
Value: logLevelWarn,
},
}
_, err := distributors[0].Push(ctx, writeReq)
require.NoError(t, err)
topVal := ingester.Peek()
require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels)
require.Equal(t, push.LabelsAdapter{
{
Name: labelLevel,
Value: logLevelWarn,
},
}, topVal.Streams[0].Entries[0].StructuredMetadata)
})
}

func Test_detectLogLevelFromLogEntry(t *testing.T) {
for _, tc := range []struct {
name string
entry logproto.Entry
expectedLogLevel string
}{
{
name: "use severity number from otlp logs",
entry: logproto.Entry{
Line: "error",
StructuredMetadata: push.LabelsAdapter{
{
Name: loghttp_push.OTLPSeverityNumber,
Value: fmt.Sprintf("%d", plog.SeverityNumberDebug3),
},
},
},
expectedLogLevel: logLevelDebug,
},
{
name: "invalid severity number should not cause any issues",
entry: logproto.Entry{
StructuredMetadata: push.LabelsAdapter{
{
Name: loghttp_push.OTLPSeverityNumber,
Value: "foo",
},
},
},
expectedLogLevel: logLevelInfo,
},
{
name: "non otlp without any of the log level keywords in log line",
entry: logproto.Entry{
Line: "foo",
},
expectedLogLevel: logLevelInfo,
},
{
name: "non otlp with log level keywords in log line",
entry: logproto.Entry{
Line: "this is a warning log",
},
expectedLogLevel: logLevelWarn,
},
{
name: "json log line with an error",
entry: logproto.Entry{
Line: `{"foo":"bar","level":"error"}`,
},
expectedLogLevel: logLevelError,
},
{
name: "logfmt log line with a warn",
entry: logproto.Entry{
Line: `foo=bar level=warn`,
},
expectedLogLevel: logLevelWarn,
},
} {
t.Run(tc.name, func(t *testing.T) {
detectedLogLevel := detectLogLevelFromLogEntry(tc.entry, logproto.FromLabelAdaptersToLabels(tc.entry.StructuredMetadata))
require.Equal(t, tc.expectedLogLevel, detectedLogLevel)
})
}
}
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Limits interface {

IncrementDuplicateTimestamps(userID string) bool
DiscoverServiceName(userID string) []string
DiscoverLogLevels(userID string) bool

ShardStreams(userID string) *shardstreams.Config
IngestionRateStrategy() string
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type validationContext struct {

incrementDuplicateTimestamps bool
discoverServiceName []string
discoverLogLevels bool

allowStructuredMetadata bool
maxStructuredMetadataSize int
Expand All @@ -65,6 +66,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
maxLabelValueLength: v.MaxLabelValueLength(userID),
incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID),
discoverServiceName: v.DiscoverServiceName(userID),
discoverLogLevels: v.DiscoverLogLevels(userID),
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
Expand Down
4 changes: 3 additions & 1 deletion pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
pbContentType = "application/x-protobuf"
gzipContentEncoding = "gzip"
attrServiceName = "service.name"

OTLPSeverityNumber = "severity_number"
)

func newPushStats() *Stats {
Expand Down Expand Up @@ -287,7 +289,7 @@ func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig) push.Entry {

if severityNum := log.SeverityNumber(); severityNum != plog.SeverityNumberUnspecified {
structuredMetadata = append(structuredMetadata, push.LabelAdapter{
Name: "severity_number",
Name: OTLPSeverityNumber,
Value: fmt.Sprintf("%d", severityNum),
})
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Limits struct {
MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"`
IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"`
DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"`
DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"`

// Ingester enforced limits.
MaxLocalStreamsPerUser int `yaml:"max_streams_per_user" json:"max_streams_per_user"`
Expand Down Expand Up @@ -254,6 +255,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
"job",
}
f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.")
f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", false, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name 'level' and one of the values from 'debug', 'info', 'warn', 'error', 'critical', 'fatal'.")

_ = l.RejectOldSamplesMaxAge.Set("7d")
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.")
Expand Down Expand Up @@ -915,6 +917,10 @@ func (o *Overrides) DiscoverServiceName(userID string) []string {
return o.getOverridesForUser(userID).DiscoverServiceName
}

func (o *Overrides) DiscoverLogLevels(userID string) bool {
return o.getOverridesForUser(userID).DiscoverLogLevels
}

// VolumeEnabled returns whether volume endpoints are enabled for a user.
func (o *Overrides) VolumeEnabled(userID string) bool {
return o.getOverridesForUser(userID).VolumeEnabled
Expand Down

0 comments on commit b66c343

Please sign in to comment.