Skip to content

Commit

Permalink
chore(blooms): misc fixes (#12369)
Browse files Browse the repository at this point in the history
unbounds bloom-gw-client concurrency

removes unused log_gateway_requests param

adds note wrt bloom-gw results cache keygen issue

add msg field to stats log

sets bloomshipper download workers count to 16 default
  • Loading branch information
owen-d authored Mar 27, 2024
1 parent 19c046f commit 120b76d
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 16 deletions.
7 changes: 1 addition & 6 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1915,11 +1915,6 @@ client:
# bloom-gateway-client.grpc
[grpc_client_config: <grpc_client>]
# Flag to control whether requests sent to the gateway should be logged or
# not.
# CLI flag: -bloom-gateway-client.log-gateway-requests
[log_gateway_requests: <boolean> | default = false]
results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
Expand Down Expand Up @@ -2370,7 +2365,7 @@ bloom_shipper:
blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
[workers_count: <int> | default = 100]
[workers_count: <int> | default = 16]

# Maximum number of task in queue per tenant per bloom-gateway. Enqueuing
# the tasks above this limit will fail an error.
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func newCacheKeyGen(limits CacheLimits) keyGen {
return keyGen{limits}
}

// TODO(owen-d): need to implement our own key-generation which accounts for fingerprint ranges requested.
func (k keyGen) GenerateCacheKey(ctx context.Context, tenant string, r resultscache.Request) string {
return resultscache.ConstSplitter(k.BloomGatewayCacheKeyInterval(tenant)).GenerateCacheKey(ctx, tenant, r)
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ var (
}
},
}

// NB(chaudum): Should probably be configurable, but I don't want yet another user setting.
maxQueryParallelism = 10
)

type ringGetBuffers struct {
Expand Down Expand Up @@ -107,10 +104,6 @@ type ClientConfig struct {
// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`

// LogGatewayRequests configures if requests sent to the gateway should be logged or not.
// The log messages are of type debug and contain the address of the gateway and the relevant tenant.
LogGatewayRequests bool `yaml:"log_gateway_requests"`

// Ring is the Bloom Gateway ring used to find the appropriate Bloom Gateway instance
// this client should talk to.
Ring ring.ReadRing `yaml:"-"`
Expand All @@ -130,7 +123,6 @@ func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc", f)
i.Cache.RegisterFlagsWithPrefix(prefix+"cache.", f)
f.BoolVar(&i.CacheResults, prefix+"cache_results", false, "Flag to control whether to cache bloom gateway client requests/responses.")
f.BoolVar(&i.LogGatewayRequests, prefix+"log-gateway-requests", false, "Flag to control whether requests sent to the gateway should be logged or not.")
}

func (i *ClientConfig) Validate() error {
Expand Down Expand Up @@ -258,7 +250,7 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t

results := make([][]*logproto.GroupedChunkRefs, len(servers))
count := 0
err = concurrency.ForEachJob(ctx, len(servers), maxQueryParallelism, func(ctx context.Context, i int) error {
err = concurrency.ForEachJob(ctx, len(servers), len(servers), func(ctx context.Context, i int) error {
rs := servers[i]

// randomize order of addresses so we don't hotspot the first server in the list
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (s *Stats) KVArgs() []any {
filterRatio := float64(s.ChunksFiltered) / float64(max(s.ChunksRequested, 1))

return []any{
"msg", "stats-report",
"status", s.Status,
"tasks", s.NumTasks,
"series_requested", s.SeriesRequested,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type DownloadingQueueConfig struct {
}

func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 100, "The count of parallel workers that download Bloom Blocks.")
f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 16, "The count of parallel workers that download Bloom Blocks.")
f.IntVar(&cfg.MaxTasksEnqueuedPerTenant, prefix+"max_tasks_enqueued_per_tenant", 10_000, "Maximum number of task in queue per tenant per bloom-gateway. Enqueuing the tasks above this limit will fail an error.")
}

Expand Down

0 comments on commit 120b76d

Please sign in to comment.