Skip to content

Commit

Permalink
feat: Make list of otel blessed attributes configurable (#12180)
Browse files Browse the repository at this point in the history
(cherry picked from commit 8b9d8b0)
  • Loading branch information
sandeepsukhani committed Mar 13, 2024
1 parent a96cadd commit 86a1527
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 80 deletions.
2 changes: 2 additions & 0 deletions cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func main() {
os.Exit(1)
}

// Set the global OTLP config which is needed in per tenant otlp config
config.LimitsConfig.SetGlobalOTLPConfig(config.Distributor.OTLPConfig)
// This global is set to the config passed into the last call to `NewOverrides`. If we don't
// call it atleast once, the defaults are set to an empty struct.
// We call it with the flag values so that the config file unmarshalling only overrides the values set in the config.
Expand Down
8 changes: 7 additions & 1 deletion docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,11 @@ write_failures_logging:
# logged or not. Default: false.
# CLI flag: -distributor.write-failures-logging.add-insights-label
[add_insights_label: <boolean> | default = false]
otlp_config:
# List of default otlp resource attributes to be picked as index labels
# CLI flag: -distributor.otlp.default_resource_attributes_as_index_labels
[default_resource_attributes_as_index_labels: <list of strings> | default = [service.name service.namespace service.instance.id deployment.environment cloud.region cloud.availability_zone k8s.cluster.name k8s.namespace.name k8s.pod.name k8s.container.name container.name k8s.replicaset.name k8s.deployment.name k8s.statefulset.name k8s.daemonset.name k8s.cronjob.name k8s.job.name]]
```

### querier
Expand Down Expand Up @@ -3235,7 +3240,8 @@ otlp_config:
# Configuration for resource attributes to store them as index labels or
# Structured Metadata or drop them altogether
resource_attributes:
# Configure whether to ignore the default list of resource attributes to be
# Configure whether to ignore the default list of resource attributes set in
# 'distributor.otlp.default_resource_attributes_as_index_labels' to be
# stored as index labels and only use the given resource attributes config
[ignore_defaults: <boolean> | default = false]

Expand Down
5 changes: 3 additions & 2 deletions docs/sources/send-data/otel/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ otlp_config:
# Configuration for Resource Attributes to store them as index labels or
# Structured Metadata or drop them altogether
resource_attributes:
# Configure whether to ignore the default list of Resource Attributes to be
# stored as Index Labels and only use the given Resource Attributes config
# Configure whether to ignore the default list of resource attributes set in
# 'distributor.otlp.default_resource_attributes_as_index_labels' to be
# stored as index labels and only use the given resource attributes config
[ignore_defaults: <boolean>]
[attributes_config: <list of attributes_configs>]
Expand Down
3 changes: 3 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ type Config struct {

// WriteFailuresLoggingCfg customizes write failures logging behavior.
WriteFailuresLogging writefailures.Cfg `yaml:"write_failures_logging" doc:"description=Experimental. Customize the logging of write failures."`

OTLPConfig push.GlobalOTLPConfig `yaml:"otlp_config"`
}

// RegisterFlags registers distributor-related flags.
func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
cfg.OTLPConfig.RegisterFlags(fs)
cfg.DistributorRing.RegisterFlags(fs)
cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs)
cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs)
Expand Down
10 changes: 1 addition & 9 deletions pkg/loghttp/push/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ const (
attrServiceName = "service.name"
)

var blessedAttributesNormalized = make([]string, len(blessedAttributes))

func init() {
for i := range blessedAttributes {
blessedAttributesNormalized[i] = prometheustranslator.NormalizeLabel(blessedAttributes[i])
}
}

func newPushStats() *Stats {
return &Stats{
LogLinesBytes: map[time.Duration]int64{},
Expand Down Expand Up @@ -118,7 +110,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants
resAttrs.PutStr(attrServiceName, "unknown_service")
}
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, len(blessedAttributesNormalized))
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size

resAttrs.Range(func(k string, v pcommon.Value) bool {
action := otlpConfig.ActionForResourceAttribute(k)
Expand Down
97 changes: 48 additions & 49 deletions pkg/loghttp/push/otlp_config.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
package push

import (
"flag"
"fmt"

"github.com/grafana/dskit/flagext"
"github.com/prometheus/prometheus/model/relabel"
)

var blessedAttributes = []string{
"service.name",
"service.namespace",
"service.instance.id",
"deployment.environment",
"cloud.region",
"cloud.availability_zone",
"k8s.cluster.name",
"k8s.namespace.name",
"k8s.pod.name",
"k8s.container.name",
"container.name",
"k8s.replicaset.name",
"k8s.deployment.name",
"k8s.statefulset.name",
"k8s.daemonset.name",
"k8s.cronjob.name",
"k8s.job.name",
}

// Action is the action to be performed on OTLP Resource Attribute.
type Action string

Expand All @@ -44,15 +26,17 @@ var (
errAttributesAndRegexBothSet = fmt.Errorf("only one of attributes or regex must be set")
)

var DefaultOTLPConfig = OTLPConfig{
ResourceAttributes: ResourceAttributesConfig{
AttributesConfig: []AttributesConfig{
{
Action: IndexLabel,
Attributes: blessedAttributes,
func DefaultOTLPConfig(cfg GlobalOTLPConfig) OTLPConfig {
return OTLPConfig{
ResourceAttributes: ResourceAttributesConfig{
AttributesConfig: []AttributesConfig{
{
Action: IndexLabel,
Attributes: cfg.DefaultOTLPResourceAttributesAsIndexLabels,
},
},
},
},
}
}

type OTLPConfig struct {
Expand All @@ -61,14 +45,44 @@ type OTLPConfig struct {
LogAttributes []AttributesConfig `yaml:"log_attributes,omitempty" doc:"description=Configuration for log attributes to store them as Structured Metadata or drop them altogether"`
}

func (c *OTLPConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultOTLPConfig
type plain OTLPConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
type GlobalOTLPConfig struct {
DefaultOTLPResourceAttributesAsIndexLabels []string `yaml:"default_resource_attributes_as_index_labels"`
}

// RegisterFlags registers distributor-related flags.
func (cfg *GlobalOTLPConfig) RegisterFlags(fs *flag.FlagSet) {
cfg.DefaultOTLPResourceAttributesAsIndexLabels = []string{
"service.name",
"service.namespace",
"service.instance.id",
"deployment.environment",
"cloud.region",
"cloud.availability_zone",
"k8s.cluster.name",
"k8s.namespace.name",
"k8s.pod.name",
"k8s.container.name",
"container.name",
"k8s.replicaset.name",
"k8s.deployment.name",
"k8s.statefulset.name",
"k8s.daemonset.name",
"k8s.cronjob.name",
"k8s.job.name",
}
fs.Var((*flagext.StringSlice)(&cfg.DefaultOTLPResourceAttributesAsIndexLabels), "distributor.otlp.default_resource_attributes_as_index_labels", "List of default otlp resource attributes to be picked as index labels")
}

return nil
// ApplyGlobalOTLPConfig applies global otlp config, specifically DefaultOTLPResourceAttributesAsIndexLabels for the start.
func (c *OTLPConfig) ApplyGlobalOTLPConfig(config GlobalOTLPConfig) {
if !c.ResourceAttributes.IgnoreDefaults && len(config.DefaultOTLPResourceAttributesAsIndexLabels) != 0 {
c.ResourceAttributes.AttributesConfig = append([]AttributesConfig{
{
Action: IndexLabel,
Attributes: config.DefaultOTLPResourceAttributesAsIndexLabels,
},
}, c.ResourceAttributes.AttributesConfig...)
}
}

func (c *OTLPConfig) actionForAttribute(attribute string, cfgs []AttributesConfig) Action {
Expand Down Expand Up @@ -146,21 +160,6 @@ func (c *AttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) erro
}

type ResourceAttributesConfig struct {
IgnoreDefaults bool `yaml:"ignore_defaults,omitempty" doc:"default=false|description=Configure whether to ignore the default list of resource attributes to be stored as index labels and only use the given resource attributes config"`
IgnoreDefaults bool `yaml:"ignore_defaults,omitempty" doc:"default=false|description=Configure whether to ignore the default list of resource attributes set in 'distributor.otlp.default_resource_attributes_as_index_labels' to be stored as index labels and only use the given resource attributes config"`
AttributesConfig []AttributesConfig `yaml:"attributes_config,omitempty"`
}

func (c *ResourceAttributesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type plain ResourceAttributesConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}

if !c.IgnoreDefaults {
c.AttributesConfig = append([]AttributesConfig{
DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0],
}, c.AttributesConfig...)
}

return nil
}
16 changes: 12 additions & 4 deletions pkg/loghttp/push/otlp_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@ package push
import (
"testing"

"github.com/grafana/dskit/flagext"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)

var defaultGlobalOTLPConfig = GlobalOTLPConfig{}

func init() {
flagext.DefaultValues(&defaultGlobalOTLPConfig)
}

func TestUnmarshalOTLPConfig(t *testing.T) {
for _, tc := range []struct {
name string
Expand All @@ -25,7 +32,7 @@ resource_attributes:
expectedCfg: OTLPConfig{
ResourceAttributes: ResourceAttributesConfig{
AttributesConfig: []AttributesConfig{
DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0],
DefaultOTLPConfig(defaultGlobalOTLPConfig).ResourceAttributes.AttributesConfig[0],
{
Action: IndexLabel,
Regex: relabel.MustNewRegexp("foo"),
Expand Down Expand Up @@ -66,7 +73,7 @@ scope_attributes:
AttributesConfig: []AttributesConfig{
{
Action: IndexLabel,
Attributes: blessedAttributes,
Attributes: defaultGlobalOTLPConfig.DefaultOTLPResourceAttributesAsIndexLabels,
},
},
},
Expand Down Expand Up @@ -96,7 +103,7 @@ log_attributes:
expectedCfg: OTLPConfig{
ResourceAttributes: ResourceAttributesConfig{
AttributesConfig: []AttributesConfig{
DefaultOTLPConfig.ResourceAttributes.AttributesConfig[0],
DefaultOTLPConfig(defaultGlobalOTLPConfig).ResourceAttributes.AttributesConfig[0],
{
Action: IndexLabel,
Regex: relabel.MustNewRegexp("foo"),
Expand Down Expand Up @@ -151,6 +158,7 @@ log_attributes:
require.ErrorIs(t, err, tc.expectedErr)
return
}
cfg.ApplyGlobalOTLPConfig(defaultGlobalOTLPConfig)
require.Equal(t, tc.expectedCfg, cfg)
})
}
Expand All @@ -171,7 +179,7 @@ func TestOTLPConfig(t *testing.T) {
}{
{
name: "default OTLPConfig",
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
resAttrs: []attrAndExpAction{
{
attr: attrServiceName,
Expand Down
16 changes: 8 additions & 8 deletions pkg/loghttp/push/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedPushRequest: logproto.PushRequest{},
expectedStats: *newPushStats(),
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
},
{
name: "resource with no logs",
Expand All @@ -45,11 +45,11 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
expectedPushRequest: logproto.PushRequest{},
expectedStats: *newPushStats(),
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
},
{
name: "resource with a log entry",
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
generateLogs: func() plog.Logs {
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("service.name", "service-1")
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
{
name: "no resource attributes defined",
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
generateLogs: func() plog.Logs {
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty()
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
{
name: "service.name not defined in resource attributes",
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
tracker: NewMockTracker(),
generateLogs: func() plog.Logs {
ld := plog.NewLogs()
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
{
name: "resource attributes and scope attributes stored as structured metadata",
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
generateLogs: func() plog.Logs {
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty()
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
{
name: "attributes with nested data",
otlpConfig: DefaultOTLPConfig,
otlpConfig: DefaultOTLPConfig(defaultGlobalOTLPConfig),
generateLogs: func() plog.Logs {
ld := plog.NewLogs()
ld.ResourceLogs().AppendEmpty()
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestOTLPLogToPushEntry(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig))
require.Equal(t, tc.expectedResp, otlpLogToPushEntry(tc.buildLogRecord(), DefaultOTLPConfig(defaultGlobalOTLPConfig)))
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/loghttp/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Limits interface {
type EmptyLimits struct{}

func (EmptyLimits) OTLPConfig(string) OTLPConfig {
return DefaultOTLPConfig
return DefaultOTLPConfig(GlobalOTLPConfig{})
}

type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error)
Expand Down
Loading

0 comments on commit 86a1527

Please sign in to comment.