Skip to content

Commit

Permalink
Load partition meta on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p committed Oct 1, 2024
1 parent b398012 commit c5f46ce
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 90 deletions.
33 changes: 18 additions & 15 deletions pkg/experiment/metastore/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type BlockWithPartition struct {

type Store interface {
ListPartitions() []PartitionKey
ReadPartitionMeta(p PartitionKey) (*PartitionMeta, error)

ListShards(p PartitionKey) []uint32
ListTenants(p PartitionKey, shard uint32) []string
ListBlocks(p PartitionKey, shard uint32, tenant string) []*metastorev1.BlockMeta
Expand Down Expand Up @@ -112,17 +110,30 @@ func (i *Index) LoadPartitions() {

i.allPartitions = make([]*PartitionMeta, 0)
for _, key := range i.store.ListPartitions() {
pMeta, err := i.store.ReadPartitionMeta(key)
if err != nil {
level.Error(i.logger).Log("msg", "error reading partition metadata", "key", key, "err", err)
continue
}
pMeta := i.loadPartitionMeta(key)
i.allPartitions = append(i.allPartitions, pMeta)
}

i.sortPartitions()
}

func (i *Index) loadPartitionMeta(key PartitionKey) *PartitionMeta {
t, dur, _ := key.Parse()
pMeta := &PartitionMeta{
Key: key,
Ts: t,
Duration: dur,
Tenants: make([]string, 0),
tenantMap: make(map[string]struct{}),
}
for _, s := range i.store.ListShards(key) {
for _, t := range i.store.ListTenants(key, s) {
pMeta.AddTenant(t)
}
}
return pMeta
}

// ForEachPartition executes the given function concurrently for each partition. It will be called for all partitions,
// regardless if they are fully loaded in memory or not.
func (i *Index) ForEachPartition(ctx context.Context, fn func(meta *PartitionMeta) error) error {
Expand Down Expand Up @@ -248,14 +259,6 @@ func (i *Index) insertBlock(b *metastorev1.BlockMeta) {
ten.blocks[b.Id] = b
}

// GetOrCreatePartitionMeta creates the mapping between blocks and partitions. It may assign the block to an existing
// partition or create a new partition altogether. Meant to be used only in the context of new blocks.
func (i *Index) GetOrCreatePartitionMeta(b *metastorev1.BlockMeta) *PartitionMeta {
i.partitionMu.Lock()
defer i.partitionMu.Unlock()
return i.getOrCreatePartitionMeta(b)
}

func (i *Index) getOrCreatePartitionMeta(b *metastorev1.BlockMeta) *PartitionMeta {
key := i.CreatePartitionKey(b.Id)
meta := i.findPartitionMeta(key)
Expand Down
26 changes: 0 additions & 26 deletions pkg/experiment/metastore/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,6 @@ func TestIndex_FindBlocksInRange(t *testing.T) {
}

func mockPartition(store *mockindex.MockStore, key index.PartitionKey, blocks []*metastorev1.BlockMeta) {
t, d, _ := key.Parse()
store.On("ReadPartitionMeta", key).Return(&index.PartitionMeta{
Key: key,
Ts: t,
Duration: d,
}, nil).Maybe()
store.On("ListShards", key).Return([]uint32{0}).Maybe()
store.On("ListTenants", key, uint32(0)).Return([]string{""}).Maybe()
store.On("ListBlocks", key, uint32(0), "").Return(blocks).Maybe()
Expand Down Expand Up @@ -156,20 +150,6 @@ func TestIndex_ForEachPartition(t *testing.T) {
require.Len(t, visited, 5)
}

func TestIndex_GetOrCreatePartitionMeta(t *testing.T) {
store := mockindex.NewMockStore(t)
i := index.NewIndex(store, util.Logger, &index.Config{PartitionDuration: time.Hour, PartitionCacheSize: 1})

block := &metastorev1.BlockMeta{
Id: createUlidString("2024-09-23T08:00:00.123Z"),
TenantId: "tenant-1",
}
pMeta := i.GetOrCreatePartitionMeta(block)
require.Equal(t, index.PartitionKey("20240923T08.1h"), pMeta.Key)
require.Equal(t, time.UnixMilli(createTime("2024-09-23T08:00:00.000Z")).UTC(), pMeta.Ts)
require.Equal(t, []string{"tenant-1"}, pMeta.Tenants)
}

func TestIndex_GetPartitionKey(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -269,12 +249,6 @@ func TestIndex_LoadPartitions(t *testing.T) {

partitionKey := i.CreatePartitionKey(blocks[0].Id)
store.On("ListPartitions").Return([]index.PartitionKey{partitionKey})
store.On("ReadPartitionMeta", mock.Anything).Return(&index.PartitionMeta{
Key: partitionKey,
Ts: time.Now().UTC(),
Duration: time.Hour,
Tenants: []string{""},
}, nil)
store.On("ListShards", mock.Anything).Return([]uint32{0})
store.On("ListTenants", mock.Anything, mock.Anything).Return([]string{""})
store.On("ListBlocks", mock.Anything, mock.Anything, mock.Anything).Return(blocks)
Expand Down
51 changes: 5 additions & 46 deletions pkg/experiment/metastore/metastore_block_store.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package metastore

import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"slices"

Expand All @@ -20,12 +18,10 @@ type indexStore struct {

const (
partitionBucketName = "partition"
partitionMetaKeyName = "meta"
emptyTenantBucketName = "-"
)

var partitionBucketNameBytes = []byte(partitionBucketName)
var partitionMetaKeyNameBytes = []byte(partitionMetaKeyName)
var emptyTenantBucketNameBytes = []byte(emptyTenantBucketName)

func getPartitionBucket(tx *bbolt.Tx) (*bbolt.Bucket, error) {
Expand Down Expand Up @@ -55,31 +51,6 @@ func (m *indexStore) ListPartitions() []index.PartitionKey {
return partitionKeys
}

func (m *indexStore) ReadPartitionMeta(key index.PartitionKey) (*index.PartitionMeta, error) {
var meta index.PartitionMeta
err := m.db.View(func(tx *bbolt.Tx) error {
bkt, err := getPartitionBucket(tx)
if err != nil {
return errors.Wrap(err, "root partition bucket missing")
}
partBkt := bkt.Bucket([]byte(key))
if partBkt == nil {
return fmt.Errorf("partition meta not found for %s", key)
}
data := partBkt.Get(partitionMetaKeyNameBytes)
dec := gob.NewDecoder(bytes.NewReader(data))
err = dec.Decode(&meta)
if err != nil {
return errors.Wrapf(err, "failed to read partition meta for %s", key)
}
return nil
})
if err != nil {
return nil, err
}
return &meta, nil
}

func (m *indexStore) ListShards(key index.PartitionKey) []uint32 {
shards := make([]uint32, 0)
err := m.db.View(func(tx *bbolt.Tx) error {
Expand Down Expand Up @@ -174,34 +145,22 @@ func (m *indexStore) ListBlocks(key index.PartitionKey, shard uint32, tenant str
return blocks
}

func updateBlockMetadataBucket(tx *bbolt.Tx, partitionMeta *index.PartitionMeta, shard uint32, tenant string, fn func(*bbolt.Bucket) error) error {
func updateBlockMetadataBucket(tx *bbolt.Tx, partKey index.PartitionKey, shard uint32, tenant string, fn func(*bbolt.Bucket) error) error {
bkt, err := getPartitionBucket(tx)
if err != nil {
return errors.Wrap(err, "root partition bucket missing")
}

partBkt, err := getOrCreateSubBucket(bkt, []byte(partitionMeta.Key))
if err != nil {
return errors.Wrapf(err, "error creating partition bucket for %s", partitionMeta.Key)
}

var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err = enc.Encode(partitionMeta)
if err != nil {
return errors.Wrapf(err, "could not encode partition meta for %s", partitionMeta.Key)
}

err = partBkt.Put(partitionMetaKeyNameBytes, buf.Bytes())
partBkt, err := getOrCreateSubBucket(bkt, []byte(partKey))
if err != nil {
return errors.Wrapf(err, "could not write partition meta for %s", partitionMeta.Key)
return errors.Wrapf(err, "error creating partition bucket for %s", partKey)
}

shardBktName := make([]byte, 4)
binary.BigEndian.PutUint32(shardBktName, shard)
shardBkt, err := getOrCreateSubBucket(partBkt, shardBktName)
if err != nil {
return errors.Wrapf(err, "error creating shard bucket for partiton %s and shard %d", partitionMeta.Key, shard)
return errors.Wrapf(err, "error creating shard bucket for partiton %s and shard %d", partKey, shard)
}

tenantBktName := []byte(tenant)
Expand All @@ -210,7 +169,7 @@ func updateBlockMetadataBucket(tx *bbolt.Tx, partitionMeta *index.PartitionMeta,
}
tenantBkt, err := getOrCreateSubBucket(shardBkt, tenantBktName)
if err != nil {
return errors.Wrapf(err, "error creating tenant bucket for partition %s, shard %d and tenant %s", partitionMeta.Key, shard, tenant)
return errors.Wrapf(err, "error creating tenant bucket for partition %s, shard %d and tenant %s", partKey, shard, tenant)
}

return fn(tenantBkt)
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/metastore/metastore_state_add_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ func (m *metastoreState) persistBlock(tx *bbolt.Tx, block *metastorev1.BlockMeta
return err
}

partMeta := m.index.GetOrCreatePartitionMeta(block)
partKey := m.index.CreatePartitionKey(block.Id)

return updateBlockMetadataBucket(tx, partMeta, block.Shard, block.TenantId, func(bucket *bbolt.Bucket) error {
return updateBlockMetadataBucket(tx, partKey, block.Shard, block.TenantId, func(bucket *bbolt.Bucket) error {
return bucket.Put(key, value)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (m *metastoreState) writeToDb(sTable *pollStateUpdate) error {
func (m *metastoreState) deleteBlock(tx *bbolt.Tx, blockWithPartition *index.BlockWithPartition) error {
meta := blockWithPartition.Meta
block := blockWithPartition.Block
return updateBlockMetadataBucket(tx, meta, block.Shard, block.TenantId, func(bucket *bbolt.Bucket) error {
return updateBlockMetadataBucket(tx, meta.Key, block.Shard, block.TenantId, func(bucket *bbolt.Bucket) error {
return bucket.Delete([]byte(block.Id))
})
}

0 comments on commit c5f46ce

Please sign in to comment.