Skip to content

Commit

Permalink
Make Blooms-Gateway queue settings configurable (#11321)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

This PR adds three new settings to the Bloom-Gateway config:

- `worker_concurrency`: replaces the `numWorkers` constant. Named after
the scheduler's `scheduler_worker_concurrency` config.
- `max_outstanding_per_tenant`: replaces the `maxTasksPerTenant`
constant. Named after the frontend's `max_outstanding_per_tenant`
config.
- `pending_tasks_initial_capacity`: replaces the pendingTasksInitialCap
constant.

**Note to the reviewers:**
- I think in the future `max_outstanding_per_tenant` should be
configurable per-tenant. This would require refactoring the queue pkg
and the frontend/scheduler, so I think it doesn't make sense it in this
PR.
- I moved the `Limits` struct definition to `config.go` so have all the
adjustable settings in the same place.
  • Loading branch information
salvacorts authored Nov 27, 2023
1 parent 2a83cf0 commit 67ecf50
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
8 changes: 8 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1837,6 +1837,14 @@ client:
# not.
# CLI flag: -bloom-gateway-client.log-gateway-requests
[log_gateway_requests: <boolean> | default = false]
# Number of workers to use for filtering chunks concurrently.
# CLI flag: -bloom-gateway.worker-concurrency
[worker_concurrency: <int> | default = 4]
# Maximum number of outstanding tasks per tenant.
# CLI flag: -bloom-gateway.max-outstanding-per-tenant
[max_outstanding_per_tenant: <int> | default = 1024]
```

### storage_config
Expand Down
12 changes: 3 additions & 9 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,9 @@ import (

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")

// TODO(chaudum): Make these configurable
const (
numWorkers = 4
maxTasksPerTenant = 1024
pendingTasksInitialCap = 1024
)

const (
metricsSubsystem = "bloom_gateway"
metricsSubsystem = "bloom_gateway"
)

type metrics struct {
Expand Down Expand Up @@ -180,7 +174,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem),
}

g.queue = queue.NewRequestQueue(maxTasksPerTenant, time.Minute, g.queueMetrics)
g.queue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, time.Minute, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm)
Expand Down Expand Up @@ -212,7 +206,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
func (g *Gateway) initServices() error {
var err error
svcs := []services.Service{g.queue, g.activeUsers}
for i := 0; i < numWorkers; i++ {
for i := 0; i < g.cfg.WorkerConcurrency; i++ {
id := fmt.Sprintf("bloom-query-worker-%d", i)
w := newWorker(id, g.workerConfig, g.queue, g.bloomStore, g.pendingTasks, g.logger, g.workerMetrics)
svcs = append(svcs, w)
Expand Down
6 changes: 5 additions & 1 deletion pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func TestBloomGateway_StartStopService(t *testing.T) {
},
ReplicationFactor: 1,
},
WorkerConcurrency: 4,
MaxOutstandingPerTenant: 1024,
}

gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg)
Expand All @@ -113,7 +115,7 @@ func TestBloomGateway_StartStopService(t *testing.T) {

// Wait for workers to connect to queue
time.Sleep(50 * time.Millisecond)
require.Equal(t, float64(numWorkers), gw.queue.GetConnectedConsumersMetric())
require.Equal(t, float64(cfg.WorkerConcurrency), gw.queue.GetConnectedConsumersMetric())

err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
Expand Down Expand Up @@ -162,6 +164,8 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
},
ReplicationFactor: 1,
},
WorkerConcurrency: 4,
MaxOutstandingPerTenant: 1024,
}

t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type Config struct {
Enabled bool `yaml:"enabled"`
// Client configures the Bloom Gateway client
Client ClientConfig `yaml:"client,omitempty" doc:""`

WorkerConcurrency int `yaml:"worker_concurrency"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
}

// RegisterFlags registers flags for the Bloom Gateway configuration.
Expand All @@ -27,7 +30,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f)
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
f.IntVar(&cfg.WorkerConcurrency, prefix+"worker-concurrency", 4, "Number of workers to use for filtering chunks concurrently.")
f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.")
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
}

type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
BloomGatewayBlocksDownloadingParallelism(tenantID string) int
}
6 changes: 0 additions & 6 deletions pkg/bloomgateway/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ var (
})
)

type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
BloomGatewayBlocksDownloadingParallelism(userID string) int
}

type ShardingStrategy interface {
// FilterTenants whose indexes should be loaded by the index gateway.
// Returns the list of user IDs that should be synced by the index gateway.
Expand Down

0 comments on commit 67ecf50

Please sign in to comment.