Skip to content

Commit

Permalink
Merge branch 'main' into lshippy/updates-api-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
lshippy authored Jan 4, 2024
2 parents f11c61d + 599eed7 commit 6ab6627
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 21 deletions.
7 changes: 0 additions & 7 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3084,13 +3084,6 @@ shard_streams:
# CLI flag: -bloom-compactor.max-table-age
[bloom_compactor_max_table_age: <duration> | default = 168h]

# The minimum age of a table before it is compacted. Do not compact tables newer
# than the the configured time. Default to 1 hour. 0s means no limit. This is
# useful to avoid compacting tables that will be updated with out-of-order
# writes.
# CLI flag: -bloom-compactor.min-table-age
[bloom_compactor_min_table_age: <duration> | default = 1h]

# Whether to compact chunks into bloom filters.
# CLI flag: -bloom-compactor.enable-compaction
[bloom_compactor_enable_compaction: <boolean> | default = false]
Expand Down
9 changes: 2 additions & 7 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,11 @@ func (c *Compactor) runCompaction(ctx context.Context) error {
_ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error {
tableName := tables[i]
logger := log.With(c.logger, "table", tableName)
level.Info(logger).Log("msg", "compacting table")
err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName])
if err != nil {
errs.Add(err)
return nil
}
level.Info(logger).Log("msg", "finished compacting table")
return nil
})

Expand Down Expand Up @@ -307,12 +305,7 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor

// Skip this table if it is too new/old for the tenant limits.
now := model.Now()
tableMinAge := c.limits.BloomCompactorMinTableAge(tenant)
tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant)
if tableMinAge > 0 && tableInterval.End.After(now.Add(-tableMinAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too new ", "table-min-age", tableMinAge, "table-end", tableInterval.End, "now", now)
continue
}
if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too old", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now)
continue
Expand Down Expand Up @@ -507,6 +500,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
}()

level.Info(logger).Log("msg", "started compacting table", "table", job.tableName, "tenant", job.tenantID)
if len(blocksMatchingJob) == 0 && len(metasMatchingJob) > 0 {
// There is no change to any blocks, no compaction needed
level.Info(logger).Log("msg", "No changes to tsdb, no compaction needed")
Expand Down Expand Up @@ -597,5 +591,6 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err)
return err
}
level.Info(logger).Log("msg", "finished compacting table", "table", job.tableName, "tenant", job.tenantID)
return nil
}
1 change: 0 additions & 1 deletion pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorMinTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
BloomNGramSkip(tenantID string) int
Expand Down
6 changes: 0 additions & 6 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ type Limits struct {

BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
Expand Down Expand Up @@ -312,7 +311,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
Expand Down Expand Up @@ -838,10 +836,6 @@ func (o *Overrides) BloomCompactorMaxTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMaxTableAge
}

func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMinTableAge
}

func (o *Overrides) BloomCompactorEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomCompactorEnabled
}
Expand Down

0 comments on commit 6ab6627

Please sign in to comment.