From 0c88216cf4564c25911fc8163c78977ad9dd501c Mon Sep 17 00:00:00 2001 From: Joao Marcal Date: Thu, 9 Nov 2023 17:15:54 +0000 Subject: [PATCH] Allow support for ruler to use thanos/obj storage Signed-off-by: Joao Marcal --- pkg/loki/modules.go | 7 ++- pkg/ruler/base/ruler_test.go | 2 +- pkg/ruler/base/storage.go | 10 ++- pkg/storage/bucket/client.go | 83 ++++++++++++++++++++----- pkg/storage/bucket/filesystem/config.go | 8 ++- pkg/validation/limits.go | 21 +++++++ 6 files changed, 113 insertions(+), 18 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 4c14a4872655..9bc969dc7745 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1036,7 +1036,12 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) { } } - t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger) + overrides, err := validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits) + if err != nil { + return nil, err + } + + t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, overrides, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger) return } diff --git a/pkg/ruler/base/ruler_test.go b/pkg/ruler/base/ruler_test.go index 99839ed65253..ffa554c2cf90 100644 --- a/pkg/ruler/base/ruler_test.go +++ b/pkg/ruler/base/ruler_test.go @@ -205,7 +205,7 @@ func buildRuler(t *testing.T, rulerConfig Config, q storage.Querier, clientMetri require.NoError(t, rulerConfig.Validate(log.NewNopLogger())) engine, queryable, pusher, logger, overrides, reg := testSetup(t, q) - storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger()) + storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, nil, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger()) require.NoError(t, err) managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, reg, constants.Loki) diff --git a/pkg/ruler/base/storage.go b/pkg/ruler/base/storage.go index 4a79fd569122..5c887f64ad06 100644 --- a/pkg/ruler/base/storage.go +++ b/pkg/ruler/base/storage.go @@ -27,6 +27,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/hedging" "github.com/grafana/loki/pkg/storage/chunk/client/ibmcloud" "github.com/grafana/loki/pkg/storage/chunk/client/openstack" + utilLog "github.com/grafana/loki/pkg/util/log" ) // RuleStoreConfig configures a rule store. @@ -44,6 +45,9 @@ type RuleStoreConfig struct { COS ibmcloud.COSConfig `yaml:"cos" doc:"description=Configures backend rule storage for IBM Cloud Object Storage (COS)."` Local local.Config `yaml:"local" doc:"description=Configures backend rule storage for a local file system directory."` + ThanosObjStore bool `yaml:"thanos_objstore"` + ObjStoreConf rulestore.Config `yaml:"objstore_config"` + mock rulestore.RuleStore `yaml:"-"` } @@ -82,7 +86,7 @@ func (cfg *RuleStoreConfig) IsDefaults() bool { // NewLegacyRuleStore returns a rule store backend client based on the provided cfg. // The client used by the function is based a legacy object store clients that shouldn't // be used anymore. -func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) { +func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProvider, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) { if cfg.mock != nil { return cfg.mock, nil } @@ -94,6 +98,10 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetr var err error var client client.ObjectClient + if cfg.ThanosObjStore { + return NewRuleStore(context.Background(), cfg.ObjStoreConf, cfgProvider, loader, utilLog.Logger, prometheus.DefaultRegisterer) + } + switch cfg.Type { case "azure": client, err = azure.NewBlobStorage(&cfg.Azure, clientMetrics.AzureMetrics, hedgeCfg) diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 57751afe3654..5f19b3954879 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -5,12 +5,13 @@ import ( "errors" "flag" "fmt" + "regexp" "strings" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" - opentracing "github.com/thanos-io/objstore/tracing/opentracing" + objstoretracing "github.com/thanos-io/objstore/tracing/opentracing" "github.com/grafana/loki/pkg/storage/bucket/azure" "github.com/grafana/loki/pkg/storage/bucket/filesystem" @@ -35,17 +36,22 @@ const ( // Filesystem is the value for the filesystem storage backend. Filesystem = "filesystem" + + // validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation + validPrefixCharactersRegex = `^[\da-zA-Z]+$` ) var ( SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem} - ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") + ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") + ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters") ) -// Config holds configuration for accessing long-term storage. -type Config struct { +// StorageBackendConfig holds configuration for accessing long-term storage. +type StorageBackendConfig struct { Backend string `yaml:"backend"` + // Backends S3 s3.Config `yaml:"s3"` GCS gcs.Config `yaml:"gcs"` @@ -63,26 +69,30 @@ type Config struct { } // Returns the supportedBackends for the package and any custom backends injected into the config. -func (cfg *Config) supportedBackends() []string { +func (cfg *StorageBackendConfig) supportedBackends() []string { return append(SupportedBackends, cfg.ExtraBackends...) } // RegisterFlags registers the backend storage config. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { +func (cfg *StorageBackendConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { +func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { cfg.S3.RegisterFlagsWithPrefix(prefix, f) cfg.GCS.RegisterFlagsWithPrefix(prefix, f) cfg.Azure.RegisterFlagsWithPrefix(prefix, f) cfg.Swift.RegisterFlagsWithPrefix(prefix, f) - cfg.Filesystem.RegisterFlagsWithPrefix(prefix, f) + cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) - f.StringVar(&cfg.Backend, prefix+"backend", S3, fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(cfg.supportedBackends(), ", "))) + f.StringVar(&cfg.Backend, prefix+"backend", Filesystem, fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(cfg.supportedBackends(), ", "))) } -func (cfg *Config) Validate() error { +func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f) +} + +func (cfg *StorageBackendConfig) Validate() error { if !util.StringsContain(cfg.supportedBackends(), cfg.Backend) { return ErrUnsupportedStorageBackend } @@ -96,8 +106,49 @@ func (cfg *Config) Validate() error { return nil } +// Config holds configuration for accessing long-term storage. +type Config struct { + StorageBackendConfig `yaml:",inline"` + + StoragePrefix string `yaml:"storage_prefix"` + + // Not used internally, meant to allow callers to wrap Buckets + // created using this config + Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"` +} + +// RegisterFlags registers the backend storage config. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + +func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { + cfg.StorageBackendConfig.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) + f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.") +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f) +} + +func (cfg *Config) Validate() error { + if cfg.StoragePrefix != "" { + acceptablePrefixCharacters := regexp.MustCompile(validPrefixCharactersRegex) + if !acceptablePrefixCharacters.MatchString(cfg.StoragePrefix) { + return ErrInvalidCharactersInStoragePrefix + } + } + + return cfg.StorageBackendConfig.Validate() +} + // NewClient creates a new bucket client based on the configured backend -func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) { +func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { + var ( + client objstore.Bucket + err error + ) + switch cfg.Backend { case S3: client, err = s3.NewBucketClient(cfg.S3, name, logger) @@ -117,17 +168,21 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, return nil, err } - client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg)) + if cfg.StoragePrefix != "" { + client = NewPrefixedBucketClient(client, cfg.StoragePrefix) + } + + instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, reg)) // Wrap the client with any provided middleware for _, wrap := range cfg.Middlewares { - client, err = wrap(client) + instrumentedClient, err = wrap(instrumentedClient) if err != nil { return nil, err } } - return client, nil + return instrumentedClient, nil } func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket { diff --git a/pkg/storage/bucket/filesystem/config.go b/pkg/storage/bucket/filesystem/config.go index 923923a03290..873a2eb1ba28 100644 --- a/pkg/storage/bucket/filesystem/config.go +++ b/pkg/storage/bucket/filesystem/config.go @@ -12,7 +12,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } +// RegisterFlagsWithPrefixAndDefaultDirectory registers the flags for filesystem +// storage with the provided prefix and sets the default directory to dir. +func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) { + f.StringVar(&cfg.Directory, prefix+"filesystem.dir", dir, "Local filesystem storage directory.") +} + // RegisterFlagsWithPrefix registers the flags for filesystem storage with the provided prefix func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.StringVar(&cfg.Directory, prefix+"filesystem.dir", "", "Local filesystem storage directory.") + cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 823b23dd9311..54077bb4f7c6 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -167,6 +167,12 @@ type Limits struct { PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"` PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"` + // This config doesn't have a CLI flag registered here because they're registered in + // their own original config struct. + S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."` + S3SSEKMSKeyID string `yaml:"s3_sse_kms_key_id" json:"s3_sse_kms_key_id" doc:"nocli|description=S3 server-side encryption KMS Key ID. Ignored if the SSE type override is not set."` + S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."` + // Deprecated CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes" doc:"deprecated|description=Use deletion_mode per tenant configuration instead."` @@ -610,6 +616,21 @@ func (o *Overrides) RulerRemoteWriteDisabled(userID string) bool { return o.getOverridesForUser(userID).RulerRemoteWriteDisabled } +// S3SSEType returns the per-tenant S3 SSE type. +func (o *Overrides) S3SSEType(user string) string { + return o.getOverridesForUser(user).S3SSEType +} + +// S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id. +func (o *Overrides) S3SSEKMSKeyID(user string) string { + return o.getOverridesForUser(user).S3SSEKMSKeyID +} + +// S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE encryption context. +func (o *Overrides) S3SSEKMSEncryptionContext(user string) string { + return o.getOverridesForUser(user).S3SSEKMSEncryptionContext +} + // Deprecated: use RulerRemoteWriteConfig instead // RulerRemoteWriteURL returns the remote-write URL to use for a given user. func (o *Overrides) RulerRemoteWriteURL(userID string) string {