diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 31f73740c1ff7..c9b3ecae35c2c 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" ) @@ -178,7 +179,7 @@ func (s *SimpleBloomController) loadWorkForGap(id tsdb.Identifier, gap gapWithBl type gapWithBlocks struct { bounds v1.FingerprintBounds - blocks []BlockRef + blocks []bloomshipper.BlockRef } // blockPlan is a plan for all the work needed to build a meta.json @@ -220,7 +221,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) { } for _, block := range meta.Blocks { - if block.OwnershipRange.Intersection(gap) == nil { + if block.Bounds.Intersection(gap) == nil { // this block doesn't overlap the gap, skip continue } @@ -232,27 +233,27 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) { // ensure we sort blocks so deduping iterator works as expected sort.Slice(planGap.blocks, func(i, j int) bool { - return planGap.blocks[i].OwnershipRange.Less(planGap.blocks[j].OwnershipRange) + return planGap.blocks[i].Bounds.Less(planGap.blocks[j].Bounds) }) - peekingBlocks := v1.NewPeekingIter[BlockRef]( - v1.NewSliceIter[BlockRef]( + peekingBlocks := v1.NewPeekingIter[bloomshipper.BlockRef]( + v1.NewSliceIter[bloomshipper.BlockRef]( planGap.blocks, ), ) // dedupe blocks which could be in multiple metas - itr := v1.NewDedupingIter[BlockRef, BlockRef]( - func(a, b BlockRef) bool { + itr := v1.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef]( + func(a, b bloomshipper.BlockRef) bool { return a == b }, - v1.Identity[BlockRef], - func(a, _ BlockRef) BlockRef { + v1.Identity[bloomshipper.BlockRef], + func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef { return a }, peekingBlocks, ) - deduped, err := v1.Collect[BlockRef](itr) + deduped, err := v1.Collect[bloomshipper.BlockRef](itr) if err != nil { return nil, errors.Wrap(err, "failed to dedupe blocks") } diff --git a/pkg/bloomcompactor/controller_test.go b/pkg/bloomcompactor/controller_test.go index 9f3f56153af32..1f89a0e318efd 100644 --- a/pkg/bloomcompactor/controller_test.go +++ b/pkg/bloomcompactor/controller_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" ) @@ -119,7 +120,7 @@ func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier { } } -func genMeta(min, max model.Fingerprint, sources []int, blocks []BlockRef) Meta { +func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) Meta { m := Meta{ OwnershipRange: v1.NewBounds(min, max), Blocks: blocks, @@ -224,10 +225,12 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { } } -func genBlockRef(min, max model.Fingerprint) BlockRef { +func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef { bounds := v1.NewBounds(min, max) - return BlockRef{ - OwnershipRange: bounds, + return bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + Bounds: bounds, + }, } } @@ -245,7 +248,7 @@ func Test_blockPlansForGaps(t *testing.T) { ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, metas: []Meta{ - genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(11, 20)}), + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}), }, exp: []blockPlan{ { @@ -263,7 +266,7 @@ func Test_blockPlansForGaps(t *testing.T) { ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, metas: []Meta{ - genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(9, 20)}), + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), }, exp: []blockPlan{ { @@ -271,7 +274,7 @@ func Test_blockPlansForGaps(t *testing.T) { gaps: []gapWithBlocks{ { bounds: v1.NewBounds(0, 10), - blocks: []BlockRef{genBlockRef(9, 20)}, + blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)}, }, }, }, @@ -285,8 +288,8 @@ func Test_blockPlansForGaps(t *testing.T) { ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, metas: []Meta{ - genMeta(9, 20, []int{0}, []BlockRef{genBlockRef(9, 20)}), // block for same tsdb - genMeta(9, 20, []int{1}, []BlockRef{genBlockRef(9, 20)}), // block for different tsdb + genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb }, exp: []blockPlan{ { @@ -304,8 +307,8 @@ func Test_blockPlansForGaps(t *testing.T) { ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, metas: []Meta{ - genMeta(9, 20, []int{0}, []BlockRef{genBlockRef(9, 20)}), // block for same tsdb - genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(5, 20)}), // block for different tsdb + genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb + genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb }, exp: []blockPlan{ { @@ -313,7 +316,7 @@ func Test_blockPlansForGaps(t *testing.T) { gaps: []gapWithBlocks{ { bounds: v1.NewBounds(0, 8), - blocks: []BlockRef{genBlockRef(5, 20)}, + blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)}, }, }, }, @@ -324,14 +327,14 @@ func Test_blockPlansForGaps(t *testing.T) { ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs metas: []Meta{ - genMeta(0, 2, []int{0}, []BlockRef{ + genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{ genBlockRef(0, 1), genBlockRef(1, 2), }), // tsdb_0 - genMeta(6, 8, []int{0}, []BlockRef{genBlockRef(6, 8)}), // tsdb_0 + genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0 - genMeta(3, 5, []int{1}, []BlockRef{genBlockRef(3, 5)}), // tsdb_1 - genMeta(8, 10, []int{1}, []BlockRef{genBlockRef(8, 10)}), // tsdb_1 + genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1 + genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1 }, exp: []blockPlan{ { @@ -340,11 +343,11 @@ func Test_blockPlansForGaps(t *testing.T) { // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) { bounds: v1.NewBounds(3, 5), - blocks: []BlockRef{genBlockRef(3, 5)}, + blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)}, }, { bounds: v1.NewBounds(9, 10), - blocks: []BlockRef{genBlockRef(8, 10)}, + blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)}, }, }, }, @@ -354,14 +357,14 @@ func Test_blockPlansForGaps(t *testing.T) { gaps: []gapWithBlocks{ { bounds: v1.NewBounds(0, 2), - blocks: []BlockRef{ + blocks: []bloomshipper.BlockRef{ genBlockRef(0, 1), genBlockRef(1, 2), }, }, { bounds: v1.NewBounds(6, 7), - blocks: []BlockRef{genBlockRef(6, 8)}, + blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)}, }, }, }, @@ -372,11 +375,11 @@ func Test_blockPlansForGaps(t *testing.T) { ownershipRange: v1.NewBounds(0, 10), tsdbs: []tsdb.Identifier{tsdbID(0)}, metas: []Meta{ - genMeta(9, 20, []int{1}, []BlockRef{ + genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{ genBlockRef(1, 4), genBlockRef(9, 20), }), // blocks for first diff tsdb - genMeta(5, 20, []int{2}, []BlockRef{ + genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{ genBlockRef(5, 10), genBlockRef(9, 20), // same block references in prior meta (will be deduped) }), // block for second diff tsdb @@ -387,7 +390,7 @@ func Test_blockPlansForGaps(t *testing.T) { gaps: []gapWithBlocks{ { bounds: v1.NewBounds(0, 10), - blocks: []BlockRef{ + blocks: []bloomshipper.BlockRef{ genBlockRef(1, 4), genBlockRef(5, 10), genBlockRef(9, 20), diff --git a/pkg/bloomcompactor/meta.go b/pkg/bloomcompactor/meta.go index adffb61dff5ed..c0a333c5c907e 100644 --- a/pkg/bloomcompactor/meta.go +++ b/pkg/bloomcompactor/meta.go @@ -2,14 +2,13 @@ package bloomcompactor import ( "fmt" - "hash" "path" "github.com/pkg/errors" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" - "github.com/grafana/loki/pkg/util/encoding" ) const ( @@ -17,27 +16,6 @@ const ( MetasPrefix = "metas" ) -// TODO(owen-d): Probably want to integrate against the block shipper -// instead of defining here, but only (min,max,fp) should be required for -// the ref. Things like index-paths, etc are not needed and possibly harmful -// in the case we want to do migrations. It's easier to load a block-ref or similar -// within the context of a specific tenant+period+index path and not couple them. -type BlockRef struct { - OwnershipRange v1.FingerprintBounds - Checksum uint32 -} - -func (r BlockRef) Hash(h hash.Hash32) error { - if err := r.OwnershipRange.Hash(h); err != nil { - return err - } - - var enc encoding.Encbuf - enc.PutBE32(r.Checksum) - _, err := h.Write(enc.Get()) - return errors.Wrap(err, "writing BlockRef") -} - type MetaRef struct { OwnershipRange v1.FingerprintBounds Checksum uint32 @@ -63,13 +41,13 @@ type Meta struct { OwnershipRange v1.FingerprintBounds // Old blocks which can be deleted in the future. These should be from previous compaction rounds. - Tombstones []BlockRef + Tombstones []bloomshipper.BlockRef // The specific TSDB files used to generate the block. Sources []tsdb.SingleTenantTSDBIdentifier // A list of blocks that were generated - Blocks []BlockRef + Blocks []bloomshipper.BlockRef } // Generate MetaRef from Meta @@ -131,6 +109,6 @@ type MetaStore interface { type BlockStore interface { // TODO(owen-d): flesh out|integrate against bloomshipper.Client - GetBlocks([]BlockRef) ([]*v1.Block, error) + GetBlocks([]bloomshipper.BlockRef) ([]*v1.Block, error) PutBlock(interface{}) error } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index a705d1965780f..d60ab5f13a190 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -2,7 +2,6 @@ package bloomgateway import ( "context" - "fmt" "math/rand" "testing" "time" @@ -343,9 +342,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, EndTimestamp: through, } block := bloomshipper.BlockRef{ - Ref: ref, - IndexPath: "index.tsdb.gz", - BlockPath: fmt.Sprintf("block-%d", i), + Ref: ref, } meta := bloomshipper.Meta{ MetaRef: bloomshipper.MetaRef{ @@ -459,8 +456,6 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp EndTimestamp: 0, Checksum: 0, }, - IndexPath: fmt.Sprintf("index-%d", i), - BlockPath: fmt.Sprintf("block-%d", i), }) } return res diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go index 81355f78e84ec..8d28ca03dcc63 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go @@ -4,10 +4,10 @@ import ( "context" "errors" "fmt" + "io" "os" "path" "path/filepath" - "strconv" "sync" "time" @@ -175,12 +175,12 @@ type cacheDownloadingStrategy struct { } func (s *cacheDownloadingStrategy) downloadBlock(task *BlockDownloadingTask, logger log.Logger) (blockWithQuerier, error) { - blockPath := task.block.BlockPath - s.keyMutex.LockKey(blockPath) + key := s.blockClient.Block(task.block).Addr() + s.keyMutex.LockKey(key) defer func() { - _ = s.keyMutex.UnlockKey(blockPath) + _ = s.keyMutex.UnlockKey(key) }() - blockFromCache, exists := s.blocksCache.Get(task.ctx, task.block.BlockPath) + blockFromCache, exists := s.blocksCache.Get(task.ctx, key) if exists { return blockWithQuerier{ BlockRef: task.block, @@ -193,10 +193,10 @@ func (s *cacheDownloadingStrategy) downloadBlock(task *BlockDownloadingTask, log return blockWithQuerier{}, err } blockFromCache = newCachedBlock(directory, s.config.BlocksCache.RemoveDirectoryGracefulPeriod, logger) - err = s.blocksCache.Store(task.ctx, []string{task.block.BlockPath}, []*cachedBlock{blockFromCache}) + err = s.blocksCache.Store(task.ctx, []string{key}, []*cachedBlock{blockFromCache}) if err != nil { - level.Error(logger).Log("msg", "error storing the block in the cache", "block", blockPath, "err", err) - return blockWithQuerier{}, fmt.Errorf("error storing the block %s in the cache : %w", blockPath, err) + level.Error(logger).Log("msg", "error storing the block in the cache", "block", key, "err", err) + return blockWithQuerier{}, fmt.Errorf("error storing the block %s in the cache : %w", key, err) } return blockWithQuerier{ BlockRef: task.block, @@ -229,20 +229,20 @@ func (s *storageDownloadingStrategy) close() { } func downloadBlockToDirectory(logger log.Logger, task *BlockDownloadingTask, workingDirectory string, blockClient BlockClient) (string, error) { - blockPath := task.block.BlockPath + blockPath := filepath.Join(workingDirectory, blockClient.Block(task.block).LocalPath()) level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath) block, err := blockClient.GetBlock(task.ctx, task.block) if err != nil { level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err) return "", fmt.Errorf("error downloading the block %s : %w", blockPath, err) } - directory, err := extractBlock(&block, time.Now(), workingDirectory, logger) + err = extractBlock(block.Data, blockPath, logger) if err != nil { level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err) return "", fmt.Errorf("error extracting the block %s : %w", blockPath, err) } - level.Debug(logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory) - return directory, nil + level.Debug(logger).Log("msg", "block has been downloaded and extracted", "block", blockPath) + return blockPath, nil } func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, references []BlockRef) (chan blockWithQuerier, chan error) { @@ -256,10 +256,10 @@ func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, r for _, reference := range references { task := NewBlockDownloadingTask(ctx, reference, blocksCh, errCh) - level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference.BlockPath) + level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference) err := d.queue.Enqueue(tenantID, nil, task, nil) if err != nil { - errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference.BlockPath, err) + errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference, err) return blocksCh, errCh } } @@ -272,15 +272,15 @@ type blockWithQuerier struct { } // extract the files into directory and returns absolute path to this directory. -func extractBlock(block *LazyBlock, ts time.Time, workingDirectory string, logger log.Logger) (string, error) { - workingDirectoryPath := filepath.Join(workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixNano(), 10)) - err := os.MkdirAll(workingDirectoryPath, os.ModePerm) +func extractBlock(data io.ReadCloser, blockDir string, logger log.Logger) error { + + err := os.MkdirAll(blockDir, os.ModePerm) if err != nil { - return "", fmt.Errorf("can not create directory to extract the block: %w", err) + return fmt.Errorf("can not create directory to extract the block: %w", err) } - archivePath, err := writeDataToTempFile(workingDirectoryPath, block) + archivePath, err := writeDataToTempFile(blockDir, data) if err != nil { - return "", fmt.Errorf("error writing data to temp file: %w", err) + return fmt.Errorf("error writing data to temp file: %w", err) } defer func() { err = os.Remove(archivePath) @@ -288,11 +288,11 @@ func extractBlock(block *LazyBlock, ts time.Time, workingDirectory string, logge level.Error(logger).Log("msg", "error removing temp archive file", "err", err) } }() - err = extractArchive(archivePath, workingDirectoryPath) + err = extractArchive(archivePath, blockDir) if err != nil { - return "", fmt.Errorf("error extracting archive: %w", err) + return fmt.Errorf("error extracting archive: %w", err) } - return workingDirectoryPath, nil + return nil } func (d *blockDownloader) stop() { diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go index ffe715c857ec6..8fc1b3e976e0f 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go @@ -7,7 +7,6 @@ import ( "io" "os" "path/filepath" - "strconv" "sync" "testing" "time" @@ -15,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -42,12 +42,12 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) { }, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer) require.NoError(t, err) blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences) - downloadedBlocks := make(map[string]any, len(blockReferences)) + downloadedBlocks := make(map[BlockRef]any, len(blockReferences)) done := make(chan bool) go func() { for i := 0; i < 20; i++ { block := <-blocksCh - downloadedBlocks[block.BlockPath] = nil + downloadedBlocks[block.BlockRef] = nil } done <- true }() @@ -111,12 +111,12 @@ func Test_blockDownloader_downloadBlock(t *testing.T) { require.NoError(t, err) blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences) - downloadedBlocks := make(map[string]any, len(blockReferences)) + downloadedBlocks := make(map[BlockRef]any, len(blockReferences)) done := make(chan bool) go func() { for i := 0; i < 20; i++ { block := <-blocksCh - downloadedBlocks[block.BlockPath] = nil + downloadedBlocks[block.BlockRef] = nil } done <- true }() @@ -132,12 +132,12 @@ func Test_blockDownloader_downloadBlock(t *testing.T) { require.Equal(t, int32(20), blockClient.getBlockCalls.Load()) blocksCh, errorsCh = downloader.downloadBlocks(context.Background(), "fake", blockReferences) - downloadedBlocks = make(map[string]any, len(blockReferences)) + downloadedBlocks = make(map[BlockRef]any, len(blockReferences)) done = make(chan bool) go func() { for i := 0; i < 20; i++ { block := <-blocksCh - downloadedBlocks[block.BlockPath] = nil + downloadedBlocks[block.BlockRef] = nil } done <- true }() @@ -313,7 +313,7 @@ func Test_closableBlockQuerier(t *testing.T) { // creates fake blocks and returns map[block-path]Block and mockBlockClient func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) { - mockData := make(map[string]blockSupplier, count) + mockData := make(map[BlockRef]blockSupplier, count) refs := make([]BlockRef, 0, count) for i := 0; i < count; i++ { archivePath, _, _ := createBlockArchive(t) @@ -321,9 +321,16 @@ func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) { //ensure file can be opened require.NoError(t, err) blockRef := BlockRef{ - BlockPath: fmt.Sprintf("block-path-%d", i), + Ref: Ref{ + TenantID: "", + TableName: "", + Bounds: v1.NewBounds(model.Fingerprint(i), model.Fingerprint(i+1)), + StartTimestamp: 0, + EndTimestamp: 0, + Checksum: 0, + }, } - mockData[blockRef.BlockPath] = func() LazyBlock { + mockData[blockRef] = func() LazyBlock { file, _ := os.OpenFile(archivePath, os.O_RDONLY, 0700) return LazyBlock{ BlockRef: blockRef, @@ -339,19 +346,20 @@ type blockSupplier func() LazyBlock type mockBlockClient struct { responseDelay time.Duration - mockData map[string]blockSupplier + mockData map[BlockRef]blockSupplier getBlockCalls atomic.Int32 + defaultKeyResolver } func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (LazyBlock, error) { m.getBlockCalls.Inc() time.Sleep(m.responseDelay) - supplier, exists := m.mockData[reference.BlockPath] + supplier, exists := m.mockData[reference] if exists { return supplier(), nil } - return LazyBlock{}, fmt.Errorf("block %s is not found in mockData", reference.BlockPath) + return LazyBlock{}, fmt.Errorf("block %s is not found in mockData", reference) } func (m *mockBlockClient) PutBlocks(_ context.Context, _ []Block) ([]Block, error) { @@ -368,26 +376,31 @@ func Test_blockDownloader_extractBlock(t *testing.T) { require.NoError(t, err) workingDir := t.TempDir() - ts := time.Now().UTC() block := LazyBlock{ - BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"}, - Data: blockFile, + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: "", + TableName: "", + Bounds: v1.NewBounds(0, 1), + StartTimestamp: 0, + EndTimestamp: 0, + Checksum: 0, + }, + }, + Data: blockFile, } - actualPath, err := extractBlock(&block, ts, workingDir, nil) - + err = extractBlock(block.Data, workingDir, nil) require.NoError(t, err) - expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixNano(), 10)) - require.Equal(t, expectedPath, actualPath, - "expected archive to be extracted to working directory under the same path as blockPath and with timestamp suffix") - require.FileExists(t, filepath.Join(expectedPath, v1.BloomFileName)) - require.FileExists(t, filepath.Join(expectedPath, v1.SeriesFileName)) - actualBloomFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.BloomFileName)) + require.FileExists(t, filepath.Join(workingDir, v1.BloomFileName)) + require.FileExists(t, filepath.Join(workingDir, v1.SeriesFileName)) + + actualBloomFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.BloomFileName)) require.NoError(t, err) require.Equal(t, bloomFileContent, string(actualBloomFileContent)) - actualSeriesFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.SeriesFileName)) + actualSeriesFileContent, err := os.ReadFile(filepath.Join(workingDir, v1.SeriesFileName)) require.NoError(t, err) require.Equal(t, seriesFileContent, string(actualSeriesFileContent)) } diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 6944d51da6412..cfdd057db047d 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "hash" "io" "path" "strconv" @@ -12,11 +13,13 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/concurrency" + "github.com/pkg/errors" "github.com/prometheus/common/model" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/util/encoding" ) const ( @@ -35,6 +38,26 @@ type Ref struct { Checksum uint32 } +// Hash hashes the ref +// NB(owen-d): we don't include the tenant in the hash +// as it's not included in the data and leaving it out gives +// flexibility for migrating data between tenants +func (r Ref) Hash(h hash.Hash32) error { + if err := r.Bounds.Hash(h); err != nil { + return err + } + + var enc encoding.Encbuf + + enc.PutString(r.TableName) + enc.PutBE64(uint64(r.StartTimestamp)) + enc.PutBE64(uint64(r.EndTimestamp)) + enc.PutBE32(r.Checksum) + + _, err := h.Write(enc.Get()) + return errors.Wrap(err, "writing BlockRef") +} + // Cmp returns the fingerprint's position relative to the bounds func (r Ref) Cmp(fp uint64) v1.BoundsCheck { return r.Bounds.Cmp(model.Fingerprint(fp)) @@ -46,8 +69,10 @@ func (r Ref) Interval() Interval { type BlockRef struct { Ref - IndexPath string - BlockPath string +} + +func (r BlockRef) String() string { + return defaultKeyResolver{}.Block(r).Addr() } type MetaRef struct { @@ -88,6 +113,7 @@ type Block struct { } type BlockClient interface { + KeyResolver GetBlock(ctx context.Context, ref BlockRef) (LazyBlock, error) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) DeleteBlocks(ctx context.Context, blocks []BlockRef) error @@ -103,6 +129,7 @@ type Client interface { var _ Client = &BloomClient{} type BloomClient struct { + KeyResolver concurrency int client client.ObjectClient logger log.Logger @@ -110,7 +137,8 @@ type BloomClient struct { func NewBloomClient(client client.ObjectClient, logger log.Logger) (*BloomClient, error) { return &BloomClient{ - concurrency: 100, // make configurable? + KeyResolver: defaultKeyResolver{}, // TODO(owen-d): hook into schema, similar to `{,Parse}ExternalKey` + concurrency: 100, // make configurable? client: client, logger: logger, }, nil @@ -125,12 +153,6 @@ func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error { return b.client.PutObject(ctx, key, bytes.NewReader(data)) } -func externalBlockKey(ref BlockRef) string { - blockParentFolder := ref.Bounds.String() - filename := fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum) - return path.Join(rootFolder, ref.TableName, ref.TenantID, bloomsFolder, blockParentFolder, filename) -} - func externalMetaKey(ref MetaRef) string { filename := fmt.Sprintf("%s-%d-%d-%x", ref.Bounds.String(), ref.StartTimestamp, ref.EndTimestamp, ref.Checksum) return path.Join(rootFolder, ref.TableName, ref.TenantID, metasFolder, filename) @@ -153,7 +175,7 @@ func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error { // GetBlock downloads the blocks from objectStorage and returns the downloaded block func (b *BloomClient) GetBlock(ctx context.Context, reference BlockRef) (LazyBlock, error) { - readCloser, _, err := b.client.GetObject(ctx, externalBlockKey(reference)) + readCloser, _, err := b.client.GetObject(ctx, b.Block(reference).Addr()) if err != nil { return LazyBlock{}, fmt.Errorf("error while fetching object from storage: %w", err) } @@ -173,7 +195,7 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e var err error - key := externalBlockKey(block.BlockRef) + key := b.Block(block.BlockRef).Addr() _, err = block.Data.Seek(0, 0) if err != nil { return fmt.Errorf("error uploading block file: %w", err) @@ -183,7 +205,6 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e if err != nil { return fmt.Errorf("error uploading block file: %w", err) } - block.BlockPath = key results[idx] = block return nil }) @@ -193,7 +214,7 @@ func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, e func (b *BloomClient) DeleteBlocks(ctx context.Context, references []BlockRef) error { return concurrency.ForEachJob(ctx, len(references), b.concurrency, func(ctx context.Context, idx int) error { ref := references[idx] - key := externalBlockKey(ref) + key := b.Block(ref).Addr() err := b.client.DeleteObject(ctx, key) if err != nil { return fmt.Errorf("error deleting block file: %w", err) diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 3d6eac07521de..80770a5735679 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -205,17 +205,6 @@ func Test_BloomClient_DeleteMeta(t *testing.T) { } func Test_BloomClient_GetBlocks(t *testing.T) { - bloomClient := createStore(t) - fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem - firstBlockPath := fmt.Sprintf("bloom/first-period-19621/tenantA/blooms/%s/1695272400000-1695276000000-1", v1.NewBounds(0xeeee, 0xffff)) - firstBlockFullPath := filepath.Join(fsNamedStores["folder-1"].Directory, firstBlockPath) - firstBlockData := createBlockFile(t, firstBlockFullPath) - secondBlockPath := fmt.Sprintf("bloom/second-period-19624/tenantA/blooms/%s/1695531600000-1695535200000-2", v1.NewBounds(0xaaaa, 0xbbbb)) - secondBlockFullPath := filepath.Join(fsNamedStores["folder-2"].Directory, secondBlockPath) - secondBlockData := createBlockFile(t, secondBlockFullPath) - require.FileExists(t, firstBlockFullPath) - require.FileExists(t, secondBlockFullPath) - firstBlockRef := BlockRef{ Ref: Ref{ TenantID: "tenantA", @@ -225,7 +214,6 @@ func Test_BloomClient_GetBlocks(t *testing.T) { EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, }, - BlockPath: firstBlockPath, } secondBlockRef := BlockRef{ Ref: Ref{ @@ -236,9 +224,23 @@ func Test_BloomClient_GetBlocks(t *testing.T) { EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), Checksum: 2, }, - BlockPath: secondBlockPath, } + bloomClient := createStore(t) + fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem + firstBlockFullPath := NewPrefixedResolver( + fsNamedStores["folder-1"].Directory, + defaultKeyResolver{}, + ).Block(firstBlockRef).LocalPath() + firstBlockData := createBlockFile(t, firstBlockFullPath) + secondBlockFullPath := NewPrefixedResolver( + fsNamedStores["folder-2"].Directory, + defaultKeyResolver{}, + ).Block(secondBlockRef).LocalPath() + secondBlockData := createBlockFile(t, secondBlockFullPath) + require.FileExists(t, firstBlockFullPath) + require.FileExists(t, secondBlockFullPath) + downloadedFirstBlock, err := bloomClient.GetBlock(context.Background(), firstBlockRef) require.NoError(t, err) firstBlockActualData, err := io.ReadAll(downloadedFirstBlock.Data) @@ -254,8 +256,7 @@ func Test_BloomClient_GetBlocks(t *testing.T) { func Test_BloomClient_PutBlocks(t *testing.T) { bloomClient := createStore(t) - blockForFirstFolderData := "data1" - blockForFirstFolder := Block{ + block := Block{ BlockRef: BlockRef{ Ref: Ref{ TenantID: "tenantA", @@ -265,118 +266,44 @@ func Test_BloomClient_PutBlocks(t *testing.T) { EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), Checksum: 1, }, - IndexPath: uuid.New().String(), - }, - Data: awsio.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))}, - } - - blockForSecondFolderData := "data2" - blockForSecondFolder := Block{ - BlockRef: BlockRef{ - Ref: Ref{ - TenantID: "tenantA", - TableName: "second-period-19624", - Bounds: v1.NewBounds(0xaaaa, 0xbbbb), - StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), - EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), - Checksum: 2, - }, - IndexPath: uuid.New().String(), }, - Data: awsio.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, + Data: awsio.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte("data"))}, } - - results, err := bloomClient.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder}) + _, err := bloomClient.PutBlocks(context.Background(), []Block{block}) require.NoError(t, err) - require.Len(t, results, 2) - firstResultBlock := results[0] - path := firstResultBlock.BlockPath - require.Equal(t, - fmt.Sprintf( - "bloom/first-period-19621/tenantA/blooms/%s/1695272400000-1695276000000-1", - v1.NewBounds(0xeeee, 0xffff), - ), - path, - ) - require.Equal(t, blockForFirstFolder.TenantID, firstResultBlock.TenantID) - require.Equal(t, blockForFirstFolder.TableName, firstResultBlock.TableName) - require.Equal(t, blockForFirstFolder.Bounds.Min, firstResultBlock.Bounds.Min) - require.Equal(t, blockForFirstFolder.Bounds.Max, firstResultBlock.Bounds.Max) - require.Equal(t, blockForFirstFolder.StartTimestamp, firstResultBlock.StartTimestamp) - require.Equal(t, blockForFirstFolder.EndTimestamp, firstResultBlock.EndTimestamp) - require.Equal(t, blockForFirstFolder.Checksum, firstResultBlock.Checksum) - require.Equal(t, blockForFirstFolder.IndexPath, firstResultBlock.IndexPath) - folder1 := bloomClient.storageConfig.NamedStores.Filesystem["folder-1"].Directory - savedFilePath := filepath.Join(folder1, path) - require.FileExists(t, savedFilePath) - savedData, err := os.ReadFile(savedFilePath) + got, err := bloomClient.GetBlock(context.Background(), block.BlockRef) require.NoError(t, err) - require.Equal(t, blockForFirstFolderData, string(savedData)) - - secondResultBlock := results[1] - path = secondResultBlock.BlockPath - require.Equal(t, - fmt.Sprintf( - "bloom/second-period-19624/tenantA/blooms/%s/1695531600000-1695535200000-2", - v1.NewBounds(0xaaaa, 0xbbbb), - ), - path, - ) - require.Equal(t, blockForSecondFolder.TenantID, secondResultBlock.TenantID) - require.Equal(t, blockForSecondFolder.TableName, secondResultBlock.TableName) - require.Equal(t, blockForSecondFolder.Bounds.Min, secondResultBlock.Bounds.Min) - require.Equal(t, blockForSecondFolder.Bounds.Max, secondResultBlock.Bounds.Max) - require.Equal(t, blockForSecondFolder.StartTimestamp, secondResultBlock.StartTimestamp) - require.Equal(t, blockForSecondFolder.EndTimestamp, secondResultBlock.EndTimestamp) - require.Equal(t, blockForSecondFolder.Checksum, secondResultBlock.Checksum) - require.Equal(t, blockForSecondFolder.IndexPath, secondResultBlock.IndexPath) - folder2 := bloomClient.storageConfig.NamedStores.Filesystem["folder-2"].Directory - - savedFilePath = filepath.Join(folder2, path) - require.FileExists(t, savedFilePath) - savedData, err = os.ReadFile(savedFilePath) + require.Equal(t, block.BlockRef, got.BlockRef) + data, err := io.ReadAll(got.Data) require.NoError(t, err) - require.Equal(t, blockForSecondFolderData, string(savedData)) + require.Equal(t, "data", string(data)) } func Test_BloomClient_DeleteBlocks(t *testing.T) { - bloomClient := createStore(t) - fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem - block1Path := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom/first-period-19621/tenantA/blooms/000000000000eeee-000000000000ffff/1695272400000-1695276000000-1") - createBlockFile(t, block1Path) - block2Path := filepath.Join(fsNamedStores["folder-2"].Directory, "bloom/second-period-19624/tenantA/blooms/000000000000aaaa-000000000000bbbb/1695531600000-1695535200000-2") - createBlockFile(t, block2Path) - require.FileExists(t, block1Path) - require.FileExists(t, block2Path) - - blocksToDelete := []BlockRef{ - { - Ref: Ref{ - TenantID: "tenantA", - TableName: "second-period-19624", - Bounds: v1.NewBounds(0xaaaa, 0xbbbb), - StartTimestamp: Date(2023, time.September, 24, 5, 0, 0), - EndTimestamp: Date(2023, time.September, 24, 6, 0, 0), - Checksum: 2, - }, - IndexPath: uuid.New().String(), - }, - { - Ref: Ref{ - TenantID: "tenantA", - TableName: "first-period-19621", - Bounds: v1.NewBounds(0xeeee, 0xffff), - StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), - EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), - Checksum: 1, - }, - IndexPath: uuid.New().String(), + block := BlockRef{ + Ref: Ref{ + TenantID: "tenantA", + TableName: "first-period-19621", + Bounds: v1.NewBounds(0xeeee, 0xffff), + StartTimestamp: Date(2023, time.September, 21, 5, 0, 0), + EndTimestamp: Date(2023, time.September, 21, 6, 0, 0), + Checksum: 1, }, } - err := bloomClient.DeleteBlocks(context.Background(), blocksToDelete) + + bloomClient := createStore(t) + fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem + blockFullPath := NewPrefixedResolver( + fsNamedStores["folder-1"].Directory, + defaultKeyResolver{}, + ).Block(block).LocalPath() + _ = createBlockFile(t, blockFullPath) + require.FileExists(t, blockFullPath) + + err := bloomClient.DeleteBlocks(context.Background(), []BlockRef{block}) require.NoError(t, err) - require.NoFileExists(t, block1Path) - require.NoFileExists(t, block2Path) + require.NoFileExists(t, blockFullPath) + } func createBlockFile(t *testing.T, path string) string { @@ -556,8 +483,6 @@ func createMetaEntity( StartTimestamp: startTimestamp, EndTimestamp: endTimestamp, }, - IndexPath: uuid.New().String(), - BlockPath: uuid.New().String(), }, }, Blocks: []BlockRef{ @@ -569,8 +494,6 @@ func createMetaEntity( StartTimestamp: startTimestamp, EndTimestamp: endTimestamp, }, - IndexPath: uuid.New().String(), - BlockPath: uuid.New().String(), }, }, } diff --git a/pkg/storage/stores/shipper/bloomshipper/compress_utils.go b/pkg/storage/stores/shipper/bloomshipper/compress_utils.go index 96af5e987c3d4..5f11e81a5b3b6 100644 --- a/pkg/storage/stores/shipper/bloomshipper/compress_utils.go +++ b/pkg/storage/stores/shipper/bloomshipper/compress_utils.go @@ -32,32 +32,8 @@ func CompressBloomBlock(ref BlockRef, archivePath, localDst string, logger log.L return blockToUpload, nil } -func UncompressBloomBlock(block *LazyBlock, workingDirectory string, logger log.Logger) (string, error) { - workingDirectoryPath := filepath.Join(workingDirectory, block.BlockPath) - err := os.MkdirAll(workingDirectoryPath, os.ModePerm) - if err != nil { - return "", fmt.Errorf("can not create directory to extract the block: %w", err) - } - archivePath, err := writeDataToTempFile(workingDirectoryPath, block) - if err != nil { - return "", fmt.Errorf("error writing data to temp file: %w", err) - } - level.Info(logger).Log("msg", "extracting archive", "archive", archivePath, "workingDirectory", workingDirectoryPath, "blockPath", block.BlockPath) - defer func() { - err = os.Remove(archivePath) - if err != nil { - level.Error(logger).Log("msg", "removing archive file", "err", err, "file", archivePath) - } - }() - err = extractArchive(archivePath, workingDirectoryPath) - if err != nil { - return "", fmt.Errorf("error extracting archive: %w", err) - } - return workingDirectoryPath, nil -} - -func writeDataToTempFile(workingDirectoryPath string, block *LazyBlock) (string, error) { - defer block.Data.Close() +func writeDataToTempFile(workingDirectoryPath string, data io.ReadCloser) (string, error) { + defer data.Close() archivePath := filepath.Join(workingDirectoryPath, uuid.New().String()) archiveFile, err := os.Create(archivePath) @@ -65,7 +41,7 @@ func writeDataToTempFile(workingDirectoryPath string, block *LazyBlock) (string, return "", fmt.Errorf("error creating empty file to store the archiver: %w", err) } defer archiveFile.Close() - _, err = io.Copy(archiveFile, block.Data) + _, err = io.Copy(archiveFile, data) if err != nil { return "", fmt.Errorf("error writing data to archive file: %w", err) } diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go new file mode 100644 index 0000000000000..6278af9c6d04d --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -0,0 +1,121 @@ +package bloomshipper + +import ( + "fmt" + "path" + "path/filepath" +) + +const ( + BloomPrefix = "bloom" + MetasPrefix = "metas" + BlocksPrefix = "blocks" +) + +// KeyResolver is an interface for resolving keys to locations. +// This is used to determine where items are stored in object storage _and_ on disk. +// Using an interface allows us to abstract away platform specifics +// (e.g. OS path-specifics, object storage difference, etc) +// TODO(owen-d): implement resolvers that schema-aware, allowing us to change +// the locations of data across schema boundaries (for instance to upgrade|improve). +type KeyResolver interface { + Meta(MetaRef) Location + Block(BlockRef) Location +} + +type defaultKeyResolver struct{} + +func (defaultKeyResolver) Meta(ref MetaRef) Location { + return simpleLocation{ + BloomPrefix, + fmt.Sprintf("%v", ref.TableName), + ref.TenantID, + MetasPrefix, + fmt.Sprintf("%v-%v", ref.Bounds, ref.Checksum), + } +} + +func (defaultKeyResolver) Block(ref BlockRef) Location { + return simpleLocation{ + BloomPrefix, + fmt.Sprintf("%v", ref.TableName), + ref.TenantID, + BlocksPrefix, + ref.Bounds.String(), + fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum), + } +} + +type PrefixedResolver struct { + prefix string + KeyResolver +} + +func NewPrefixedResolver(prefix string, resolver KeyResolver) KeyResolver { + return PrefixedResolver{ + prefix: prefix, + KeyResolver: resolver, + } +} + +func (p PrefixedResolver) Meta(ref MetaRef) Location { + return locations{ + key(p.prefix), + p.KeyResolver.Meta(ref), + } +} + +func (p PrefixedResolver) Block(ref BlockRef) Location { + return locations{ + key(p.prefix), + p.KeyResolver.Block(ref), + } +} + +type Location interface { + Addr() string // object storage location + LocalPath() string // local path version +} + +// simplest Location implementor, just a string +type key string + +func (k key) Addr() string { + return string(k) +} + +func (k key) LocalPath() string { + return string(k) +} + +// simpleLocation is a simple implementation of Location combining multiple strings +type simpleLocation []string + +func (xs simpleLocation) LocalPath() string { + return filepath.Join(xs...) +} + +func (xs simpleLocation) Addr() string { + return path.Join(xs...) +} + +// helper type for combining multiple locations into one +type locations []Location + +func (ls locations) Addr() string { + xs := make([]string, 0, len(ls)) + for _, l := range ls { + xs = append(xs, l.Addr()) + } + + return path.Join(xs...) +} + +func (ls locations) LocalPath() string { + xs := make([]string, 0, len(ls)) + for _, l := range ls { + xs = append(xs, l.LocalPath()) + } + + return filepath.Join(xs...) +} diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index fad9b2616b6bc..9ccffcd643024 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -107,7 +107,7 @@ func runCallback(callback ForEachBlockCallback, block blockWithQuerier) error { err := callback(block.closableBlockQuerier.BlockQuerier, block.Bounds) if err != nil { - return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err) + return fmt.Errorf("error running callback function for block %s err: %w", block.BlockRef, err) } return nil } @@ -142,37 +142,36 @@ func (s *Shipper) getActiveBlockRefs(ctx context.Context, tenantID string, inter return BlocksForMetas(metas, interval, bounds), nil } +// BlocksForMetas returns all the blocks from all the metas listed that are within the requested bounds +// and not tombstoned in any of the metas func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) []BlockRef { - tombstones := make(map[string]interface{}) + blocks := make(map[BlockRef]bool) // block -> isTombstoned + for _, meta := range metas { for _, tombstone := range meta.Tombstones { - tombstones[tombstone.BlockPath] = nil + blocks[tombstone] = true } - } - blocksSet := make(map[string]BlockRef) - for _, meta := range metas { for _, block := range meta.Blocks { - if _, contains := tombstones[block.BlockPath]; contains { + tombstoned, ok := blocks[block] + if ok && tombstoned { // skip tombstoned blocks continue } - if isOutsideRange(block, interval, keyspaces) { - // skip block that are outside of interval or keyspaces - continue - } - blocksSet[block.BlockPath] = block + blocks[block] = false } } - blockRefs := make([]BlockRef, 0, len(blocksSet)) - for _, ref := range blocksSet { - blockRefs = append(blockRefs, ref) - } - sort.Slice(blockRefs, func(i, j int) bool { - return blockRefs[i].Bounds.Less(blockRefs[j].Bounds) + refs := make([]BlockRef, 0, len(blocks)) + for ref, tombstoned := range blocks { + if !tombstoned && !isOutsideRange(ref, interval, keyspaces) { + refs = append(refs, ref) + } + } + sort.Slice(refs, func(i, j int) bool { + return refs[i].Bounds.Less(refs[j].Bounds) }) - return blockRefs + return refs } // isOutsideRange tests if a given BlockRef b is outside of search boundaries diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index f3ef55a4f3901..8ad776d4164aa 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -18,24 +18,24 @@ func Test_Shipper_findBlocks(t *testing.T) { { Blocks: []BlockRef{ //this blockRef is marked as deleted in the next meta - createMatchingBlockRef("block1"), - createMatchingBlockRef("block2"), + createMatchingBlockRef(1), + createMatchingBlockRef(2), }, }, { Blocks: []BlockRef{ //this blockRef is marked as deleted in the next meta - createMatchingBlockRef("block3"), - createMatchingBlockRef("block4"), + createMatchingBlockRef(3), + createMatchingBlockRef(4), }, }, { Tombstones: []BlockRef{ - createMatchingBlockRef("block1"), - createMatchingBlockRef("block3"), + createMatchingBlockRef(1), + createMatchingBlockRef(3), }, Blocks: []BlockRef{ - createMatchingBlockRef("block5"), + createMatchingBlockRef(5), }, }, } @@ -49,9 +49,9 @@ func Test_Shipper_findBlocks(t *testing.T) { blocks := BlocksForMetas(metas, interval, []v1.FingerprintBounds{{Min: 100, Max: 200}}) expectedBlockRefs := []BlockRef{ - createMatchingBlockRef("block2"), - createMatchingBlockRef("block4"), - createMatchingBlockRef("block5"), + createMatchingBlockRef(2), + createMatchingBlockRef(4), + createMatchingBlockRef(5), } require.ElementsMatch(t, expectedBlockRefs, blocks) }) @@ -98,7 +98,7 @@ func Test_Shipper_findBlocks(t *testing.T) { } for name, data := range tests { t.Run(name, func(t *testing.T) { - ref := createBlockRef("fake-block", data.minFingerprint, data.maxFingerprint, data.startTimestamp, data.endTimestamp) + ref := createBlockRef(data.minFingerprint, data.maxFingerprint, data.startTimestamp, data.endTimestamp) blocks := BlocksForMetas([]Meta{{Blocks: []BlockRef{ref}}}, NewInterval(300, 400), []v1.FingerprintBounds{{Min: 100, Max: 200}}) if data.filtered { require.Empty(t, blocks) @@ -115,78 +115,79 @@ func TestIsOutsideRange(t *testing.T) { endTs := model.Time(2000) t.Run("is outside if startTs > through", func(t *testing.T) { - b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs) + b := createBlockRef(0, math.MaxUint64, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(0, 900), []v1.FingerprintBounds{}) require.True(t, isOutside) }) t.Run("is outside if startTs == through ", func(t *testing.T) { - b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs) + b := createBlockRef(0, math.MaxUint64, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(900, 1000), []v1.FingerprintBounds{}) require.True(t, isOutside) }) t.Run("is outside if endTs < from", func(t *testing.T) { - b := createBlockRef("block", 0, math.MaxUint64, startTs, endTs) + b := createBlockRef(0, math.MaxUint64, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(2100, 3000), []v1.FingerprintBounds{}) require.True(t, isOutside) }) t.Run("is outside if endFp < first fingerprint", func(t *testing.T) { - b := createBlockRef("block", 0, 90, startTs, endTs) + b := createBlockRef(0, 90, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 100, Max: 199}}) require.True(t, isOutside) }) t.Run("is outside if startFp > last fingerprint", func(t *testing.T) { - b := createBlockRef("block", 200, math.MaxUint64, startTs, endTs) + b := createBlockRef(200, math.MaxUint64, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 49}, {Min: 100, Max: 149}}) require.True(t, isOutside) }) t.Run("is outside if within gaps in fingerprints", func(t *testing.T) { - b := createBlockRef("block", 100, 199, startTs, endTs) + b := createBlockRef(100, 199, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}}) require.True(t, isOutside) }) t.Run("is not outside if within fingerprints 1", func(t *testing.T) { - b := createBlockRef("block", 10, 90, startTs, endTs) + b := createBlockRef(10, 90, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}}) require.False(t, isOutside) }) t.Run("is not outside if within fingerprints 2", func(t *testing.T) { - b := createBlockRef("block", 210, 290, startTs, endTs) + b := createBlockRef(210, 290, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}}) require.False(t, isOutside) }) t.Run("is not outside if spans across multiple fingerprint ranges", func(t *testing.T) { - b := createBlockRef("block", 50, 250, startTs, endTs) + b := createBlockRef(50, 250, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0, Max: 99}, {Min: 200, Max: 299}}) require.False(t, isOutside) }) t.Run("is not outside if fingerprint range and time range are larger than block", func(t *testing.T) { - b := createBlockRef("block", math.MaxUint64/3, math.MaxUint64/3*2, startTs, endTs) + b := createBlockRef(math.MaxUint64/3, math.MaxUint64/3*2, startTs, endTs) isOutside := isOutsideRange(b, NewInterval(0, 3000), []v1.FingerprintBounds{{Min: 0, Max: math.MaxUint64}}) require.False(t, isOutside) }) t.Run("is not outside if block fingerprint range is bigger that search keyspace", func(t *testing.T) { - b := createBlockRef("block", 0x0000, 0xffff, model.Earliest, model.Latest) + b := createBlockRef(0x0000, 0xffff, model.Earliest, model.Latest) isOutside := isOutsideRange(b, NewInterval(startTs, endTs), []v1.FingerprintBounds{{Min: 0x0100, Max: 0xff00}}) require.False(t, isOutside) }) } -func createMatchingBlockRef(blockPath string) BlockRef { - return createBlockRef(blockPath, 0, math.MaxUint64, model.Time(0), model.Time(math.MaxInt64)) +func createMatchingBlockRef(checksum uint32) BlockRef { + block := createBlockRef(0, math.MaxUint64, model.Time(0), model.Time(math.MaxInt64)) + block.Checksum = checksum + return block } func createBlockRef( - blockPath string, minFingerprint, maxFingerprint uint64, startTimestamp, endTimestamp model.Time, ) BlockRef { @@ -200,7 +201,5 @@ func createBlockRef( EndTimestamp: endTimestamp, Checksum: 0, }, - // block path is unique, and it's used to distinguish the blocks so the rest of the fields might be skipped in this test - BlockPath: blockPath, } } diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index d4da6573ba5df..8bcb65d304ca7 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -32,11 +32,12 @@ var _ Client = &bloomStoreEntry{} var _ Store = &bloomStoreEntry{} type bloomStoreEntry struct { - start model.Time - cfg config.PeriodConfig - objectClient client.ObjectClient - bloomClient Client - fetcher *Fetcher + start model.Time + cfg config.PeriodConfig + objectClient client.ObjectClient + bloomClient Client + fetcher *Fetcher + defaultKeyResolver // TODO(owen-d): impl schema aware resolvers } // ResolveMetas implements store. @@ -191,6 +192,38 @@ func NewBloomStore( return store, nil } +// Impements KeyResolver +func (b *BloomStore) Meta(ref MetaRef) (loc Location) { + _ = b.storeDo(ref.StartTimestamp, func(s *bloomStoreEntry) error { + loc = s.Meta(ref) + return nil + }) + + // NB(owen-d): should not happen unless a ref is requested outside the store's accepted range. + // This should be prevented during query validation + if loc == nil { + loc = defaultKeyResolver{}.Meta(ref) + } + + return +} + +// Impements KeyResolver +func (b *BloomStore) Block(ref BlockRef) (loc Location) { + _ = b.storeDo(ref.StartTimestamp, func(s *bloomStoreEntry) error { + loc = s.Block(ref) + return nil + }) + + // NB(owen-d): should not happen unless a ref is requested outside the store's accepted range. + // This should be prevented during query validation + if loc == nil { + loc = defaultKeyResolver{}.Block(ref) + } + + return +} + // Fetcher implements Store. func (b *BloomStore) Fetcher(ts model.Time) *Fetcher { if store := b.getStore(ts); store != nil {