From cc3f04d378415fbe0fd5b6f01029c6fccc2c4cb5 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 2 May 2023 17:24:58 +0200 Subject: [PATCH] Per tenant RF setting for index gateway sharding Signed-off-by: Christian Haudum --- docs/sources/configuration/_index.md | 7 +++ pkg/loki/loki.go | 2 +- pkg/loki/modules.go | 4 +- pkg/storage/factory.go | 6 +- pkg/storage/store.go | 4 +- .../gatewayclient/gateway_client.go | 15 +++-- .../gatewayclient/gateway_client_test.go | 9 ++- .../stores/shipper/indexgateway/gateway.go | 16 +++-- .../shipper/indexgateway/ringmanager.go | 23 +++++-- pkg/util/limiter/combined_limits.go | 2 + pkg/util/ring.go | 39 ++++++------ pkg/util/ring_test.go | 60 ++++++++++++++++--- pkg/validation/limits.go | 14 +++++ 13 files changed, 147 insertions(+), 54 deletions(-) diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index acd730b08850..2e93083da6f0 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2589,6 +2589,13 @@ shard_streams: # Minimum number of label matchers a query should contain. [minimum_labels_number: ] + +# Experimental. The sharding factor defines how many index gateways should be +# used for querying. A factor of 0.0 means that replication factor amount of +# servers are used, a factor of 1.0 means all instances of the index gateway +# ring are used. +# CLI flag: -index-gateway.sharding-factor +[gateway_sharding_factor: | default = 0] ``` ### frontend_worker diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index ccd8417e0ee1..63b364ef0a41 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -664,7 +664,7 @@ func (t *Loki) setupModuleManager() error { Compactor: {Server, Overrides, MemberlistKV, Analytics}, IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing}, IngesterQuerier: {Ring}, - IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV}, + IndexGatewayRing: {Overrides, Server, MemberlistKV}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryFrontend, Querier}, Write: {Ingester, Distributor}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 3c62b7b9acdc..7167619cddef 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1160,7 +1160,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) { tableRange := period.GetIndexTableNumberRange(periodEndTime) indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant, - prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), + prometheus.DefaultRegisterer, t.Overrides, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), ) if err != nil { return nil, err @@ -1201,7 +1201,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) { managerMode = indexgateway.ServerMode } - rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer) + rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides) if err != nil { return nil, gerrors.Wrap(err, "new index gateway ring manager") diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 803169cdbc1f..6d45778cfe8f 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -33,6 +33,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -61,6 +62,7 @@ func ResetBoltDBIndexClientsWithShipper() { type StoreLimits interface { downloads.Limits stores.StoreLimits + indexgateway.Limits CardinalityLimit(string) int } @@ -256,7 +258,7 @@ func (cfg *Config) Validate() error { } // NewIndexClient makes a new index client of the desired type. -func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, logger log.Logger) (index.Client, error) { +func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, overrides indexgateway.Limits, logger log.Logger) (index.Client, error) { switch periodCfg.IndexType { case config.StorageTypeInMemory: store := testutils.NewMockStorage() @@ -289,7 +291,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, return indexGatewayClient, nil } - gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, logger) + gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, overrides, logger) if err != nil { return nil, err } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 69208d16eb2e..803b5d1d6a2c 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -217,7 +217,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan if p.IndexType == config.TSDBType { if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) { // inject the index-gateway client into the index store - gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, indexClientLogger) + gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger) if err != nil { return nil, nil, nil, err } @@ -274,7 +274,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan }, nil } - idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, indexClientLogger) + idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, s.limits, indexClientLogger) if err != nil { return nil, nil, nil, errors.Wrap(err, "error creating index client") } diff --git a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go index b710d7141a7d..76f73a2ffb40 100644 --- a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go +++ b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go @@ -53,7 +53,7 @@ type IndexGatewayClientConfig struct { // this client should talk to. // // Only relevant for the ring mode. - Ring ring.ReadRing `yaml:"-"` + Ring ring.DynamicReplicationReadRing `yaml:"-"` // GRPCClientConfig configures the gRPC connection between the Index Gateway client and the server. // @@ -97,17 +97,19 @@ type GatewayClient struct { pool *ring_client.Pool - ring ring.ReadRing + ring ring.DynamicReplicationReadRing stringBufPool *sync.Pool instanceBufPool *sync.Pool + + overrides indexgateway.Limits } // NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance. // // If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created. // Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode. -func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, logger log.Logger) (*GatewayClient, error) { +func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, overrides indexgateway.Limits, logger log.Logger) (*GatewayClient, error) { latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "loki", Name: "index_gateway_request_duration_seconds", @@ -129,6 +131,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, log cfg: cfg, storeGatewayClientRequestDuration: latency, ring: cfg.Ring, + overrides: overrides, } dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration)) @@ -346,8 +349,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log bufZones := s.stringBufPool.Get().([]string) defer s.stringBufPool.Put(bufZones) //nolint:staticcheck - key := util.TokenFor(userID, "" /* labels */) - rs, err := s.ring.Get(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0]) + f := s.overrides.GatewayShardingFactor(userID) + rf := util.DynamicReplicationFactor(s.ring, f) + key := util.TokenFor(userID, "") + rs, err := s.ring.GetWithRF(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0], rf) if err != nil { return errors.Wrap(err, "index gateway get ring") } diff --git a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client_test.go b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client_test.go index 8e16b00bb63a..bcea59c35b82 100644 --- a/pkg/storage/stores/indexshipper/gatewayclient/gateway_client_test.go +++ b/pkg/storage/stores/indexshipper/gatewayclient/gateway_client_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/storage/stores/shipper/util" util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/validation" ) const ( @@ -116,7 +117,8 @@ func TestGatewayClient(t *testing.T) { flagext.DefaultValues(&cfg) cfg.Address = storeAddress - gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, util_log.Logger) + overrides, _ := validation.NewOverrides(validation.Limits{}, nil) + gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, overrides, util_log.Logger) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "fake") @@ -296,16 +298,17 @@ func Benchmark_QueriesMatchingLargeNumOfRows(b *testing.B) { func TestDoubleRegistration(t *testing.T) { r := prometheus.NewRegistry() + o, _ := validation.NewOverrides(validation.Limits{}, nil) clientCfg := IndexGatewayClientConfig{ Address: "my-store-address:1234", } - client, err := NewGatewayClient(clientCfg, r, util_log.Logger) + client, err := NewGatewayClient(clientCfg, r, o, util_log.Logger) require.NoError(t, err) defer client.Stop() - client, err = NewGatewayClient(clientCfg, r, util_log.Logger) + client, err = NewGatewayClient(clientCfg, r, o, util_log.Logger) require.NoError(t, err) defer client.Stop() } diff --git a/pkg/storage/stores/shipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexgateway/gateway.go index 7c37f4faeb58..cb4c61f9c37d 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway.go @@ -28,6 +28,10 @@ const ( maxIndexEntriesPerResponse = 1000 ) +type Limits interface { + GatewayShardingFactor(tenantID string) float64 +} + type IndexQuerier interface { stores.ChunkFetcher index.BaseReader @@ -52,15 +56,13 @@ type Gateway struct { cfg Config log log.Logger - - shipper IndexQuerier } // NewIndexGateway instantiates a new Index Gateway and start its services. // // In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started. // Otherwise, it starts an Idle Service that doesn't have lifecycle hooks. -func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange) (*Gateway, error) { +func NewIndexGateway(cfg Config, log log.Logger, _ prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange) (*Gateway, error) { g := &Gateway{ indexQuerier: indexQuerier, cfg: cfg, @@ -73,7 +75,7 @@ func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registere return g.indexClients[i].TableRange.Start > g.indexClients[j].TableRange.Start }) - g.Service = services.NewIdleService(nil, func(failureCase error) error { + g.Service = services.NewIdleService(nil, func(_ error) error { g.indexQuerier.Stop() for _, indexClient := range g.indexClients { indexClient.Stop() @@ -136,11 +138,7 @@ func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, server logprot return server.Send(response) }) - if innerErr != nil { - return false - } - - return true + return innerErr == nil }) if innerErr != nil { diff --git a/pkg/storage/stores/shipper/indexgateway/ringmanager.go b/pkg/storage/stores/shipper/indexgateway/ringmanager.go index b0dfce18db1d..f62d12265dcf 100644 --- a/pkg/storage/stores/shipper/indexgateway/ringmanager.go +++ b/pkg/storage/stores/shipper/indexgateway/ringmanager.go @@ -60,7 +60,8 @@ type RingManager struct { Ring *ring.Ring managerMode ManagerMode - cfg Config + cfg Config + overrides Limits log log.Logger } @@ -68,9 +69,9 @@ type RingManager struct { // NewRingManager is the recommended way of instantiating a RingManager. // // The other functions will assume the RingManager was instantiated through this function. -func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) { +func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer, overrides Limits) (*RingManager, error) { rm := &RingManager{ - cfg: cfg, log: log, managerMode: managerMode, + cfg: cfg, log: log, managerMode: managerMode, overrides: overrides, } if cfg.Mode != RingMode { @@ -242,7 +243,21 @@ func (rm *RingManager) IndexGatewayOwnsTenant(tenant string) bool { return true } - return loki_util.IsAssignedKey(rm.Ring, rm.RingLifecycler.GetInstanceAddr(), tenant) + return rm.isAssigned(rm.RingLifecycler.GetInstanceAddr(), tenant) +} + +// isAssigned replies wether the given instance address is in the ReplicationSet responsible for the given tenant or not. +// The result will be defined based on the tokens assigned to each ring component, queried through the ring client. +func (rm *RingManager) isAssigned(instanceAddress, tenant string) bool { + f := rm.overrides.GatewayShardingFactor(tenant) + rf := loki_util.DynamicReplicationFactor(rm.Ring, f) + token := loki_util.TokenFor(tenant, "" /* labels */) + inSet, err := loki_util.IsInReplicationSetWithFactor(rm.Ring, token, instanceAddress, rf) + if err != nil { + level.Error(rm.log).Log("msg", "error checking if tenant is in replicationset", "error", err, "tenant", tenant, "token", token) + return false + } + return inSet } // ServeHTTP serves the HTTP route /indexgateway/ring. diff --git a/pkg/util/limiter/combined_limits.go b/pkg/util/limiter/combined_limits.go index 70fd9065845b..c835d69aa8fe 100644 --- a/pkg/util/limiter/combined_limits.go +++ b/pkg/util/limiter/combined_limits.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/pkg/scheduler" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" ) type CombinedLimits interface { @@ -20,4 +21,5 @@ type CombinedLimits interface { ruler.RulesLimits scheduler.Limits storage.StoreLimits + indexgateway.Limits } diff --git a/pkg/util/ring.go b/pkg/util/ring.go index 80ab207d1d6e..b66db4f6b310 100644 --- a/pkg/util/ring.go +++ b/pkg/util/ring.go @@ -2,11 +2,9 @@ package util import ( "hash/fnv" + "math" - "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" - - util_log "github.com/grafana/loki/pkg/util/log" ) // TokenFor generates a token used for finding ingesters from ring @@ -25,25 +23,28 @@ func IsInReplicationSet(r ring.ReadRing, ringKey uint32, address string) (bool, if err != nil { return false, err } + return StringsContain(rs.GetAddresses(), address), nil +} - addrs := rs.GetAddresses() - for _, a := range addrs { - if a == address { - return true, nil - } +// IsInReplicationSetWithFactor will query the provided ring for the provided key +// and see if the provided address is in the resulting ReplicationSet. +// Same as IsInReplicationSet, but you can additionally provide a replication factor. +func IsInReplicationSetWithFactor(r ring.DynamicReplicationReadRing, ringKey uint32, address string, rf int) (bool, error) { + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + rs, err := r.GetWithRF(ringKey, ring.Write, bufDescs, bufHosts, bufZones, rf) + if err != nil { + return false, err } - return false, nil + return StringsContain(rs.GetAddresses(), address), nil } -// IsAssignedKey replies wether the given instance address is in the ReplicationSet responsible for the given key or not, based on the tokens. -// -// The result will be defined based on the tokens assigned to each ring component, queried through the ring client. -func IsAssignedKey(ringClient ring.ReadRing, instanceAddress string, key string) bool { - token := TokenFor(key, "" /* labels */) - inSet, err := IsInReplicationSet(ringClient, token, instanceAddress) - if err != nil { - level.Error(util_log.Logger).Log("msg", "error checking if key is in replicationset", "error", err, "key", key) - return false +// DynamicReplicatioFactor returns a RF between ring.ReplicationFactor and ring.InstanceCount +// where a factor f=0 is mapped to the lower bound and f=1 is mapped to the upper bound. +func DynamicReplicationFactor(r ring.ReadRing, f float64) int { + rf := r.ReplicationFactor() + if f > 0 { + f = math.Min(f, 1) + rf = rf + int(float64(r.InstancesCount()-rf)*f) } - return inSet + return rf } diff --git a/pkg/util/ring_test.go b/pkg/util/ring_test.go index 4cca729f71a5..501e71459ca5 100644 --- a/pkg/util/ring_test.go +++ b/pkg/util/ring_test.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/dskit/ring" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) func TestTokenFor(t *testing.T) { @@ -15,23 +16,23 @@ func TestTokenFor(t *testing.T) { } } -func TestIsAssignedKey(t *testing.T) { +func TestIsInReplicationSet(t *testing.T) { for _, tc := range []struct { desc string - ring ring.ReadRing + ring ring.DynamicReplicationReadRing userID string exp bool addr string }{ { - desc: "basic ring and tenant are assigned key", + desc: "is in replication set", ring: newReadRingMock([]ring.InstanceDesc{{Addr: "127.0.0.1", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}}), userID: "1", exp: true, addr: "127.0.0.1", }, { - desc: "basic ring and tenant are not assigned key", + desc: "is not in replication set", ring: newReadRingMock([]ring.InstanceDesc{{Addr: "127.0.0.2", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}}), userID: "1", exp: false, @@ -39,9 +40,10 @@ func TestIsAssignedKey(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - if res := IsAssignedKey(tc.ring, newReadLifecyclerMock(tc.addr).addr, tc.userID); res != tc.exp { - t.Errorf("IsAssignedKey(%v, %v) = %v, want %v", tc.ring, tc.userID, res, tc.exp) - } + token := TokenFor(tc.userID, "") + res, err := IsInReplicationSet(tc.ring, token, newReadLifecyclerMock(tc.addr).addr) + require.NoError(t, err) + require.Equal(t, tc.exp, res, "IsInReplicationSet(%v, %v) = %v, want %v", tc.ring, tc.userID, res, tc.exp) }) } } @@ -69,6 +71,10 @@ func (r *readRingMock) Get(key uint32, op ring.Operation, buf []ring.InstanceDes return r.replicationSet, nil } +func (r *readRingMock) GetWithRF(key uint32, op ring.Operation, buf []ring.InstanceDesc, _ []string, _ []string, _ int) (ring.ReplicationSet, error) { + return r.replicationSet, nil +} + func (r *readRingMock) ShuffleShard(identifier string, size int) ring.ReadRing { // pass by value to copy return func(r readRingMock) *readRingMock { @@ -139,3 +145,43 @@ func (m *readLifecyclerMock) HealthyInstancesCount() int { func (m *readLifecyclerMock) GetInstanceAddr() string { return m.addr } + +func TestDynamicReplicationFactor(t *testing.T) { + mockRing := newReadRingMock([]ring.InstanceDesc{ + {Addr: "127.0.0.1", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}, + {Addr: "127.0.0.2", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{4, 5, 6}}, + {Addr: "127.0.0.3", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{7, 8, 9}}, + }) + + for _, tc := range []struct { + desc string + factor float64 + expected int + }{ + { + desc: "factor 0.0", + factor: 0, + expected: 1, // ring.ReplicationFactor() + }, + { + desc: "factor 0.5", + factor: 0.5, + expected: 2, // 1 + (3 - 1) * 0.5 + }, + { + desc: "factor 1.0", + factor: 1, + expected: 3, // num instances + }, + { + desc: "factor > 1.0", + factor: 2, + expected: 3, // num instances + }, + } { + tc := tc + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.expected, DynamicReplicationFactor(mockRing, tc.factor)) + }) + } +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 93989f0078bc..948f7abe4f55 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -172,6 +172,9 @@ type Limits struct { RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."` RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."` + + // Experimental + GatewayShardingFactor float64 `yaml:"gateway_sharding_factor" json:"gateway_sharding_factor"` } type StreamRetention struct { @@ -269,6 +272,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { // Deprecated dskit_flagext.DeprecatedFlag(f, "compactor.allow-deletes", "Deprecated. Instead, see compactor.deletion-mode which is another per tenant configuration", util_log.Logger) + // Experimental + f.Float64Var(&l.GatewayShardingFactor, "index-gateway.sharding-factor", 0.0, "Experimental. The sharding factor defines how many index gateways should be used for querying. A factor of 0.0 means that replication factor amount of servers are used, a factor of 1.0 means all instances of the index gateway ring are used.") + l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) } @@ -313,6 +319,10 @@ func (l *Limits) Validate() error { return err } + if l.GatewayShardingFactor < 0 || l.GatewayShardingFactor > 1 { + return fmt.Errorf("index gateway sharding factor must be between 0.0 and 1.0") + } + if l.CompactorDeletionEnabled { level.Warn(util_log.Logger).Log("msg", "The compactor.allow-deletes configuration option has been deprecated and will be ignored. Instead, use deletion_mode in the limits_configs to adjust deletion functionality") } @@ -723,6 +733,10 @@ func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool { return o.getOverridesForUser(userID).IncrementDuplicateTimestamp } +func (o *Overrides) GatewayShardingFactor(userID string) float64 { + return o.getOverridesForUser(userID).GatewayShardingFactor +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.TenantLimits(userID)