Skip to content

Commit

Permalink
Bloom Shipper (grafana#10806)
Browse files Browse the repository at this point in the history
implemented bloom shipper that allows to iterates over all active blocks
This new component will be used by the bloom-compactor and
bloom-gateway.

#### How BloomShipper works
1. looks up all Metas
2. from metas gets list of all blocks
3. from metas gets list of tombstones
4. removes tombstoned blocks from the list of all blocks
5. passes all active blockReferences to bloomClient
6. once bloomClient returns the downloaded block, it extracts all files
from bloom block archive into working directory
7. creates `*BlockQuerier` and run a callback function with it
**Note:** `BlockQuerier` will be created using `BloomDirectoryReader`
that will be created using the past from step #6

---------

Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko authored and rhnasc committed Apr 12, 2024
1 parent eb1bae8 commit e84d728
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 3 deletions.
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2111,6 +2111,12 @@ tsdb_shipper:
# index-read-cache will be used as the backend.
# CLI flag: -tsdb.enable-postings-cache
[enable_postings_cache: <boolean> | default = false]

# Configures Bloom Shipper.
bloom_shipper:
# Working directory to store downloaded Bloom Blocks.
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]
```
### chunk_store_config
Expand Down
3 changes: 3 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) {
if r.CompactorConfig.WorkingDirectory == defaults.CompactorConfig.WorkingDirectory {
r.CompactorConfig.WorkingDirectory = fmt.Sprintf("%s/compactor", prefix)
}
if r.StorageConfig.BloomShipperConfig.WorkingDirectory == defaults.StorageConfig.BloomShipperConfig.WorkingDirectory {
r.StorageConfig.BloomShipperConfig.WorkingDirectory = fmt.Sprintf("%s/bloom-shipper", prefix)
}
}
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/bloomshipperconfig"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
Expand Down Expand Up @@ -332,9 +333,10 @@ type Config struct {
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
MaxParallelGetChunk int `yaml:"max_parallel_get_chunk"`

MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."`
TSDBShipperConfig tsdb.IndexCfg `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."`
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."`
TSDBShipperConfig tsdb.IndexCfg `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."`
BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures Bloom Shipper."`

// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
// It is required for getting chunk ids of recently flushed chunks from the ingesters.
Expand Down Expand Up @@ -365,6 +367,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.BoltDBShipperConfig.RegisterFlags(f)
f.IntVar(&cfg.MaxChunkBatchSize, "store.max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.")
cfg.TSDBShipperConfig.RegisterFlagsWithPrefix("tsdb.", f)
cfg.BloomShipperConfig.RegisterFlagsWithPrefix("bloom.", f)
}

// Validate config and returns error on failure
Expand All @@ -390,6 +393,9 @@ func (cfg *Config) Validate() error {
if err := cfg.TSDBShipperConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid tsdb config")
}
if err := cfg.BloomShipperConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid bloom shipper config")
}

return cfg.NamedStores.Validate()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package bloomshipperconfig

import (
"errors"
"flag"
"strings"
)

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
}

func (c *Config) Validate() error {
if strings.TrimSpace(c.WorkingDirectory) == "" {
return errors.New("working directory must be specified")
}
return nil
}
208 changes: 208 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/shipper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package bloomshipper

import (
"archive/zip"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/bloomshipperconfig"
)

type Shipper struct {
client Client
config bloomshipperconfig.Config
}

func NewShipper(client Client, config bloomshipperconfig.Config) (*Shipper, error) {
return &Shipper{
client: client,
config: config,
}, nil
}

func (s *Shipper) ForEachBlock(
ctx context.Context,
tenantID string,
startTimestamp, endTimestamp int64,
minFingerprint, maxFingerprint uint64,
callback func(bq *v1.BlockQuerier) error) error {

blockRefs, err := s.getActiveBlockRefs(ctx, tenantID, startTimestamp, endTimestamp, minFingerprint, maxFingerprint)
if err != nil {
return fmt.Errorf("error fetching active block references : %w", err)
}

blocksChannel, errorsChannel := s.client.GetBlocks(ctx, blockRefs)
for {
select {
case block, ok := <-blocksChannel:
if !ok {
return nil
}
directory, err := s.extractBlock(&block, time.Now().UTC())
if err != nil {
return fmt.Errorf("error unarchiving block %s err: %w", block.BlockPath, err)
}
blockQuerier := s.createBlockQuerier(directory)
err = callback(blockQuerier)
if err != nil {
return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err)
}
case err := <-errorsChannel:
if err != nil {
return fmt.Errorf("error downloading blocks : %w", err)
}
}
}
}

func (s *Shipper) getActiveBlockRefs(
ctx context.Context,
tenantID string,
startTimestamp, endTimestamp int64,
minFingerprint, maxFingerprint uint64) ([]BlockRef, error) {
metas, err := s.client.GetMetas(ctx, MetaSearchParams{
TenantID: tenantID,
MinFingerprint: minFingerprint,
MaxFingerprint: maxFingerprint,
StartTimestamp: startTimestamp,
EndTimestamp: endTimestamp,
})
if err != nil {
return []BlockRef{}, fmt.Errorf("error fetching meta.json files: %w", err)
}
activeBlocks := s.findBlocks(metas, minFingerprint, maxFingerprint, startTimestamp, endTimestamp)
slices.SortStableFunc(activeBlocks, func(a, b BlockRef) bool {
return a.MinFingerprint < b.MinFingerprint
})
return activeBlocks, nil
}

func (s *Shipper) findBlocks(
metas []Meta,
minFingerprint uint64,
maxFingerprint uint64,
startTimestamp int64,
endTimestamp int64,
) []BlockRef {
outdatedBlocks := make(map[string]interface{})
for _, meta := range metas {
for _, tombstone := range meta.Tombstones {
outdatedBlocks[tombstone.BlockPath] = nil
}
}
blocksSet := make(map[string]BlockRef)
for _, meta := range metas {
for _, block := range meta.Blocks {
if _, contains := outdatedBlocks[block.BlockPath]; contains {
continue
}
if isOutsideRange(&block, minFingerprint, maxFingerprint, startTimestamp, endTimestamp) {
continue
}
blocksSet[block.BlockPath] = block
}
}
blockRefs := make([]BlockRef, 0, len(blocksSet))
for _, ref := range blocksSet {
blockRefs = append(blockRefs, ref)
}
return blockRefs
}

func isOutsideRange(
b *BlockRef,
minFingerprint uint64,
maxFingerprint uint64,
startTimestamp int64,
endTimestamp int64,
) bool {
return b.MaxFingerprint < minFingerprint || b.MinFingerprint > maxFingerprint ||
b.EndTimestamp < startTimestamp || b.StartTimestamp > endTimestamp
}

// unzip the bytes into directory and returns absolute path to this directory.
func (s *Shipper) extractBlock(block *Block, ts time.Time) (string, error) {
workingDirectoryPath := filepath.Join(s.config.WorkingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(workingDirectoryPath, block)
if err != nil {
return "", fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
os.Remove(archivePath)
// todo log err
}()
err = extractArchive(archivePath, workingDirectoryPath)
if err != nil {
return "", fmt.Errorf("error extracting archive: %w", err)
}
return workingDirectoryPath, nil
}

func (s *Shipper) createBlockQuerier(directory string) *v1.BlockQuerier {
reader := v1.NewDirectoryBlockReader(directory)
block := v1.NewBlock(reader)
return v1.NewBlockQuerier(block)
}

func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) {
defer block.Data.Close()
archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:])

archiveFile, err := os.Create(archivePath)
if err != nil {
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
}
defer archiveFile.Close()
_, err = io.Copy(archiveFile, block.Data)
if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err)
}
return archivePath, nil
}

func extractArchive(archivePath string, workingDirectoryPath string) error {
reader, err := zip.OpenReader(archivePath)
if err != nil {
return fmt.Errorf("error opening archive: %w", err)
}
defer reader.Close()
for _, file := range reader.File {
err := extractInnerFile(file, workingDirectoryPath)
if err != nil {
return fmt.Errorf("error extracting %s file from archive: %w", file.Name, err)
}
}
return nil
}

func extractInnerFile(file *zip.File, workingDirectoryPath string) error {
innerFile, err := file.Open()
if err != nil {
return fmt.Errorf("error opening file: %w", err)
}
defer innerFile.Close()
extractedInnerFile, err := os.Create(filepath.Join(workingDirectoryPath, file.Name))
if err != nil {
return fmt.Errorf("error creating empty file: %w", err)
}
defer extractedInnerFile.Close()
_, err = io.Copy(extractedInnerFile, innerFile)
if err != nil {
return fmt.Errorf("error writing data: %w", err)
}
return nil
}
Loading

0 comments on commit e84d728

Please sign in to comment.