Skip to content

Commit

Permalink
Per tenant RF setting for index gateway sharding
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Jun 1, 2023
1 parent f4abc45 commit cc3f04d
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 54 deletions.
7 changes: 7 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2589,6 +2589,13 @@ shard_streams:

# Minimum number of label matchers a query should contain.
[minimum_labels_number: <int>]

# Experimental. The sharding factor defines how many index gateways should be
# used for querying. A factor of 0.0 means that replication factor amount of
# servers are used, a factor of 1.0 means all instances of the index gateway
# ring are used.
# CLI flag: -index-gateway.sharding-factor
[gateway_sharding_factor: <float> | default = 0]
```
### frontend_worker
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (t *Loki) setupModuleManager() error {
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing},
IngesterQuerier: {Ring},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
IndexGatewayRing: {Overrides, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor},
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
tableRange := period.GetIndexTableNumberRange(periodEndTime)

indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())),
prometheus.DefaultRegisterer, t.Overrides, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
managerMode = indexgateway.ServerMode
}
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer)
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)

if err != nil {
return nil, gerrors.Wrap(err, "new index gateway ring manager")
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ func ResetBoltDBIndexClientsWithShipper() {
type StoreLimits interface {
downloads.Limits
stores.StoreLimits
indexgateway.Limits
CardinalityLimit(string) int
}

Expand Down Expand Up @@ -256,7 +258,7 @@ func (cfg *Config) Validate() error {
}

// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, logger log.Logger) (index.Client, error) {
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, overrides indexgateway.Limits, logger log.Logger) (index.Client, error) {
switch periodCfg.IndexType {
case config.StorageTypeInMemory:
store := testutils.NewMockStorage()
Expand Down Expand Up @@ -289,7 +291,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
return indexGatewayClient, nil
}

gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, logger)
gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, overrides, logger)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan
if p.IndexType == config.TSDBType {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, indexClientLogger)
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan
}, nil
}

idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, indexClientLogger)
idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, s.limits, indexClientLogger)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error creating index client")
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type IndexGatewayClientConfig struct {
// this client should talk to.
//
// Only relevant for the ring mode.
Ring ring.ReadRing `yaml:"-"`
Ring ring.DynamicReplicationReadRing `yaml:"-"`

// GRPCClientConfig configures the gRPC connection between the Index Gateway client and the server.
//
Expand Down Expand Up @@ -97,17 +97,19 @@ type GatewayClient struct {

pool *ring_client.Pool

ring ring.ReadRing
ring ring.DynamicReplicationReadRing

stringBufPool *sync.Pool
instanceBufPool *sync.Pool

overrides indexgateway.Limits
}

// NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance.
//
// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created.
// Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode.
func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, logger log.Logger) (*GatewayClient, error) {
func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, overrides indexgateway.Limits, logger log.Logger) (*GatewayClient, error) {
latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "index_gateway_request_duration_seconds",
Expand All @@ -129,6 +131,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, log
cfg: cfg,
storeGatewayClientRequestDuration: latency,
ring: cfg.Ring,
overrides: overrides,
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration))
Expand Down Expand Up @@ -346,8 +349,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log
bufZones := s.stringBufPool.Get().([]string)
defer s.stringBufPool.Put(bufZones) //nolint:staticcheck

key := util.TokenFor(userID, "" /* labels */)
rs, err := s.ring.Get(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0])
f := s.overrides.GatewayShardingFactor(userID)
rf := util.DynamicReplicationFactor(s.ring, f)
key := util.TokenFor(userID, "")
rs, err := s.ring.GetWithRF(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0], rf)
if err != nil {
return errors.Wrap(err, "index gateway get ring")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)

const (
Expand Down Expand Up @@ -116,7 +117,8 @@ func TestGatewayClient(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.Address = storeAddress

gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, util_log.Logger)
overrides, _ := validation.NewOverrides(validation.Limits{}, nil)
gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, overrides, util_log.Logger)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "fake")
Expand Down Expand Up @@ -296,16 +298,17 @@ func Benchmark_QueriesMatchingLargeNumOfRows(b *testing.B) {

func TestDoubleRegistration(t *testing.T) {
r := prometheus.NewRegistry()
o, _ := validation.NewOverrides(validation.Limits{}, nil)

clientCfg := IndexGatewayClientConfig{
Address: "my-store-address:1234",
}

client, err := NewGatewayClient(clientCfg, r, util_log.Logger)
client, err := NewGatewayClient(clientCfg, r, o, util_log.Logger)
require.NoError(t, err)
defer client.Stop()

client, err = NewGatewayClient(clientCfg, r, util_log.Logger)
client, err = NewGatewayClient(clientCfg, r, o, util_log.Logger)
require.NoError(t, err)
defer client.Stop()
}
16 changes: 7 additions & 9 deletions pkg/storage/stores/shipper/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
maxIndexEntriesPerResponse = 1000
)

type Limits interface {
GatewayShardingFactor(tenantID string) float64
}

type IndexQuerier interface {
stores.ChunkFetcher
index.BaseReader
Expand All @@ -52,15 +56,13 @@ type Gateway struct {

cfg Config
log log.Logger

shipper IndexQuerier
}

// NewIndexGateway instantiates a new Index Gateway and start its services.
//
// In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started.
// Otherwise, it starts an Idle Service that doesn't have lifecycle hooks.
func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange) (*Gateway, error) {
func NewIndexGateway(cfg Config, log log.Logger, _ prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange) (*Gateway, error) {
g := &Gateway{
indexQuerier: indexQuerier,
cfg: cfg,
Expand All @@ -73,7 +75,7 @@ func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registere
return g.indexClients[i].TableRange.Start > g.indexClients[j].TableRange.Start
})

g.Service = services.NewIdleService(nil, func(failureCase error) error {
g.Service = services.NewIdleService(nil, func(_ error) error {
g.indexQuerier.Stop()
for _, indexClient := range g.indexClients {
indexClient.Stop()
Expand Down Expand Up @@ -136,11 +138,7 @@ func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, server logprot
return server.Send(response)
})

if innerErr != nil {
return false
}

return true
return innerErr == nil
})

if innerErr != nil {
Expand Down
23 changes: 19 additions & 4 deletions pkg/storage/stores/shipper/indexgateway/ringmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,18 @@ type RingManager struct {
Ring *ring.Ring
managerMode ManagerMode

cfg Config
cfg Config
overrides Limits

log log.Logger
}

// NewRingManager is the recommended way of instantiating a RingManager.
//
// The other functions will assume the RingManager was instantiated through this function.
func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer, overrides Limits) (*RingManager, error) {
rm := &RingManager{
cfg: cfg, log: log, managerMode: managerMode,
cfg: cfg, log: log, managerMode: managerMode, overrides: overrides,
}

if cfg.Mode != RingMode {
Expand Down Expand Up @@ -242,7 +243,21 @@ func (rm *RingManager) IndexGatewayOwnsTenant(tenant string) bool {
return true
}

return loki_util.IsAssignedKey(rm.Ring, rm.RingLifecycler.GetInstanceAddr(), tenant)
return rm.isAssigned(rm.RingLifecycler.GetInstanceAddr(), tenant)
}

// isAssigned replies wether the given instance address is in the ReplicationSet responsible for the given tenant or not.
// The result will be defined based on the tokens assigned to each ring component, queried through the ring client.
func (rm *RingManager) isAssigned(instanceAddress, tenant string) bool {
f := rm.overrides.GatewayShardingFactor(tenant)
rf := loki_util.DynamicReplicationFactor(rm.Ring, f)
token := loki_util.TokenFor(tenant, "" /* labels */)
inSet, err := loki_util.IsInReplicationSetWithFactor(rm.Ring, token, instanceAddress, rf)
if err != nil {
level.Error(rm.log).Log("msg", "error checking if tenant is in replicationset", "error", err, "tenant", tenant, "token", token)
return false
}
return inSet
}

// ServeHTTP serves the HTTP route /indexgateway/ring.
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/limiter/combined_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
)

type CombinedLimits interface {
Expand All @@ -20,4 +21,5 @@ type CombinedLimits interface {
ruler.RulesLimits
scheduler.Limits
storage.StoreLimits
indexgateway.Limits
}
39 changes: 20 additions & 19 deletions pkg/util/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package util

import (
"hash/fnv"
"math"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"

util_log "github.com/grafana/loki/pkg/util/log"
)

// TokenFor generates a token used for finding ingesters from ring
Expand All @@ -25,25 +23,28 @@ func IsInReplicationSet(r ring.ReadRing, ringKey uint32, address string) (bool,
if err != nil {
return false, err
}
return StringsContain(rs.GetAddresses(), address), nil
}

addrs := rs.GetAddresses()
for _, a := range addrs {
if a == address {
return true, nil
}
// IsInReplicationSetWithFactor will query the provided ring for the provided key
// and see if the provided address is in the resulting ReplicationSet.
// Same as IsInReplicationSet, but you can additionally provide a replication factor.
func IsInReplicationSetWithFactor(r ring.DynamicReplicationReadRing, ringKey uint32, address string, rf int) (bool, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := r.GetWithRF(ringKey, ring.Write, bufDescs, bufHosts, bufZones, rf)
if err != nil {
return false, err
}
return false, nil
return StringsContain(rs.GetAddresses(), address), nil
}

// IsAssignedKey replies wether the given instance address is in the ReplicationSet responsible for the given key or not, based on the tokens.
//
// The result will be defined based on the tokens assigned to each ring component, queried through the ring client.
func IsAssignedKey(ringClient ring.ReadRing, instanceAddress string, key string) bool {
token := TokenFor(key, "" /* labels */)
inSet, err := IsInReplicationSet(ringClient, token, instanceAddress)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error checking if key is in replicationset", "error", err, "key", key)
return false
// DynamicReplicatioFactor returns a RF between ring.ReplicationFactor and ring.InstanceCount
// where a factor f=0 is mapped to the lower bound and f=1 is mapped to the upper bound.
func DynamicReplicationFactor(r ring.ReadRing, f float64) int {
rf := r.ReplicationFactor()
if f > 0 {
f = math.Min(f, 1)
rf = rf + int(float64(r.InstancesCount()-rf)*f)
}
return inSet
return rf
}
Loading

0 comments on commit cc3f04d

Please sign in to comment.