Skip to content

Commit

Permalink
Move interfaces to the consumer side, return type
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p committed Oct 18, 2024
1 parent 2a71df4 commit 00516e1
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 39 deletions.
50 changes: 17 additions & 33 deletions pkg/experiment/metastore/blockcleaner/block_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.CompactedBlocksCleanupInterval, prefix+"compacted-blocks-cleanup-interval", time.Minute, "The interval at which block cleanup is performed.")
}

type CleanerLifecycler interface {
Cleaner

Start()
Stop()
}

type Cleaner interface {
MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error
IsMarked(blockId string) bool

RemoveExpiredBlocks(now int64) error
}

type RaftLog[Req, Resp proto.Message] interface {
ApplyCommand(req Req) (resp Resp, err error)
}
Expand Down Expand Up @@ -96,7 +82,7 @@ func newMetrics(reg prometheus.Registerer) *metrics {
return m
}

type blockCleaner struct {
type BlockCleaner struct {
blocks map[string]struct{}
blocksMu sync.Mutex

Expand All @@ -121,7 +107,7 @@ func New(
config *Config,
bkt objstore.Bucket,
reg prometheus.Registerer,
) CleanerLifecycler {
) *BlockCleaner {
return newBlockCleaner(raftLog, db, logger, config, bkt, reg)
}

Expand All @@ -132,8 +118,8 @@ func newBlockCleaner(
config *Config,
bkt objstore.Bucket,
reg prometheus.Registerer,
) *blockCleaner {
return &blockCleaner{
) *BlockCleaner {
return &BlockCleaner{
blocks: make(map[string]struct{}),
raftLog: raftLog,
db: db,
Expand All @@ -149,7 +135,7 @@ type blockRemovalContext struct {
expiryTs int64
}

func (c *blockCleaner) MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error {
func (c *BlockCleaner) MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error {
if c.IsMarked(blockId) {
return nil
}
Expand Down Expand Up @@ -177,14 +163,14 @@ func (c *blockCleaner) MarkBlock(shard uint32, tenant string, blockId string, de
return nil
}

func (c *blockCleaner) IsMarked(blockId string) bool {
func (c *BlockCleaner) IsMarked(blockId string) bool {
c.blocksMu.Lock()
defer c.blocksMu.Unlock()
_, ok := c.blocks[blockId]
return ok
}

func (c *blockCleaner) Start() {
func (c *BlockCleaner) Start() {
c.mu.Lock()
defer c.mu.Unlock()
if c.started {
Expand All @@ -199,7 +185,7 @@ func (c *blockCleaner) Start() {
level.Info(c.logger).Log("msg", "block cleaner started")
}

func (c *blockCleaner) Stop() {
func (c *BlockCleaner) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.started {
Expand All @@ -213,7 +199,7 @@ func (c *blockCleaner) Stop() {
level.Info(c.logger).Log("msg", "block cleaner stopped")
}

func (c *blockCleaner) loop(ctx context.Context) {
func (c *BlockCleaner) loop(ctx context.Context) {
t := time.NewTicker(c.cfg.CompactedBlocksCleanupInterval)
defer func() {
t.Stop()
Expand All @@ -223,17 +209,15 @@ func (c *blockCleaner) loop(ctx context.Context) {
case <-ctx.Done():
return
case <-t.C:
if c.isLeader {
_, err := c.raftLog.ApplyCommand(&raftlogpb.CleanBlocksRequest{})
if err != nil {
_ = level.Error(c.logger).Log("msg", "failed to apply truncate command", "err", err)
}
_, err := c.raftLog.ApplyCommand(&raftlogpb.CleanBlocksRequest{})
if err != nil {
_ = level.Error(c.logger).Log("msg", "failed to apply clean blocks command", "err", err)
}
}
}
}

func (c *blockCleaner) RemoveExpiredBlocks(now int64) error {
func (c *BlockCleaner) RemoveExpiredBlocks(now int64) error {
shards, err := c.listShards()
if err != nil {
panic(fmt.Errorf("failed to list shards for pending block removals: %w", err))
Expand All @@ -253,7 +237,7 @@ func (c *blockCleaner) RemoveExpiredBlocks(now int64) error {
return err
}

func (c *blockCleaner) listShards() ([]uint32, error) {
func (c *BlockCleaner) listShards() ([]uint32, error) {
shards := make([]uint32, 0)
err := c.db().View(func(tx *bbolt.Tx) error {
bkt, err := getPendingBlockRemovalsBucket(tx)
Expand All @@ -271,7 +255,7 @@ func (c *blockCleaner) listShards() ([]uint32, error) {
return shards, nil
}

func (c *blockCleaner) cleanShard(ctx context.Context, shard uint32, now int64) error {
func (c *BlockCleaner) cleanShard(ctx context.Context, shard uint32, now int64) error {
blocks, err := c.listBlocks(shard)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to list removed blocks for shard", "err", err, "shard", shard)
Expand Down Expand Up @@ -335,7 +319,7 @@ func (c *blockCleaner) cleanShard(ctx context.Context, shard uint32, now int64)
return nil
}

func (c *blockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext, error) {
func (c *BlockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext, error) {
blocks := make(map[string]*blockRemovalContext)
err := c.db().View(func(tx *bbolt.Tx) error {
bkt, err := getPendingBlockRemovalsBucket(tx)
Expand Down Expand Up @@ -364,7 +348,7 @@ func (c *blockCleaner) listBlocks(shard uint32) (map[string]*blockRemovalContext
return blocks, nil
}

func (c *blockCleaner) removeBlock(blockId string, shard uint32, removalContext *blockRemovalContext) error {
func (c *BlockCleaner) removeBlock(blockId string, shard uint32, removalContext *blockRemovalContext) error {
err := c.db().Update(func(tx *bbolt.Tx) error {
bkt, err := getPendingBlockRemovalsBucket(tx)
if err != nil {
Expand Down
12 changes: 9 additions & 3 deletions pkg/experiment/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (cfg *RaftConfig) Validate() error {
return nil
}

type RaftLeaderRoutine interface {
Start()
Stop()
}

type Metastore struct {
service services.Service
metastorev1.MetastoreServiceServer
Expand Down Expand Up @@ -146,7 +151,7 @@ type Metastore struct {
placementMgr *adaptiveplacement.Manager
dnsProvider *dns.Provider
dlq *dlq.Recovery
blockCleaner blockcleaner.CleanerLifecycler
blockCleaner RaftLeaderRoutine
}

func New(
Expand All @@ -168,15 +173,16 @@ func New(
placementMgr: placementMgr,
}
m.leaderhealth = raftleader.NewRaftLeaderHealthObserver(logger, reg)
m.blockCleaner = blockcleaner.New(
blockCleaner := blockcleaner.New(
m,
func() *bbolt.DB { return m.db.boltdb },
m.logger,
&m.config.BlockCleaner,
bucket,
reg,
)
m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index, m.blockCleaner)
m.blockCleaner = blockCleaner
m.state = newMetastoreState(m.logger, m.db, m.reg, &m.config.Compaction, &m.config.Index, blockCleaner)
m.dlq = dlq.NewRecovery(dlq.RecoveryConfig{
Period: config.DLQRecoveryPeriod,
}, logger, m, bucket)
Expand Down
11 changes: 8 additions & 3 deletions pkg/experiment/metastore/metastore_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/bbolt"

"github.com/grafana/pyroscope/pkg/experiment/metastore/blockcleaner"
"github.com/grafana/pyroscope/pkg/experiment/metastore/compactionpb"
"github.com/grafana/pyroscope/pkg/experiment/metastore/index"
)
Expand All @@ -25,14 +24,20 @@ type tenantShard struct {
shard uint32
}

type BlockCleaner interface {
MarkBlock(shard uint32, tenant string, blockId string, deletedTs int64) error
IsMarked(blockId string) bool
RemoveExpiredBlocks(now int64) error
}

type metastoreState struct {
logger log.Logger
compactionMetrics *compactionMetrics
compactionConfig *CompactionConfig
indexConfig *index.Config

index *index.Index
blockCleaner blockcleaner.Cleaner
blockCleaner BlockCleaner

compactionMutex sync.Mutex
compactionJobBlockQueues map[tenantShard]*compactionJobBlockQueue
Expand All @@ -52,7 +57,7 @@ func newMetastoreState(
reg prometheus.Registerer,
compactionCfg *CompactionConfig,
indexCfg *index.Config,
blockCleaner blockcleaner.Cleaner,
blockCleaner BlockCleaner,
) *metastoreState {
return &metastoreState{
logger: logger,
Expand Down

0 comments on commit 00516e1

Please sign in to comment.