Skip to content

Commit

Permalink
Load deletion markers at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p committed Oct 18, 2024
1 parent 00516e1 commit 89fd482
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 33 deletions.
49 changes: 41 additions & 8 deletions pkg/experiment/metastore/blockcleaner/block_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/grafana/pyroscope/pkg/experiment/metastore/raftleader"
"github.com/grafana/pyroscope/pkg/experiment/metastore/raftlogpb"
"github.com/grafana/pyroscope/pkg/util"
)
Expand All @@ -39,6 +40,11 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.CompactedBlocksCleanupInterval, prefix+"compacted-blocks-cleanup-interval", time.Minute, "The interval at which block cleanup is performed.")
}

type CleanerLifecycler interface {
raftleader.LeaderRoutine
LoadMarkers()
}

type RaftLog[Req, Resp proto.Message] interface {
ApplyCommand(req Req) (resp Resp, err error)
}
Expand Down Expand Up @@ -87,7 +93,7 @@ type BlockCleaner struct {
blocksMu sync.Mutex

raftLog RaftLogCleanBlocks
db func() *bbolt.DB
db *bbolt.DB
bkt objstore.Bucket
logger log.Logger
cfg *Config
Expand All @@ -102,7 +108,7 @@ type BlockCleaner struct {

func New(
raftLog RaftLogCleanBlocks,
db func() *bbolt.DB,
db *bbolt.DB,
logger log.Logger,
config *Config,
bkt objstore.Bucket,
Expand All @@ -113,7 +119,7 @@ func New(

func newBlockCleaner(
raftLog RaftLogCleanBlocks,
db func() *bbolt.DB,
db *bbolt.DB,
logger log.Logger,
config *Config,
bkt objstore.Bucket,
Expand All @@ -135,11 +141,38 @@ type blockRemovalContext struct {
expiryTs int64
}

func (c *BlockCleaner) LoadMarkers() {
c.mu.Lock()
defer c.mu.Unlock()

_ = c.db.View(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(removedBlocksBucketNameBytes)
if bkt == nil {
return nil
}
return bkt.ForEachBucket(func(k []byte) error {
shardBkt := bkt.Bucket(k)
if shardBkt == nil {
return nil
}
return shardBkt.ForEach(func(k, v []byte) error {
if len(k) < 34 {
return fmt.Errorf("block key too short (expected 34 chars, was %d)", len(k))
}
blockId := string(k[:26])
c.blocks[blockId] = struct{}{}
return nil
})
})
})
level.Info(c.logger).Log("msg", "loaded metastore block deletion markers", "marker_count", len(c.blocks))
}

func (c *BlockCleaner) MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error {
if c.IsMarked(blockId) {
return nil
}
err := c.db().Update(func(tx *bbolt.Tx) error {
err := c.db.Update(func(tx *bbolt.Tx) error {
bkt, err := tx.CreateBucketIfNotExists(removedBlocksBucketNameBytes)
if err != nil {
return err
Expand All @@ -148,7 +181,7 @@ func (c *BlockCleaner) MarkBlock(shard uint32, tenant string, blockId string, de
if err != nil {
return err
}
expiryTs := deletedTs + time.Duration(c.cfg.CompactedBlocksCleanupDelay).Milliseconds()
expiryTs := deletedTs + c.cfg.CompactedBlocksCleanupDelay.Milliseconds()
blockKey := getBlockKey(blockId, expiryTs, tenant)

return shardBkt.Put(blockKey, []byte{})
Expand Down Expand Up @@ -239,7 +272,7 @@ func (c *BlockCleaner) RemoveExpiredBlocks(now int64) error {

func (c *BlockCleaner) listShards() ([]uint32, error) {
shards := make([]uint32, 0)
err := c.db().View(func(tx *bbolt.Tx) error {
err := c.db.View(func(tx *bbolt.Tx) error {
bkt, err := getPendingBlockRemovalsBucket(tx)
if err != nil {
return err
Expand Down Expand Up @@ -321,7 +354,7 @@ func (c *BlockCleaner) cleanShard(ctx context.Context, shard uint32, now int64)

func (c *BlockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext, error) {
blocks := make(map[string]*blockRemovalContext)
err := c.db().View(func(tx *bbolt.Tx) error {
err := c.db.View(func(tx *bbolt.Tx) error {
bkt, err := getPendingBlockRemovalsBucket(tx)
if err != nil {
return err
Expand Down Expand Up @@ -349,7 +382,7 @@ func (c *BlockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext
}

func (c *BlockCleaner) removeBlock(blockId string, shard uint32, removalContext *blockRemovalContext) error {
err := c.db().Update(func(tx *bbolt.Tx) error {
err := c.db.Update(func(tx *bbolt.Tx) error {
bkt, err := getPendingBlockRemovalsBucket(tx)
if err != nil {
return err
Expand Down
10 changes: 2 additions & 8 deletions pkg/experiment/metastore/blockcleaner/block_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import (
)

func Test_AddAndCheck(t *testing.T) {
db := createDb(t)
cleaner := newBlockCleaner(nil, func() *bbolt.DB {
return db
}, util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil)
cleaner := newBlockCleaner(nil, createDb(t), util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil)

blockId := ulid.MustNew(ulid.Now(), rand.Reader).String()
err := cleaner.MarkBlock(0, "tenant", blockId, 1000)
Expand All @@ -31,10 +28,7 @@ func Test_AddAndCheck(t *testing.T) {
}

func Test_AddAndRemove(t *testing.T) {
db := createDb(t)
cleaner := newBlockCleaner(nil, func() *bbolt.DB {
return db
}, util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil)
cleaner := newBlockCleaner(nil, createDb(t), util.Logger, &Config{CompactedBlocksCleanupDelay: time.Second * 2}, memory.NewInMemBucket(), nil)
cleaner.isLeader = true

blockId := ulid.MustNew(ulid.Now(), rand.Reader).String()
Expand Down
31 changes: 14 additions & 17 deletions pkg/experiment/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"go.etcd.io/bbolt"

compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1"
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
Expand Down Expand Up @@ -108,11 +107,6 @@ func (cfg *RaftConfig) Validate() error {
return nil
}

type RaftLeaderRoutine interface {
Start()
Stop()
}

type Metastore struct {
service services.Service
metastorev1.MetastoreServiceServer
Expand All @@ -122,6 +116,7 @@ type Metastore struct {
config Config
logger log.Logger
reg prometheus.Registerer
bucket objstore.Bucket

// In-memory state.
state *metastoreState
Expand Down Expand Up @@ -151,7 +146,7 @@ type Metastore struct {
placementMgr *adaptiveplacement.Manager
dnsProvider *dns.Provider
dlq *dlq.Recovery
blockCleaner RaftLeaderRoutine
blockCleaner blockcleaner.CleanerLifecycler
}

func New(
Expand All @@ -167,22 +162,13 @@ func New(
config: config,
logger: logger,
reg: reg,
bucket: bucket,
db: newDB(config, logger, metrics),
metrics: metrics,
client: client,
placementMgr: placementMgr,
}
m.leaderhealth = raftleader.NewRaftLeaderHealthObserver(logger, reg)
blockCleaner := blockcleaner.New(
m,
func() *bbolt.DB { return m.db.boltdb },
m.logger,
&m.config.BlockCleaner,
bucket,
reg,
)
m.blockCleaner = blockCleaner
m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index, blockCleaner)
m.dlq = dlq.NewRecovery(dlq.RecoveryConfig{
Period: config.DLQRecoveryPeriod,
}, logger, m, bucket)
Expand All @@ -204,6 +190,17 @@ func (m *Metastore) starting(context.Context) error {
if err := m.db.open(false); err != nil {
return fmt.Errorf("failed to initialize database: %w", err)
}
blockCleaner := blockcleaner.New(
m,
m.db.boltdb,
m.logger,
&m.config.BlockCleaner,
m.bucket,
m.reg,
)
blockCleaner.LoadMarkers()
m.blockCleaner = blockCleaner
m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index, blockCleaner)
if err := m.initRaft(); err != nil {
return fmt.Errorf("failed to initialize raft: %w", err)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/experiment/metastore/raftleader/raftleader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func newMetrics(reg prometheus.Registerer) *metrics {
return m
}

type LeaderRoutine interface {
Start()
Stop()
}

func NewRaftLeaderHealthObserver(logger log.Logger, reg prometheus.Registerer) *LeaderObserver {
return &LeaderObserver{
logger: logger,
Expand Down
1 change: 1 addition & 0 deletions tools/dev/experiment/values-micro-services-experiment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pyroscope:
write-path: "segment-writer"
enable-query-backend: "true"
querier.max-query-length: '7d'
metastore.block-cleaner.compacted-blocks-cleanup-delay: '1m'

components:
distributor:
Expand Down

0 comments on commit 89fd482

Please sign in to comment.