Skip to content

Commit

Permalink
Allow support for ruler to use thanos/obj storage
Browse files Browse the repository at this point in the history
Signed-off-by: Joao Marcal <[email protected]>
  • Loading branch information
JoaoBraveCoding committed Nov 14, 2023
1 parent f6ea86d commit 0c88216
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 18 deletions.
7 changes: 6 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,12 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
}
}

t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger)
overrides, err := validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits)
if err != nil {
return nil, err
}

t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, overrides, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger)

return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/base/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func buildRuler(t *testing.T, rulerConfig Config, q storage.Querier, clientMetri
require.NoError(t, rulerConfig.Validate(log.NewNopLogger()))

engine, queryable, pusher, logger, overrides, reg := testSetup(t, q)
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, nil, hedging.Config{}, clientMetrics, promRules.FileLoader{}, log.NewNopLogger())
require.NoError(t, err)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, reg, constants.Loki)
Expand Down
10 changes: 9 additions & 1 deletion pkg/ruler/base/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/pkg/storage/chunk/client/ibmcloud"
"github.com/grafana/loki/pkg/storage/chunk/client/openstack"
utilLog "github.com/grafana/loki/pkg/util/log"
)

// RuleStoreConfig configures a rule store.
Expand All @@ -44,6 +45,9 @@ type RuleStoreConfig struct {
COS ibmcloud.COSConfig `yaml:"cos" doc:"description=Configures backend rule storage for IBM Cloud Object Storage (COS)."`
Local local.Config `yaml:"local" doc:"description=Configures backend rule storage for a local file system directory."`

ThanosObjStore bool `yaml:"thanos_objstore"`
ObjStoreConf rulestore.Config `yaml:"objstore_config"`

mock rulestore.RuleStore `yaml:"-"`
}

Expand Down Expand Up @@ -82,7 +86,7 @@ func (cfg *RuleStoreConfig) IsDefaults() bool {
// NewLegacyRuleStore returns a rule store backend client based on the provided cfg.
// The client used by the function is based a legacy object store clients that shouldn't
// be used anymore.
func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
func NewLegacyRuleStore(cfg RuleStoreConfig, cfgProvider bucket.TenantConfigProvider, hedgeCfg hedging.Config, clientMetrics storage.ClientMetrics, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) {
if cfg.mock != nil {
return cfg.mock, nil
}
Expand All @@ -94,6 +98,10 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, hedgeCfg hedging.Config, clientMetr
var err error
var client client.ObjectClient

if cfg.ThanosObjStore {
return NewRuleStore(context.Background(), cfg.ObjStoreConf, cfgProvider, loader, utilLog.Logger, prometheus.DefaultRegisterer)
}

switch cfg.Type {
case "azure":
client, err = azure.NewBlobStorage(&cfg.Azure, clientMetrics.AzureMetrics, hedgeCfg)
Expand Down
83 changes: 69 additions & 14 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"errors"
"flag"
"fmt"
"regexp"
"strings"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
opentracing "github.com/thanos-io/objstore/tracing/opentracing"
objstoretracing "github.com/thanos-io/objstore/tracing/opentracing"

"github.com/grafana/loki/pkg/storage/bucket/azure"
"github.com/grafana/loki/pkg/storage/bucket/filesystem"
Expand All @@ -35,17 +36,22 @@ const (

// Filesystem is the value for the filesystem storage backend.
Filesystem = "filesystem"

// validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation
validPrefixCharactersRegex = `^[\da-zA-Z]+$`
)

var (
SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem}

ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrUnsupportedStorageBackend = errors.New("unsupported storage backend")
ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters")
)

// Config holds configuration for accessing long-term storage.
type Config struct {
// StorageBackendConfig holds configuration for accessing long-term storage.
type StorageBackendConfig struct {
Backend string `yaml:"backend"`

// Backends
S3 s3.Config `yaml:"s3"`
GCS gcs.Config `yaml:"gcs"`
Expand All @@ -63,26 +69,30 @@ type Config struct {
}

// Returns the supportedBackends for the package and any custom backends injected into the config.
func (cfg *Config) supportedBackends() []string {
func (cfg *StorageBackendConfig) supportedBackends() []string {
return append(SupportedBackends, cfg.ExtraBackends...)
}

// RegisterFlags registers the backend storage config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *StorageBackendConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
cfg.S3.RegisterFlagsWithPrefix(prefix, f)
cfg.GCS.RegisterFlagsWithPrefix(prefix, f)
cfg.Azure.RegisterFlagsWithPrefix(prefix, f)
cfg.Swift.RegisterFlagsWithPrefix(prefix, f)
cfg.Filesystem.RegisterFlagsWithPrefix(prefix, f)
cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)

f.StringVar(&cfg.Backend, prefix+"backend", S3, fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(cfg.supportedBackends(), ", ")))
f.StringVar(&cfg.Backend, prefix+"backend", Filesystem, fmt.Sprintf("Backend storage to use. Supported backends are: %s.", strings.Join(cfg.supportedBackends(), ", ")))
}

func (cfg *Config) Validate() error {
func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}

func (cfg *StorageBackendConfig) Validate() error {
if !util.StringsContain(cfg.supportedBackends(), cfg.Backend) {
return ErrUnsupportedStorageBackend
}
Expand All @@ -96,8 +106,49 @@ func (cfg *Config) Validate() error {
return nil
}

// Config holds configuration for accessing long-term storage.
type Config struct {
StorageBackendConfig `yaml:",inline"`

StoragePrefix string `yaml:"storage_prefix"`

// Not used internally, meant to allow callers to wrap Buckets
// created using this config
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"`
}

// RegisterFlags registers the backend storage config.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
cfg.StorageBackendConfig.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f)
f.StringVar(&cfg.StoragePrefix, prefix+"storage-prefix", "", "Prefix for all objects stored in the backend storage. For simplicity, it may only contain digits and English alphabet letters.")
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}

func (cfg *Config) Validate() error {
if cfg.StoragePrefix != "" {
acceptablePrefixCharacters := regexp.MustCompile(validPrefixCharactersRegex)
if !acceptablePrefixCharacters.MatchString(cfg.StoragePrefix) {
return ErrInvalidCharactersInStoragePrefix
}
}

return cfg.StorageBackendConfig.Validate()
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) {
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
var (
client objstore.Bucket
err error
)

switch cfg.Backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
Expand All @@ -117,17 +168,21 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
return nil, err
}

client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg))
if cfg.StoragePrefix != "" {
client = NewPrefixedBucketClient(client, cfg.StoragePrefix)
}

instrumentedClient := objstoretracing.WrapWithTraces(bucketWithMetrics(client, name, reg))

// Wrap the client with any provided middleware
for _, wrap := range cfg.Middlewares {
client, err = wrap(client)
instrumentedClient, err = wrap(instrumentedClient)
if err != nil {
return nil, err
}
}

return client, nil
return instrumentedClient, nil
}

func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket {
Expand Down
8 changes: 7 additions & 1 deletion pkg/storage/bucket/filesystem/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefixAndDefaultDirectory registers the flags for filesystem
// storage with the provided prefix and sets the default directory to dir.
func (cfg *Config) RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir string, f *flag.FlagSet) {
f.StringVar(&cfg.Directory, prefix+"filesystem.dir", dir, "Local filesystem storage directory.")
}

// RegisterFlagsWithPrefix registers the flags for filesystem storage with the provided prefix
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Directory, prefix+"filesystem.dir", "", "Local filesystem storage directory.")
cfg.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, "", f)
}
21 changes: 21 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ type Limits struct {
PerTenantOverrideConfig string `yaml:"per_tenant_override_config" json:"per_tenant_override_config"`
PerTenantOverridePeriod model.Duration `yaml:"per_tenant_override_period" json:"per_tenant_override_period"`

// This config doesn't have a CLI flag registered here because they're registered in
// their own original config struct.
S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."`
S3SSEKMSKeyID string `yaml:"s3_sse_kms_key_id" json:"s3_sse_kms_key_id" doc:"nocli|description=S3 server-side encryption KMS Key ID. Ignored if the SSE type override is not set."`
S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."`

// Deprecated
CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes" doc:"deprecated|description=Use deletion_mode per tenant configuration instead."`

Expand Down Expand Up @@ -610,6 +616,21 @@ func (o *Overrides) RulerRemoteWriteDisabled(userID string) bool {
return o.getOverridesForUser(userID).RulerRemoteWriteDisabled
}

// S3SSEType returns the per-tenant S3 SSE type.
func (o *Overrides) S3SSEType(user string) string {
return o.getOverridesForUser(user).S3SSEType
}

// S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id.
func (o *Overrides) S3SSEKMSKeyID(user string) string {
return o.getOverridesForUser(user).S3SSEKMSKeyID
}

// S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE encryption context.
func (o *Overrides) S3SSEKMSEncryptionContext(user string) string {
return o.getOverridesForUser(user).S3SSEKMSEncryptionContext
}

// Deprecated: use RulerRemoteWriteConfig instead
// RulerRemoteWriteURL returns the remote-write URL to use for a given user.
func (o *Overrides) RulerRemoteWriteURL(userID string) string {
Expand Down

0 comments on commit 0c88216

Please sign in to comment.