Skip to content

Commit

Permalink
Merge remote-tracking branch 'grafana/main' into karsten/query-plan-json
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies committed Nov 13, 2023
2 parents c3ad706 + e5b006c commit fc6e792
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 4 deletions.
6 changes: 5 additions & 1 deletion docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1799,7 +1799,7 @@ ring:
# CLI flag: -bloom-gateway.replication-factor
[replication_factor: <int> | default = 3]
# Flag to enable or disable the usage of the bloom gatway component.
# Flag to enable or disable the bloom gateway component globally.
# CLI flag: -bloom-gateway.enabled
[enabled: <boolean> | default = false]
Expand Down Expand Up @@ -2938,6 +2938,10 @@ shard_streams:
# CLI flag: -bloom-gateway.shard-size
[bloom_gateway_shard_size: <int> | default = 1]

# Whether to use the bloom gateway component in the read path to filter chunks.
# CLI flag: -bloom-gateway.enable-filtering
[bloom_gateway_enable_filtering: <boolean> | default = false]

# The shard size defines how many bloom compactors should be used by a tenant
# when computing blooms. If it's set to 0, shuffle sharding is disabled.
# CLI flag: -bloom-compactor.shard-size
Expand Down
4 changes: 4 additions & 0 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func shuffleAddrs(addrs []string) []string {

// FilterChunkRefs implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
if !c.limits.BloomGatewayEnabled(tenant) {
return groups, nil
}

// Get the addresses of corresponding bloom gateways for each series.
fingerprints, addrs, err := c.serverAddrsForFingerprints(tenant, groups)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f)
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the usage of the bloom gatway component.")
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (

type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
}

type ShardingStrategy interface {
Expand Down
14 changes: 12 additions & 2 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ type Limits struct {
RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."`
RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."`

IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`

BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"`

BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
Expand Down Expand Up @@ -291,7 +294,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
dskit_flagext.DeprecatedFlag(f, "compactor.allow-deletes", "Deprecated. Instead, see compactor.deletion-mode which is another per tenant configuration", util_log.Logger)

f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")

f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.")
f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Whether to use the bloom gateway component in the read path to filter chunks.")

f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
Expand Down Expand Up @@ -774,6 +780,10 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomGatewayShardSize
}

func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}

func (o *Overrides) BloomCompactorShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomCompactorShardSize
}
Expand Down

0 comments on commit fc6e792

Please sign in to comment.