Skip to content

Commit

Permalink
Make compactTenant function more readable
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Dec 5, 2023
1 parent 79693d7 commit efe32bb
Showing 1 changed file with 27 additions and 14 deletions.
41 changes: 27 additions & 14 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,14 +343,27 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
bt, _ := v1.NewBloomTokenizer(c.reg, NGramLength, NGramSkip)

errs := multierror.New()
if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex { // TODO: handle multitenant tables
return fmt.Errorf("unexpected multi-tenant")
_ = sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error {
if isMultiTenantIndex {
// Skip multi-tenant indexes
return nil
}

tsdbFile, ok := idx.(*tsdb.TSDBFile)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBFile"))
return nil
}

tsdbIndex, ok := tsdbFile.Index.(*tsdb.TSDBIndex)
if !ok {
errs.Add(fmt.Errorf("failed to cast to TSDBIndex"))
return nil
}

var seriesMetas []seriesMeta
// TODO: Make these casts safely
if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries(

err := tsdbIndex.ForSeries(
ctx, nil,
0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod
func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) {
Expand All @@ -371,26 +384,26 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
//All seriesMetas given a table within fp of this compactor shard
seriesMetas = append(seriesMetas, seriesMeta{seriesFP: fingerprint, seriesLbs: labels, chunkRefs: temp})
},
); err != nil {
)

if err != nil {
errs.Add(err)
return nil
}

job := NewJob(tenant, tableName, idx.Path(), seriesMetas)
jobLogger := log.With(logger, "job", job.String())
c.metrics.compactionRunJobStarted.Inc()

if err := c.runCompact(ctx, jobLogger, job, bt, sc); err != nil {
err = c.runCompact(ctx, jobLogger, job, bt, sc)
if err != nil {
c.metrics.compactionRunJobFailed.Inc()
errs.Add(errors.Wrap(err, "runBloomCompact failed"))
return errs.Err()
} else {
c.metrics.compactionRunJobSuceeded.Inc()
}

c.metrics.compactionRunJobSuceeded.Inc()

return nil
}); err != nil {
errs.Add(err)
}
})

return errs.Err()
}
Expand Down

0 comments on commit efe32bb

Please sign in to comment.