Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per-tenant replication (sharding) factor for index gateway #9574

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [9574](https://github.com/grafana/loki/pull/9574) **chaudum**: Add per-tenant replication factor for index gateways.
* [8067](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion.
* [9515](https://github.com/grafana/loki/pull/9515) **MichelHollands**: Fix String() on vector aggregation LogQL expressions that contain `without ()`.
* [8067](https://github.com/grafana/loki/pull/8067) **DylanGuedes**: Distributor: Add auto-forget unhealthy members support.
Expand Down
7 changes: 7 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2589,6 +2589,13 @@ shard_streams:

# Minimum number of label matchers a query should contain.
[minimum_labels_number: <int>]

# Experimental. The sharding factor defines how many index gateways should be
# used for querying. A factor of 0.0 means that replication factor amount of
# servers are used, a factor of 1.0 means all instances of the index gateway
# ring are used.
# CLI flag: -index-gateway.sharding-factor
[gateway_sharding_factor: <float> | default = 0]
```

### frontend_worker
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e
github.com/grafana/dskit v0.0.0-20230531063521-9bd0e035aecd
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1089,8 +1089,8 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e h1:ODjv+9dmklDS33O2B4zPgIDKdnji18o9ofD9qWA+mAs=
github.com/grafana/dskit v0.0.0-20230518162305-3c92c534827e/go.mod h1:M03k2fzuQ2n9TVE1xfVKTESibxsXdw0wYfWT3+9Owp4=
github.com/grafana/dskit v0.0.0-20230531063521-9bd0e035aecd h1:lPl/cLANWA5rvlXxs1v05lkDG/aTiGRikhZqEoONaSM=
github.com/grafana/dskit v0.0.0-20230531063521-9bd0e035aecd/go.mod h1:M03k2fzuQ2n9TVE1xfVKTESibxsXdw0wYfWT3+9Owp4=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (t *Loki) setupModuleManager() error {
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing},
IngesterQuerier: {Ring},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
IndexGatewayRing: {Overrides, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor},
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
tableRange := period.GetIndexTableNumberRange(periodEndTime)

indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())),
prometheus.DefaultRegisterer, t.Overrides, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
if t.Cfg.isModuleEnabled(IndexGateway) || legacyReadMode || t.Cfg.isModuleEnabled(Backend) {
managerMode = indexgateway.ServerMode
}
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer)
rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)

if err != nil {
return nil, gerrors.Wrap(err, "new index gateway ring manager")
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -61,6 +62,7 @@ func ResetBoltDBIndexClientsWithShipper() {
type StoreLimits interface {
downloads.Limits
stores.StoreLimits
indexgateway.Limits
CardinalityLimit(string) int
}

Expand Down Expand Up @@ -256,7 +258,7 @@ func (cfg *Config) Validate() error {
}

// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, logger log.Logger) (index.Client, error) {
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, overrides indexgateway.Limits, logger log.Logger) (index.Client, error) {
switch periodCfg.IndexType {
case config.StorageTypeInMemory:
store := testutils.NewMockStorage()
Expand Down Expand Up @@ -289,7 +291,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
return indexGatewayClient, nil
}

gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, logger)
gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, overrides, logger)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan
if p.IndexType == config.TSDBType {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, indexClientLogger)
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan
}, nil
}

idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, indexClientLogger)
idx, err := NewIndexClient(p, tableRange, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg, s.limits, indexClientLogger)
if err != nil {
return nil, nil, nil, errors.Wrap(err, "error creating index client")
}
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type IndexGatewayClientConfig struct {
// this client should talk to.
//
// Only relevant for the ring mode.
Ring ring.ReadRing `yaml:"-"`
Ring ring.DynamicReplicationReadRing `yaml:"-"`

// GRPCClientConfig configures the gRPC connection between the Index Gateway client and the server.
//
Expand Down Expand Up @@ -97,17 +97,19 @@ type GatewayClient struct {

pool *ring_client.Pool

ring ring.ReadRing
ring ring.DynamicReplicationReadRing

stringBufPool *sync.Pool
instanceBufPool *sync.Pool

overrides indexgateway.Limits
}

// NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance.
//
// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created.
// Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode.
func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, logger log.Logger) (*GatewayClient, error) {
func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, overrides indexgateway.Limits, logger log.Logger) (*GatewayClient, error) {
latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "index_gateway_request_duration_seconds",
Expand All @@ -129,6 +131,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, log
cfg: cfg,
storeGatewayClientRequestDuration: latency,
ring: cfg.Ring,
overrides: overrides,
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration))
Expand Down Expand Up @@ -346,8 +349,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log
bufZones := s.stringBufPool.Get().([]string)
defer s.stringBufPool.Put(bufZones) //nolint:staticcheck

key := util.TokenFor(userID, "" /* labels */)
rs, err := s.ring.Get(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0])
f := s.overrides.GatewayShardingFactor(userID)
rf := util.DynamicReplicationFactor(s.ring, f)
key := util.TokenFor(userID, "")
rs, err := s.ring.GetWithRF(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0], rf)
if err != nil {
return errors.Wrap(err, "index gateway get ring")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)

const (
Expand Down Expand Up @@ -116,7 +117,8 @@ func TestGatewayClient(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.Address = storeAddress

gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, util_log.Logger)
overrides, _ := validation.NewOverrides(validation.Limits{}, nil)
gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, overrides, util_log.Logger)
require.NoError(t, err)

ctx := user.InjectOrgID(context.Background(), "fake")
Expand Down Expand Up @@ -296,16 +298,17 @@ func Benchmark_QueriesMatchingLargeNumOfRows(b *testing.B) {

func TestDoubleRegistration(t *testing.T) {
r := prometheus.NewRegistry()
o, _ := validation.NewOverrides(validation.Limits{}, nil)

clientCfg := IndexGatewayClientConfig{
Address: "my-store-address:1234",
}

client, err := NewGatewayClient(clientCfg, r, util_log.Logger)
client, err := NewGatewayClient(clientCfg, r, o, util_log.Logger)
require.NoError(t, err)
defer client.Stop()

client, err = NewGatewayClient(clientCfg, r, util_log.Logger)
client, err = NewGatewayClient(clientCfg, r, o, util_log.Logger)
require.NoError(t, err)
defer client.Stop()
}
16 changes: 7 additions & 9 deletions pkg/storage/stores/shipper/indexgateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
maxIndexEntriesPerResponse = 1000
)

type Limits interface {
GatewayShardingFactor(tenantID string) float64
}

type IndexQuerier interface {
stores.ChunkFetcher
index.BaseReader
Expand All @@ -52,15 +56,13 @@ type Gateway struct {

cfg Config
log log.Logger

shipper IndexQuerier
}

// NewIndexGateway instantiates a new Index Gateway and start its services.
//
// In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started.
// Otherwise, it starts an Idle Service that doesn't have lifecycle hooks.
func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange) (*Gateway, error) {
func NewIndexGateway(cfg Config, log log.Logger, _ prometheus.Registerer, indexQuerier IndexQuerier, indexClients []IndexClientWithRange) (*Gateway, error) {
g := &Gateway{
indexQuerier: indexQuerier,
cfg: cfg,
Expand All @@ -73,7 +75,7 @@ func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registere
return g.indexClients[i].TableRange.Start > g.indexClients[j].TableRange.Start
})

g.Service = services.NewIdleService(nil, func(failureCase error) error {
g.Service = services.NewIdleService(nil, func(_ error) error {
g.indexQuerier.Stop()
for _, indexClient := range g.indexClients {
indexClient.Stop()
Expand Down Expand Up @@ -136,11 +138,7 @@ func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, server logprot
return server.Send(response)
})

if innerErr != nil {
return false
}

return true
return innerErr == nil
})

if innerErr != nil {
Expand Down
23 changes: 19 additions & 4 deletions pkg/storage/stores/shipper/indexgateway/ringmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,18 @@ type RingManager struct {
Ring *ring.Ring
managerMode ManagerMode

cfg Config
cfg Config
overrides Limits

log log.Logger
}

// NewRingManager is the recommended way of instantiating a RingManager.
//
// The other functions will assume the RingManager was instantiated through this function.
func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer, overrides Limits) (*RingManager, error) {
rm := &RingManager{
cfg: cfg, log: log, managerMode: managerMode,
cfg: cfg, log: log, managerMode: managerMode, overrides: overrides,
}

if cfg.Mode != RingMode {
Expand Down Expand Up @@ -242,7 +243,21 @@ func (rm *RingManager) IndexGatewayOwnsTenant(tenant string) bool {
return true
}

return loki_util.IsAssignedKey(rm.Ring, rm.RingLifecycler.GetInstanceAddr(), tenant)
return rm.isAssigned(rm.RingLifecycler.GetInstanceAddr(), tenant)
}

// isAssigned replies wether the given instance address is in the ReplicationSet responsible for the given tenant or not.
// The result will be defined based on the tokens assigned to each ring component, queried through the ring client.
func (rm *RingManager) isAssigned(instanceAddress, tenant string) bool {
f := rm.overrides.GatewayShardingFactor(tenant)
rf := loki_util.DynamicReplicationFactor(rm.Ring, f)
token := loki_util.TokenFor(tenant, "" /* labels */)
inSet, err := loki_util.IsInReplicationSetWithFactor(rm.Ring, token, instanceAddress, rf)
if err != nil {
level.Error(rm.log).Log("msg", "error checking if tenant is in replicationset", "error", err, "tenant", tenant, "token", token)
return false
}
return inSet
}

// ServeHTTP serves the HTTP route /indexgateway/ring.
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/limiter/combined_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
)

type CombinedLimits interface {
Expand All @@ -20,4 +21,5 @@ type CombinedLimits interface {
ruler.RulesLimits
scheduler.Limits
storage.StoreLimits
indexgateway.Limits
}
39 changes: 20 additions & 19 deletions pkg/util/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package util

import (
"hash/fnv"
"math"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"

util_log "github.com/grafana/loki/pkg/util/log"
)

// TokenFor generates a token used for finding ingesters from ring
Expand All @@ -25,25 +23,28 @@ func IsInReplicationSet(r ring.ReadRing, ringKey uint32, address string) (bool,
if err != nil {
return false, err
}
return StringsContain(rs.GetAddresses(), address), nil
}

addrs := rs.GetAddresses()
for _, a := range addrs {
if a == address {
return true, nil
}
// IsInReplicationSetWithFactor will query the provided ring for the provided key
// and see if the provided address is in the resulting ReplicationSet.
// Same as IsInReplicationSet, but you can additionally provide a replication factor.
func IsInReplicationSetWithFactor(r ring.DynamicReplicationReadRing, ringKey uint32, address string, rf int) (bool, error) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := r.GetWithRF(ringKey, ring.Write, bufDescs, bufHosts, bufZones, rf)
if err != nil {
return false, err
}
return false, nil
return StringsContain(rs.GetAddresses(), address), nil
}

// IsAssignedKey replies wether the given instance address is in the ReplicationSet responsible for the given key or not, based on the tokens.
//
// The result will be defined based on the tokens assigned to each ring component, queried through the ring client.
func IsAssignedKey(ringClient ring.ReadRing, instanceAddress string, key string) bool {
token := TokenFor(key, "" /* labels */)
inSet, err := IsInReplicationSet(ringClient, token, instanceAddress)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error checking if key is in replicationset", "error", err, "key", key)
return false
// DynamicReplicatioFactor returns a RF between ring.ReplicationFactor and ring.InstanceCount
// where a factor f=0 is mapped to the lower bound and f=1 is mapped to the upper bound.
func DynamicReplicationFactor(r ring.ReadRing, f float64) int {
rf := r.ReplicationFactor()
if f > 0 {
f = math.Min(f, 1)
rf = rf + int(float64(r.InstancesCount()-rf)*f)
}
return inSet
return rf
}
Loading