diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 501b57df2876..e65d025c6449 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -1799,7 +1799,7 @@ ring: # CLI flag: -bloom-gateway.replication-factor [replication_factor: | default = 3] -# Flag to enable or disable the usage of the bloom gatway component. +# Flag to enable or disable the bloom gateway component globally. # CLI flag: -bloom-gateway.enabled [enabled: | default = false] @@ -2938,6 +2938,10 @@ shard_streams: # CLI flag: -bloom-gateway.shard-size [bloom_gateway_shard_size: | default = 1] +# Whether to use the bloom gateway component in the read path to filter chunks. +# CLI flag: -bloom-gateway.enable-filtering +[bloom_gateway_enable_filtering: | default = false] + # The shard size defines how many bloom compactors should be used by a tenant # when computing blooms. If it's set to 0, shuffle sharding is disabled. # CLI flag: -bloom-compactor.shard-size diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 2216e0b43e3b..9e43a32d08e7 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -134,6 +134,10 @@ func shuffleAddrs(addrs []string) []string { // FilterChunkRefs implements Client func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) { + if !c.limits.BloomGatewayEnabled(tenant) { + return groups, nil + } + // Get the addresses of corresponding bloom gateways for each series. fingerprints, addrs, err := c.serverAddrsForFingerprints(tenant, groups) if err != nil { diff --git a/pkg/bloomgateway/config.go b/pkg/bloomgateway/config.go index 6385361066db..68856a45d4c2 100644 --- a/pkg/bloomgateway/config.go +++ b/pkg/bloomgateway/config.go @@ -26,7 +26,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f) - f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the usage of the bloom gatway component.") + f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.") // TODO(chaudum): Figure out what the better place is for registering flags // -bloom-gateway.client.* or -bloom-gateway-client.* cfg.Client.RegisterFlags(f) diff --git a/pkg/bloomgateway/sharding.go b/pkg/bloomgateway/sharding.go index 34d3c63c43b1..4bd288ccfe43 100644 --- a/pkg/bloomgateway/sharding.go +++ b/pkg/bloomgateway/sharding.go @@ -37,6 +37,7 @@ var ( type Limits interface { BloomGatewayShardSize(tenantID string) int + BloomGatewayEnabled(tenantID string) bool } type ShardingStrategy interface { diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 90cbe29e567d..823b23dd9311 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -177,8 +177,11 @@ type Limits struct { RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."` RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."` - IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` - BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` + IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` + + BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` + BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"` @@ -291,7 +294,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { dskit_flagext.DeprecatedFlag(f, "compactor.allow-deletes", "Deprecated. Instead, see compactor.deletion-mode which is another per tenant configuration", util_log.Logger) f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.") + f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.") + f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Whether to use the bloom gateway component in the read path to filter chunks.") + f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.") f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.") @@ -774,6 +780,10 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int { return o.getOverridesForUser(userID).BloomGatewayShardSize } +func (o *Overrides) BloomGatewayEnabled(userID string) bool { + return o.getOverridesForUser(userID).BloomGatewayEnabled +} + func (o *Overrides) BloomCompactorShardSize(userID string) int { return o.getOverridesForUser(userID).BloomCompactorShardSize }