diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 12bccc31b63b9..9084fc41db3cd 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "storage", srcs = [ "azblob.go", + "batch.go", "compress.go", "flags.go", "gcs.go", @@ -27,6 +28,7 @@ go_library( deps = [ "//br/pkg/errors", "//br/pkg/logutil", + "//br/pkg/utils/iter", "//pkg/lightning/log", "//pkg/sessionctx/variable", "//pkg/util", @@ -76,6 +78,7 @@ go_library( "@org_golang_x_net//http2", "@org_golang_x_oauth2//google", "@org_uber_go_atomic//:atomic", + "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", ], ) @@ -85,6 +88,7 @@ go_test( timeout = "short", srcs = [ "azblob_test.go", + "batch_test.go", "compress_test.go", "gcs_test.go", "local_test.go", diff --git a/br/pkg/storage/batch.go b/br/pkg/storage/batch.go new file mode 100644 index 0000000000000..76e3a40f8a65d --- /dev/null +++ b/br/pkg/storage/batch.go @@ -0,0 +1,165 @@ +package storage + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "path" + "sync" + + "github.com/pingcap/errors" + berrors "github.com/pingcap/tidb/br/pkg/errors" + "go.uber.org/multierr" +) + +// Effect is an side effect that happens in the batch storage. +type Effect any + +// EffPut is the side effect of a call to `WriteFile`. +type EffPut struct { + File string `json:"file"` + Content []byte `json:"content"` +} + +// EffDeleteFiles is the side effect of a call to `DeleteFiles`. +type EffDeleteFiles struct { + Files []string `json:"files"` +} + +// EffDeleteFile is the side effect of a call to `DeleteFile`. +type EffDeleteFile string + +// EffRename is the side effect of a call to `Rename`. +type EffRename struct { + From string `json:"from"` + To string `json:"to"` +} + +// JSONEffects converts a slices of effects into json. +// The json will be a tagged union: `{"type": $go_type_name, "effect": $effect}` +func JSONEffects(es []Effect, output io.Writer) error { + type Typed struct { + Type string `json:"type"` + Eff Effect `json:"effect"` + } + + out := make([]Typed, 0, len(es)) + for _, eff := range es { + out = append(out, Typed{ + Type: fmt.Sprintf("%T", eff), + Eff: eff, + }) + } + + return json.NewEncoder(output).Encode(out) +} + +func SaveJSONEffectsToTmp(es []Effect) (string, error) { + // Save the json to a subdir so user can redirect the output path by symlinking... + tmp, err := os.CreateTemp(path.Join(os.TempDir(), "tidb_br"), "br-effects-*.json") + if err != nil { + return "", err + } + if err := JSONEffects(es, tmp); err != nil { + return "", err + } + return tmp.Name(), nil +} + +// Batched is a wrapper of an external storage that suspends all write operations ("effects"). +// If `Close()` without calling `Commit()`, nothing will happen in the underlying external storage. +// In that case, we have done a "dry run". +// +// You may use `ReadOnlyEffects()` to get the history of the effects. +// But don't modify the returned slice! +// +// You may use `Commit()` to execute all suspended effects. +type Batched struct { + ExternalStorage + effectsMu sync.Mutex + // It will be one of: + // EffPut, EffDeleteFiles, EffDeleteFile, EffRename + effects []Effect +} + +// Batch wraps an external storage instance to a batched version. +func Batch(s ExternalStorage) *Batched { + return &Batched{ExternalStorage: s} +} + +// Fetch all effects from the batched storage. +// +// **The returned slice should not be modified.** +func (d *Batched) ReadOnlyEffects() []Effect { + d.effectsMu.Lock() + defer d.effectsMu.Unlock() + return d.effects +} + +// CleanEffects cleans all suspended effects. +func (d *Batched) CleanEffects() { + d.effectsMu.Lock() + defer d.effectsMu.Unlock() + d.effects = nil +} + +func (d *Batched) DeleteFiles(ctx context.Context, names []string) error { + d.effectsMu.Lock() + defer d.effectsMu.Unlock() + d.effects = append(d.effects, EffDeleteFiles{Files: names}) + return nil +} + +func (d *Batched) DeleteFile(ctx context.Context, name string) error { + d.effectsMu.Lock() + defer d.effectsMu.Unlock() + d.effects = append(d.effects, EffDeleteFile(name)) + return nil +} + +func (d *Batched) WriteFile(ctx context.Context, name string, data []byte) error { + d.effectsMu.Lock() + defer d.effectsMu.Unlock() + d.effects = append(d.effects, EffPut{File: name, Content: data}) + return nil +} + +func (d *Batched) Rename(ctx context.Context, oldName, newName string) error { + d.effectsMu.Lock() + defer d.effectsMu.Unlock() + d.effects = append(d.effects, EffRename{From: oldName, To: newName}) + return nil +} + +func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) { + return nil, errors.Annotatef(berrors.ErrStorageUnknown, "ExternalStorage.Create isn't allowed in batch mode for now.") +} + +// Commit performs all effects recorded so long in the REAL external storage. +// This will cleanup all of the suspended effects. +func (d *Batched) Commit(ctx context.Context) error { + d.effectsMu.Lock() + defer d.effectsMu.Unlock() + + var err error + for _, eff := range d.effects { + switch e := eff.(type) { + case EffPut: + err = multierr.Combine(d.ExternalStorage.WriteFile(ctx, e.File, e.Content), err) + case EffDeleteFiles: + err = multierr.Combine(d.ExternalStorage.DeleteFiles(ctx, e.Files), err) + case EffDeleteFile: + err = multierr.Combine(d.ExternalStorage.DeleteFile(ctx, string(e)), err) + case EffRename: + err = multierr.Combine(d.ExternalStorage.Rename(ctx, e.From, e.To), err) + default: + return errors.Annotatef(berrors.ErrStorageUnknown, "Unknown effect type %T", eff) + } + } + + d.effects = nil + + return nil +} diff --git a/br/pkg/storage/batch_test.go b/br/pkg/storage/batch_test.go new file mode 100644 index 0000000000000..967e28dc4d4b7 --- /dev/null +++ b/br/pkg/storage/batch_test.go @@ -0,0 +1,108 @@ +package storage_test + +import ( + "context" + "io" + "os" + "testing" + + . "github.com/pingcap/tidb/br/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestBatched(t *testing.T) { + ctx := context.Background() + bat := Batch(nil) // Passing nil as we don't need actual storage operations + + // Test operations + operations := []struct { + name string + op func() error + expected []Effect + }{ + { + name: "DeleteFiles", + op: func() error { + return bat.DeleteFiles(ctx, []string{"file1.txt", "file2.txt"}) + }, + expected: []Effect{EffDeleteFiles{Files: []string{"file1.txt", "file2.txt"}}}, + }, + { + name: "DeleteFile", + op: func() error { + return bat.DeleteFile(ctx, "file3.txt") + }, + expected: []Effect{EffDeleteFile("file3.txt")}, + }, + { + name: "WriteFile", + op: func() error { + return bat.WriteFile(ctx, "file4.txt", []byte("content")) + }, + expected: []Effect{EffPut{File: "file4.txt", Content: []byte("content")}}, + }, + { + name: "Rename", + op: func() error { + return bat.Rename(ctx, "oldName.txt", "newName.txt") + }, + expected: []Effect{EffRename{From: "oldName.txt", To: "newName.txt"}}, + }, + { + name: "SequenceOfOperations", + op: func() error { + if err := bat.DeleteFile(ctx, "file5.txt"); err != nil { + return err + } + if err := bat.WriteFile(ctx, "file6.txt", []byte("new content")); err != nil { + return err + } + return bat.Rename(ctx, "file6.txt", "fileRenamed.txt") + }, + expected: []Effect{ + EffDeleteFile("file5.txt"), + EffPut{File: "file6.txt", Content: []byte("new content")}, + EffRename{From: "file6.txt", To: "fileRenamed.txt"}, + }}, + } + + for _, op := range operations { + t.Run(op.name, func(t *testing.T) { + require.NoError(t, op.op()) + + effects := bat.ReadOnlyEffects() + require.Equal(t, len(op.expected), len(effects)) + for i, effect := range effects { + require.Equal(t, op.expected[i], effect) + } + + // Reset effects for the next test + bat.CleanEffects() + }) + } +} + +func TestJSONEffects(t *testing.T) { + effects := []Effect{ + EffPut{File: "example.txt", Content: []byte("Hello, world")}, + EffDeleteFiles{Files: []string{"old_file.txt", "temp.txt"}}, + EffDeleteFile("obsolete.txt"), + EffRename{From: "old_name.txt", To: "new_name.txt"}, + } + + tmp, err := SaveJSONEffectsToTmp(effects) + require.NoError(t, err) + f, err := os.Open(tmp) + require.NoError(t, err) + buf, err := io.ReadAll(f) + require.NoError(t, err) + + expectedJSON := `[ + {"type":"storage.EffPut","effect":{"file":"example.txt","content":"SGVsbG8sIHdvcmxk"}}, + {"type":"storage.EffDeleteFiles","effect":{"files":["old_file.txt","temp.txt"]}}, + {"type":"storage.EffDeleteFile","effect":"obsolete.txt"}, + {"type":"storage.EffRename","effect":{"from":"old_name.txt","to":"new_name.txt"}} + ]` + + require.JSONEq(t, expectedJSON, string(buf), "Output JSON should match expected JSON") +} diff --git a/br/pkg/storage/helper.go b/br/pkg/storage/helper.go index ae88ec9ae9d6c..c0c5c63ba0747 100644 --- a/br/pkg/storage/helper.go +++ b/br/pkg/storage/helper.go @@ -6,6 +6,8 @@ import ( "context" "sync/atomic" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/sessionctx/variable" ) @@ -37,3 +39,50 @@ var activeUploadWorkerCnt atomic.Int64 func GetActiveUploadWorkerCount() int64 { return activeUploadWorkerCnt.Load() } + +// UnmarshalDir iterates over a prefix, then "unmarshal" the content of each file it met with the unmarshal function. +// Returning an iterator that yields the unmarshaled content. +// The "unmarshal" function should put the result of unmarshalling to the `target` argument. +func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalStorage, unmarshal func(target *T, name string, content []byte) error) iter.TryNextor[*T] { + ch := make(chan *T) + errCh := make(chan error, 1) + reader := func() { + defer close(ch) + err := s.WalkDir(ctx, walkOpt, func(path string, size int64) error { + metaBytes, err := s.ReadFile(ctx, path) + if err != nil { + return errors.Annotatef(err, "failed during reading file %s", path) + } + var meta T + if err := unmarshal(&meta, path, metaBytes); err != nil { + return errors.Annotatef(err, "failed to parse subcompaction meta of file %s", path) + } + select { + case ch <- &meta: + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + if err != nil { + select { + case errCh <- err: + case <-ctx.Done(): + } + } + } + go reader() + return iter.Func(func(ctx context.Context) iter.IterResult[*T] { + select { + case <-ctx.Done(): + return iter.Throw[*T](ctx.Err()) + case err := <-errCh: + return iter.Throw[*T](err) + case meta, ok := <-ch: + if !ok { + return iter.Done[*T]() + } + return iter.Emit(meta) + } + }) +} diff --git a/br/pkg/storage/local.go b/br/pkg/storage/local.go index bfc0fc9907626..2d9deb076e06d 100644 --- a/br/pkg/storage/local.go +++ b/br/pkg/storage/local.go @@ -5,13 +5,16 @@ package storage import ( "bufio" "context" + stderrors "errors" "io" "os" "path/filepath" "strings" + "syscall" "github.com/google/uuid" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "go.uber.org/zap" ) @@ -28,12 +31,30 @@ const ( // export for using in tests. type LocalStorage struct { base string + // Whether ignoring ENOINT while deleting. + // Don't fail when deleting an unexist file is more like + // a normal ExternalStorage implementation does. + IgnoreEnoentForDelete bool +} + +// Base returns the base dir used by this local storage. +func (l *LocalStorage) Base() string { + return l.base } // DeleteFile deletes the file. func (l *LocalStorage) DeleteFile(_ context.Context, name string) error { + failpoint.Inject("local_delete_file_err", func(v failpoint.Value) { + failpoint.Return(errors.New(v.(string))) + }) path := filepath.Join(l.base, name) - return os.Remove(path) + err := os.Remove(path) + if err != nil && + l.IgnoreEnoentForDelete && + stderrors.Is(err, syscall.ENOENT) { + return nil + } + return err } // DeleteFiles deletes the files. @@ -49,6 +70,10 @@ func (l *LocalStorage) DeleteFiles(ctx context.Context, names []string) error { // WriteFile writes data to a file to storage. func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) error { + failpoint.Inject("local_write_file_err", func(v failpoint.Value) { + failpoint.Return(errors.New(v.(string))) + }) + // because `os.WriteFile` is not atomic, directly write into it may reset the file // to an empty file if write is not finished. tmpPath := filepath.Join(l.base, name) + ".tmp." + uuid.NewString() diff --git a/br/pkg/stream/BUILD.bazel b/br/pkg/stream/BUILD.bazel index f2a38a769c45d..ecc38a9ba24fb 100644 --- a/br/pkg/stream/BUILD.bazel +++ b/br/pkg/stream/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//br/pkg/restore/tiflashrec", "//br/pkg/storage", "//br/pkg/streamhelper", + "//br/pkg/utils/iter", "//pkg/ddl", "//pkg/kv", "//pkg/meta", @@ -61,7 +62,7 @@ go_test( ], embed = [":stream"], flaky = True, - shard_count = 38, + shard_count = 43, deps = [ "//br/pkg/storage", "//br/pkg/streamhelper", @@ -78,10 +79,12 @@ go_test( "//pkg/util/table-filter", "@com_github_fsouza_fake_gcs_server//fakestorage", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//oracle", + "@org_golang_x_exp//maps", "@org_uber_go_zap//:zap", ], ) diff --git a/br/pkg/stream/stream_metas.go b/br/pkg/stream/stream_metas.go index c86921cc15bdd..6dd87eb36d6a3 100644 --- a/br/pkg/stream/stream_metas.go +++ b/br/pkg/stream/stream_metas.go @@ -1,26 +1,42 @@ -// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0. +// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0. package stream import ( "context" + "encoding/binary" + "fmt" + "hash/crc64" "math" + "path" + "slices" + "sort" "strconv" + "strings" "sync" + "github.com/docker/go-units" + "github.com/fatih/color" "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" + pb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/mathutil" + "go.uber.org/multierr" "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) -const notDeletedBecameFatalThreshold = 128 +const ( + baseMigrationSN = 0 + baseMigrationName = "BASE" + baseTmp = "BASE_TMP" + migrationPrefix = "v1/migrations" +) type StreamMetadataSet struct { // if set true, the metadata and datafile won't be removed @@ -35,7 +51,7 @@ type StreamMetadataSet struct { Helper *MetadataHelper // for test - BeforeDoWriteBack func(path string, replaced *backuppb.Metadata) (skip bool) + BeforeDoWriteBack func(path string, replaced *pb.Metadata) (skip bool) } // keep these meta-information for statistics and filtering @@ -156,155 +172,56 @@ func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *Fi }) } +type updateFnHook struct { + NoHooks + updateFn func(num int64) +} + +func (hook updateFnHook) DeletedAFileForTruncating(count int) { + hook.updateFn(int64(count)) +} + // RemoveDataFilesAndUpdateMetadataInBatch concurrently remove datafilegroups and update metadata. // Only one metadata is processed in each thread, including deleting its datafilegroup and updating it. // Returns the not deleted datafilegroups. func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch( ctx context.Context, from uint64, - storage storage.ExternalStorage, + st storage.ExternalStorage, + // num = deleted files updateFn func(num int64), ) ([]string, error) { - var notDeleted struct { - item []string - sync.Mutex - } - worker := util.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files") - eg, cx := errgroup.WithContext(ctx) - for path, metaInfo := range ms.metadataInfos { - path := path - minTS := metaInfo.MinTS - // It's safety to remove the item within a range loop - delete(ms.metadataInfos, path) - if minTS >= from { - // That means all the datafiles wouldn't be removed, - // so that the metadata is skipped. - continue - } - worker.ApplyOnErrorGroup(eg, func() error { - if cx.Err() != nil { - return cx.Err() - } - - data, err := storage.ReadFile(ctx, path) - if err != nil { - return err - } - - meta, err := ms.Helper.ParseToMetadataHard(data) - if err != nil { - return err - } - - num, notDeletedItems, err := ms.removeDataFilesAndUpdateMetadata(ctx, storage, from, meta, path) - if err != nil { - return err - } - - updateFn(num) + hst := ms.hook(st) + est := MigerationExtension(hst) + est.Hooks = updateFnHook{updateFn: updateFn} + res := MigratedTo{NewBase: new(pb.Migration)} + est.doTruncateLogs(ctx, ms, from, &res) - notDeleted.Lock() - notDeleted.item = append(notDeleted.item, notDeletedItems...) - notDeleted.Unlock() - return nil - }) - } - - if err := eg.Wait(); err != nil { - return nil, errors.Trace(err) - } - - return notDeleted.item, nil -} - -// removeDataFilesAndUpdateMetadata removes some datafilegroups of the metadata, if their max-ts is less than `from` -func (ms *StreamMetadataSet) removeDataFilesAndUpdateMetadata( - ctx context.Context, - storage storage.ExternalStorage, - from uint64, - meta *backuppb.Metadata, - metaPath string, -) (num int64, notDeleted []string, err error) { - removed := make([]*backuppb.DataFileGroup, 0) - remainedDataFiles := make([]*backuppb.DataFileGroup, 0) - notDeleted = make([]string, 0) - // can we assume those files are sorted to avoid traversing here? (by what?) - for _, ds := range meta.FileGroups { - if ds.MaxTs < from { - removed = append(removed, ds) + if bst, ok := hst.ExternalStorage.(*storage.Batched); ok { + effs, err := storage.SaveJSONEffectsToTmp(bst.ReadOnlyEffects()) + if err != nil { + log.Warn("failed to save effects", logutil.ShortError(err)) } else { - // That means some kvs in the datafilegroup shouldn't be removed, - // so it will be kept out being removed. - remainedDataFiles = append(remainedDataFiles, ds) + log.Info("effects are saved, you may check them then.", zap.String("path", effs)) } } - num = int64(len(removed)) - - if ms.DryRun { - log.Info("dry run, skip deletion ...") - return num, notDeleted, nil - } - - // remove data file groups - for _, f := range removed { - log.Info("Deleting file", zap.String("path", f.Path)) - if err := storage.DeleteFile(ctx, f.Path); err != nil { - log.Warn("File not deleted.", zap.String("path", f.Path), logutil.ShortError(err)) - notDeleted = append(notDeleted, f.Path) - if len(notDeleted) > notDeletedBecameFatalThreshold { - return num, notDeleted, errors.Annotatef(berrors.ErrPiTRMalformedMetadata, "too many failure when truncating") - } - } - } - - // update metadata - if len(remainedDataFiles) != len(meta.FileGroups) { - // rewrite metadata - log.Info("Updating metadata.", zap.String("file", metaPath), - zap.Int("data-file-before", len(meta.FileGroups)), - zap.Int("data-file-after", len(remainedDataFiles))) - - // replace the filegroups and update the ts of the replaced metadata - ReplaceMetadata(meta, remainedDataFiles) - - if ms.BeforeDoWriteBack != nil && ms.BeforeDoWriteBack(metaPath, meta) { - log.Info("Skipped writeback meta by the hook.", zap.String("meta", metaPath)) - return num, notDeleted, nil - } - - if err := ms.doWriteBackForFile(ctx, storage, metaPath, meta); err != nil { - // NOTE: Maybe we'd better roll back all writebacks? (What will happen if roll back fails too?) - return num, notDeleted, errors.Annotatef(err, "failed to write back file %s", metaPath) - } + notDeleted := []string{} + for _, me := range res.NewBase.EditMeta { + notDeleted = append(notDeleted, me.DeletePhysicalFiles...) } - return num, notDeleted, nil -} - -func (ms *StreamMetadataSet) doWriteBackForFile( - ctx context.Context, - s storage.ExternalStorage, - path string, - meta *backuppb.Metadata, -) error { - // If the metadata file contains no data file, remove it due to it is meanless. - if len(meta.FileGroups) == 0 { - if err := s.DeleteFile(ctx, path); err != nil { - return errors.Annotatef(err, "failed to remove the empty meta %s", path) - } - return nil + // Hacking: if we started to delete some files, we must enter the `cleanUp` phase, + // then, all warnings should be `cannot delete file`. + if len(res.Warnings) > 0 && len(notDeleted) == 0 { + return nil, multierr.Combine(res.Warnings...) } - bs, err := ms.Helper.Marshal(meta) - if err != nil { - return errors.Annotatef(err, "failed to marshal the file %s", path) - } - return truncateAndWrite(ctx, s, path, bs) + return notDeleted, nil } func truncateAndWrite(ctx context.Context, s storage.ExternalStorage, path string, data []byte) error { - // Performance hack: the `Write` implemention would truncate the file if it exists. + // Performance hack: the `Write` implementation would truncate the file if it exists. if err := s.WriteFile(ctx, path, data); err != nil { return errors.Annotatef(err, "failed to save the file %s to %s", path, s.URI()) } @@ -355,7 +272,7 @@ func SetTSToFile( return truncateAndWrite(ctx, s, filename, []byte(content)) } -func UpdateShiftTS(m *backuppb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool) { +func UpdateShiftTS(m *pb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool) { var ( minBeginTS uint64 isExist bool @@ -381,11 +298,7 @@ func UpdateShiftTS(m *backuppb.Metadata, startTS uint64, restoreTS uint64) (uint return minBeginTS, isExist } -// replace the filegroups and update the ts of the replaced metadata -func ReplaceMetadata(meta *backuppb.Metadata, filegroups []*backuppb.DataFileGroup) { - // replace the origin metadata - meta.FileGroups = filegroups - +func updateMetadataInternalStat(meta *pb.Metadata) { if len(meta.FileGroups) == 0 { meta.MinTs = 0 meta.MaxTs = 0 @@ -408,3 +321,947 @@ func ReplaceMetadata(meta *backuppb.Metadata, filegroups []*backuppb.DataFileGro } } } + +// replace the filegroups and update the ts of the replaced metadata +func ReplaceMetadata(meta *pb.Metadata, filegroups []*pb.DataFileGroup) { + // replace the origin metadata + meta.FileGroups = filegroups + + updateMetadataInternalStat(meta) +} + +func AddMigrationToTable(m *pb.Migration, table *glue.Table) { + rd := color.New(color.FgHiRed).Sprint + for i, c := range m.Compactions { + addCompactionToTable(c, table, i) + } + + if len(m.EditMeta) > 0 { + totalDeletePhyFile := 0 + totalDeleteLgcFile := 0 + for _, edit := range m.EditMeta { + totalDeletePhyFile += len(edit.DeletePhysicalFiles) + for _, dl := range edit.DeleteLogicalFiles { + totalDeleteLgcFile += len(dl.Spans) + } + } + table.Add( + "edit-meta-files", + fmt.Sprintf("%s meta files will be edited.", rd(len(m.EditMeta))), + ) + table.Add( + "delete-physical-file", + fmt.Sprintf("%s physical files will be deleted.", rd(totalDeletePhyFile)), + ) + table.Add( + "delete-logical-file", + fmt.Sprintf("%s logical segments may be deleted, if possible.", rd(totalDeleteLgcFile)), + ) + } + for i, c := range m.DestructPrefix { + table.Add(fmt.Sprintf("destruct-prefix[%02d]", i), rd(c)) + } + table.Add("truncate-to", rd(m.TruncatedTo)) +} + +func addCompactionToTable(m *pb.LogFileCompaction, table *glue.Table, idx int) { + withIdx := func(s string) string { return fmt.Sprintf("compactions[%d].%s", idx, s) } + table.Add(withIdx("name"), m.Name) + table.Add(withIdx("time"), fmt.Sprintf("%d ~ %d", m.CompactionFromTs, m.CompactionUntilTs)) + table.Add(withIdx("file"), fmt.Sprintf("[%q, %q]", m.Artifacts, m.GeneratedFiles)) +} + +// MigrationExt is an extension to the `ExternalStorage` type. +// This added some support methods for the "migration" system of log backup. +// +// Migrations are idempontent batch modifications (adding a compaction, delete a file, etc..) to the backup files. +// You may check the protocol buffer message `Migration` for more details. +// Idempontence is important for migrations, as they may be executed multi times due to retry or racing. +// +// The encoded migrations will be put in a folder in the external storage, +// they are ordered by a series number. +// +// Not all migrations can be applied to the storage then removed from the migration. +// Small "additions" will be inlined into the migration, for example, a `Compaction`. +// Small "deletions" will also be delayed, for example, deleting a span in a file. +// Such operations will be save to a special migration, the first migration, named "BASE". +// +// A simple list of migrations (loaded by `Load()`): +/* +base = [ compaction, deleteSpan, ... ], +layers = { + { sn = 1, content = [ compaction, ... ] }, + { sn = 2, content = [ compaction, deleteFiles, ... ] }, +*/ +type MigrationExt struct { + s storage.ExternalStorage + prefix string + // The hooks used for tracking the execution. + // See the `Hooks` type for more details. + Hooks Hooks +} + +type Hooks interface { + StartLoadingMetaForTruncating() + EndLoadingMetaForTruncating() + + StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64) + DeletedAFileForTruncating(count int) + DeletedAllFilesForTruncating() + + StartHandlingMetaEdits([]*pb.MetaEdit) + HandledAMetaEdit(*pb.MetaEdit) + HandingMetaEditDone() +} + +func NewProgressBarHooks(console glue.ConsoleOperations) *ProgressBarHooks { + return &ProgressBarHooks{console: console} +} + +type ProgressBarHooks struct { + console glue.ConsoleOperations + + readMetaDone func() + deletingForTruncateProg glue.ProgressWaiter + handlingMetaEditProg glue.ProgressWaiter +} + +func (p *ProgressBarHooks) StartLoadingMetaForTruncating() { + log.Info("Start reading metadata.") + p.readMetaDone = p.console.ShowTask("Reading Metadata... ", glue.WithTimeCost()) +} + +func (p *ProgressBarHooks) EndLoadingMetaForTruncating() { + if p.readMetaDone != nil { + p.readMetaDone() + } +} + +func (p *ProgressBarHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64) { + var ( + fileCount int = 0 + kvCount int64 = 0 + totalSize uint64 = 0 + ) + + meta.IterateFilesFullyBefore(shiftTS, func(d *FileGroupInfo) (shouldBreak bool) { + fileCount++ + totalSize += d.Length + kvCount += d.KVCount + return + }) + + p.deletingForTruncateProg = p.console.StartProgressBar( + "Clearing Data Files and Metadata", fileCount, + glue.WithTimeCost(), + glue.WithConstExtraField("kv-count", kvCount), + glue.WithConstExtraField("kv-size", fmt.Sprintf("%d(%s)", totalSize, units.HumanSize(float64(totalSize)))), + ) +} + +func (p *ProgressBarHooks) DeletedAFileForTruncating(count int) { + if p.deletingForTruncateProg != nil { + p.deletingForTruncateProg.IncBy(int64(count)) + } +} + +func (p *ProgressBarHooks) DeletedAllFilesForTruncating() { + if p.deletingForTruncateProg != nil { + p.deletingForTruncateProg.Close() + } +} + +func (p *ProgressBarHooks) StartHandlingMetaEdits(edits []*pb.MetaEdit) { + p.handlingMetaEditProg = p.console.StartProgressBar( + "Applying Meta Edits", len(edits), + glue.WithTimeCost(), + glue.WithConstExtraField("meta-edits", len(edits)), + ) +} + +func (p *ProgressBarHooks) HandledAMetaEdit(edit *pb.MetaEdit) { + if p.handlingMetaEditProg != nil { + p.handlingMetaEditProg.Inc() + } +} + +func (p *ProgressBarHooks) HandingMetaEditDone() { + if p.handlingMetaEditProg != nil { + p.handlingMetaEditProg.Close() + } +} + +// NoHooks is used for non-interactive secnarios. +type NoHooks struct{} + +func (NoHooks) StartLoadingMetaForTruncating() {} +func (NoHooks) EndLoadingMetaForTruncating() {} +func (NoHooks) StartDeletingForTruncating(meta *StreamMetadataSet, shiftTS uint64) {} +func (NoHooks) DeletedAFileForTruncating(count int) {} +func (NoHooks) DeletedAllFilesForTruncating() {} +func (NoHooks) StartHandlingMetaEdits([]*pb.MetaEdit) {} +func (NoHooks) HandledAMetaEdit(*pb.MetaEdit) {} +func (NoHooks) HandingMetaEditDone() {} + +// MigrateionExtnsion installs the extension methods to an `ExternalStorage`. +func MigerationExtension(s storage.ExternalStorage) MigrationExt { + return MigrationExt{ + s: s, + prefix: migrationPrefix, + Hooks: NoHooks{}, + } +} + +// Merge merges two migrations. +// The merged migration contains all operations from the two arguments. +func MergeMigrations(m1 *pb.Migration, m2 *pb.Migration) *pb.Migration { + out := new(pb.Migration) + out.EditMeta = mergeMetaEdits(m1.EditMeta, m2.EditMeta) + out.Compactions = append(out.Compactions, m1.Compactions...) + out.Compactions = append(out.Compactions, m2.Compactions...) + out.TruncatedTo = mathutil.Max(m1.TruncatedTo, m2.TruncatedTo) + out.DestructPrefix = append(out.DestructPrefix, m1.DestructPrefix...) + out.DestructPrefix = append(out.DestructPrefix, m2.DestructPrefix...) + return out +} + +// MergeAndMigratedTo is the result of a call to `MergeAndMigrateTo`. +type MergeAndMigratedTo struct { + MigratedTo + // The BASE migration of this "migrate to" operation. + Base *pb.Migration + // The migrations have been merged to the BASE migration. + Source []*OrderedMigration +} + +// MigratedTo is the result of trying to "migrate to" a migration. +// +// The term "migrate to" means, try to performance all possible operations +// from a migration to the storage. +type MigratedTo struct { + // Errors happen during executing the migration. + Warnings []error + // The new BASE migration after the operation. + NewBase *pb.Migration +} + +// Migrations represents living migrations from the storage. +type Migrations struct { + // The BASE migration. + Base *pb.Migration `json:"base"` + // Appended migrations. + // They are sorted by their sequence numbers. + Layers []*OrderedMigration `json:"layers"` +} + +// OrderedMigration is a migration with its path and sequence number. +type OrderedMigration struct { + SeqNum int `json:"seq_num"` + Path string `json:"path"` + Content pb.Migration `json:"content"` +} + +// Load loads the current living migrations from the storage. +func (m MigrationExt) Load(ctx context.Context) (Migrations, error) { + opt := &storage.WalkOption{ + SubDir: m.prefix, + } + items := storage.UnmarshalDir(ctx, opt, m.s, func(t *OrderedMigration, name string, b []byte) error { + t.Path = name + var err error + t.SeqNum, err = migIdOf(path.Base(name)) + if err != nil { + return errors.Trace(err) + } + if t.SeqNum == baseMigrationSN { + // NOTE: the legacy truncating isn't implemented by appending a migration. + // We load their checkpoint here to be compatible with them. + // Then we can know a truncation happens so we are safe to remove stale compactions. + truncatedTs, err := GetTSFromFile(ctx, m.s, TruncateSafePointFileName) + if err != nil { + return errors.Annotate(err, "failed to get the truncate safepoint for base migration") + } + t.Content.TruncatedTo = mathutil.Max(truncatedTs, t.Content.TruncatedTo) + } + return t.Content.Unmarshal(b) + }) + collected := iter.CollectAll(ctx, items) + if collected.Err != nil { + return Migrations{}, collected.Err + } + sort.Slice(collected.Item, func(i, j int) bool { + return collected.Item[i].SeqNum < collected.Item[j].SeqNum + }) + + var result Migrations + if len(collected.Item) > 0 && collected.Item[0].SeqNum == baseMigrationSN { + result = Migrations{ + Base: &collected.Item[0].Content, + Layers: collected.Item[1:], + } + } else { + // The BASE migration isn't persisted. + // This happens when `migrate-to` wasn't run ever. + result = Migrations{ + Base: new(pb.Migration), + Layers: collected.Item, + } + } + return result, nil +} + +func (m MigrationExt) DryRun(f func(MigrationExt)) []storage.Effect { + batchSelf := MigrationExt{ + s: storage.Batch(m.s), + prefix: m.prefix, + Hooks: m.Hooks, + } + f(batchSelf) + return batchSelf.s.(*storage.Batched).ReadOnlyEffects() +} + +func (m MigrationExt) AppendMigration(ctx context.Context, mig *pb.Migration) (int, error) { + migs, err := m.Load(ctx) + if err != nil { + return 0, err + } + newSN := migs.Layers[len(migs.Layers)-1].SeqNum + 1 + name := path.Join(migrationPrefix, nameOf(mig, newSN)) + data, err := mig.Marshal() + if err != nil { + return 0, errors.Annotatef(err, "failed to encode the migration %s", mig) + } + return newSN, m.s.WriteFile(ctx, name, data) +} + +// MergeTo merges migrations from the BASE in the live migrations until the specified sequence number. +func (migs Migrations) MergeTo(seq int) *pb.Migration { + return migs.MergeToBy(seq, MergeMigrations) +} + +func (migs Migrations) MergeToBy(seq int, merge func(m1, m2 *pb.Migration) *pb.Migration) *pb.Migration { + newBase := migs.Base + for _, mig := range migs.Layers { + if mig.SeqNum > seq { + return newBase + } + newBase = merge(newBase, &mig.Content) + } + return newBase +} + +type mergeAndMigrateToConfig struct { + interactiveCheck func(context.Context, *pb.Migration) bool + alwaysRunTruncate bool + appendPhantomMigration []pb.Migration +} + +type MergeAndMigrateToOpt func(*mergeAndMigrateToConfig) + +func MMOptInteractiveCheck(f func(context.Context, *pb.Migration) bool) MergeAndMigrateToOpt { + return func(c *mergeAndMigrateToConfig) { + c.interactiveCheck = f + } +} + +// MMOptAlwaysRunTruncate forces the merge and migrate to always run the truncating. +// If not set, when the `truncated-to` wasn'd modified, truncating will be skipped. +// This is necessary because truncating, even a no-op, requires a full scan over metadatas for now. +// This will be useful for retrying failed truncations. +func MMOptAlwaysRunTruncate() MergeAndMigrateToOpt { + return func(c *mergeAndMigrateToConfig) { + c.alwaysRunTruncate = true + } +} + +// MMOptAppendPhantomMigration appends a phantom migration to the merge and migrate to operation. +// The phantom migration will be persised to BASE during executing. +// We call it a "phantom" because it wasn't persisted. +// When the target version isn't the latest version, the order of migrating may be broken. +// Carefully use this in that case. +func MMOptAppendPhantomMigration(migs ...pb.Migration) MergeAndMigrateToOpt { + return func(c *mergeAndMigrateToConfig) { + c.appendPhantomMigration = append(c.appendPhantomMigration, migs...) + } +} + +// MergeAndMigrateTo will merge the migrations from BASE until the specified SN, then migrate to it. +// Finally it writes the new BASE and remove stale migrations from the storage. +func (m MigrationExt) MergeAndMigrateTo(ctx context.Context, targetSpec int, opts ...MergeAndMigrateToOpt) (result MergeAndMigratedTo) { + config := mergeAndMigrateToConfig{} + for _, o := range opts { + o(&config) + } + + migs, err := m.Load(ctx) + if err != nil { + result.MigratedTo = MigratedTo{ + Warnings: []error{ + errors.Annotate(err, "failed to load migrations, nothing will happen"), + }} + return + } + result.Base = migs.Base + for _, mig := range migs.Layers { + if mig.SeqNum > targetSpec { + break + } + result.Source = append(result.Source, mig) + } + for _, mig := range config.appendPhantomMigration { + result.Source = append(result.Source, &OrderedMigration{ + SeqNum: math.MaxInt, + Path: "", + Content: mig, + }) + } + + newBase := result.Base + canSkipTruncate := true + for _, mig := range result.Source { + if mig.Content.TruncatedTo > newBase.TruncatedTo { + canSkipTruncate = false + } + newBase = MergeMigrations(newBase, &mig.Content) + } + + if config.interactiveCheck != nil && !config.interactiveCheck(ctx, newBase) { + result.Warnings = append(result.Warnings, errors.New("User aborted, nothing will happen")) + return + } + + migTo := &result.MigratedTo + err = m.writeBase(ctx, newBase) + if err != nil { + result.MigratedTo.Warnings = append(result.MigratedTo.Warnings, errors.Annotatef(err, "failed to save the new base")) + return + } + for _, mig := range result.Source { + // Perhaps a phanom migration. + if len(mig.Path) > 0 { + err = m.s.DeleteFile(ctx, mig.Path) + if err != nil { + migTo.Warnings = append( + migTo.Warnings, + errors.Annotatef(err, "failed to delete the merged migration %s", migs.Layers[0].Path), + ) + } + } + } + result.MigratedTo = m.MigrateTo(ctx, newBase, MTMaybeSkipTruncateLog(!config.alwaysRunTruncate && canSkipTruncate)) + return +} + +type MigrateToOpt func(*migToOpt) + +type migToOpt struct { + skipTruncateLog bool +} + +func MTSkipTruncateLog(o *migToOpt) { + o.skipTruncateLog = true +} + +func MTMaybeSkipTruncateLog(cond bool) MigrateToOpt { + if cond { + return MTSkipTruncateLog + } + return func(*migToOpt) {} +} + +// MigrateTo migrates to a migration. +// If encountered some error during executing some operation, the operation will be put +// to the new BASE, which can be retryed then. +func (m MigrationExt) MigrateTo(ctx context.Context, mig *pb.Migration, opts ...MigrateToOpt) MigratedTo { + opt := migToOpt{} + for _, o := range opts { + o(&opt) + } + + result := MigratedTo{ + NewBase: new(pb.Migration), + } + // Fills: EditMeta for new Base. + m.doMetaEdits(ctx, mig, &result) + // Fills: TruncatedTo, Compactions, DesctructPrefix. + if !opt.skipTruncateLog { + m.doTruncating(ctx, mig, &result) + } else { + // Fast path: `truncate_to` wasn't updated, just copy the compactions and truncated to. + result.NewBase.Compactions = mig.Compactions + result.NewBase.TruncatedTo = mig.TruncatedTo + } + + return result +} + +func (m MigrationExt) writeBase(ctx context.Context, mig *pb.Migration) error { + content, err := mig.Marshal() + if err != nil { + return errors.Trace(err) + } + err = m.s.WriteFile(ctx, path.Join(m.prefix, baseTmp), content) + if err != nil { + return errors.Trace(err) + } + return m.s.Rename(ctx, path.Join(m.prefix, baseTmp), path.Join(m.prefix, baseMigrationName)) +} + +// doMetaEdits applies the modification to the meta files in the storage. +func (m MigrationExt) doMetaEdits(ctx context.Context, mig *pb.Migration, out *MigratedTo) { + m.Hooks.StartHandlingMetaEdits(mig.EditMeta) + + handleAMetaEdit := func(medit *pb.MetaEdit) { + if isEmptyEdition(medit) { + return + } + err := m.applyMetaEdit(ctx, medit) + if err != nil { + out.NewBase.EditMeta = append(out.NewBase.EditMeta, medit) + out.Warnings = append(out.Warnings, errors.Annotatef(err, "failed to apply meta edit %s to meta file", medit.Path)) + return + } + + m.cleanUpFor(ctx, medit, out) + } + + defer m.Hooks.HandingMetaEditDone() + for _, medit := range mig.EditMeta { + handleAMetaEdit(medit) + m.Hooks.HandledAMetaEdit(medit) + } +} + +// cleanUpFor modifies the real storage, remove the log files removed in the meta file, AFTER the meta edition has been applied. +func (m MigrationExt) cleanUpFor(ctx context.Context, medit *pb.MetaEdit, out *MigratedTo) { + var err error + newMetaEdit := &pb.MetaEdit{ + Path: medit.Path, + } + + if len(medit.DeletePhysicalFiles) > 0 { + err = m.s.DeleteFiles(ctx, medit.DeletePhysicalFiles) + if err != nil { + out.Warnings = append(out.Warnings, errors.Annotate(err, "failed to delete file")) + newMetaEdit.DeletePhysicalFiles = slices.Clone(medit.DeletePhysicalFiles) + } + } + + physicalFilesToDelete := []string{} + for _, spans := range medit.DeleteLogicalFiles { + if physicalFileCanBeDeleted(spans) { + physicalFilesToDelete = append(physicalFilesToDelete, spans.Path) + } else { + newMetaEdit.DeleteLogicalFiles = append(newMetaEdit.DeleteLogicalFiles, spans) + } + } + if len(physicalFilesToDelete) > 0 { + err = m.s.DeleteFiles(ctx, physicalFilesToDelete) + if err != nil { + out.Warnings = append(out.Warnings, errors.Annotate(err, "failed to delete file")) + newMetaEdit.DeletePhysicalFiles = append(newMetaEdit.DeletePhysicalFiles, physicalFilesToDelete...) + } + } + + if !isEmptyEdition(newMetaEdit) { + out.NewBase.EditMeta = append(out.NewBase.EditMeta, newMetaEdit) + } +} + +// applyMetaEdit applies the modifications in the `MetaEdit` to the real meta file. +// But this won't really clean up the real log files. +func (m MigrationExt) applyMetaEdit(ctx context.Context, medit *pb.MetaEdit) (err error) { + if medit.DestructSelf { + return m.s.DeleteFile(ctx, medit.Path) + } + + mContent, err := m.s.ReadFile(ctx, medit.Path) + if err != nil { + return err + } + var metadata pb.Metadata + err = metadata.Unmarshal(mContent) + if err != nil { + return err + } + + return m.applyMetaEditTo(ctx, medit, &metadata) +} + +func (m MigrationExt) applyMetaEditTo(ctx context.Context, medit *pb.MetaEdit, metadata *pb.Metadata) error { + metadata.Files = slices.DeleteFunc(metadata.Files, func(dfi *pb.DataFileInfo) bool { + // Here, `DeletePhysicalFiles` is usually tiny. + // Use a hashmap to filter out if this gets slow in the future. + return slices.Contains(medit.DeletePhysicalFiles, dfi.Path) + }) + metadata.FileGroups = slices.DeleteFunc(metadata.FileGroups, func(dfg *pb.DataFileGroup) bool { + return slices.Contains(medit.DeletePhysicalFiles, dfg.Path) + }) + for _, group := range metadata.FileGroups { + idx := slices.IndexFunc(medit.DeleteLogicalFiles, func(dsof *pb.DeleteSpansOfFile) bool { + return dsof.Path == group.Path + }) + if idx >= 0 { + sort.Slice(medit.DeleteLogicalFiles[idx].Spans, func(i, j int) bool { + return medit.DeleteLogicalFiles[idx].Spans[i].Offset < medit.DeleteLogicalFiles[idx].Spans[j].Offset + }) + var err error + group.DataFilesInfo = slices.DeleteFunc(group.DataFilesInfo, func(dfi *pb.DataFileInfo) bool { + received, ok := slices.BinarySearchFunc( + medit.DeleteLogicalFiles[idx].Spans, + dfi.RangeOffset, + func(s *pb.Span, u uint64) int { + return int(s.Offset - u) + }) + if ok && medit.DeleteLogicalFiles[idx].Spans[received].Length != dfi.RangeLength { + err = errors.Annotatef( + berrors.ErrPiTRMalformedMetadata, + "trying to delete a span that mismatches with metadata: to delete is %v, found %v", + medit.DeleteLogicalFiles[idx].Spans[received], + dfi, + ) + } + return ok + }) + if err != nil { + return err + } + } + } + metadata.FileGroups = slices.DeleteFunc(metadata.FileGroups, func(dfg *pb.DataFileGroup) bool { + // As all spans in the physical data file has been deleted, it will be soonly removed. + return len(dfg.DataFilesInfo) == 0 + }) + + if isEmptyMetadata(metadata) { + // As it is empty, even no hint to destruct self, we can safely delete it. + return m.s.DeleteFile(ctx, medit.Path) + } + + updateMetadataInternalStat(metadata) + newContent, err := metadata.Marshal() + if err != nil { + return err + } + return truncateAndWrite(ctx, m.s, medit.Path, newContent) +} + +func (m MigrationExt) tryRemovePrefix(ctx context.Context, pfx string, out *MigratedTo) { + enumerateAndDelete := func(prefix string) error { + if isInsane(prefix) { + return errors.Annotatef( + berrors.ErrPiTRMalformedMetadata, + "trying to delete a prefix %q that is too wide, skipping deleting", + prefix, + ) + } + files, err := m.loadFilesOfPrefix(ctx, prefix) + if err != nil { + return err + } + return m.s.DeleteFiles(ctx, files) + } + if err := enumerateAndDelete(pfx); err != nil { + out.Warnings = append(out.Warnings, errors.Annotatef(err, "failed to delete prefix %s", pfx)) + out.NewBase.DestructPrefix = append(out.NewBase.DestructPrefix, pfx) + } +} + +// doTruncating tries to remove outdated compaction, filling the not-yet removed compactions to the new migration. +func (m MigrationExt) doTruncating(ctx context.Context, mig *pb.Migration, result *MigratedTo) { + // NOTE: Execution of truncation wasn't implemented here. + // If we are going to truncate some files, for now we still need to use `br log truncate`. + for _, compaction := range mig.Compactions { + // Can we also remove the compaction when `until-ts` is equal to `truncated-to`...? + if compaction.CompactionUntilTs > mig.TruncatedTo { + result.NewBase.Compactions = append(result.NewBase.Compactions, compaction) + } else { + m.tryRemovePrefix(ctx, compaction.Artifacts, result) + m.tryRemovePrefix(ctx, compaction.GeneratedFiles, result) + } + } + for _, pfx := range mig.DestructPrefix { + m.tryRemovePrefix(ctx, pfx, result) + } + + result.NewBase.TruncatedTo = mig.TruncatedTo + + m.Hooks.StartLoadingMetaForTruncating() + mdSet := new(StreamMetadataSet) + mdSet.MetadataDownloadBatchSize = 128 + shiftTS, err := mdSet.LoadUntilAndCalculateShiftTS(ctx, m.s, mig.TruncatedTo) + if err != nil { + result.Warnings = append(result.Warnings, errors.Annotatef(err, "failed to open meta storage")) + return + } + m.Hooks.EndLoadingMetaForTruncating() + + m.doTruncateLogs(ctx, mdSet, shiftTS, result) +} + +func (m MigrationExt) loadFilesOfPrefix(ctx context.Context, prefix string) (out []string, err error) { + err = m.s.WalkDir(ctx, &storage.WalkOption{SubDir: prefix}, func(path string, size int64) error { + out = append(out, path) + return nil + }) + return +} + +// doTruncateLogs truncates the logs until the specified TS. +// This might be slow. +func (m MigrationExt) doTruncateLogs( + ctx context.Context, + metadataInfos *StreamMetadataSet, + from uint64, + out *MigratedTo, +) { + mu := new(sync.Mutex) + updateResult := func(f func(r *MigratedTo)) { + mu.Lock() + defer mu.Unlock() + + f(out) + } + emitErr := func(err error) { + updateResult(func(r *MigratedTo) { + r.Warnings = append(r.Warnings, err) + }) + } + cannotBeRetryByRerunBase := func(err error) error { + return errors.Annotate(err, "this error may not be retry by `migrate-to --base`, you may need to rerun `log truncate`") + } + + worker := util.NewWorkerPool(128, "delete files") + wg := new(sync.WaitGroup) + + m.Hooks.StartDeletingForTruncating(metadataInfos, from) + defer m.Hooks.DeletedAllFilesForTruncating() + for path, metaInfo := range metadataInfos.metadataInfos { + path := path + if metaInfo.MinTS >= from { + continue + } + wg.Add(1) + worker.Apply(func() { + defer wg.Done() + data, err := m.s.ReadFile(ctx, path) + if err != nil { + emitErr(cannotBeRetryByRerunBase( + errors.Annotatef(err, "failed to open meta %s", path))) + return + } + + // Note: maybe make this a static method or just a normal function... + meta, err := (*MetadataHelper).ParseToMetadataHard(nil, data) + if err != nil { + emitErr(cannotBeRetryByRerunBase( + errors.Annotatef(err, "failed to parse meta %s", path))) + return + } + + me := new(pb.MetaEdit) + me.Path = path + for _, ds := range meta.FileGroups { + if ds.MaxTs < from { + me.DeletePhysicalFiles = append(me.DeletePhysicalFiles, ds.Path) + } + } + + err = m.applyMetaEditTo(ctx, me, meta) + if err != nil { + updateResult(func(r *MigratedTo) { + r.Warnings = append(r.Warnings, errors.Annotatef(err, "during handling %s", me.Path)) + r.NewBase.EditMeta = append(r.NewBase.EditMeta, me) + }) + } + m.cleanUpFor(ctx, me, out) + m.Hooks.DeletedAFileForTruncating(len(me.DeletePhysicalFiles)) + }) + } + wg.Wait() +} + +type hookedStorage struct { + storage.ExternalStorage + metaSet *StreamMetadataSet +} + +func (h hookedStorage) WriteFile(ctx context.Context, name string, data []byte) error { + if h.metaSet.BeforeDoWriteBack != nil { + meta, err := h.metaSet.Helper.ParseToMetadataHard(data) + if err != nil { + // Note: will this be too strict? But for now it seems this check won't fail. + // We can remove this in the future if needed. + return errors.Annotatef(err, "Writing non-meta during write back (to = %s)", name) + } + if h.metaSet.BeforeDoWriteBack(name, meta) { + log.Info("Skipped writeback meta by the hook.", zap.String("meta", name)) + return nil + } + } + + return h.ExternalStorage.WriteFile(ctx, name, data) +} + +func (ms *StreamMetadataSet) hook(s storage.ExternalStorage) hookedStorage { + hooked := hookedStorage{ + ExternalStorage: s, + metaSet: ms, + } + if ms.DryRun { + hooked.ExternalStorage = storage.Batch(hooked.ExternalStorage) + } + return hooked +} + +func physicalFileCanBeDeleted(fs *pb.DeleteSpansOfFile) bool { + sort.Slice(fs.Spans, func(i, j int) bool { + return fs.Spans[i].Offset < fs.Spans[j].Offset + }) + lastOffset := uint64(0) + for _, s := range fs.Spans { + if s.Offset != lastOffset { + return false + } + lastOffset += s.Length + } + return lastOffset == fs.WholeFileLength +} + +// mergeMetaEdits merges two meta edits. +// +// If the spans in the `DeleteLogicalFiles` consist a physical file, +// they will be transformed to `DeletePhysicalFiles`. +func mergeMetaEdits(s1, s2 []*pb.MetaEdit) []*pb.MetaEdit { + edits := map[string]*pb.MetaEdit{} + for _, edit := range s1 { + edits[edit.GetPath()] = &pb.MetaEdit{ + Path: edit.Path, + DeletePhysicalFiles: edit.DeletePhysicalFiles[:len(edit.DeletePhysicalFiles):len(edit.DeletePhysicalFiles)], + DeleteLogicalFiles: edit.DeleteLogicalFiles[:len(edit.DeleteLogicalFiles):len(edit.DeleteLogicalFiles)], + } + } + for _, edit := range s2 { + target, ok := edits[edit.GetPath()] + if !ok { + edits[edit.GetPath()] = edit + } else { + target.DeletePhysicalFiles = append(target.DeletePhysicalFiles, edit.GetDeletePhysicalFiles()...) + target.DeleteLogicalFiles = mergeDeleteLogicalFiles(target.GetDeleteLogicalFiles(), edit.GetDeleteLogicalFiles()) + } + } + + val := make([]*pb.MetaEdit, 0, len(edits)) + for _, v := range edits { + val = append(val, v) + } + return val +} + +// mergeDeleteLogicalFiles merges two `DeleteSpansOfFile`. +func mergeDeleteLogicalFiles(s1, s2 []*pb.DeleteSpansOfFile) []*pb.DeleteSpansOfFile { + files := map[string]*pb.DeleteSpansOfFile{} + for _, file := range s1 { + files[file.GetPath()] = &pb.DeleteSpansOfFile{ + Path: file.GetPath(), + Spans: file.GetSpans()[:len(file.GetSpans()):len(file.GetSpans())], + WholeFileLength: file.GetWholeFileLength(), + } + } + for _, file := range s2 { + target, ok := files[file.GetPath()] + if !ok { + files[file.GetPath()] = file + } else { + target.Spans = append(target.Spans, file.GetSpans()...) + } + } + + val := make([]*pb.DeleteSpansOfFile, 0, len(files)) + for _, v := range files { + val = append(val, v) + } + return val +} + +func isEmptyEdition(medit *pb.MetaEdit) bool { + return len(medit.DeletePhysicalFiles) == 0 && len(medit.DeleteLogicalFiles) == 0 && !medit.DestructSelf +} + +func migIdOf(s string) (int, error) { + const ( + migrationPrefixLen = 8 + ) + if s == baseMigrationName { + return baseMigrationSN, nil + } + if len(s) < 8 { + return 0, errors.Annotatef(berrors.ErrUnknown, + "migration name %s is too short, perhaps `migrations` dir corrupted", s) + } + toParse := s[:migrationPrefixLen] + result, err := strconv.Atoi(toParse) + if err != nil { + return 0, errors.Annotatef(err, + "migration name %s is not a valid number, perhaps `migrations` dir corrupted", s) + } + return result, nil +} + +// isInsane checks whether deleting a prefix is insane: say, going to destroy the whole backup storage. +// +// This would be useful when a compaction's output dir is absent or modified. +func isInsane(pfx string) bool { + normalized := path.Clean(pfx) + switch normalized { + case "", ".", "/", "/v1", "v1": + return true + default: + } + + return strings.HasPrefix(pfx, "..") +} + +func isEmptyMetadata(md *pb.Metadata) bool { + return len(md.FileGroups) == 0 && len(md.Files) == 0 +} + +func hashMigration(m *pb.Migration) uint64 { + var crc64 uint64 = 0 + for _, compaction := range m.Compactions { + crc64 ^= compaction.ArtifactsHash + } + for _, metaEdit := range m.EditMeta { + crc64 ^= hashMetaEdit(metaEdit) + } + return crc64 ^ m.TruncatedTo +} + +func hashMetaEdit(metaEdit *pb.MetaEdit) uint64 { + var res uint64 = 0 + for _, df := range metaEdit.DeletePhysicalFiles { + digest := crc64.New(crc64.MakeTable(crc64.ISO)) + digest.Write([]byte(df)) + res ^= digest.Sum64() + } + for _, spans := range metaEdit.DeleteLogicalFiles { + for _, span := range spans.GetSpans() { + crc := crc64.New(crc64.MakeTable(crc64.ISO)) + crc.Write([]byte(spans.GetPath())) + crc.Write(binary.LittleEndian.AppendUint64(nil, span.GetOffset())) + crc.Write(binary.LittleEndian.AppendUint64(nil, span.GetLength())) + res ^= crc.Sum64() + } + } + crc := crc64.New(crc64.MakeTable(crc64.ISO)) + if metaEdit.DestructSelf { + crc.Write([]byte{1}) + } else { + crc.Write([]byte{0}) + } + return res ^ crc.Sum64() +} + +func nameOf(mig *pb.Migration, sn int) string { + return fmt.Sprintf("%08d_%016X.mgrt", sn, hashMigration(mig)) +} diff --git a/br/pkg/stream/stream_metas_test.go b/br/pkg/stream/stream_metas_test.go index 2545f2fd90097..332f31f7f8f8f 100644 --- a/br/pkg/stream/stream_metas_test.go +++ b/br/pkg/stream/stream_metas_test.go @@ -5,23 +5,61 @@ package stream import ( "context" "fmt" + "math" "math/rand" "os" "path" "path/filepath" + "slices" + "strings" "sync" "testing" "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/util/intest" "github.com/stretchr/testify/require" "go.uber.org/zap" + "golang.org/x/exp/maps" ) +func requireMigrationsEqual(t *testing.T, miga, migb *backuppb.Migration) { + require.NotNil(t, miga) + require.NotNil(t, migb) + require.Equal(t, hashMigration(miga), hashMigration(migb), "\n%s\n%s", miga, migb) +} + +type effects struct { + Renames map[string]string + Deletions map[string]struct{} + Edits map[string][]byte +} + +func effectsOf(efs []storage.Effect) effects { + out := effects{Renames: map[string]string{}, Deletions: map[string]struct{}{}, Edits: map[string][]byte{}} + for _, ef := range efs { + switch e := ef.(type) { + case storage.EffDeleteFile: + out.Deletions[string(e)] = struct{}{} + case storage.EffDeleteFiles: + for _, f := range e.Files { + out.Deletions[f] = struct{}{} + } + case storage.EffPut: + out.Edits[e.File] = e.Content + case storage.EffRename: + out.Renames[e.From] = e.To + default: + panic(fmt.Sprintf("unknown effect %T", ef)) + } + } + return out +} + func fakeDataFiles(s storage.ExternalStorage, base, item int) (result []*backuppb.DataFileInfo) { ctx := context.Background() for i := base; i < base+item; i++ { @@ -301,6 +339,8 @@ func TestTruncateSafepoint(t *testing.T) { } func TestTruncateSafepointForGCS(t *testing.T) { + t.SkipNow() + require.True(t, intest.InTest) ctx := context.Background() opts := fakestorage.Options{ @@ -534,6 +574,174 @@ func m(storeId int64, minTS, maxTS uint64) *backuppb.Metadata { } } +type migOP func(*backuppb.Migration) + +func mDstrPfx(path ...string) migOP { + return func(m *backuppb.Migration) { + m.DestructPrefix = append(m.DestructPrefix, path...) + } +} + +func mCompaction(cPath, aPath string, fromTs, untilTs uint64) migOP { + return func(m *backuppb.Migration) { + c := &backuppb.LogFileCompaction{} + c.GeneratedFiles = cPath + c.Artifacts = aPath + c.CompactionFromTs = fromTs + c.CompactionUntilTs = untilTs + m.Compactions = append(m.Compactions, c) + } +} + +func mDel(mPath string, files ...string) migOP { + return func(m *backuppb.Migration) { + idx := slices.IndexFunc(m.EditMeta, func(m *backuppb.MetaEdit) bool { return m.Path == mPath }) + var meta *backuppb.MetaEdit + if idx < 0 { + meta = &backuppb.MetaEdit{ + Path: mPath, + } + m.EditMeta = append(m.EditMeta, meta) + } else { + meta = m.EditMeta[idx] + } + meta.DeletePhysicalFiles = append(meta.DeletePhysicalFiles, files...) + } +} + +func sp(offset, length uint64) *backuppb.Span { + return &backuppb.Span{ + Offset: offset, + Length: length, + } +} + +func spans(lPath string, total uint64, spans ...*backuppb.Span) *backuppb.DeleteSpansOfFile { + return &backuppb.DeleteSpansOfFile{ + Path: lPath, + Spans: spans, + WholeFileLength: total, + } +} + +func mLogDel(mPath string, logFiles ...*backuppb.DeleteSpansOfFile) migOP { + return func(m *backuppb.Migration) { + idx := slices.IndexFunc(m.EditMeta, func(m *backuppb.MetaEdit) bool { return m.Path == mPath }) + var meta *backuppb.MetaEdit + if idx < 0 { + meta = &backuppb.MetaEdit{ + Path: mPath, + } + m.EditMeta = append(m.EditMeta, meta) + } else { + meta = m.EditMeta[idx] + } + meta.DeleteLogicalFiles = append(meta.DeleteLogicalFiles, logFiles...) + } +} + +type metaOp func(*backuppb.Metadata) + +func mtGroup(tpath string, files ...*backuppb.DataFileInfo) metaOp { + return func(m *backuppb.Metadata) { + grp := tGroup(tpath, files...) + if m.MaxTs < grp.MaxTs { + m.MaxTs = grp.MaxTs + } + if m.MinTs > grp.MinTs { + m.MinTs = grp.MinTs + } + m.FileGroups = append(m.FileGroups, grp) + } +} + +func tGroup(tPath string, files ...*backuppb.DataFileInfo) *backuppb.DataFileGroup { + grp := &backuppb.DataFileGroup{} + grp.Path = tPath + grp.MinTs = math.MaxUint64 + for _, f := range files { + grp.DataFilesInfo = append(grp.DataFilesInfo, f) + if f.MaxTs > grp.MaxTs { + grp.MaxTs = f.MaxTs + } + if f.MinTs < grp.MinTs { + grp.MinTs = f.MinTs + } + } + return grp +} + +func dFile(sp *backuppb.Span) *backuppb.DataFileInfo { + return &backuppb.DataFileInfo{ + RangeOffset: sp.GetOffset(), + RangeLength: sp.GetLength(), + } +} + +// mt is abbrev. of meta. +func mt(ops ...metaOp) *backuppb.Metadata { + m := &backuppb.Metadata{} + for _, op := range ops { + op(m) + } + return m +} + +// pmt is abbrev. of persisted meta. +func pmt(s storage.ExternalStorage, path string, mt *backuppb.Metadata) { + data, err := mt.Marshal() + if err != nil { + panic(err) + } + err = s.WriteFile(context.Background(), path, data) + if err != nil { + panic(err) + } +} + +func pmig(s storage.ExternalStorage, num uint64, mt *backuppb.Migration) string { + numS := fmt.Sprintf("%08d", num) + if num == baseMigrationSN { + numS = baseMigrationName + } + name := fmt.Sprintf("%s_%08X.mgrt", numS, hashMigration(mt)) + p := path.Join(migrationPrefix, name) + + data, err := mt.Marshal() + if err != nil { + panic(err) + } + err = s.WriteFile(context.Background(), p, data) + if err != nil { + panic(err) + } + return p +} + +func mTruncatedTo(to uint64) migOP { + return func(m *backuppb.Migration) { + m.TruncatedTo = to + } +} + +// tmp creates a temporary storage. +func tmp(t *testing.T) *storage.LocalStorage { + tmpDir := t.TempDir() + s, err := storage.NewLocalStorage(tmpDir) + require.NoError(t, os.MkdirAll(path.Join(tmpDir, migrationPrefix), 0744)) + require.NoError(t, err) + s.IgnoreEnoentForDelete = true + return s +} + +func mig(ops ...migOP) *backuppb.Migration { + mig := &backuppb.Migration{} + for _, op := range ops { + op(mig) + } + return mig +} + func f(storeId int64, minTS, maxTS uint64, cf string, defaultTS uint64) *backuppb.DataFileGroup { return &backuppb.DataFileGroup{ Path: logName(storeId, minTS, maxTS), @@ -2088,22 +2296,23 @@ func TestTruncate3(t *testing.T) { for i, cs := range cases { for j, ts := range cs.testParams { for _, until := range ts.until { - t.Logf("case %d, param %d, until %d", i, j, until) - metas := StreamMetadataSet{ - Helper: NewMetadataHelper(), - MetadataDownloadBatchSize: 128, - } - err := generateFiles(ctx, s, cs.metas, tmpDir) - require.NoError(t, err) - shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, s, until) - require.NoError(t, err) - require.Equal(t, shiftUntilTS, ts.shiftUntilTS(until)) - n, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, s, func(num int64) {}) - require.Equal(t, len(n), 0) - require.NoError(t, err) - - // check the result - checkFiles(ctx, s, ts.restMetadata, t) + t.Run(fmt.Sprintf("case %d, param %d, until %d", i, j, until), func(t *testing.T) { + metas := StreamMetadataSet{ + Helper: NewMetadataHelper(), + MetadataDownloadBatchSize: 128, + } + err := generateFiles(ctx, s, cs.metas, tmpDir) + require.NoError(t, err) + shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, s, until) + require.NoError(t, err) + require.Equal(t, shiftUntilTS, ts.shiftUntilTS(until)) + n, err := metas.RemoveDataFilesAndUpdateMetadataInBatch(ctx, shiftUntilTS, s, func(num int64) {}) + require.Equal(t, len(n), 0) + require.NoError(t, err) + + // check the result + checkFiles(ctx, s, ts.restMetadata, t) + }) } } } @@ -2315,3 +2524,302 @@ func TestCalculateShiftTS(t *testing.T) { } } } + +func TestBasicMigration(t *testing.T) { + s := tmp(t) + dsp := func(o, l uint64) *backuppb.DataFileInfo { return dFile(sp(o, l)) } + + pmt(s, "00001.meta", mt(mtGroup("foo.log"), mtGroup("bar.log"))) + pmt(s, "00002.meta", mt( + mtGroup("00001.log", dsp(0, 42), dsp(42, 18), dsp(60, 1024-60)), + mtGroup("00002.log", dsp(0, 42), dsp(42, 54)), + )) + pmt(s, "00003.meta", mt(mtGroup("3.log", dsp(0, 50)))) + + mig1 := mig( + mDel("00001.meta", "foo.log"), + mLogDel("00002.meta", + spans("00001.log", 1024, sp(0, 42), sp(42, 18)), + spans("00002.log", 96, sp(42, 54)), + ), + ) + mig2 := mig( + mDel("00001.meta", "bar.log"), + mLogDel("00002.meta", + spans("00002.log", 96, sp(0, 42)), + ), + mLogDel("00003.meta", spans("3.log", 50, sp(0, 50))), + ) + + bs := storage.Batch(s) + est := MigerationExtension(bs) + res := MergeMigrations(mig1, mig2) + + resE := mig( + mDel("00001.meta", "bar.log"), + mLogDel("00002.meta", + spans("00002.log", 96, sp(0, 42)), + ), + mDel("00001.meta", "foo.log"), + mLogDel("00002.meta", + spans("00001.log", 1024, sp(0, 42), sp(42, 18)), + spans("00002.log", 96, sp(42, 54)), + ), + mLogDel("00003.meta", spans("3.log", 50, sp(0, 50))), + ) + requireMigrationsEqual(t, resE, res) + + ctx := context.Background() + mg := est.MigrateTo(ctx, res) + + newBaseE := mig(mLogDel("00002.meta", spans("00001.log", 1024, sp(0, 42), sp(42, 18)))) + require.Empty(t, mg.Warnings) + requireMigrationsEqual(t, newBaseE, mg.NewBase) + + efs := effectsOf(bs.ReadOnlyEffects()) + require.ElementsMatch(t, maps.Keys(efs.Deletions), []string{"foo.log", "bar.log", "00002.log", "00001.meta", "00003.meta", "3.log"}) + var meta backuppb.Metadata + require.NoError(t, meta.Unmarshal(efs.Edits["00002.meta"])) + require.Equal(t, &meta, mt(mtGroup("00001.log", dsp(60, 1024-60)))) + + require.NoError(t, bs.Commit(ctx)) + + delRem := mig(mLogDel("00002.meta", spans("00001.log", 1024, sp(60, 1024-60)))) + newNewBase := MergeMigrations(mg.NewBase, delRem) + mg = est.MigrateTo(ctx, newNewBase) + require.Empty(t, mg.Warnings) + requireMigrationsEqual(t, mg.NewBase, mig()) +} + +func TestMergeAndMigrateTo(t *testing.T) { + s := tmp(t) + dfi := func(o, l uint64) *backuppb.DataFileInfo { return dFile(sp(o, l)) } + lN := func(n uint64) string { return fmt.Sprintf("%05d.log", n) } + mN := func(n uint64) string { return fmt.Sprintf("%05d.meta", n) } + + pmt(s, mN(1), mt( + mtGroup(lN(1)), mtGroup(lN(2)), + mtGroup(lN(3), dfi(0, 42), dfi(42, 18), dfi(60, 40)), + mtGroup(lN(4), dfi(0, 42), dfi(42, 58))), + ) + + mig1p := pmig(s, 1, mig( + mDel(mN(1), lN(2)), + mLogDel(mN(1), spans(lN(3), 100, sp(0, 42), sp(42, 18))), + mLogDel(mN(1), spans(lN(4), 100, sp(42, 58))), + )) + mig2p := pmig(s, 2, mig( + mLogDel(mN(1), spans(lN(3), 100, sp(60, 40))), + )) + mig3 := mig( + mDel(mN(1), lN(1)), + mLogDel(mN(1), spans(lN(4), 100, sp(0, 42))), + ) + mig3p := pmig(s, 3, mig3) + + bs := storage.Batch(s) + est := MigerationExtension(bs) + + ctx := context.Background() + migs, err := est.Load(ctx) + require.NoError(t, err) + requireMigrationsEqual(t, migs.MergeTo(2), mig( + mDel(mN(1), lN(2)), + mLogDel(mN(1), + spans(lN(4), 100, sp(42, 58)), + spans(lN(3), 100, sp(0, 42), sp(42, 18), sp(60, 40))), + )) + + mg := est.MergeAndMigrateTo(ctx, 2) + + require.Len(t, mg.Source, 2) + require.Empty(t, mg.Warnings) + requireMigrationsEqual(t, mg.NewBase, mig(mLogDel(mN(1), spans(lN(4), 100, sp(42, 58))))) + + effs := effectsOf(bs.ReadOnlyEffects()) + require.ElementsMatch(t, maps.Keys(effs.Deletions), []string{lN(2), lN(3), mig1p, mig2p}) + require.NoError(t, bs.Commit(ctx)) + + migs, err = est.Load(ctx) + require.NoError(t, err) + + requireMigrationsEqual(t, migs.Base, mg.NewBase) + require.Len(t, migs.Layers, 1) + requireMigrationsEqual(t, &migs.Layers[0].Content, mig3) + require.EqualValues(t, migs.Layers[0].SeqNum, 3) + + mg = est.MergeAndMigrateTo(ctx, 3) + require.Empty(t, mg.Warnings) + requireMigrationsEqual(t, mg.NewBase, mig()) + effs = effectsOf(bs.ReadOnlyEffects()) + require.ElementsMatch(t, maps.Keys(effs.Deletions), []string{mN(1), lN(1), lN(4), mig3p}) +} + +func TestRemoveCompaction(t *testing.T) { + s := tmp(t) + ctx := context.Background() + placeholder := func(pfx string) string { + path := path.Join(pfx, "monolith") + require.NoError(t, s.WriteFile(ctx, path, []byte("🪨"))) + return path + } + cDir := func(n uint64) string { return fmt.Sprintf("%05d/output", n) } + aDir := func(n uint64) string { return fmt.Sprintf("%05d/metas", n) } + + var ( + ap []string + cp []string + ) + for i := 1; i <= 4; i++ { + ap = append(ap, placeholder(aDir(uint64(i)))) + cp = append(cp, placeholder(cDir(uint64(i)))) + } + mig1 := mig( + mCompaction(cDir(1), aDir(1), 10, 40), + mCompaction(cDir(2), aDir(2), 35, 50), + // Should not truncate the full dir... + mTruncatedTo(30), + ) + mig2 := mig( + mCompaction("", aDir(4), 15, 25), + mCompaction(cDir(3), aDir(3), 5, 29), + mTruncatedTo(20), + ) + bs := storage.Batch(s) + est := MigerationExtension(bs) + + merged := MergeMigrations(mig1, mig2) + requireMigrationsEqual(t, merged, mig( + mCompaction(cDir(1), aDir(1), 10, 40), + mCompaction(cDir(2), aDir(2), 35, 50), + mCompaction("", aDir(4), 15, 25), + mCompaction(cDir(3), aDir(3), 5, 29), + mTruncatedTo(30), + )) + + mg := est.MigrateTo(ctx, merged) + requireMigrationsEqual(t, mg.NewBase, mig( + mCompaction(cDir(1), aDir(1), 10, 40), + mCompaction(cDir(2), aDir(2), 35, 50), + mTruncatedTo(30), + )) + + ops := effectsOf(bs.ReadOnlyEffects()) + require.ElementsMatch(t, maps.Keys(ops.Deletions), []string{ap[2], cp[2], ap[3]}) +} + +func TestRetry(t *testing.T) { + s := tmp(t) + lN := func(n uint64) string { return fmt.Sprintf("%05d.log", n) } + mN := func(n uint64) string { return fmt.Sprintf("%05d.meta", n) } + dfi := func(o, l uint64) *backuppb.DataFileInfo { return dFile(sp(o, l)) } + + pmt(s, mN(1), mt( + mtGroup(lN(1), dfi(0, 41)), mtGroup(lN(2), dfi(0, 42)), mtGroup(lN(3), dfi(0, 43)))) + + mig1 := mig(mDel(mN(1), lN(1))) + pmig(s, 1, mig1) + mig2 := mig(mDel(mN(1), lN(2))) + pmig(s, 2, mig2) + + require.NoError(t, + failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/local_write_file_err", `1*return("this disk remembers nothing")`)) + ctx := context.Background() + est := MigerationExtension(s) + mg := est.MergeAndMigrateTo(ctx, 2) + require.Len(t, mg.Warnings, 1) + require.Error(t, mg.Warnings[0], "this disk remembers nothing") + fmt.Printf("mg: %v\n", mg) + requireMigrationsEqual(t, mg.NewBase, mig(mDel(mN(1), lN(1), lN(2)))) + + mg = est.MergeAndMigrateTo(ctx, 2) + require.Empty(t, slices.DeleteFunc(mg.Warnings, func(err error) bool { + return strings.Contains(err.Error(), "failed to delete file") + })) + requireMigrationsEqual(t, mg.NewBase, mig()) +} + +func TestRetryRemoveCompaction(t *testing.T) { + s := tmp(t) + ctx := context.Background() + placeholder := func(pfx string) string { + path := path.Join(pfx, "monolith") + require.NoError(t, s.WriteFile(ctx, path, []byte("🪨"))) + return path + } + cDir := func(n uint64) string { return fmt.Sprintf("%05d/output", n) } + aDir := func(n uint64) string { return fmt.Sprintf("%05d/metas", n) } + + mig1 := mig( + mCompaction(placeholder(cDir(1)), placeholder(aDir(1)), 15, 25), + mCompaction(placeholder(cDir(2)), placeholder(aDir(2)), 28, 32), + mTruncatedTo(27), + ) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/storage/local_delete_file_err", `1*return("this disk will never forget")`)) + est := MigerationExtension(s) + mg := est.MigrateTo(ctx, mig1) + require.Len(t, mg.Warnings, 1) + require.Error(t, mg.Warnings[0], "this disk will never forget") + requireMigrationsEqual(t, mg.NewBase, mig( + mCompaction(placeholder(cDir(2)), placeholder(aDir(2)), 28, 32), + mTruncatedTo(27), + mDstrPfx(cDir(1), aDir(1)), + )) + + mg = est.MigrateTo(ctx, mg.NewBase) + require.Empty(t, mg.Warnings) + requireMigrationsEqual(t, mg.NewBase, mig( + mCompaction(placeholder(cDir(2)), placeholder(aDir(2)), 28, 32), + mTruncatedTo(27), + )) + + // NOTE: the base dir won't be enumerated in `Walk` for local storage. + // So the dir itself won't be deleted, we check the content has been deleted here. + require.NoFileExists(t, path.Join(s.Base(), cDir(1), "monolith")) + require.NoFileExists(t, path.Join(s.Base(), aDir(1), "monolith")) +} + +func TestWithSimpleTruncate(t *testing.T) { + s := tmp(t) + ctx := context.Background() + mN := func(n uint64) string { return fmt.Sprintf("v1/backupmeta/%05d.meta", n) } + + pmt(s, mN(1), mf(1, [][]*backuppb.DataFileInfo{ + { + fi(10, 20, DefaultCF, 0), + fi(15, 30, WriteCF, 8), + fi(25, 35, WriteCF, 11), + }, + })) + pmt(s, mN(2), mf(2, [][]*backuppb.DataFileInfo{ + { + fi(45, 64, WriteCF, 32), + fi(65, 70, WriteCF, 55), + fi(50, 60, DefaultCF, 0), + fi(80, 85, WriteCF, 72), + }, + })) + + est := MigerationExtension(s) + m := mig(mTruncatedTo(65)) + var res MigratedTo + effs := est.DryRun(func(me MigrationExt) { res = me.MigrateTo(ctx, m) }) + + require.Empty(t, res.Warnings) + for _, eff := range effs { + switch e := eff.(type) { + case *storage.EffDeleteFile: + require.Equal(t, e, mN(1)) + case *storage.EffPut: + var m backuppb.Metadata + require.NoError(t, m.Unmarshal(e.Content)) + require.Equal(t, e.File, mN(2)) + require.ElementsMatch(t, m.FileGroups[0].DataFilesInfo, []*backuppb.DataFileInfo{ + fi(65, 70, WriteCF, 55), + fi(50, 60, DefaultCF, 0), + fi(80, 85, WriteCF, 72), + }) + } + } +} diff --git a/br/pkg/utils/iter/source.go b/br/pkg/utils/iter/source.go index f2ea2fd8fb173..d9ad7a197a098 100644 --- a/br/pkg/utils/iter/source.go +++ b/br/pkg/utils/iter/source.go @@ -3,6 +3,8 @@ package iter import ( + "context" + "golang.org/x/exp/constraints" ) @@ -26,3 +28,8 @@ func OfRange[T constraints.Integer](begin, end T) TryNextor[T] { func Fail[T any](err error) TryNextor[T] { return failure[T]{error: err} } + +// Func generates results by a function. +func Func[T any](g func(ctx context.Context) IterResult[T]) TryNextor[T] { + return ofFunc[T](g) +} diff --git a/br/pkg/utils/iter/source_types.go b/br/pkg/utils/iter/source_types.go index 41e9810de5286..0b2cac2ef4806 100644 --- a/br/pkg/utils/iter/source_types.go +++ b/br/pkg/utils/iter/source_types.go @@ -50,3 +50,9 @@ type failure[T any] struct { func (f failure[T]) TryNext(ctx context.Context) IterResult[T] { return Throw[T](f) } + +type ofFunc[T any] func(ctx context.Context) IterResult[T] + +func (g ofFunc[T]) TryNext(ctx context.Context) IterResult[T] { + return g(ctx) +}