Skip to content

Commit

Permalink
Prepare cherry picks for v0.37.3 (#5604)
Browse files Browse the repository at this point in the history
* go get google.golang.org/[email protected] (#5511)

(cherry picked from commit 1d11d37)

* Upgrade Agent to Collector 0.87 (#5529)

* Upgrade Agent to Collector 0.87

* Parametrize the OTel version in docs.

* Document another batch processor metric

* Don't accept routing keys for metrics.

* Add tests for otelcol.receiver.kafka

---------

Co-authored-by: Mischa Thompson <[email protected]>
(cherry picked from commit b307c02)

* prometheus.remote_write: fix missing series ref mapping for native histogram (#5517)

Signed-off-by: György Krajcsovits <[email protected]>
Co-authored-by: Paschalis Tsilias <[email protected]>
(cherry picked from commit 5794224)

* pkg/metrics/wal: drop out-of-order exemplars (#5580)

Signed-off-by: Paschalis Tsilias <[email protected]>
(cherry picked from commit cd9d185)

* remote.vault: respect value of `namespace` argument (#5582)

Unfortunately, this setting can't be easily tested because Vault
namespaces are an enterprise-only feature.

Co-authored-by: Paschalis Tsilias <[email protected]>
(cherry picked from commit c0a52bc)

* reorganize prometheus converter code and limit surface area of dependencies between converters (#5406)

Signed-off-by: erikbaranowski <[email protected]>
(cherry picked from commit 5d39145)

* Clearer converter diagnostics (#5505)

* Update a bunch of converter messages and use standardized functions where possible for validation.

* default promtail log level from static converter so warnings don't show

* update migration doc output to match current behaviour

* refactor validation helper functions

Signed-off-by: erikbaranowski <[email protected]>

---------

Signed-off-by: erikbaranowski <[email protected]>
(cherry picked from commit 06c6792)

* wire up the agent integration for the static converter (#5545)

* wire up the agent integration for the static converter

Signed-off-by: erikbaranowski <[email protected]>

---------

Signed-off-by: erikbaranowski <[email protected]>
(cherry picked from commit f4e6ac4)

* converters: Default with localhost in static targets when none provided (#5546)

* promtail: Default with localhost in static targets in converter

* Add tests and changelog

* prettify

* feedback

(cherry picked from commit 8850660)

---------

Co-authored-by: Mischa Thompson <[email protected]>
Co-authored-by: Paulin Todev <[email protected]>
Co-authored-by: George Krajcsovits <[email protected]>
Co-authored-by: Robert Fratto <[email protected]>
Co-authored-by: Erik Baranowski <[email protected]>
Co-authored-by: Piotr <[email protected]>
  • Loading branch information
7 people authored Oct 26, 2023
1 parent e11be62 commit 8df2731
Show file tree
Hide file tree
Showing 123 changed files with 2,447 additions and 1,347 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ v0.37.2 (2023-10-16)
- Fix an issue with the static to flow converter for blackbox exporter modules
config not being included in the river output. (@erikbaranowski)

- Fix issue with default values in `discovery.nomad`. (@marctc)

### Enhancements

- Update Prometheus dependency to v2.47.2. (@tpaschalis)
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ func (a *Auth) Update(args component.Arguments) error {

TracerProvider: a.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (p *Connector) Update(args component.Arguments) error {

TracerProvider: p.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
2 changes: 1 addition & 1 deletion component/otelcol/connector/spanmetrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
// The unit is a private type in an internal Otel package,
// so we need to convert it to a map and then back to the internal type.
// ConvertMetricUnit matches the Unit type in this internal package:
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.85.0/connector/spanmetricsconnector/internal/metrics/unit.go
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.87.0/connector/spanmetricsconnector/internal/metrics/unit.go
func ConvertMetricUnit(unit string) (map[string]interface{}, error) {
switch unit {
case MetricsUnitMilliseconds:
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (e *Exporter) Update(args component.Arguments) error {

TracerProvider: e.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metricOpts...),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
18 changes: 18 additions & 0 deletions component/otelcol/exporter/loadbalancing/loadbalancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package loadbalancing

import (
"fmt"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -48,6 +49,7 @@ type Arguments struct {
var (
_ exporter.Arguments = Arguments{}
_ river.Defaulter = &Arguments{}
_ river.Validator = &Arguments{}
)

var (
Expand All @@ -72,6 +74,22 @@ func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
//TODO(ptodev): Add support for "resource" and "metric" routing keys later.
// The reason we can't add them yet is that otelcol.exporter.loadbalancing
// is labeled as "beta", but those routing keys are experimental.
// We need a way to label otelcol.exporter.loadbalancing as "beta"
// for logs and traces, but "experimental" for metrics.
switch args.RoutingKey {
case "service", "traceID":
// The routing key is valid.
default:
return fmt.Errorf("invalid routing key %q", args.RoutingKey)
}
return nil
}

// Convert implements exporter.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &loadbalancingexporter.Config{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func (e *Extension) Update(args component.Arguments) error {

TracerProvider: e.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
4 changes: 4 additions & 0 deletions component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (p *Processor) Update(args component.Arguments) error {

TracerProvider: p.opts.Tracer,
MeterProvider: metric.NewMeterProvider(metric.WithReader(promExporter)),

ReportComponentStatus: func(*otelcomponent.StatusEvent) error {
return nil
},
},

BuildInfo: otelcomponent.BuildInfo{
Expand Down
125 changes: 77 additions & 48 deletions component/otelcol/receiver/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/agent/component/otelcol/receiver"
otel_service "github.com/grafana/agent/service/otel"
"github.com/grafana/river/rivertypes"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
otelcomponent "go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -38,10 +39,11 @@ type Arguments struct {
ClientID string `river:"client_id,attr,optional"`
InitialOffset string `river:"initial_offset,attr,optional"`

Authentication AuthenticationArguments `river:"authentication,block,optional"`
Metadata MetadataArguments `river:"metadata,block,optional"`
AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`
MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`
Authentication AuthenticationArguments `river:"authentication,block,optional"`
Metadata MetadataArguments `river:"metadata,block,optional"`
AutoCommit AutoCommitArguments `river:"autocommit,block,optional"`
MessageMarking MessageMarkingArguments `river:"message_marking,block,optional"`
HeaderExtraction HeaderExtraction `river:"header_extraction,block,optional"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcol.DebugMetricsArguments `river:"debug_metrics,block,optional"`
Expand Down Expand Up @@ -79,6 +81,10 @@ var DefaultArguments = Arguments{
AfterExecution: false,
IncludeUnsuccessful: false,
},
HeaderExtraction: HeaderExtraction{
ExtractHeaders: false,
Headers: []string{},
},
}

// SetToDefault implements river.Defaulter.
Expand All @@ -88,20 +94,28 @@ func (args *Arguments) SetToDefault() {

// Convert implements receiver.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &kafkareceiver.Config{
Brokers: args.Brokers,
ProtocolVersion: args.ProtocolVersion,
Topic: args.Topic,
Encoding: args.Encoding,
GroupID: args.GroupID,
ClientID: args.ClientID,
InitialOffset: args.InitialOffset,

Authentication: args.Authentication.Convert(),
Metadata: args.Metadata.Convert(),
AutoCommit: args.AutoCommit.Convert(),
MessageMarking: args.MessageMarking.Convert(),
}, nil
input := make(map[string]interface{})
input["auth"] = args.Authentication.Convert()

var result kafkareceiver.Config
err := mapstructure.Decode(input, &result)
if err != nil {
return nil, err
}

result.Brokers = args.Brokers
result.ProtocolVersion = args.ProtocolVersion
result.Topic = args.Topic
result.Encoding = args.Encoding
result.GroupID = args.GroupID
result.ClientID = args.ClientID
result.InitialOffset = args.InitialOffset
result.Metadata = args.Metadata.Convert()
result.AutoCommit = args.AutoCommit.Convert()
result.MessageMarking = args.MessageMarking.Convert()
result.HeaderExtraction = args.HeaderExtraction.Convert()

return &result, nil
}

// Extensions implements receiver.Arguments.
Expand All @@ -128,26 +142,26 @@ type AuthenticationArguments struct {
}

// Convert converts args into the upstream type.
func (args AuthenticationArguments) Convert() kafkaexporter.Authentication {
var res kafkaexporter.Authentication
func (args AuthenticationArguments) Convert() map[string]interface{} {
auth := make(map[string]interface{})

if args.Plaintext != nil {
conv := args.Plaintext.Convert()
res.PlainText = &conv
auth["plain_text"] = &conv
}
if args.SASL != nil {
conv := args.SASL.Convert()
res.SASL = &conv
auth["sasl"] = &conv
}
if args.TLS != nil {
res.TLS = args.TLS.Convert()
auth["tls"] = args.TLS.Convert()
}
if args.Kerberos != nil {
conv := args.Kerberos.Convert()
res.Kerberos = &conv
auth["kerberos"] = &conv
}

return res
return auth
}

// PlaintextArguments configures plaintext authentication against the Kafka
Expand All @@ -158,10 +172,10 @@ type PlaintextArguments struct {
}

// Convert converts args into the upstream type.
func (args PlaintextArguments) Convert() kafkaexporter.PlainTextConfig {
return kafkaexporter.PlainTextConfig{
Username: args.Username,
Password: string(args.Password),
func (args PlaintextArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"username": args.Username,
"password": string(args.Password),
}
}

Expand All @@ -170,16 +184,18 @@ type SASLArguments struct {
Username string `river:"username,attr"`
Password rivertypes.Secret `river:"password,attr"`
Mechanism string `river:"mechanism,attr"`
Version int `river:"version,attr,optional"`
AWSMSK AWSMSKArguments `river:"aws_msk,block,optional"`
}

// Convert converts args into the upstream type.
func (args SASLArguments) Convert() kafkaexporter.SASLConfig {
return kafkaexporter.SASLConfig{
Username: args.Username,
Password: string(args.Password),
Mechanism: args.Mechanism,
AWSMSK: args.AWSMSK.Convert(),
func (args SASLArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"username": args.Username,
"password": string(args.Password),
"mechanism": args.Mechanism,
"version": args.Version,
"aws_msk": args.AWSMSK.Convert(),
}
}

Expand All @@ -191,10 +207,10 @@ type AWSMSKArguments struct {
}

// Convert converts args into the upstream type.
func (args AWSMSKArguments) Convert() kafkaexporter.AWSMSKConfig {
return kafkaexporter.AWSMSKConfig{
Region: args.Region,
BrokerAddr: args.BrokerAddr,
func (args AWSMSKArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"region": args.Region,
"broker_addr": args.BrokerAddr,
}
}

Expand All @@ -211,15 +227,15 @@ type KerberosArguments struct {
}

// Convert converts args into the upstream type.
func (args KerberosArguments) Convert() kafkaexporter.KerberosConfig {
return kafkaexporter.KerberosConfig{
ServiceName: args.ServiceName,
Realm: args.Realm,
UseKeyTab: args.UseKeyTab,
Username: args.Username,
Password: string(args.Password),
ConfigPath: args.ConfigPath,
KeyTabPath: args.KeyTabPath,
func (args KerberosArguments) Convert() map[string]interface{} {
return map[string]interface{}{
"service_name": args.ServiceName,
"realm": args.Realm,
"use_keytab": args.UseKeyTab,
"username": args.Username,
"password": string(args.Password),
"config_file": args.ConfigPath,
"keytab_file": args.KeyTabPath,
}
}

Expand Down Expand Up @@ -283,6 +299,19 @@ func (args MessageMarkingArguments) Convert() kafkareceiver.MessageMarking {
}
}

type HeaderExtraction struct {
ExtractHeaders bool `river:"extract_headers,attr,optional"`
Headers []string `river:"headers,attr,optional"`
}

// Convert converts HeaderExtraction into the upstream type.
func (h HeaderExtraction) Convert() kafkareceiver.HeaderExtraction {
return kafkareceiver.HeaderExtraction{
ExtractHeaders: h.ExtractHeaders,
Headers: h.Headers,
}
}

// DebugMetricsConfig implements receiver.Arguments.
func (args Arguments) DebugMetricsConfig() otelcol.DebugMetricsArguments {
return args.DebugMetrics
Expand Down
Loading

0 comments on commit 8df2731

Please sign in to comment.