Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: enable service detection for otlp endoint #14036

Merged
merged 14 commits into from
Sep 4, 2024
25 changes: 20 additions & 5 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRe
return nil, nil, err
}

req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), tracker, stats)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats)
return req, stats, nil
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -111,12 +111,13 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
res := rls.At(i).Resource()
resAttrs := res.Attributes()

if v, ok := resAttrs.Get(attrServiceName); !ok || v.AsString() == "" {
resAttrs.PutStr(attrServiceName, "unknown_service")
}
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size

hasServiceName := false
if v, ok := resAttrs.Get(attrServiceName); ok && v.AsString() != "" {
hasServiceName = true
}
resAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForResourceAttribute(k)
if action == Drop {
Expand All @@ -127,6 +128,16 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)

if !hasServiceName && len(discoverServiceName) > 0 && !stats.IsAggregatedMetric {
for _, serviceLabel := range discoverServiceName {
if lbl.Name == serviceLabel {
streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(lbl.Value)
hasServiceName = true
break
}
}
}
}
} else if action == StructuredMetadata {
resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...)
Expand All @@ -135,6 +146,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten
return true
})

if !hasServiceName && len(discoverServiceName) > 0 && !stats.IsAggregatedMetric {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The len(discoverServiceName) > 0 guard is another way of turning of service detection, I guess? Would it make sense to add a new var somewhere before this line like shouldDisdoverService := len(discoverServiceName) > 0 and use that also here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created the the variable and reused where I could, not sure if the line #s in your links were off, but it I don't think this can replace the hasServiceName logic, I think we need both

streamLabels[model.LabelName(LabelServiceName)] = model.LabelValue(ServiceUnknown)
}

if err := streamLabels.Validate(); err != nil {
stats.Errs = append(stats.Errs, fmt.Errorf("invalid labels: %w", err))
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
stats := newPushStats()
tracker := NewMockTracker()
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tracker, stats)
pushReq := otlpToLokiPushRequest(context.Background(), tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, []string{}, tracker, stats)
require.Equal(t, tc.expectedPushRequest, *pushReq)
require.Equal(t, tc.expectedStats, *stats)

Expand Down
116 changes: 113 additions & 3 deletions pkg/loghttp/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/grafana/dskit/flagext"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)

Expand Down Expand Up @@ -256,7 +259,7 @@
}

tracker := NewMockTracker()
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{test.enableServiceDiscovery}, ParseLokiRequest, tracker)
data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{enabled: test.enableServiceDiscovery}, ParseLokiRequest, tracker)

structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived
Expand Down Expand Up @@ -314,19 +317,122 @@
}
}

func Test_ServiceDetction(t *testing.T) {
trevorwhitney marked this conversation as resolved.
Show resolved Hide resolved
tracker := NewMockTracker()

t.Run("detects service from loki push requests", func(t *testing.T) {
body := `{"streams": [{ "stream": { "foo": "bar" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`
request := httptest.NewRequest(
"POST",
`/loki/api/v1/push`,
strings.NewReader(body),
)
request.Header.Add("Content-Type", "application/json")

limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
data, err := ParseRequest(util_log.Logger, "fake", request, nil, limits, ParseLokiRequest, tracker)

require.NoError(t, err)
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})

t.Run("detects service from OTLP push requests using default indexing", func(t *testing.T) {
now := time.Unix(0, time.Now().UnixNano())

tracker := NewMockTracker()

ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("k8s.job.name", "bar")
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body")
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano()))

jsonMarshaller := plog.JSONMarshaler{}
body, err := jsonMarshaller.MarshalLogs(ld)

require.NoError(t, err)
request := httptest.NewRequest(
"POST",
`/otlp/v1/logs`,
bytes.NewReader(body),
)
request.Header.Add("Content-Type", "application/json")

limits := &fakeLimits{enabled: true}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})

t.Run("detects service from OTLP push requests using custom indexing", func(t *testing.T) {
now := time.Unix(0, time.Now().UnixNano())

tracker := NewMockTracker()

ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("special", "sauce")
ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body")
ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(now.UnixNano()))

jsonMarshaller := plog.JSONMarshaler{}
body, err := jsonMarshaller.MarshalLogs(ld)

require.NoError(t, err)
request := httptest.NewRequest(
"POST",
`/otlp/v1/logs`,
bytes.NewReader(body),
)
request.Header.Add("Content-Type", "application/json")

limits := &fakeLimits{
enabled: true,
labels: []string{"special"},
indexAttributes: []string{"special"},
}
data, err := ParseRequest(util_log.Logger, "fake", request, limits, limits, ParseOTLPRequest, tracker)
require.NoError(t, err)
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
})
}

type fakeLimits struct {
enabled bool
enabled bool
labels []string
indexAttributes []string
}

func (f *fakeLimits) RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration {

Check warning on line 404 in pkg/loghttp/push/push_test.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'userID' seems to be unused, consider removing or renaming it as _ (revive)
return time.Hour
}

func (l *fakeLimits) OTLPConfig(_ string) OTLPConfig {
return OTLPConfig{}
if len(l.indexAttributes) > 0 {
return OTLPConfig{
ResourceAttributes: ResourceAttributesConfig{
AttributesConfig: []AttributesConfig{
{
Action: IndexLabel,
Attributes: l.indexAttributes,
},
},
},
}
}

defaultGlobalOTLPConfig := GlobalOTLPConfig{}
flagext.DefaultValues(&defaultGlobalOTLPConfig)
return DefaultOTLPConfig(defaultGlobalOTLPConfig)
}

func (l *fakeLimits) DiscoverServiceName(_ string) []string {
if !l.enabled {
return nil
}

if len(l.labels) > 0 {
return l.labels
}

return []string{
"service",
"app",
Expand All @@ -335,12 +441,16 @@
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}
}

// RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration
trevorwhitney marked this conversation as resolved.
Show resolved Hide resolved

type MockCustomTracker struct {
receivedBytes map[string]float64
discardedBytes map[string]float64
Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,11 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
"app_kubernetes_io_name",
"container",
"container_name",
"k8s_container_name",
"component",
"workload",
"job",
"k8s_job_name",
}
f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.")
f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).")
Expand Down
Loading