Skip to content

Commit

Permalink
Enable/disable bloom gateway per tenant (#11203)
Browse files Browse the repository at this point in the history
This commits adds a per-tenant setting to enable/disable the use of the
bloom gateway component for filtering chunks.

As an operator, you want to control what tenant can use the filtering of
chunks based on search string, because the creation of the bloom filters
requires a significant amount of resources.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Nov 13, 2023
1 parent 05c4b77 commit e5b006c
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 4 deletions.
6 changes: 5 additions & 1 deletion docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1799,7 +1799,7 @@ ring:
# CLI flag: -bloom-gateway.replication-factor
[replication_factor: <int> | default = 3]
# Flag to enable or disable the usage of the bloom gatway component.
# Flag to enable or disable the bloom gateway component globally.
# CLI flag: -bloom-gateway.enabled
[enabled: <boolean> | default = false]
Expand Down Expand Up @@ -2938,6 +2938,10 @@ shard_streams:
# CLI flag: -bloom-gateway.shard-size
[bloom_gateway_shard_size: <int> | default = 1]

# Whether to use the bloom gateway component in the read path to filter chunks.
# CLI flag: -bloom-gateway.enable-filtering
[bloom_gateway_enable_filtering: <boolean> | default = false]

# The shard size defines how many bloom compactors should be used by a tenant
# when computing blooms. If it's set to 0, shuffle sharding is disabled.
# CLI flag: -bloom-compactor.shard-size
Expand Down
4 changes: 4 additions & 0 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func shuffleAddrs(addrs []string) []string {

// FilterChunkRefs implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
if !c.limits.BloomGatewayEnabled(tenant) {
return groups, nil
}

// Get the addresses of corresponding bloom gateways for each series.
fingerprints, addrs, err := c.serverAddrsForFingerprints(tenant, groups)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f)
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the usage of the bloom gatway component.")
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (

type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
}

type ShardingStrategy interface {
Expand Down
14 changes: 12 additions & 2 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ type Limits struct {
RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."`
RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."`

IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`

BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"`

BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
Expand Down Expand Up @@ -291,7 +294,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
dskit_flagext.DeprecatedFlag(f, "compactor.allow-deletes", "Deprecated. Instead, see compactor.deletion-mode which is another per tenant configuration", util_log.Logger)

f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")

f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.")
f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Whether to use the bloom gateway component in the read path to filter chunks.")

f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
Expand Down Expand Up @@ -774,6 +780,10 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomGatewayShardSize
}

func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}

func (o *Overrides) BloomCompactorShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomCompactorShardSize
}
Expand Down

0 comments on commit e5b006c

Please sign in to comment.