Skip to content

Commit

Permalink
changed batch size to int
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko committed Jan 11, 2024
1 parent e90f8d5 commit a97a9e4
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type lazyBloomBuilder struct {
bt compactorTokenizer
fpRate float64
logger log.Logger
chunksBatchSize uint
chunksBatchSize int

cur v1.SeriesWithBloom // retured by At()
err error // returned by Err()
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloomcompactor/chunksbatchesiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ type chunksBatchesIterator struct {
context context.Context
client chunkClient
chunksToDownload []chunk.Chunk
batchSize uint
batchSize int

currentBatch []chunk.Chunk
err error
}

func newChunkBatchesIterator(context context.Context, client chunkClient, chunksToDownload []chunk.Chunk, batchSize uint) (*chunksBatchesIterator, error) {
if batchSize == 0 {
func newChunkBatchesIterator(context context.Context, client chunkClient, chunksToDownload []chunk.Chunk, batchSize int) (*chunksBatchesIterator, error) {
if batchSize <= 0 {
return nil, errors.New("batchSize must be greater than 0")
}
return &chunksBatchesIterator{context: context, client: client, chunksToDownload: chunksToDownload, batchSize: batchSize}, nil
Expand All @@ -29,7 +29,7 @@ func (c *chunksBatchesIterator) Next() bool {
return false
}
batchSize := c.batchSize
chunksToDownloadCount := uint(len(c.chunksToDownload))
chunksToDownloadCount := len(c.chunksToDownload)
if chunksToDownloadCount < batchSize {
batchSize = chunksToDownloadCount
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/chunksbatchesiterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

func Test_chunksBatchesIterator(t *testing.T) {
tests := map[string]struct {
batchSize uint
batchSize int
chunksToDownload []chunk.Chunk
constructorError error

Expand Down Expand Up @@ -63,7 +63,7 @@ func Test_chunksBatchesIterator(t *testing.T) {
hadNextCount++
downloaded := iterator.At()
downloadedChunks = append(downloadedChunks, downloaded...)
require.LessOrEqual(t, uint(len(downloaded)), data.batchSize)
require.LessOrEqual(t, len(downloaded), data.batchSize)
}
require.Equal(t, data.chunksToDownload, downloadedChunks)
require.Equal(t, data.hadNextCount, client.callsCount)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorChunksBatchSize(userID string) uint
BloomCompactorChunksBatchSize(userID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ func TestShuffleSharding(t *testing.T) {
type mockLimits struct {
*validation.Overrides
bloomCompactorShardSize int
chunksDownloadingBatchSize uint
chunksDownloadingBatchSize int
}

func (m mockLimits) BloomCompactorShardSize(_ string) int {
return m.bloomCompactorShardSize
}

func (m mockLimits) BloomCompactorChunksBatchSize(_ string) uint {
func (m mockLimits) BloomCompactorChunksBatchSize(_ string) int {
if m.chunksDownloadingBatchSize != 0 {
return m.chunksDownloadingBatchSize
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,10 @@ type Limits struct {
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"`

BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`

BloomCompactorChunksBatchSize uint `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"`
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
Expand Down Expand Up @@ -314,7 +313,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
f.UintVar(&l.BloomCompactorChunksBatchSize, "bloom-compactor.chunks-batch-size", 100, "The batch size of the chunks the bloom-compactor downloads at once.")
f.IntVar(&l.BloomCompactorChunksBatchSize, "bloom-compactor.chunks-batch-size", 100, "The batch size of the chunks the bloom-compactor downloads at once.")
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.")
Expand Down Expand Up @@ -831,7 +830,7 @@ func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}

func (o *Overrides) BloomCompactorChunksBatchSize(userID string) uint {
func (o *Overrides) BloomCompactorChunksBatchSize(userID string) int {
return o.getOverridesForUser(userID).BloomCompactorChunksBatchSize
}

Expand Down

0 comments on commit a97a9e4

Please sign in to comment.