Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otel: add support for per tenant configuration for mapping otlp data to loki format #11143

Merged
merged 7 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, push.ParseLokiRequest)
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, pushRequestParser)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand Down
2 changes: 2 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/loki/pkg/compactor/retention"
"github.com/grafana/loki/pkg/distributor/shardstreams"
"github.com/grafana/loki/pkg/loghttp/push"
)

// Limits is an interface for distributor limits/related configs
Expand All @@ -30,4 +31,5 @@ type Limits interface {
AllowStructuredMetadata(userID string) bool
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
}
142 changes: 82 additions & 60 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,9 @@ import (
const (
pbContentType = "application/x-protobuf"
gzipContentEncoding = "gzip"
attrServiceName = "service.name"
)

var blessedAttributes = []string{
"service.name",
"service.namespace",
"service.instance.id",
"deployment.environment",
"cloud.region",
"cloud.availability_zone",
"k8s.cluster.name",
"k8s.namespace.name",
"k8s.pod.name",
"k8s.container.name",
"container.name",
"k8s.replicaset.name",
"k8s.deployment.name",
"k8s.statefulset.name",
"k8s.daemonset.name",
"k8s.cronjob.name",
"k8s.job.name",
}

var blessedAttributesNormalized = make([]string, len(blessedAttributes))

func init() {
Expand All @@ -62,14 +43,14 @@ func newPushStats() *Stats {
}
}

func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention) (*logproto.PushRequest, *Stats, error) {
func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) {
stats := newPushStats()
otlpLogs, err := extractLogs(r, stats)
if err != nil {
return nil, nil, err
}

req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, stats)
req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), stats)
return req, stats, nil
}

Expand Down Expand Up @@ -120,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) {
return req.Logs(), nil
}

func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, stats *Stats) *logproto.PushRequest {
func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, stats *Stats) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
Expand All @@ -131,39 +112,38 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for i := 0; i < rls.Len(); i++ {
sls := rls.At(i).ScopeLogs()
res := rls.At(i).Resource()
resAttrs := res.Attributes()

flattenedResourceAttributes := labels.NewBuilder(logproto.FromLabelAdaptersToLabels(attributesToLabels(res.Attributes(), "")))
// service.name is a required Resource Attribute. If it is not present, we will set it to "unknown_service".
if flattenedResourceAttributes.Get("service_name") == "" {
flattenedResourceAttributes = flattenedResourceAttributes.Set("service_name", "unknown_service")
if v, _ := resAttrs.Get(attrServiceName); v.AsString() == "" {
resAttrs.PutStr(attrServiceName, "unknown_service")
}
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized))

if dac := res.DroppedAttributesCount(); dac != 0 {
flattenedResourceAttributes = flattenedResourceAttributes.Set("resource_dropped_attributes_count", fmt.Sprintf("%d", dac))
}
resAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForResourceAttribute(k)
if action == Drop {
return true
}

// copy blessed attributes to stream labels
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized))
for _, ba := range blessedAttributesNormalized {
v := flattenedResourceAttributes.Get(ba)
if v == "" {
continue
attributeAsLabels := attributeToLabels(k, v, "")
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
} else if action == StructuredMetadata {
resourceAttributesAsStructuredMetadata = append(resourceAttributesAsStructuredMetadata, attributeAsLabels...)
}
streamLabels[model.LabelName(ba)] = model.LabelValue(v)

// remove the blessed attributes copied to stream labels
flattenedResourceAttributes.Del(ba)
}
return true
})

if err := streamLabels.Validate(); err != nil {
stats.errs = append(stats.errs, fmt.Errorf("invalid labels: %w", err))
continue
}
labelsStr := streamLabels.String()

// convert the remaining resource attributes to structured metadata
resourceAttributesAsStructuredMetadata := logproto.FromLabelsToLabelAdapters(flattenedResourceAttributes.Labels())

lbs := modelLabelsSetToLabelsList(streamLabels)
if _, ok := pushRequestsByStream[labelsStr]; !ok {
pushRequestsByStream[labelsStr] = logproto.Stream{
Expand All @@ -178,6 +158,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for j := 0; j < sls.Len(); j++ {
scope := sls.At(j).Scope()
logs := sls.At(j).LogRecords()
scopeAttrs := scope.Attributes()

// it would be rare to have multiple scopes so if the entries slice is empty, pre-allocate it for the number of log entries
if cap(pushRequestsByStream[labelsStr].Entries) == 0 {
Expand All @@ -187,7 +168,20 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
}

// use fields and attributes from scope as structured metadata
scopeAttributesAsStructuredMetadata := attributesToLabels(scope.Attributes(), "")
scopeAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, scopeAttrs.Len()+3)
scopeAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForScopeAttribute(k)
if action == Drop {
return true
}

attributeAsLabels := attributeToLabels(k, v, "")
if action == StructuredMetadata {
scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, attributeAsLabels...)
}

return true
})

if scopeName := scope.Name(); scopeName != "" {
scopeAttributesAsStructuredMetadata = append(scopeAttributesAsStructuredMetadata, push.LabelAdapter{
Expand All @@ -213,7 +207,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)

entry := otlpLogToPushEntry(log)
entry := otlpLogToPushEntry(log, otlpConfig)

// if entry.StructuredMetadata doesn't have capacity to add resource and scope attributes, make a new slice with enough capacity
attributesAsStructuredMetadataLen := len(resourceAttributesAsStructuredMetadata) + len(scopeAttributesAsStructuredMetadata)
Expand Down Expand Up @@ -251,9 +245,23 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
}

// otlpLogToPushEntry converts an OTLP log record to a Loki push.Entry.
func otlpLogToPushEntry(log plog.LogRecord) push.Entry {
func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig) push.Entry {
// copy log attributes and all the fields from log(except log.Body) to structured metadata
structuredMetadata := attributesToLabels(log.Attributes(), "")
logAttrs := log.Attributes()
structuredMetadata := make(push.LabelsAdapter, 0, logAttrs.Len()+7)
logAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForLogAttribute(k)
if action == Drop {
return true
}

attributeAsLabels := attributeToLabels(k, v, "")
if action == StructuredMetadata {
structuredMetadata = append(structuredMetadata, attributeAsLabels...)
}

return true
})

// if log.Timestamp() is 0, we would have already stored log.ObservedTimestamp as log timestamp so no need to store again in structured metadata
if log.Timestamp() != 0 && log.ObservedTimestamp() != 0 {
Expand Down Expand Up @@ -316,25 +324,39 @@ func attributesToLabels(attrs pcommon.Map, prefix string) push.LabelsAdapter {
}

attrs.Range(func(k string, v pcommon.Value) bool {
keyWithPrefix := k
if prefix != "" {
keyWithPrefix = prefix + "_" + k
}
keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix)

typ := v.Type()
if typ == pcommon.ValueTypeMap {
labelsAdapter = append(labelsAdapter, attributesToLabels(v.Map(), keyWithPrefix)...)
} else {
labelsAdapter = append(labelsAdapter, push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()})
}

labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, prefix)...)
return true
})

return labelsAdapter
}

func attributeToLabels(k string, v pcommon.Value, prefix string) push.LabelsAdapter {
var labelsAdapter push.LabelsAdapter

keyWithPrefix := k
if prefix != "" {
keyWithPrefix = prefix + "_" + k
}
keyWithPrefix = prometheustranslator.NormalizeLabel(keyWithPrefix)

typ := v.Type()
if typ == pcommon.ValueTypeMap {
mv := v.Map()
labelsAdapter = make(push.LabelsAdapter, 0, mv.Len())
mv.Range(func(k string, v pcommon.Value) bool {
labelsAdapter = append(labelsAdapter, attributeToLabels(k, v, keyWithPrefix)...)
return true
})
} else {
labelsAdapter = push.LabelsAdapter{
push.LabelAdapter{Name: keyWithPrefix, Value: v.AsString()},
}
}

return labelsAdapter
}

func timestampFromLogRecord(lr plog.LogRecord) time.Time {
if lr.Timestamp() != 0 {
return time.Unix(0, int64(lr.Timestamp()))
Expand Down
Loading
Loading