From 89fd482a6acad8c9bae2d05caf1b440a9df24931 Mon Sep 17 00:00:00 2001 From: Aleksandar Petrov <8142643+aleks-p@users.noreply.github.com> Date: Fri, 18 Oct 2024 10:55:56 -0300 Subject: [PATCH] Load deletion markers at startup --- .../metastore/blockcleaner/block_cleaner.go | 49 ++++++++++++++++--- .../blockcleaner/block_cleaner_test.go | 10 +--- pkg/experiment/metastore/metastore.go | 31 ++++++------ .../metastore/raftleader/raftleader.go | 5 ++ .../values-micro-services-experiment.yaml | 1 + 5 files changed, 63 insertions(+), 33 deletions(-) diff --git a/pkg/experiment/metastore/blockcleaner/block_cleaner.go b/pkg/experiment/metastore/blockcleaner/block_cleaner.go index 7f143e4ae0..f70868bb3a 100644 --- a/pkg/experiment/metastore/blockcleaner/block_cleaner.go +++ b/pkg/experiment/metastore/blockcleaner/block_cleaner.go @@ -19,6 +19,7 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" + "github.com/grafana/pyroscope/pkg/experiment/metastore/raftleader" "github.com/grafana/pyroscope/pkg/experiment/metastore/raftlogpb" "github.com/grafana/pyroscope/pkg/util" ) @@ -39,6 +40,11 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.CompactedBlocksCleanupInterval, prefix+"compacted-blocks-cleanup-interval", time.Minute, "The interval at which block cleanup is performed.") } +type CleanerLifecycler interface { + raftleader.LeaderRoutine + LoadMarkers() +} + type RaftLog[Req, Resp proto.Message] interface { ApplyCommand(req Req) (resp Resp, err error) } @@ -87,7 +93,7 @@ type BlockCleaner struct { blocksMu sync.Mutex raftLog RaftLogCleanBlocks - db func() *bbolt.DB + db *bbolt.DB bkt objstore.Bucket logger log.Logger cfg *Config @@ -102,7 +108,7 @@ type BlockCleaner struct { func New( raftLog RaftLogCleanBlocks, - db func() *bbolt.DB, + db *bbolt.DB, logger log.Logger, config *Config, bkt objstore.Bucket, @@ -113,7 +119,7 @@ func New( func newBlockCleaner( raftLog RaftLogCleanBlocks, - db func() *bbolt.DB, + db *bbolt.DB, logger log.Logger, config *Config, bkt objstore.Bucket, @@ -135,11 +141,38 @@ type blockRemovalContext struct { expiryTs int64 } +func (c *BlockCleaner) LoadMarkers() { + c.mu.Lock() + defer c.mu.Unlock() + + _ = c.db.View(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(removedBlocksBucketNameBytes) + if bkt == nil { + return nil + } + return bkt.ForEachBucket(func(k []byte) error { + shardBkt := bkt.Bucket(k) + if shardBkt == nil { + return nil + } + return shardBkt.ForEach(func(k, v []byte) error { + if len(k) < 34 { + return fmt.Errorf("block key too short (expected 34 chars, was %d)", len(k)) + } + blockId := string(k[:26]) + c.blocks[blockId] = struct{}{} + return nil + }) + }) + }) + level.Info(c.logger).Log("msg", "loaded metastore block deletion markers", "marker_count", len(c.blocks)) +} + func (c *BlockCleaner) MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error { if c.IsMarked(blockId) { return nil } - err := c.db().Update(func(tx *bbolt.Tx) error { + err := c.db.Update(func(tx *bbolt.Tx) error { bkt, err := tx.CreateBucketIfNotExists(removedBlocksBucketNameBytes) if err != nil { return err @@ -148,7 +181,7 @@ func (c *BlockCleaner) MarkBlock(shard uint32, tenant string, blockId string, de if err != nil { return err } - expiryTs := deletedTs + time.Duration(c.cfg.CompactedBlocksCleanupDelay).Milliseconds() + expiryTs := deletedTs + c.cfg.CompactedBlocksCleanupDelay.Milliseconds() blockKey := getBlockKey(blockId, expiryTs, tenant) return shardBkt.Put(blockKey, []byte{}) @@ -239,7 +272,7 @@ func (c *BlockCleaner) RemoveExpiredBlocks(now int64) error { func (c *BlockCleaner) listShards() ([]uint32, error) { shards := make([]uint32, 0) - err := c.db().View(func(tx *bbolt.Tx) error { + err := c.db.View(func(tx *bbolt.Tx) error { bkt, err := getPendingBlockRemovalsBucket(tx) if err != nil { return err @@ -321,7 +354,7 @@ func (c *BlockCleaner) cleanShard(ctx context.Context, shard uint32, now int64) func (c *BlockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext, error) { blocks := make(map[string]*blockRemovalContext) - err := c.db().View(func(tx *bbolt.Tx) error { + err := c.db.View(func(tx *bbolt.Tx) error { bkt, err := getPendingBlockRemovalsBucket(tx) if err != nil { return err @@ -349,7 +382,7 @@ func (c *BlockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext } func (c *BlockCleaner) removeBlock(blockId string, shard uint32, removalContext *blockRemovalContext) error { - err := c.db().Update(func(tx *bbolt.Tx) error { + err := c.db.Update(func(tx *bbolt.Tx) error { bkt, err := getPendingBlockRemovalsBucket(tx) if err != nil { return err diff --git a/pkg/experiment/metastore/blockcleaner/block_cleaner_test.go b/pkg/experiment/metastore/blockcleaner/block_cleaner_test.go index 5c5a143005..f48025d89b 100644 --- a/pkg/experiment/metastore/blockcleaner/block_cleaner_test.go +++ b/pkg/experiment/metastore/blockcleaner/block_cleaner_test.go @@ -18,10 +18,7 @@ import ( ) func Test_AddAndCheck(t *testing.T) { - db := createDb(t) - cleaner := newBlockCleaner(nil, func() *bbolt.DB { - return db - }, util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil) + cleaner := newBlockCleaner(nil, createDb(t), util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil) blockId := ulid.MustNew(ulid.Now(), rand.Reader).String() err := cleaner.MarkBlock(0, "tenant", blockId, 1000) @@ -31,10 +28,7 @@ func Test_AddAndCheck(t *testing.T) { } func Test_AddAndRemove(t *testing.T) { - db := createDb(t) - cleaner := newBlockCleaner(nil, func() *bbolt.DB { - return db - }, util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil) + cleaner := newBlockCleaner(nil, createDb(t), util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil) cleaner.isLeader = true blockId := ulid.MustNew(ulid.Now(), rand.Reader).String() diff --git a/pkg/experiment/metastore/metastore.go b/pkg/experiment/metastore/metastore.go index 8a0cae784d..d4d70cdf2f 100644 --- a/pkg/experiment/metastore/metastore.go +++ b/pkg/experiment/metastore/metastore.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" - "go.etcd.io/bbolt" compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" @@ -108,11 +107,6 @@ func (cfg *RaftConfig) Validate() error { return nil } -type RaftLeaderRoutine interface { - Start() - Stop() -} - type Metastore struct { service services.Service metastorev1.MetastoreServiceServer @@ -122,6 +116,7 @@ type Metastore struct { config Config logger log.Logger reg prometheus.Registerer + bucket objstore.Bucket // In-memory state. state *metastoreState @@ -151,7 +146,7 @@ type Metastore struct { placementMgr *adaptiveplacement.Manager dnsProvider *dns.Provider dlq *dlq.Recovery - blockCleaner RaftLeaderRoutine + blockCleaner blockcleaner.CleanerLifecycler } func New( @@ -167,22 +162,13 @@ func New( config: config, logger: logger, reg: reg, + bucket: bucket, db: newDB(config, logger, metrics), metrics: metrics, client: client, placementMgr: placementMgr, } m.leaderhealth = raftleader.NewRaftLeaderHealthObserver(logger, reg) - blockCleaner := blockcleaner.New( - m, - func() *bbolt.DB { return m.db.boltdb }, - m.logger, - &m.config.BlockCleaner, - bucket, - reg, - ) - m.blockCleaner = blockCleaner - m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index, blockCleaner) m.dlq = dlq.NewRecovery(dlq.RecoveryConfig{ Period: config.DLQRecoveryPeriod, }, logger, m, bucket) @@ -204,6 +190,17 @@ func (m *Metastore) starting(context.Context) error { if err := m.db.open(false); err != nil { return fmt.Errorf("failed to initialize database: %w", err) } + blockCleaner := blockcleaner.New( + m, + m.db.boltdb, + m.logger, + &m.config.BlockCleaner, + m.bucket, + m.reg, + ) + blockCleaner.LoadMarkers() + m.blockCleaner = blockCleaner + m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index, blockCleaner) if err := m.initRaft(); err != nil { return fmt.Errorf("failed to initialize raft: %w", err) } diff --git a/pkg/experiment/metastore/raftleader/raftleader.go b/pkg/experiment/metastore/raftleader/raftleader.go index 3c15c80f0c..360a9d329f 100644 --- a/pkg/experiment/metastore/raftleader/raftleader.go +++ b/pkg/experiment/metastore/raftleader/raftleader.go @@ -36,6 +36,11 @@ func newMetrics(reg prometheus.Registerer) *metrics { return m } +type LeaderRoutine interface { + Start() + Stop() +} + func NewRaftLeaderHealthObserver(logger log.Logger, reg prometheus.Registerer) *LeaderObserver { return &LeaderObserver{ logger: logger, diff --git a/tools/dev/experiment/values-micro-services-experiment.yaml b/tools/dev/experiment/values-micro-services-experiment.yaml index db588fa2e1..2d0dfa4504 100644 --- a/tools/dev/experiment/values-micro-services-experiment.yaml +++ b/tools/dev/experiment/values-micro-services-experiment.yaml @@ -13,6 +13,7 @@ pyroscope: write-path: "segment-writer" enable-query-backend: "true" querier.max-query-length: '7d' + metastore.block-cleaner.compacted-blocks-cleanup-delay: '1m' components: distributor: