Skip to content

Commit

Permalink
Implementation for static traces promsdprocessor conversion (#6722)
Browse files Browse the repository at this point in the history
* static traces promsdprocessor conversion to flow mode

Signed-off-by: erikbaranowski <[email protected]>

---------

Signed-off-by: erikbaranowski <[email protected]>
  • Loading branch information
erikbaranowski authored Mar 26, 2024
1 parent 5be00e4 commit 586fcb9
Show file tree
Hide file tree
Showing 37 changed files with 318 additions and 514 deletions.
26 changes: 13 additions & 13 deletions internal/converter/internal/otelcolconvert/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"go.opentelemetry.io/collector/otelcol"
)

// componentConverter represents a converter which converts an OpenTelemetry
// ComponentConverter represents a converter which converts an OpenTelemetry
// Collector component into a Flow component.
type componentConverter interface {
type ComponentConverter interface {
// Factory should return the factory for the OpenTelemetry Collector
// component.
Factory() component.Factory
Expand All @@ -39,25 +39,25 @@ type componentConverter interface {
// ConvertAndAppend may be called more than once with the same component used
// in different pipelines. Use [state.FlowComponentLabel] to get a guaranteed
// unique Flow component label for the current state.
ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics
ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics
}

// List of component converters. This slice is appended to by init functions in
// other files.
var converters []componentConverter
var converters []ComponentConverter

// state represents the state of the conversion. The state tracks:
// State represents the State of the conversion. The State tracks:
//
// - The OpenTelemetry Collector config being converted.
// - The current OpenTelemetry Collector pipelines being converted.
// - The current OpenTelemetry Collector component being converted.
type state struct {
type State struct {
cfg *otelcol.Config // Input config.
file *builder.File // Output file.
group *pipelineGroup // Current pipeline group being converted.

// converterLookup maps a converter key to the associated converter instance.
converterLookup map[converterKey]componentConverter
converterLookup map[converterKey]ComponentConverter

// extensionLookup maps OTel extensions to Flow component IDs.
extensionLookup map[component.ID]componentID
Expand All @@ -74,18 +74,18 @@ type converterKey struct {

// Body returns the body of the file being generated. Implementations of
// [componentConverter] should use this to append components.
func (state *state) Body() *builder.Body { return state.file.Body() }
func (state *State) Body() *builder.Body { return state.file.Body() }

// FlowComponentLabel returns the unique Flow label for the OpenTelemetry
// Component component being converted. It is safe to use this label to create
// multiple Flow components in a chain.
func (state *state) FlowComponentLabel() string {
func (state *State) FlowComponentLabel() string {
return state.flowLabelForComponent(state.componentID)
}

// flowLabelForComponent returns the unique Flow label for the given
// OpenTelemetry Collector component.
func (state *state) flowLabelForComponent(c component.InstanceID) string {
func (state *State) flowLabelForComponent(c component.InstanceID) string {
const defaultLabel = "default"

// We need to prove that it's possible to statelessly compute the label for a
Expand Down Expand Up @@ -144,7 +144,7 @@ func (state *state) flowLabelForComponent(c component.InstanceID) string {

// Next returns the set of Flow component IDs for a given data type that the
// current component being converted should forward data to.
func (state *state) Next(c component.InstanceID, dataType component.DataType) []componentID {
func (state *State) Next(c component.InstanceID, dataType component.DataType) []componentID {
instances := state.nextInstances(c, dataType)

var ids []componentID
Expand Down Expand Up @@ -177,7 +177,7 @@ func (state *state) Next(c component.InstanceID, dataType component.DataType) []
return ids
}

func (state *state) nextInstances(c component.InstanceID, dataType component.DataType) []component.InstanceID {
func (state *State) nextInstances(c component.InstanceID, dataType component.DataType) []component.InstanceID {
switch dataType {
case component.DataTypeMetrics:
return state.group.NextMetrics(c)
Expand All @@ -191,7 +191,7 @@ func (state *state) nextInstances(c component.InstanceID, dataType component.Dat
}
}

func (state *state) LookupExtension(id component.ID) componentID {
func (state *State) LookupExtension(id component.ID) componentID {
cid, ok := state.extensionLookup[id]
if !ok {
panic(fmt.Sprintf("no component name found for extension %q", id.Name()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (attributesProcessorConverter) InputComponentName() string {
return "otelcol.processor.attributes"
}

func (attributesProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (attributesProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -35,14 +35,14 @@ func (attributesProcessorConverter) ConvertAndAppend(state *state, id component.

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toAttributesProcessor(state *state, id component.InstanceID, cfg *attributesprocessor.Config) *attributes.Arguments {
func toAttributesProcessor(state *State, id component.InstanceID, cfg *attributesprocessor.Config) *attributes.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextTraces = state.Next(id, component.DataTypeTraces)
Expand All @@ -53,9 +53,9 @@ func toAttributesProcessor(state *state, id component.InstanceID, cfg *attribute
Match: toMatchConfig(cfg),
Actions: toAttrActionKeyValue(encodeMapslice(cfg.Actions)),
Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces)},
Metrics: ToTokenizedConsumers(nextMetrics),
Logs: ToTokenizedConsumers(nextLogs),
Traces: ToTokenizedConsumers(nextTraces)},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (basicAuthConverterConverter) Factory() component.Factory {

func (basicAuthConverterConverter) InputComponentName() string { return "otelcol.auth.basic" }

func (basicAuthConverterConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (basicAuthConverterConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -33,7 +33,7 @@ func (basicAuthConverterConverter) ConvertAndAppend(state *state, id component.I

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (batchProcessorConverter) InputComponentName() string {
return "otelcol.processor.batch"
}

func (batchProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (batchProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -35,14 +35,14 @@ func (batchProcessorConverter) ConvertAndAppend(state *state, id component.Insta

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toBatchProcessor(state *state, id component.InstanceID, cfg *batchprocessor.Config) *batch.Arguments {
func toBatchProcessor(state *State, id component.InstanceID, cfg *batchprocessor.Config) *batch.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
Expand All @@ -56,9 +56,9 @@ func toBatchProcessor(state *state, id component.InstanceID, cfg *batchprocessor
MetadataKeys: cfg.MetadataKeys,
MetadataCardinalityLimit: cfg.MetadataCardinalityLimit,
Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
Metrics: ToTokenizedConsumers(nextMetrics),
Logs: ToTokenizedConsumers(nextLogs),
Traces: ToTokenizedConsumers(nextTraces),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (bearerTokenAuthExtensionConverter) Factory() component.Factory {

func (bearerTokenAuthExtensionConverter) InputComponentName() string { return "otelcol.auth.bearer" }

func (bearerTokenAuthExtensionConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (bearerTokenAuthExtensionConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -52,7 +52,7 @@ func (bearerTokenAuthExtensionConverter) ConvertAndAppend(state *state, id compo

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
Expand All @@ -65,7 +65,7 @@ func toBearerTokenAuthExtension(cfg *bearertokenauthextension.Config) *bearer.Ar
Token: rivertypes.Secret(string(cfg.BearerToken)),
}
}
func toBearerTokenAuthExtensionWithFilename(state *state, cfg *bearertokenauthextension.Config) (*bearer.Arguments, string) {
func toBearerTokenAuthExtensionWithFilename(state *State, cfg *bearertokenauthextension.Config) (*bearer.Arguments, string) {
label := state.FlowComponentLabel()
args := &file.Arguments{
Filename: cfg.Filename,
Expand All @@ -78,5 +78,5 @@ func toBearerTokenAuthExtensionWithFilename(state *state, cfg *bearertokenauthex

return &bearer.Arguments{
Scheme: cfg.Scheme,
}, fmt.Sprintf("%s.content", stringifyBlock(block))
}, fmt.Sprintf("%s.content", StringifyBlock(block))
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (filterProcessorConverter) InputComponentName() string {
return "otelcol.processor.filter"
}

func (filterProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (filterProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -35,14 +35,14 @@ func (filterProcessorConverter) ConvertAndAppend(state *state, id component.Inst

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toFilterProcessor(state *state, id component.InstanceID, cfg *filterprocessor.Config) *filter.Arguments {
func toFilterProcessor(state *State, id component.InstanceID, cfg *filterprocessor.Config) *filter.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
Expand All @@ -63,9 +63,9 @@ func toFilterProcessor(state *state, id component.InstanceID, cfg *filterprocess
LogRecord: cfg.Logs.LogConditions,
},
Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
Metrics: ToTokenizedConsumers(nextMetrics),
Logs: ToTokenizedConsumers(nextLogs),
Traces: ToTokenizedConsumers(nextTraces),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (headersSetterExtensionConverter) Factory() component.Factory {

func (headersSetterExtensionConverter) InputComponentName() string { return "otelcol.auth.headers" }

func (headersSetterExtensionConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (headersSetterExtensionConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -33,7 +33,7 @@ func (headersSetterExtensionConverter) ConvertAndAppend(state *state, id compone

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (tc tokenizedConsumer) RiverTokenize() []builder.Token {
}}
}

func toTokenizedConsumers(components []componentID) []otelcol.Consumer {
func ToTokenizedConsumers(components []componentID) []otelcol.Consumer {
res := make([]otelcol.Consumer, 0, len(components))

for _, component := range components {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (jaegerReceiverConverter) Factory() component.Factory { return jaegerreceiv

func (jaegerReceiverConverter) InputComponentName() string { return "" }

func (jaegerReceiverConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (jaegerReceiverConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -34,14 +34,14 @@ func (jaegerReceiverConverter) ConvertAndAppend(state *state, id component.Insta

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toJaegerReceiver(state *state, id component.InstanceID, cfg *jaegerreceiver.Config) *jaeger.Arguments {
func toJaegerReceiver(state *State, id component.InstanceID, cfg *jaegerreceiver.Config) *jaeger.Arguments {
var (
nextTraces = state.Next(id, component.DataTypeTraces)
)
Expand All @@ -57,7 +57,7 @@ func toJaegerReceiver(state *state, id component.InstanceID, cfg *jaegerreceiver
DebugMetrics: common.DefaultValue[jaeger.Arguments]().DebugMetrics,

Output: &otelcol.ConsumerArguments{
Traces: toTokenizedConsumers(nextTraces),
Traces: ToTokenizedConsumers(nextTraces),
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (jaegerRemoteSamplingExtensionConverter) InputComponentName() string {
return "otelcol.extension.jaeger_remote_sampling"
}

func (jaegerRemoteSamplingExtensionConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (jaegerRemoteSamplingExtensionConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -34,7 +34,7 @@ func (jaegerRemoteSamplingExtensionConverter) ConvertAndAppend(state *state, id

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (k8sAttributesProcessorConverter) InputComponentName() string {
return "otelcol.processor.k8sattributes"
}

func (k8sAttributesProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
func (k8sAttributesProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()
Expand All @@ -35,14 +35,14 @@ func (k8sAttributesProcessorConverter) ConvertAndAppend(state *state, id compone

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toK8SAttributesProcessor(state *state, id component.InstanceID, cfg *k8sattributesprocessor.Config) *k8sattributes.Arguments {
func toK8SAttributesProcessor(state *State, id component.InstanceID, cfg *k8sattributesprocessor.Config) *k8sattributes.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
Expand All @@ -67,9 +67,9 @@ func toK8SAttributesProcessor(state *state, id component.InstanceID, cfg *k8satt
Exclude: toExclude(cfg.Exclude),

Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
Metrics: ToTokenizedConsumers(nextMetrics),
Logs: ToTokenizedConsumers(nextLogs),
Traces: ToTokenizedConsumers(nextTraces),
},
}
}
Expand Down
Loading

0 comments on commit 586fcb9

Please sign in to comment.