From 5c773d2b721a3cec4988a2bbad796c44377e5970 Mon Sep 17 00:00:00 2001 From: Tiago Rossi <34634082+tiagorossig@users.noreply.github.com> Date: Tue, 1 Oct 2024 20:32:47 -0400 Subject: [PATCH] Support for TLS client settings for clustering (#1724) * add support for TLS in cluster transport communications * add docs and changelog * Apply suggestions from code review docs review Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * add clientAuth to tls config * revert last commit * remove pin, go get ckit@latest-commit, go mod tidy * spacing * use %s for formatting string in fmt.Errorf --------- Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- CHANGELOG.md | 2 + docs/sources/reference/cli/run.md | 8 +++ go.mod | 4 +- go.sum | 8 +-- internal/alloycli/cluster_builder.go | 10 ++++ internal/alloycli/cmd_run.go | 20 +++++++ internal/service/cluster/cluster.go | 82 ++++++++++++++++++++++------ 7 files changed, 112 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4511af790d..7eeef37d3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,8 @@ Main (unreleased) - Changed OTEL alerts in Alloy mixin to use success rate for tracing. (@thampiotr) +- Support TLS client settings for clustering (@tiagorossig) + v1.4.1 ----------------- diff --git a/docs/sources/reference/cli/run.md b/docs/sources/reference/cli/run.md index 6c36739280..5b0340ec92 100644 --- a/docs/sources/reference/cli/run.md +++ b/docs/sources/reference/cli/run.md @@ -52,6 +52,11 @@ The following flags are supported: * `--cluster.max-join-peers`: Number of peers to join from the discovered set (default `5`). * `--cluster.name`: Name to prevent nodes without this identifier from joining the cluster (default `""`). * `--cluster.use-discovery-v1`: Use the older, v1 version of cluster peer discovery mechanism (default `false`). Note that this flag will be deprecated in the future and eventually removed. +* `--cluster.enable-tls`: Specifies whether TLS should be used for communication between peers (default `false`). +* `--cluster.tls-ca-path`: Path to the CA certificate file used for peer communication over TLS. +* `--cluster.tls-cert-path`: Path to the certificate file used for peer communication over TLS. +* `--cluster.tls-key-path`: Path to the key file used for peer communication over TLS. +* `--cluster.tls-server-name`: Server name used for peer communication over TLS. * `--config.format`: The format of the source file. Supported formats: `alloy`, `otelcol`, `prometheus`, `promtail`, `static` (default `"alloy"`). * `--config.bypass-conversion-errors`: Enable bypassing errors when converting (default `false`). * `--config.extra-args`: Extra arguments from the original format used by the converter. @@ -112,6 +117,9 @@ The comma-separated list of addresses provided in `--cluster.join-addresses` can In both cases, the port number can be specified with a `:` suffix. If ports are not provided, default of the port used for the HTTP listener is used. If you do not provide the port number explicitly, you must ensure that all instances use the same port for the HTTP listener. +The `--cluster.enable-tls` flag can be set to enable TLS for peer-to-peer communications. +Additional arguments are required to configure the TLS client, including the CA certificate, the TLS certificate, the key, and the server name. + The `--cluster.discover-peers` command-line flag expects a list of tuples in the form of `provider=XXX key=val key=val ...`. Clustering uses the [go-discover] package to discover peers and fetch their IP addresses, based on the chosen provider and the filtering key-values it supports. Clustering supports the default set of providers available in go-discover and registers the `k8s` provider on top. diff --git a/go.mod b/go.mod index 6ccb723151..5a880856f3 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/grafana/alloy/syntax v0.1.0 github.com/grafana/beyla v1.8.2 github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d - github.com/grafana/ckit v0.0.0-20240913130805-0ee98bafad88 + github.com/grafana/ckit v0.0.0-20241001124237-ee134485edd3 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb github.com/grafana/go-gelf/v2 v2.0.1 @@ -154,7 +154,7 @@ require ( github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring v0.66.0 github.com/prometheus-operator/prometheus-operator/pkg/client v0.66.0 github.com/prometheus/blackbox_exporter v0.24.1-0.20230623125439-bd22efa1c900 - github.com/prometheus/client_golang v1.20.3 + github.com/prometheus/client_golang v1.20.4 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.55.0 github.com/prometheus/common/sigv4 v0.1.0 diff --git a/go.sum b/go.sum index 4140e2242d..39e11916b0 100644 --- a/go.sum +++ b/go.sum @@ -1198,8 +1198,8 @@ github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2 h1:ju6EcY2aEobeBg github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2/go.mod h1:8sLW/G7rcFe1CKMaA4pYT4mX3P1xQVGqM6luzEzx/2g= github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d h1:6sNPBwOokfCxAyateu7iLdtyWDUzaLLShPs7F4eTLfw= github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d/go.mod h1:aGPSALDAkw18nn8M7gumhM/MbJG+zgOA3jNWTwPYtLg= -github.com/grafana/ckit v0.0.0-20240913130805-0ee98bafad88 h1:GgbYRGz2+/Vgz8/lk19Ht8TQDsAudl51Qenuw+COs5k= -github.com/grafana/ckit v0.0.0-20240913130805-0ee98bafad88/go.mod h1:dDqep1rKTbq2ppMYEgIM88GaPXHp4i6Cp3qantiloA0= +github.com/grafana/ckit v0.0.0-20241001124237-ee134485edd3 h1:t1oO5eBWAwnryyW5e+MvJocf88HRtR0x/UsR4M0z290= +github.com/grafana/ckit v0.0.0-20241001124237-ee134485edd3/go.mod h1:h3W376FaLy2VAhqm2w6IAkJjJxr2bGkdsdIcjadWMbs= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb h1:AWE6+kvtE18HP+lRWNUCyvymyrFSXs6TcS2vXIXGIuw= @@ -2149,8 +2149,8 @@ github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= -github.com/prometheus/client_golang v1.20.3 h1:oPksm4K8B+Vt35tUhw6GbSNSgVlVSBH0qELP/7u83l4= -github.com/prometheus/client_golang v1.20.3/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= +github.com/prometheus/client_golang v1.20.4 h1:Tgh3Yr67PaOv/uTqloMsCEdeuFTatm5zIq5+qNN23vI= +github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= diff --git a/internal/alloycli/cluster_builder.go b/internal/alloycli/cluster_builder.go index 3b74d1eb60..754fbe1b6f 100644 --- a/internal/alloycli/cluster_builder.go +++ b/internal/alloycli/cluster_builder.go @@ -38,6 +38,11 @@ type clusterOptions struct { ClusterName string EnableStateUpdatesLimiter bool EnableDiscoveryV2 bool + EnableTLS bool + TLSCAPath string + TLSCertPath string + TLSKeyPath string + TLSServerName string } func buildClusterService(opts clusterOptions) (*cluster.Service, error) { @@ -54,6 +59,11 @@ func buildClusterService(opts clusterOptions) (*cluster.Service, error) { ClusterMaxJoinPeers: opts.ClusterMaxJoinPeers, ClusterName: opts.ClusterName, EnableStateUpdatesLimiter: opts.EnableStateUpdatesLimiter, + EnableTLS: opts.EnableTLS, + TLSCAPath: opts.TLSCAPath, + TLSCertPath: opts.TLSCertPath, + TLSKeyPath: opts.TLSKeyPath, + TLSServerName: opts.TLSServerName, } if config.NodeName == "" { diff --git a/internal/alloycli/cmd_run.go b/internal/alloycli/cmd_run.go index 7bd9c99ae4..5d1a833f77 100644 --- a/internal/alloycli/cmd_run.go +++ b/internal/alloycli/cmd_run.go @@ -131,6 +131,16 @@ depending on the nature of the reload error. IntVar(&r.clusterMaxJoinPeers, "cluster.max-join-peers", r.clusterMaxJoinPeers, "Number of peers to join from the discovered set") cmd.Flags(). StringVar(&r.clusterName, "cluster.name", r.clusterName, "The name of the cluster to join") + cmd.Flags(). + BoolVar(&r.clusterEnableTLS, "cluster.enable-tls", r.clusterEnableTLS, "Specifies whether TLS should be used for communication between peers") + cmd.Flags(). + StringVar(&r.clusterTLSCAPath, "cluster.tls-ca-path", r.clusterTLSCAPath, "Path to the CA certificate file") + cmd.Flags(). + StringVar(&r.clusterTLSCertPath, "cluster.tls-cert-path", r.clusterTLSCertPath, "Path to the certificate file") + cmd.Flags(). + StringVar(&r.clusterTLSKeyPath, "cluster.tls-key-path", r.clusterTLSKeyPath, "Path to the key file") + cmd.Flags(). + StringVar(&r.clusterTLSServerName, "cluster.tls-server-name", r.clusterTLSServerName, "Server name to use for TLS communication") // TODO(alloy/#1274): make this flag a no-op once we have more confidence in this feature, and add issue to // remove it in the next major release cmd.Flags(). @@ -168,6 +178,11 @@ type alloyRun struct { clusterMaxJoinPeers int clusterName string clusterUseDiscoveryV1 bool + clusterEnableTLS bool + clusterTLSCAPath string + clusterTLSCertPath string + clusterTLSKeyPath string + clusterTLSServerName string configFormat string configBypassConversionErrors bool configExtraArgs string @@ -258,6 +273,11 @@ func (fr *alloyRun) Run(configPath string) error { //TODO(alloy/#1274): graduate to GA once we have more confidence in this feature EnableStateUpdatesLimiter: fr.minStability.Permits(featuregate.StabilityPublicPreview), EnableDiscoveryV2: !fr.clusterUseDiscoveryV1, + EnableTLS: fr.clusterEnableTLS, + TLSCertPath: fr.clusterTLSCertPath, + TLSCAPath: fr.clusterTLSCAPath, + TLSKeyPath: fr.clusterTLSKeyPath, + TLSServerName: fr.clusterTLSServerName, }) if err != nil { return err diff --git a/internal/service/cluster/cluster.go b/internal/service/cluster/cluster.go index b3c7de811e..8abc6410e5 100644 --- a/internal/service/cluster/cluster.go +++ b/internal/service/cluster/cluster.go @@ -5,6 +5,7 @@ package cluster import ( "context" "crypto/tls" + "crypto/x509" "fmt" "math/rand" "net" @@ -76,6 +77,11 @@ type Options struct { NodeName string // Name to use for this node in the cluster. AdvertiseAddress string // Address to advertise to other nodes in the cluster. + EnableTLS bool // Specifies whether TLS should be used for communication between peers. + TLSCAPath string // Path to the CA file. + TLSCertPath string // Path to the certificate file. + TLSKeyPath string // Path to the key file. + TLSServerName string // Server name to use for TLS communication. RejoinInterval time.Duration // How frequently to rejoin the cluster to address split brain issues. ClusterMaxJoinPeers int // Number of initial peers to join from the discovered set. ClusterName string // Name to prevent nodes without this identifier from joining the cluster. @@ -121,26 +127,35 @@ func New(opts Options) (*Service, error) { Log: l, Sharder: shard.Ring(tokensPerNode), Label: opts.ClusterName, + EnableTLS: opts.EnableTLS, } - httpClient := &http.Client{ - Transport: &http2.Transport{ - AllowHTTP: true, - DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { - // Set a maximum timeout for establishing the connection. If our - // context has a deadline earlier than our timeout, we shrink the - // timeout to it. - // - // TODO(rfratto): consider making the max timeout configurable. - timeout := 30 * time.Second - if dur, ok := deadlineDuration(ctx); ok && dur < timeout { - timeout = dur - } - - return net.DialTimeout(network, addr, timeout) - }, + httpTransport := &http2.Transport{ + AllowHTTP: false, + DialTLSContext: func(ctx context.Context, network, addr string, _ *tls.Config) (net.Conn, error) { + return net.DialTimeout(network, addr, calcTimeout(ctx)) }, } + if opts.EnableTLS { + tlsConfig, err := loadTLSConfigFromFile(opts.TLSCAPath, opts.TLSCertPath, opts.TLSKeyPath, opts.TLSServerName) + if err != nil { + return nil, fmt.Errorf("failed to load TLS config from file: %w", err) + } + level.Debug(l).Log( + "msg", "loaded TLS config for cluster http transport", + "TLSCAPath", opts.TLSCAPath, + "TLSCertPath", opts.TLSCertPath, + "TLSKeyPath", opts.TLSKeyPath, + "TLSServerName", opts.TLSServerName, + ) + httpTransport.TLSClientConfig = tlsConfig + httpTransport.DialTLSContext = func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) { + return tls.DialWithDialer(&net.Dialer{Timeout: calcTimeout(ctx)}, network, addr, cfg) + } + } + httpClient := &http.Client{ + Transport: httpTransport, + } node, err := ckit.NewNode(httpClient, ckitConfig) if err != nil { @@ -163,6 +178,41 @@ func New(opts Options) (*Service, error) { }, nil } +func loadTLSConfigFromFile(TLSCAPath string, TLSCertPath string, TLSKeyPath string, serverName string) (*tls.Config, error) { + pem, err := os.ReadFile(TLSCAPath) + if err != nil { + return nil, fmt.Errorf("failed to read TLS CA file: %w", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(pem) + if !caCertPool.AppendCertsFromPEM(pem) { + return nil, fmt.Errorf("failed to append CA from PEM with path %s", TLSCAPath) + } + + cert, err := tls.LoadX509KeyPair(TLSCertPath, TLSKeyPath) + if err != nil { + return nil, fmt.Errorf("failed to load X509 key pair: %w", err) + } + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + ServerName: serverName, + }, nil +} + +// TODO(rfratto): consider making the max timeout configurable. +// Set a maximum timeout for establishing the connection. If our +// context has a deadline earlier than our timeout, we shrink the +// timeout to it. +func calcTimeout(ctx context.Context) time.Duration { + timeout := 30 * time.Second + if dur, ok := deadlineDuration(ctx); ok && dur < timeout { + timeout = dur + } + return timeout +} + func deadlineDuration(ctx context.Context) (d time.Duration, ok bool) { if t, ok := ctx.Deadline(); ok { return time.Until(t), true