From 52b31ac1312b381d589abb0193749434fed1eb9a Mon Sep 17 00:00:00 2001 From: William Dumont Date: Wed, 15 May 2024 14:15:08 +0200 Subject: [PATCH] backport spawn otel receivers on need (#6898) * backport spawn otel receivers on need * remove default value for topic --- CHANGELOG.md | 2 + .../components/otelcol.receiver.kafka.md | 12 +++- .../component/otelcol/receiver/kafka/kafka.go | 24 ++++++- .../otelcol/receiver/kafka/kafka_test.go | 31 +++++++-- .../component/otelcol/receiver/receiver.go | 46 +++++++------ .../otelcol/receiver/receiver_test.go | 68 +++++++++++++++++++ .../otelcolconvert/testdata/kafka.river | 1 - 7 files changed, 155 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fe8b60d47181..ecadb4f5dc58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,8 @@ Main (unreleased) - Fix a bug with the logs pipeline in static mode which prevented it from shutting down cleanly. +- Fix a bug where a topic was claimed by the wrong consumer type in `otelcol.receiver.kafka`. (@wildum) + ### Other changes - Clustering for Grafana Agent in Flow mode has graduated from beta to stable. diff --git a/docs/sources/flow/reference/components/otelcol.receiver.kafka.md b/docs/sources/flow/reference/components/otelcol.receiver.kafka.md index 042db0227a92..47d8a6305a20 100644 --- a/docs/sources/flow/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/flow/reference/components/otelcol.receiver.kafka.md @@ -45,13 +45,23 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `brokers` | `array(string)` | Kafka brokers to connect to. | | yes `protocol_version` | `string` | Kafka protocol version to use. | | yes -`topic` | `string` | Kafka topic to read from. | `"otlp_spans"` | no +`topic` | `string` | Kafka topic to read from. | | no `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no `client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no `initial_offset` | `string` | Initial offset to use if no offset was previously committed. | `"latest"` | no `resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no +If `topic` is not set, different topics will be used for different telemetry signals: + +* Metrics will be received from an `otlp_metrics` topic. +* Traces will be received from an `otlp_spans` topic. +* Logs will be received from an `otlp_logs` topic. + +If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block. +For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces. +If it contains only metrics, then `otelcol.receiver.kafka` should be configured to output only metrics. + The `encoding` argument determines how to decode messages read from Kafka. `encoding` must be one of the following strings: diff --git a/internal/component/otelcol/receiver/kafka/kafka.go b/internal/component/otelcol/receiver/kafka/kafka.go index 529b6c5b2edd..2b0c76143129 100644 --- a/internal/component/otelcol/receiver/kafka/kafka.go +++ b/internal/component/otelcol/receiver/kafka/kafka.go @@ -2,6 +2,8 @@ package kafka import ( + "fmt" + "strings" "time" "github.com/grafana/agent/internal/component" @@ -63,7 +65,6 @@ func (args *Arguments) SetToDefault() { // for compatibility, even though that means using a client and group ID of // "otel-collector". - Topic: "otlp_spans", Encoding: "otlp_proto", Brokers: []string{"localhost:9092"}, ClientID: "otel-collector", @@ -77,6 +78,27 @@ func (args *Arguments) SetToDefault() { args.DebugMetrics.SetToDefault() } +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + var signals []string + + if len(args.Topic) > 0 { + if len(args.Output.Logs) > 0 { + signals = append(signals, "logs") + } + if len(args.Output.Metrics) > 0 { + signals = append(signals, "metrics") + } + if len(args.Output.Traces) > 0 { + signals = append(signals, "traces") + } + if len(signals) > 1 { + return fmt.Errorf("only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: %s", strings.Join(signals, ", ")) + } + } + return nil +} + // Convert implements receiver.Arguments. func (args Arguments) Convert() (otelcomponent.Config, error) { input := make(map[string]interface{}) diff --git a/internal/component/otelcol/receiver/kafka/kafka_test.go b/internal/component/otelcol/receiver/kafka/kafka_test.go index c03c8a63efaf..e1dfcdefdeb5 100644 --- a/internal/component/otelcol/receiver/kafka/kafka_test.go +++ b/internal/component/otelcol/receiver/kafka/kafka_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/grafana/agent/internal/component/otelcol" + "github.com/grafana/agent/internal/component/otelcol/internal/fakeconsumer" "github.com/grafana/agent/internal/component/otelcol/receiver/kafka" "github.com/grafana/river" "github.com/mitchellh/mapstructure" @@ -29,7 +30,6 @@ func TestArguments_UnmarshalRiver(t *testing.T) { expected: kafkareceiver.Config{ Brokers: []string{"10.10.10.10:9092"}, ProtocolVersion: "2.0.0", - Topic: "otlp_spans", Encoding: "otlp_proto", GroupID: "otel-collector", ClientID: "otel-collector", @@ -153,7 +153,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -205,7 +204,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -263,7 +261,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -320,7 +317,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -433,3 +429,28 @@ func TestDebugMetricsConfig(t *testing.T) { }) } } + +func TestArguments_Validate(t *testing.T) { + cfg := ` + brokers = ["10.10.10.10:9092"] + protocol_version = "2.0.0" + topic = "traces" + output { + } + ` + var args kafka.Arguments + require.NoError(t, river.Unmarshal([]byte(cfg), &args)) + + // Adding two traces consumer, expect no error + args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{}) + args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{}) + require.NoError(t, args.Validate()) + + // Adding another signal type + args.Output.Logs = append(args.Output.Logs, &fakeconsumer.Consumer{}) + require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, traces") + + // Adding another signal type + args.Output.Metrics = append(args.Output.Metrics, &fakeconsumer.Consumer{}) + require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, metrics, traces") +} diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 13c52ccfd112..3007f0cf8a7d 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -149,36 +149,40 @@ func (r *Receiver) Update(args component.Arguments) error { return err } - var ( - next = rargs.NextConsumers() - nextTraces = fanoutconsumer.Traces(next.Traces) - nextMetrics = fanoutconsumer.Metrics(next.Metrics) - nextLogs = fanoutconsumer.Logs(next.Logs) - ) + next := rargs.NextConsumers() // Create instances of the receiver from our factory for each of our // supported telemetry signals. var components []otelcomponent.Component - tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces) - if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { - return err - } else if tracesReceiver != nil { - components = append(components, tracesReceiver) + if len(next.Traces) > 0 { + nextTraces := fanoutconsumer.Traces(next.Traces) + tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if tracesReceiver != nil { + components = append(components, tracesReceiver) + } } - metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics) - if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { - return err - } else if metricsReceiver != nil { - components = append(components, metricsReceiver) + if len(next.Metrics) > 0 { + nextMetrics := fanoutconsumer.Metrics(next.Metrics) + metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if metricsReceiver != nil { + components = append(components, metricsReceiver) + } } - logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs) - if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { - return err - } else if logsReceiver != nil { - components = append(components, logsReceiver) + if len(next.Logs) > 0 { + nextLogs := fanoutconsumer.Logs(next.Logs) + logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if logsReceiver != nil { + components = append(components, logsReceiver) + } } // Schedule the components to run once our component is running. diff --git a/internal/component/otelcol/receiver/receiver_test.go b/internal/component/otelcol/receiver/receiver_test.go index 989b40fa9ffa..b626f7b2ff1b 100644 --- a/internal/component/otelcol/receiver/receiver_test.go +++ b/internal/component/otelcol/receiver/receiver_test.go @@ -57,6 +57,74 @@ func TestReceiver(t *testing.T) { require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked") } +func TestReceiverNotStarted(t *testing.T) { + var ( + waitConsumerTrigger = util.NewWaitTrigger() + onTracesConsumer = func(t otelconsumer.Traces) { + waitConsumerTrigger.Trigger() + } + ) + te := newTestEnvironment(t, onTracesConsumer) + te.Start(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + + // Check that no trace receiver was started because it's not needed by the output. + require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded") +} + +func TestReceiverUpdate(t *testing.T) { + var ( + consumer otelconsumer.Traces + + waitConsumerTrigger = util.NewWaitTrigger() + onTracesConsumer = func(t otelconsumer.Traces) { + consumer = t + waitConsumerTrigger.Trigger() + } + + waitTracesTrigger = util.NewWaitTrigger() + nextConsumer = &fakeconsumer.Consumer{ + ConsumeTracesFunc: func(context.Context, ptrace.Traces) error { + waitTracesTrigger.Trigger() + return nil + }, + } + ) + + te := newTestEnvironment(t, onTracesConsumer) + te.Start(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + + // Check that no trace receiver was started because it's not needed by the output. + require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded") + + te.Controller.Update(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{ + Traces: []otelcol.Consumer{nextConsumer}, + }, + }) + + // Now the trace receiver is started. + require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no traces consumer sent") + + err := consumer.ConsumeTraces(context.Background(), ptrace.NewTraces()) + require.NoError(t, err) + + require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked") + + waitConsumerTrigger = util.NewWaitTrigger() + + // Remove the trace receiver. + te.Controller.Update(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + + // Check that after the update no trace receiver is started. + require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded") +} + type testEnvironment struct { t *testing.T diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.river b/internal/converter/internal/otelcolconvert/testdata/kafka.river index c2f11b594b98..b98eabaf2f76 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.river +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.river @@ -1,7 +1,6 @@ otelcol.receiver.kafka "default" { brokers = ["broker:9092"] protocol_version = "2.0.0" - topic = "" authentication { plaintext {