Skip to content

Commit

Permalink
Pass in a label prefix from static to otelcolconvert so multiple trac… (
Browse files Browse the repository at this point in the history
#6700)

* Pass in a label prefix from static to otelcolconvert so multiple trace configs don't bump into each other

Signed-off-by: erikbaranowski <[email protected]>

* only add a label prefix if we have more than one trace config

Signed-off-by: erikbaranowski <[email protected]>

---------

Signed-off-by: erikbaranowski <[email protected]>
  • Loading branch information
erikbaranowski authored Mar 14, 2024
1 parent b04dbf9 commit 0ed62e5
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 15 deletions.
16 changes: 11 additions & 5 deletions internal/converter/internal/otelcolconvert/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ type state struct {
// extensionLookup maps OTel extensions to Flow component IDs.
extensionLookup map[component.ID]componentID

componentID component.InstanceID // ID of the current component being converted.
componentConfig component.Config // Config of the current component being converted.
componentID component.InstanceID // ID of the current component being converted.
componentConfig component.Config // Config of the current component being converted.
componentLabelPrefix string // Prefix for the label of the current component being converted.
}

type converterKey struct {
Expand Down Expand Up @@ -120,9 +121,13 @@ func (state *state) flowLabelForComponent(c component.InstanceID) string {
//
// Otherwise, we'll replace empty group and component names with "default"
// and concatenate them with an underscore.
unsanitizedLabel := state.componentLabelPrefix
if unsanitizedLabel != "" {
unsanitizedLabel += "_"
}
switch {
case groupName == "" && componentName == "":
return defaultLabel
unsanitizedLabel += defaultLabel

default:
if groupName == "" {
Expand All @@ -131,9 +136,10 @@ func (state *state) flowLabelForComponent(c component.InstanceID) string {
if componentName == "" {
componentName = defaultLabel
}
identifier := fmt.Sprintf("%s_%s", groupName, componentName)
return common.SanitizeIdentifierPanics(identifier)
unsanitizedLabel += fmt.Sprintf("%s_%s", groupName, componentName)
}

return common.SanitizeIdentifierPanics(unsanitizedLabel)
}

// Next returns the set of Flow component IDs for a given data type that the
Expand Down
14 changes: 8 additions & 6 deletions internal/converter/internal/otelcolconvert/otelcolconvert.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func Convert(in []byte, extraArgs []string) ([]byte, diag.Diagnostics) {

f := builder.NewFile()

diags.AddAll(AppendConfig(f, cfg))
diags.AddAll(AppendConfig(f, cfg, ""))
diags.AddAll(common.ValidateNodes(f))

var buf bytes.Buffer
Expand Down Expand Up @@ -143,7 +143,7 @@ func getFactories() otelcol.Factories {

// AppendConfig converts the provided OpenTelemetry config into an equivalent
// Flow config and appends the result to the provided file.
func AppendConfig(file *builder.File, cfg *otelcol.Config) diag.Diagnostics {
func AppendConfig(file *builder.File, cfg *otelcol.Config, labelPrefix string) diag.Diagnostics {
var diags diag.Diagnostics

groups, err := createPipelineGroups(cfg.Service.Pipelines)
Expand Down Expand Up @@ -198,8 +198,9 @@ func AppendConfig(file *builder.File, cfg *otelcol.Config) diag.Diagnostics {

converterLookup: converterTable,

componentConfig: cfg.Extensions,
componentID: cid,
componentConfig: cfg.Extensions,
componentID: cid,
componentLabelPrefix: labelPrefix,
}

key := converterKey{Kind: component.KindExtension, Type: ext.Type()}
Expand Down Expand Up @@ -244,8 +245,9 @@ func AppendConfig(file *builder.File, cfg *otelcol.Config) diag.Diagnostics {
converterLookup: converterTable,
extensionLookup: extensionTable,

componentConfig: componentSet.configLookup[id],
componentID: componentID,
componentConfig: componentSet.configLookup[id],
componentID: componentID,
componentLabelPrefix: labelPrefix,
}

key := converterKey{Kind: componentSet.kind, Type: id.Type()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,19 @@ func (b *ConfigBuilder) appendTraces() {

for _, cfg := range b.cfg.Traces.Configs {
otelCfg, err := cfg.OtelConfig()
removeReceiver(otelCfg, "traces", "push_receiver")

if err != nil {
b.diags.Add(diag.SeverityLevelCritical, fmt.Sprintf("failed to load otelConfig from agent traces config: %s", err))
continue
}
// TODO: what prefix should each generated flow label get? each trace instance will need something unique
b.diags.AddAll(otelcolconvert.AppendConfig(b.f, otelCfg))

removeReceiver(otelCfg, "traces", "push_receiver")

// Let's only prefix things if we are doing more than 1 trace config
labelPrefix := ""
if len(b.cfg.Traces.Configs) > 1 {
labelPrefix = cfg.Name
}
b.diags.AddAll(otelcolconvert.AppendConfig(b.f, otelCfg, labelPrefix))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(Warning) Please review your agent command line flags and ensure they are set in your Flow mode config file where necessary.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
otelcol.receiver.otlp "trace_config_1_default" {
grpc {
include_metadata = true
}

http { }

output {
metrics = []
logs = []
traces = [otelcol.processor.attributes.trace_config_1_default.input]
}
}

otelcol.processor.attributes "trace_config_1_default" {
action {
key = "db.table"
action = "delete"
}

output {
metrics = []
logs = []
traces = [otelcol.exporter.otlp.trace_config_1_default_0.input]
}
}

otelcol.exporter.otlp "trace_config_1_default_0" {
sending_queue {
queue_size = 1000
}

retry_on_failure {
max_elapsed_time = "1m0s"
}

client {
endpoint = "http://localhost:1234/write"
balancer_name = ""
}
}

otelcol.receiver.otlp "trace_config_2_default" {
grpc {
include_metadata = true
}

http { }

output {
metrics = []
logs = []
traces = [otelcol.processor.attributes.trace_config_2_default.input]
}
}

otelcol.processor.attributes "trace_config_2_default" {
action {
key = "redacted_span"
value = true
action = "upsert"
}

output {
metrics = []
logs = []
traces = [otelcol.exporter.otlp.trace_config_2_default_0.input]
}
}

otelcol.exporter.otlp "trace_config_2_default_0" {
sending_queue {
queue_size = 1000
}

retry_on_failure {
max_elapsed_time = "1m0s"
}

client {
endpoint = "http://localhost:1234/write"
balancer_name = ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
traces:
configs:
- name: trace_config_1
receivers:
otlp:
protocols:
grpc:
http:
remote_write:
- endpoint: http://localhost:1234/write
attributes:
actions:
- key: db.table
action: delete
- name: trace_config_2
receivers:
otlp:
protocols:
grpc:
http:
remote_write:
- endpoint: http://localhost:1234/write
attributes:
actions:
- key: redacted_span
value: true
action: upsert

0 comments on commit 0ed62e5

Please sign in to comment.