diff --git a/go.mod b/go.mod index 81be6b0584c7..ae8d918ec0ea 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( github.com/DmitriyVTitov/size v1.5.0 github.com/IBM/go-sdk-core/v5 v5.13.1 github.com/IBM/ibm-cos-sdk-go v1.10.0 + github.com/aws/smithy-go v1.11.1 github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc github.com/efficientgo/core v1.0.0-rc.2 github.com/fsnotify/fsnotify v1.6.0 @@ -176,7 +177,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.11.1 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect - github.com/aws/smithy-go v1.11.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 5611708df82b..d4e4c5dd24e5 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -287,7 +287,7 @@ func (ns *NamedStores) populateStoreType() error { return nil } -func (ns *NamedStores) validate() error { +func (ns *NamedStores) Validate() error { for name, awsCfg := range ns.AWS { if err := awsCfg.Validate(); err != nil { return errors.Wrap(err, fmt.Sprintf("invalid AWS Storage config with name %s", name)) @@ -391,7 +391,7 @@ func (cfg *Config) Validate() error { return errors.Wrap(err, "invalid tsdb config") } - return cfg.NamedStores.validate() + return cfg.NamedStores.Validate() } // NewIndexClient creates a new index client of the desired type specified in the PeriodConfig diff --git a/pkg/storage/factory_test.go b/pkg/storage/factory_test.go index faeb6f038617..4ab82af32219 100644 --- a/pkg/storage/factory_test.go +++ b/pkg/storage/factory_test.go @@ -112,7 +112,7 @@ func TestNamedStores(t *testing.T) { }, BoltDBShipperConfig: boltdbShipperConfig, } - require.NoError(t, cfg.NamedStores.validate()) + require.NoError(t, cfg.NamedStores.Validate()) schemaConfig := config.SchemaConfig{ Configs: []config.PeriodConfig{ diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 6eef4433ab57..5c7158d1f1d1 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1021,7 +1021,7 @@ func TestStore_indexPrefixChange(t *testing.T) { }, }, } - require.NoError(t, cfg.NamedStores.validate()) + require.NoError(t, cfg.NamedStores.Validate()) firstPeriodDate := parseDate("2019-01-01") secondPeriodDate := parseDate("2019-01-02") @@ -1198,7 +1198,7 @@ func TestStore_MultiPeriod(t *testing.T) { }, }, } - require.NoError(t, cfg.NamedStores.validate()) + require.NoError(t, cfg.NamedStores.Validate()) periodConfigV9 := config.PeriodConfig{ From: config.DayTime{Time: timeToModelTime(firstStoreDate)}, diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go new file mode 100644 index 000000000000..a1ba78b724d1 --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -0,0 +1,381 @@ +package bloomshipper + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/prometheus/common/model" + + "github.com/grafana/dskit/concurrency" + + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/chunk/client" + "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/util/math" +) + +const ( + rootFolder = "bloom" + metasFolder = "metas" + bloomsFolder = "blooms" + delimiter = "/" + fileNamePartDelimiter = "-" +) + +type Ref struct { + TenantID string + TableName string + MinFingerprint, MaxFingerprint uint64 + StartTimestamp, EndTimestamp int64 + Checksum uint32 +} + +type BlockRef struct { + Ref + IndexPath string + BlockPath string +} + +type MetaRef struct { + Ref + FilePath string +} + +// todo rename it +type Meta struct { + MetaRef `json:"-"` + + Tombstones []BlockRef + Blocks []BlockRef +} + +type MetaSearchParams struct { + TenantID string + MinFingerprint uint64 + MaxFingerprint uint64 + StartTimestamp int64 + EndTimestamp int64 +} + +type MetaClient interface { + // Returns all metas that are within MinFingerprint-MaxFingerprint fingerprint range + // and intersect time period from StartTimestamp to EndTimestamp. + GetMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) + PutMeta(ctx context.Context, meta Meta) (Meta, error) + DeleteMeta(ctx context.Context, meta Meta) error +} + +type Block struct { + BlockRef + + Data io.ReadCloser +} + +type BlockClient interface { + GetBlocks(ctx context.Context, references []BlockRef) (chan Block, chan error) + PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) + DeleteBlocks(ctx context.Context, blocks []BlockRef) error +} + +type Client interface { + MetaClient + BlockClient + Stop() +} + +// todo add logger +func NewBloomClient(periodicConfigs []config.PeriodConfig, storageConfig storage.Config, clientMetrics storage.ClientMetrics) (*BloomClient, error) { + periodicObjectClients := make(map[config.DayTime]client.ObjectClient) + for _, periodicConfig := range periodicConfigs { + objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageConfig, clientMetrics) + if err != nil { + return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err) + } + periodicObjectClients[periodicConfig.From] = objectClient + } + return &BloomClient{ + periodicConfigs: periodicConfigs, + storageConfig: storageConfig, + periodicObjectClients: periodicObjectClients, + }, nil +} + +type BloomClient struct { + periodicConfigs []config.PeriodConfig + storageConfig storage.Config + periodicObjectClients map[config.DayTime]client.ObjectClient +} + +func (b *BloomClient) GetMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) { + start := model.TimeFromUnix(params.StartTimestamp) + end := model.TimeFromUnix(params.EndTimestamp) + tablesByPeriod := tablesByPeriod(b.periodicConfigs, start, end) + + var metas []Meta + for periodFrom, tables := range tablesByPeriod { + periodClient := b.periodicObjectClients[periodFrom] + for _, table := range tables { + prefix := filepath.Join(rootFolder, table, params.TenantID, metasFolder) + list, _, err := periodClient.List(ctx, prefix, delimiter) + if err != nil { + return nil, fmt.Errorf("error listing metas under prefix [%s]: %w", prefix, err) + } + for _, object := range list { + metaRef, err := createMetaRef(object.Key, params.TenantID, table) + if err != nil { + return nil, err + } + if metaRef.MaxFingerprint < params.MinFingerprint || params.MaxFingerprint < metaRef.MinFingerprint || + metaRef.StartTimestamp < params.StartTimestamp || params.EndTimestamp < metaRef.EndTimestamp { + continue + } + meta, err := b.downloadMeta(ctx, metaRef, periodClient) + if err != nil { + return nil, err + } + metas = append(metas, meta) + } + } + } + return metas, nil +} + +func (b *BloomClient) PutMeta(ctx context.Context, meta Meta) error { + periodFrom, err := findPeriod(b.periodicConfigs, meta.StartTimestamp) + if err != nil { + return fmt.Errorf("error updloading meta file: %w", err) + } + data, err := json.Marshal(meta) + if err != nil { + return fmt.Errorf("can not marshal the meta to json: %w", err) + } + key := createMetaObjectKey(meta.MetaRef.Ref) + return b.periodicObjectClients[periodFrom].PutObject(ctx, key, bytes.NewReader(data)) +} + +func createBlockObjectKey(meta Ref) string { + blockParentFolder := fmt.Sprintf("%x-%x", meta.MinFingerprint, meta.MaxFingerprint) + filename := fmt.Sprintf("%v-%v-%x", meta.StartTimestamp, meta.EndTimestamp, meta.Checksum) + return strings.Join([]string{rootFolder, meta.TableName, meta.TenantID, bloomsFolder, blockParentFolder, filename}, delimiter) +} + +func createMetaObjectKey(meta Ref) string { + filename := fmt.Sprintf("%x-%x-%v-%v-%x", meta.MinFingerprint, meta.MaxFingerprint, meta.StartTimestamp, meta.EndTimestamp, meta.Checksum) + return strings.Join([]string{rootFolder, meta.TableName, meta.TenantID, metasFolder, filename}, delimiter) +} + +func findPeriod(configs []config.PeriodConfig, timestamp int64) (config.DayTime, error) { + ts := model.TimeFromUnix(timestamp) + for i := len(configs) - 1; i >= 0; i-- { + periodConfig := configs[i] + if periodConfig.From.Before(ts) || periodConfig.From.Equal(ts) { + return periodConfig.From, nil + } + } + return config.DayTime{}, fmt.Errorf("can not find period for timestamp %d", timestamp) +} +func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error { + periodFrom, err := findPeriod(b.periodicConfigs, meta.StartTimestamp) + if err != nil { + return fmt.Errorf("error updloading meta file: %w", err) + } + key := createMetaObjectKey(meta.MetaRef.Ref) + return b.periodicObjectClients[periodFrom].DeleteObject(ctx, key) +} + +// GetBlocks downloads all the blocks from objectStorage in parallel and sends the downloaded blocks +// via the channel Block that is closed only if all the blocks are downloaded without errors. +// If an error happens, the error will be sent via error channel. +func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (chan Block, chan error) { + blocksChannel := make(chan Block, len(references)) + errChannel := make(chan error) + go func() { + //todo move concurrency to the config + err := concurrency.ForEachJob(ctx, len(references), 100, func(ctx context.Context, idx int) error { + reference := references[idx] + period, err := findPeriod(b.periodicConfigs, reference.StartTimestamp) + if err != nil { + return fmt.Errorf("error while period lookup: %w", err) + } + objectClient := b.periodicObjectClients[period] + readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref)) + if err != nil { + return fmt.Errorf("error while fetching object from storage: %w", err) + } + blocksChannel <- Block{ + BlockRef: reference, + Data: readCloser, + } + return nil + }) + if err != nil { + errChannel <- fmt.Errorf("error downloading block file: %w", err) + return + } + //close blocks channel only if there is no error + close(blocksChannel) + }() + return blocksChannel, errChannel +} + +func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) { + results := make([]Block, len(blocks)) + //todo move concurrency to the config + err := concurrency.ForEachJob(ctx, len(blocks), 100, func(ctx context.Context, idx int) error { + block := blocks[idx] + defer func(Data io.ReadCloser) { + _ = Data.Close() + }(block.Data) + + period, err := findPeriod(b.periodicConfigs, block.StartTimestamp) + if err != nil { + return fmt.Errorf("error updloading block file: %w", err) + } + key := createBlockObjectKey(block.Ref) + objectClient := b.periodicObjectClients[period] + data, err := io.ReadAll(block.Data) + if err != nil { + return fmt.Errorf("error while reading object data: %w", err) + } + err = objectClient.PutObject(ctx, key, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("error updloading block file: %w", err) + } + block.BlockPath = key + results[idx] = block + return nil + }) + return results, err +} + +func (b *BloomClient) DeleteBlocks(ctx context.Context, references []BlockRef) error { + //todo move concurrency to the config + return concurrency.ForEachJob(ctx, len(references), 100, func(ctx context.Context, idx int) error { + ref := references[idx] + period, err := findPeriod(b.periodicConfigs, ref.StartTimestamp) + if err != nil { + return fmt.Errorf("error deleting block file: %w", err) + } + key := createBlockObjectKey(ref.Ref) + objectClient := b.periodicObjectClients[period] + err = objectClient.DeleteObject(ctx, key) + if err != nil { + return fmt.Errorf("error deleting block file: %w", err) + } + return nil + }) +} + +func (b *BloomClient) Stop() { + for _, objectClient := range b.periodicObjectClients { + objectClient.Stop() + } +} + +func (b *BloomClient) downloadMeta(ctx context.Context, metaRef MetaRef, client client.ObjectClient) (Meta, error) { + meta := Meta{ + MetaRef: metaRef, + } + reader, _, err := client.GetObject(ctx, metaRef.FilePath) + if err != nil { + return Meta{}, fmt.Errorf("error downloading meta file %s : %w", metaRef.FilePath, err) + } + defer func() { _ = reader.Close() }() + + buf, err := io.ReadAll(reader) + if err != nil { + return Meta{}, fmt.Errorf("error reading meta file %s: %w", metaRef.FilePath, err) + } + err = json.Unmarshal(buf, &meta) + if err != nil { + return Meta{}, fmt.Errorf("error unmarshalling content of meta file %s: %w", metaRef.FilePath, err) + } + return meta, nil +} + +// todo cover with tests +func createMetaRef(objectKey string, tenantID string, tableName string) (MetaRef, error) { + fileName := objectKey[strings.LastIndex(objectKey, delimiter)+1:] + parts := strings.Split(fileName, fileNamePartDelimiter) + if len(parts) != 5 { + return MetaRef{}, fmt.Errorf("%s filename parts count must be 5 but was %d: [%s]", objectKey, len(parts), strings.Join(parts, ", ")) + } + + minFingerprint, err := strconv.ParseUint(parts[0], 16, 64) + if err != nil { + return MetaRef{}, fmt.Errorf("error parsing minFingerprint %s : %w", parts[0], err) + } + maxFingerprint, err := strconv.ParseUint(parts[1], 16, 64) + if err != nil { + return MetaRef{}, fmt.Errorf("error parsing maxFingerprint %s : %w", parts[1], err) + } + startTimestamp, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return MetaRef{}, fmt.Errorf("error parsing startTimestamp %s : %w", parts[2], err) + } + endTimestamp, err := strconv.ParseInt(parts[3], 10, 64) + if err != nil { + return MetaRef{}, fmt.Errorf("error parsing endTimestamp %s : %w", parts[3], err) + } + checksum, err := strconv.ParseUint(parts[4], 16, 64) + if err != nil { + return MetaRef{}, fmt.Errorf("error parsing checksum %s : %w", parts[4], err) + } + return MetaRef{ + Ref: Ref{ + TenantID: tenantID, + TableName: tableName, + MinFingerprint: minFingerprint, + MaxFingerprint: maxFingerprint, + StartTimestamp: startTimestamp, + EndTimestamp: endTimestamp, + Checksum: uint32(checksum), + }, + FilePath: objectKey, + }, nil +} + +func tablesByPeriod(periodicConfigs []config.PeriodConfig, start, end model.Time) map[config.DayTime][]string { + result := make(map[config.DayTime][]string) + for i := len(periodicConfigs) - 1; i >= 0; i-- { + periodConfig := periodicConfigs[i] + if end.Before(periodConfig.From.Time) { + continue + } + owningPeriodStartTs := math.Max64(periodConfig.From.Unix(), start.Unix()) + owningPeriodEndTs := end.Unix() + if i != len(periodicConfigs)-1 { + nextPeriodConfig := periodicConfigs[i+1] + owningPeriodEndTs = math.Min64(nextPeriodConfig.From.Add(-1*time.Second).Unix(), owningPeriodEndTs) + } + result[periodConfig.From] = tablesForRange(periodicConfigs[i], owningPeriodStartTs, owningPeriodEndTs) + if !start.Before(periodConfig.From.Time) { + break + } + } + return result +} + +func tablesForRange(periodConfig config.PeriodConfig, from, to int64) []string { + interval := periodConfig.IndexTables.Period + intervalSeconds := interval.Seconds() + lower := from / int64(intervalSeconds) + upper := to / int64(intervalSeconds) + tables := make([]string, 0, 1+upper-lower) + prefix := periodConfig.IndexTables.Prefix + for i := lower; i <= upper; i++ { + tables = append(tables, joinTableName(prefix, i)) + } + return tables +} + +func joinTableName(prefix string, tableNumber int64) string { + return fmt.Sprintf("%s%d", prefix, tableNumber) +} diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go new file mode 100644 index 000000000000..585328241282 --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -0,0 +1,614 @@ +package bloomshipper + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "os" + "path/filepath" + "strings" + "testing" + "time" + + aws_io "github.com/aws/smithy-go/io" + "github.com/google/uuid" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/config" +) + +const ( + day = 24 * time.Hour +) + +var ( + // table 19627 + fixedDay = model.TimeFromUnix(time.Date(2023, time.September, 27, 0, 0, 0, 0, time.UTC).Unix()) +) + +func Test_BloomClient_GetMetas(t *testing.T) { + shipper := createShipper(t) + + var expected []Meta + folder1 := shipper.storageConfig.NamedStores.Filesystem["folder-1"].Directory + // must not be present in results because it is outside of time range + createMetaInStorage(t, folder1, "first-period-19621", "tenantA", 0, 100, fixedDay.Add(-7*day)) + // must be present in the results + expected = append(expected, createMetaInStorage(t, folder1, "first-period-19621", "tenantA", 0, 100, fixedDay.Add(-6*day))) + // must not be present in results because it belongs to another tenant + createMetaInStorage(t, folder1, "first-period-19621", "tenantB", 0, 100, fixedDay.Add(-6*day)) + // must be present in the results + expected = append(expected, createMetaInStorage(t, folder1, "first-period-19621", "tenantA", 101, 200, fixedDay.Add(-6*day))) + + folder2 := shipper.storageConfig.NamedStores.Filesystem["folder-2"].Directory + // must not be present in results because it's out of the time range + createMetaInStorage(t, folder2, "second-period-19626", "tenantA", 0, 100, fixedDay.Add(-1*day)) + // must be present in the results + expected = append(expected, createMetaInStorage(t, folder2, "second-period-19625", "tenantA", 0, 100, fixedDay.Add(-2*day))) + // must not be present in results because it belongs to another tenant + createMetaInStorage(t, folder2, "second-period-19624", "tenantB", 0, 100, fixedDay.Add(-3*day)) + + actual, err := shipper.GetMetas(context.Background(), MetaSearchParams{ + TenantID: "tenantA", + MinFingerprint: 50, + MaxFingerprint: 150, + StartTimestamp: fixedDay.Add(-6 * day).Unix(), + EndTimestamp: fixedDay.Add(-1*day - 1*time.Hour).Unix(), + }) + require.NoError(t, err) + require.ElementsMatch(t, expected, actual) +} + +func Test_BloomClient_PutMeta(t *testing.T) { + tests := map[string]struct { + source Meta + expectedFilePath string + expectedStorage string + }{ + "expected meta to be uploaded to the first folder": { + source: createMetaEntity("tenantA", + "first-period-19621", + 0xff, + 0xfff, + time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), + time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + 0xaaa, + "ignored-file-path-during-uploading", + ), + expectedStorage: "folder-1", + expectedFilePath: "bloom/first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa", + }, + "expected meta to be uploaded to the second folder": { + source: createMetaEntity("tenantA", + "second-period-19625", + 200, + 300, + time.Date(2023, time.September, 25, 0, 0, 0, 0, time.UTC).Unix(), + time.Date(2023, time.September, 25, 1, 0, 0, 0, time.UTC).Unix(), + 0xbbb, + "ignored-file-path-during-uploading", + ), + expectedStorage: "folder-2", + expectedFilePath: "bloom/second-period-19625/tenantA/metas/c8-12c-1695600000-1695603600-bbb", + }, + } + for name, data := range tests { + t.Run(name, func(t *testing.T) { + shipper := createShipper(t) + + err := shipper.PutMeta(context.Background(), data.source) + require.NoError(t, err) + + directory := shipper.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory + filePath := filepath.Join(directory, data.expectedFilePath) + require.FileExists(t, filePath) + content, err := os.ReadFile(filePath) + require.NoError(t, err) + result := Meta{} + err = json.Unmarshal(content, &result) + require.NoError(t, err) + + require.Equal(t, data.source.Blocks, result.Blocks) + require.Equal(t, data.source.Tombstones, result.Tombstones) + }) + } + +} + +func Test_BloomClient_DeleteMeta(t *testing.T) { + tests := map[string]struct { + source Meta + expectedFilePath string + expectedStorage string + }{ + "expected meta to be deleted from the first folder": { + source: createMetaEntity("tenantA", + "first-period-19621", + 0xff, + 0xfff, + time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), + time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + 0xaaa, + "ignored-file-path-during-uploading", + ), + expectedStorage: "folder-1", + expectedFilePath: "bloom/first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa", + }, + "expected meta to be delete from the second folder": { + source: createMetaEntity("tenantA", + "second-period-19625", + 200, + 300, + time.Date(2023, time.September, 25, 0, 0, 0, 0, time.UTC).Unix(), + time.Date(2023, time.September, 25, 1, 0, 0, 0, time.UTC).Unix(), + 0xbbb, + "ignored-file-path-during-uploading", + ), + expectedStorage: "folder-2", + expectedFilePath: "bloom/second-period-19625/tenantA/metas/c8-12c-1695600000-1695603600-bbb", + }, + } + for name, data := range tests { + t.Run(name, func(t *testing.T) { + shipper := createShipper(t) + directory := shipper.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory + file := filepath.Join(directory, data.expectedFilePath) + err := os.MkdirAll(file[:strings.LastIndex(file, delimiter)], 0755) + require.NoError(t, err) + err = os.WriteFile(file, []byte("dummy content"), 0700) + require.NoError(t, err) + + err = shipper.DeleteMeta(context.Background(), data.source) + require.NoError(t, err) + + require.NoFileExists(t, file) + }) + } + +} + +func Test_BloomClient_GetBlocks(t *testing.T) { + shipper := createShipper(t) + fsNamedStores := shipper.storageConfig.NamedStores.Filesystem + firstBlockPath := "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1" + firstBlockFullPath := filepath.Join(fsNamedStores["folder-1"].Directory, firstBlockPath) + firstBlockData := createBlockFile(t, firstBlockFullPath) + secondBlockPath := "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2" + secondBlockFullPath := filepath.Join(fsNamedStores["folder-2"].Directory, secondBlockPath) + secondBlockData := createBlockFile(t, secondBlockFullPath) + require.FileExists(t, firstBlockFullPath) + require.FileExists(t, secondBlockFullPath) + + firstBlockRef := BlockRef{ + Ref: Ref{ + TenantID: "tenantA", + TableName: "first-period-19621", + MinFingerprint: 0xeeee, + MaxFingerprint: 0xffff, + StartTimestamp: time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), + EndTimestamp: time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + Checksum: 1, + }, + BlockPath: firstBlockPath, + } + secondBlockRef := BlockRef{ + Ref: Ref{ + TenantID: "tenantA", + TableName: "second-period-19624", + MinFingerprint: 0xaaaa, + MaxFingerprint: 0xbbbb, + StartTimestamp: time.Date(2023, time.September, 24, 5, 0, 0, 0, time.UTC).Unix(), + EndTimestamp: time.Date(2023, time.September, 24, 6, 0, 0, 0, time.UTC).Unix(), + Checksum: 2, + }, + BlockPath: secondBlockPath, + } + + blocksToDownload := []BlockRef{firstBlockRef, secondBlockRef} + + blocksCh, errorsCh := shipper.GetBlocks(context.Background(), blocksToDownload) + blocks := make(map[string]string) + func() { + timout := time.After(5 * time.Second) + for { + select { + case <-timout: + t.Fatalf("the test had to be completed before the timeout") + return + case err := <-errorsCh: + require.NoError(t, err) + case block, ok := <-blocksCh: + if !ok { + return + } + blockData, err := io.ReadAll(block.Data) + require.NoError(t, err) + blocks[block.BlockRef.BlockPath] = string(blockData) + + } + } + }() + + firstBlockActualData, exists := blocks[firstBlockRef.BlockPath] + require.Truef(t, exists, "data for the first block must be present in the results: %+v", blocks) + require.Equal(t, firstBlockData, firstBlockActualData) + + secondBlockActualData, exists := blocks[secondBlockRef.BlockPath] + require.True(t, exists, "data for the second block must be present in the results: %+v", blocks) + require.Equal(t, secondBlockData, secondBlockActualData) + + require.Len(t, blocks, 2) +} + +func Test_BloomClient_PutBlocks(t *testing.T) { + shipper := createShipper(t) + blockForFirstFolderData := "data1" + blockForFirstFolder := Block{ + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: "tenantA", + TableName: "first-period-19621", + MinFingerprint: 0xeeee, + MaxFingerprint: 0xffff, + StartTimestamp: time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), + EndTimestamp: time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + Checksum: 1, + }, + IndexPath: uuid.New().String(), + }, + Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForFirstFolderData))}, + } + + blockForSecondFolderData := "data2" + blockForSecondFolder := Block{ + BlockRef: BlockRef{ + Ref: Ref{ + TenantID: "tenantA", + TableName: "second-period-19624", + MinFingerprint: 0xaaaa, + MaxFingerprint: 0xbbbb, + StartTimestamp: time.Date(2023, time.September, 24, 5, 0, 0, 0, time.UTC).Unix(), + EndTimestamp: time.Date(2023, time.September, 24, 6, 0, 0, 0, time.UTC).Unix(), + Checksum: 2, + }, + IndexPath: uuid.New().String(), + }, + Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, + } + + results, err := shipper.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder}) + require.NoError(t, err) + require.Len(t, results, 2) + firstResultBlock := results[0] + path := firstResultBlock.BlockPath + require.Equal(t, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1", path) + require.Equal(t, blockForFirstFolder.TenantID, firstResultBlock.TenantID) + require.Equal(t, blockForFirstFolder.TableName, firstResultBlock.TableName) + require.Equal(t, blockForFirstFolder.MinFingerprint, firstResultBlock.MinFingerprint) + require.Equal(t, blockForFirstFolder.MaxFingerprint, firstResultBlock.MaxFingerprint) + require.Equal(t, blockForFirstFolder.StartTimestamp, firstResultBlock.StartTimestamp) + require.Equal(t, blockForFirstFolder.EndTimestamp, firstResultBlock.EndTimestamp) + require.Equal(t, blockForFirstFolder.Checksum, firstResultBlock.Checksum) + require.Equal(t, blockForFirstFolder.IndexPath, firstResultBlock.IndexPath) + folder1 := shipper.storageConfig.NamedStores.Filesystem["folder-1"].Directory + savedFilePath := filepath.Join(folder1, path) + require.FileExists(t, savedFilePath) + savedData, err := os.ReadFile(savedFilePath) + require.NoError(t, err) + require.Equal(t, blockForFirstFolderData, string(savedData)) + + secondResultBlock := results[1] + path = secondResultBlock.BlockPath + require.Equal(t, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2", path) + require.Equal(t, blockForSecondFolder.TenantID, secondResultBlock.TenantID) + require.Equal(t, blockForSecondFolder.TableName, secondResultBlock.TableName) + require.Equal(t, blockForSecondFolder.MinFingerprint, secondResultBlock.MinFingerprint) + require.Equal(t, blockForSecondFolder.MaxFingerprint, secondResultBlock.MaxFingerprint) + require.Equal(t, blockForSecondFolder.StartTimestamp, secondResultBlock.StartTimestamp) + require.Equal(t, blockForSecondFolder.EndTimestamp, secondResultBlock.EndTimestamp) + require.Equal(t, blockForSecondFolder.Checksum, secondResultBlock.Checksum) + require.Equal(t, blockForSecondFolder.IndexPath, secondResultBlock.IndexPath) + folder2 := shipper.storageConfig.NamedStores.Filesystem["folder-2"].Directory + + savedFilePath = filepath.Join(folder2, path) + require.FileExists(t, savedFilePath) + savedData, err = os.ReadFile(savedFilePath) + require.NoError(t, err) + require.Equal(t, blockForSecondFolderData, string(savedData)) +} + +func Test_BloomClient_DeleteBlocks(t *testing.T) { + shipper := createShipper(t) + fsNamedStores := shipper.storageConfig.NamedStores.Filesystem + block1Path := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1") + createBlockFile(t, block1Path) + block2Path := filepath.Join(fsNamedStores["folder-2"].Directory, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2") + createBlockFile(t, block2Path) + require.FileExists(t, block1Path) + require.FileExists(t, block2Path) + + blocksToDelete := []BlockRef{ + { + Ref: Ref{ + TenantID: "tenantA", + TableName: "second-period-19624", + MinFingerprint: 0xaaaa, + MaxFingerprint: 0xbbbb, + StartTimestamp: time.Date(2023, time.September, 24, 5, 0, 0, 0, time.UTC).Unix(), + EndTimestamp: time.Date(2023, time.September, 24, 6, 0, 0, 0, time.UTC).Unix(), + Checksum: 2, + }, + IndexPath: uuid.New().String(), + }, + { + Ref: Ref{ + TenantID: "tenantA", + TableName: "first-period-19621", + MinFingerprint: 0xeeee, + MaxFingerprint: 0xffff, + StartTimestamp: time.Date(2023, time.September, 21, 5, 0, 0, 0, time.UTC).Unix(), + EndTimestamp: time.Date(2023, time.September, 21, 6, 0, 0, 0, time.UTC).Unix(), + Checksum: 1, + }, + IndexPath: uuid.New().String(), + }, + } + err := shipper.DeleteBlocks(context.Background(), blocksToDelete) + require.NoError(t, err) + require.NoFileExists(t, block1Path) + require.NoFileExists(t, block2Path) +} + +func createBlockFile(t *testing.T, path string) string { + err := os.MkdirAll(path[:strings.LastIndex(path, "/")], 0755) + require.NoError(t, err) + fileContent := uuid.NewString() + err = os.WriteFile(path, []byte(fileContent), 0700) + require.NoError(t, err) + return fileContent +} + +func Test_TablesByPeriod(t *testing.T) { + configs := createPeriodConfigs() + firstPeriodFrom := configs[0].From + secondPeriodFrom := configs[1].From + tests := map[string]struct { + from, to model.Time + expectedTablesByPeriod map[config.DayTime][]string + }{ + "expected 1 table": { + from: model.TimeFromUnix(time.Date(2023, time.September, 20, 0, 0, 0, 0, time.UTC).Unix()), + to: model.TimeFromUnix(time.Date(2023, time.September, 20, 23, 59, 59, 0, time.UTC).Unix()), + expectedTablesByPeriod: map[config.DayTime][]string{ + firstPeriodFrom: {"first-period-19620"}}, + }, + "expected tables for both periods": { + from: model.TimeFromUnix(time.Date(2023, time.September, 21, 0, 0, 0, 0, time.UTC).Unix()), + to: model.TimeFromUnix(time.Date(2023, time.September, 25, 23, 59, 59, 0, time.UTC).Unix()), + expectedTablesByPeriod: map[config.DayTime][]string{ + firstPeriodFrom: {"first-period-19621", "first-period-19622", "first-period-19623"}, + secondPeriodFrom: {"second-period-19624", "second-period-19625"}, + }, + }, + "expected tables for the second period": { + from: model.TimeFromUnix(time.Date(2023, time.September, 24, 0, 0, 0, 0, time.UTC).Unix()), + to: model.TimeFromUnix(time.Date(2023, time.September, 25, 1, 0, 0, 0, time.UTC).Unix()), + expectedTablesByPeriod: map[config.DayTime][]string{ + secondPeriodFrom: {"second-period-19624", "second-period-19625"}, + }, + }, + "expected only one table from the second period": { + from: model.TimeFromUnix(time.Date(2023, time.September, 25, 0, 0, 0, 0, time.UTC).Unix()), + to: model.TimeFromUnix(time.Date(2023, time.September, 25, 1, 0, 0, 0, time.UTC).Unix()), + expectedTablesByPeriod: map[config.DayTime][]string{ + secondPeriodFrom: {"second-period-19625"}, + }, + }, + } + for name, data := range tests { + t.Run(name, func(t *testing.T) { + result := tablesByPeriod(configs, data.from, data.to) + for periodFrom, expectedTables := range data.expectedTablesByPeriod { + actualTables, exists := result[periodFrom] + require.Truef(t, exists, "tables for %s period must be provided but was not in the result: %+v", periodFrom.String(), result) + require.ElementsMatchf(t, expectedTables, actualTables, "tables mismatch for period %s", periodFrom.String()) + } + require.Len(t, result, len(data.expectedTablesByPeriod)) + }) + } +} + +func Test_createMetaRef(t *testing.T) { + tests := map[string]struct { + objectKey string + tenantID string + tableName string + + expectedRef MetaRef + expectedErr string + }{ + "ValidObjectKey": { + objectKey: "bloom/ignored-during-parsing-table-name/ignored-during-parsing-tenant-ID/metas/aaa-bbb-1234567890-9876543210-abcdef", + tenantID: "tenant1", + tableName: "table1", + expectedRef: MetaRef{ + Ref: Ref{ + TenantID: "tenant1", + TableName: "table1", + MinFingerprint: 0xaaa, + MaxFingerprint: 0xbbb, + StartTimestamp: 1234567890, + EndTimestamp: 9876543210, + Checksum: 0xabcdef, + }, + FilePath: "bloom/ignored-during-parsing-table-name/ignored-during-parsing-tenant-ID/metas/aaa-bbb-1234567890-9876543210-abcdef", + }, + }, + "InvalidObjectKeyDelimiterCount": { + objectKey: "invalid/key/with/too/many/objectKeyWithoutDelimiters", + tenantID: "tenant1", + tableName: "table1", + expectedRef: MetaRef{}, + expectedErr: "filename parts count must be 5 but was 1: [objectKeyWithoutDelimiters]", + }, + "InvalidMinFingerprint": { + objectKey: "invalid/folder/key/metas/zzz-bbb-123-9876543210-abcdef", + tenantID: "tenant1", + tableName: "table1", + expectedErr: "error parsing minFingerprint zzz", + }, + "InvalidMaxFingerprint": { + objectKey: "invalid/folder/key/metas/123-zzz-1234567890-9876543210-abcdef", + tenantID: "tenant1", + tableName: "table1", + expectedErr: "error parsing maxFingerprint zzz", + }, + "InvalidStartTimestamp": { + objectKey: "invalid/folder/key/metas/aaa-bbb-abc-9876543210-abcdef", + tenantID: "tenant1", + tableName: "table1", + expectedErr: "error parsing startTimestamp abc", + }, + "InvalidEndTimestamp": { + objectKey: "invalid/folder/key/metas/aaa-bbb-1234567890-xyz-abcdef", + tenantID: "tenant1", + tableName: "table1", + expectedErr: "error parsing endTimestamp xyz", + }, + "InvalidChecksum": { + objectKey: "invalid/folder/key/metas/aaa-bbb-1234567890-9876543210-ghijklm", + tenantID: "tenant1", + tableName: "table1", + expectedErr: "error parsing checksum ghijklm", + }, + } + for name, data := range tests { + t.Run(name, func(t *testing.T) { + actualRef, err := createMetaRef(data.objectKey, data.tenantID, data.tableName) + if data.expectedErr != "" { + require.ErrorContains(t, err, data.expectedErr) + return + } + require.NoError(t, err) + require.Equal(t, data.expectedRef, actualRef) + }) + } +} + +func createShipper(t *testing.T) *BloomClient { + periodicConfigs := createPeriodConfigs() + namedStores := storage.NamedStores{ + Filesystem: map[string]storage.NamedFSConfig{ + "folder-1": {Directory: t.TempDir()}, + "folder-2": {Directory: t.TempDir()}, + }} + //required to populate StoreType map in named config + require.NoError(t, namedStores.Validate()) + storageConfig := storage.Config{NamedStores: namedStores} + + metrics := storage.NewClientMetrics() + t.Cleanup(metrics.Unregister) + bshipper, err := NewBloomClient(periodicConfigs, storageConfig, metrics) + require.NoError(t, err) + return bshipper +} + +func createPeriodConfigs() []config.PeriodConfig { + periodicConfigs := []config.PeriodConfig{ + { + ObjectType: "folder-1", + // from 2023-09-20: table range [19620:19623] + From: config.DayTime{Time: model.TimeFromUnix(time.Date(2023, time.September, 20, 0, 0, 0, 0, time.UTC).Unix())}, + IndexTables: config.PeriodicTableConfig{ + Period: day, + Prefix: "first-period-", + }, + }, + { + ObjectType: "folder-2", + // from 2023-09-24: table range [19624:19627] + From: config.DayTime{Time: model.TimeFromUnix(time.Date(2023, time.September, 24, 0, 0, 0, 0, time.UTC).Unix())}, + IndexTables: config.PeriodicTableConfig{ + Period: day, + Prefix: "second-period-", + }, + }, + } + return periodicConfigs +} + +func createMetaInStorage(t *testing.T, folder string, tableName string, tenant string, minFingerprint uint64, maxFingerprint uint64, start model.Time) Meta { + startTimestamp := start.Unix() + endTimestamp := start.Add(12 * time.Hour).Unix() + + metaChecksum := rand.Uint32() + metaFileName := fmt.Sprintf("%x-%x-%v-%v-%x", minFingerprint, maxFingerprint, startTimestamp, endTimestamp, metaChecksum) + metaFilePath := filepath.Join(rootFolder, tableName, tenant, metasFolder, metaFileName) + err := os.MkdirAll(filepath.Join(folder, metaFilePath[:strings.LastIndex(metaFilePath, delimiter)]), 0700) + require.NoError(t, err) + meta := createMetaEntity(tenant, tableName, minFingerprint, maxFingerprint, startTimestamp, endTimestamp, metaChecksum, metaFilePath) + + metaFileContent, err := json.Marshal(meta) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(folder, metaFilePath), metaFileContent, 0644) + require.NoError(t, err) + return meta +} + +func createMetaEntity( + tenant string, + tableName string, + minFingerprint uint64, + maxFingerprint uint64, + startTimestamp int64, + endTimestamp int64, + metaChecksum uint32, + metaFilePath string) Meta { + return Meta{ + MetaRef: MetaRef{ + Ref: Ref{ + TenantID: tenant, + TableName: tableName, + MinFingerprint: minFingerprint, + MaxFingerprint: maxFingerprint, + StartTimestamp: startTimestamp, + EndTimestamp: endTimestamp, + Checksum: metaChecksum, + }, + FilePath: metaFilePath, + }, + Tombstones: []BlockRef{ + { + Ref: Ref{ + TenantID: tenant, + Checksum: metaChecksum + 1, + MinFingerprint: minFingerprint, + MaxFingerprint: maxFingerprint, + StartTimestamp: startTimestamp, + EndTimestamp: endTimestamp, + }, + IndexPath: uuid.New().String(), + BlockPath: uuid.New().String(), + }, + }, + Blocks: []BlockRef{ + { + Ref: Ref{ + TenantID: tenant, + Checksum: metaChecksum + 2, + MinFingerprint: minFingerprint, + MaxFingerprint: maxFingerprint, + StartTimestamp: startTimestamp, + EndTimestamp: endTimestamp, + }, + IndexPath: uuid.New().String(), + BlockPath: uuid.New().String(), + }, + }, + } +}