Skip to content

Commit

Permalink
This is an automated cherry-pick of #4999
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
WangLe1321 authored and ti-chi-bot committed Jun 30, 2023
1 parent 379880f commit fd4f496
Show file tree
Hide file tree
Showing 20 changed files with 5,582 additions and 5,074 deletions.
45 changes: 37 additions & 8 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (bo *Options) backupData(

var logCallback func(line string)
// Add extra args for volume snapshot backup.
if backup.Spec.Mode == v1alpha1.BackupModeVolumeSnapshot {
if bo.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && !bo.Initialize {
var (
progressFile = "progress.txt"
progressStep = "Full Backup"
Expand Down Expand Up @@ -112,7 +112,7 @@ func (bo *Options) backupData(
go bo.updateProgressFromFile(progressCtx.Done(), backup, progressFile, progressStep, statusUpdater)
}

fullArgs, err := bo.backupCommandTemplate(backup, specificArgs)
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (bo *Options) doStartLogBackup(ctx context.Context, backup *v1alpha1.Backup
if bo.CommitTS != "" && bo.CommitTS != "0" {
specificArgs = append(specificArgs, fmt.Sprintf("--start-ts=%s", bo.CommitTS))
}
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs)
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs, false)
if err != nil {
return err
}
Expand All @@ -169,15 +169,15 @@ func (bo *Options) doStopLogBackup(ctx context.Context, backup *v1alpha1.Backup)
"stop",
fmt.Sprintf("--task-name=%s", backup.Name),
}
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs)
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs, false)
if err != nil {
return err
}
return bo.brCommandRun(ctx, fullArgs)
}

// doTruncatelogBackup generates br args about log backup truncate and runs br binary to do the real backup work.
func (bo *Options) doTruncatelogBackup(ctx context.Context, backup *v1alpha1.Backup) error {
// doTruncateLogBackup generates br args about log backup truncate and runs br binary to do the real backup work.
func (bo *Options) doTruncateLogBackup(ctx context.Context, backup *v1alpha1.Backup) error {
specificArgs := []string{
"log",
"truncate",
Expand All @@ -187,15 +187,39 @@ func (bo *Options) doTruncatelogBackup(ctx context.Context, backup *v1alpha1.Bac
} else {
return fmt.Errorf("log backup truncate until %s is invalid", bo.TruncateUntil)
}
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs)
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs, false)
if err != nil {
return err
}
return bo.brCommandRun(ctx, fullArgs)
}

// doInitializeVolumeBackup generates br args to stop GC and PD schedules
// and update backup status to VolumeBackupInitialized when watches corresponding logs
func (bo *Options) doInitializeVolumeBackup(
ctx context.Context,
backup *v1alpha1.Backup,
statusUpdater controller.BackupConditionUpdaterInterface,
) error {
specificArgs := []string{
"operator",
"pause-gc-and-schedulers",
}
fullArgs, err := bo.backupCommandTemplate(backup, specificArgs, true)
if err != nil {
return err
}

backupInitializeMgr := &VolumeBackupInitializeManager{
backup: backup,
statusUpdater: statusUpdater,
}
logCallback := backupInitializeMgr.UpdateBackupStatus
return bo.brCommandRunWithLogCallback(ctx, fullArgs, logCallback)
}

// logBackupCommandTemplate is the template to generate br args.
func (bo *Options) backupCommandTemplate(backup *v1alpha1.Backup, specificArgs []string) ([]string, error) {
func (bo *Options) backupCommandTemplate(backup *v1alpha1.Backup, specificArgs []string, skipBackupArgs bool) ([]string, error) {
if len(specificArgs) == 0 {
return nil, fmt.Errorf("backup command is invalid, Args: %v", specificArgs)
}
Expand All @@ -211,6 +235,11 @@ func (bo *Options) backupCommandTemplate(backup *v1alpha1.Backup, specificArgs [
args = append(args, fmt.Sprintf("--cert=%s", path.Join(util.ClusterClientTLSPath, corev1.TLSCertKey)))
args = append(args, fmt.Sprintf("--key=%s", path.Join(util.ClusterClientTLSPath, corev1.TLSPrivateKeyKey)))
}

if skipBackupArgs {
return append(specificArgs, args...), nil
}

// `options` in spec are put to the last because we want them to have higher priority than generated arguments
dataArgs, err := constructOptions(backup)
if err != nil {
Expand Down
125 changes: 110 additions & 15 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/Masterminds/semver"
Expand All @@ -38,6 +39,11 @@ import (
"k8s.io/klog/v2"
)

const (
gcPausedKeyword = "GC is paused"
pdSchedulesPausedKeyword = "Schedulers are paused"
)

// Manager mainly used to manage backup related work
type Manager struct {
backupLister listers.BackupLister
Expand Down Expand Up @@ -117,6 +123,10 @@ func (bm *Manager) ProcessBackup() error {
return bm.performLogBackup(ctx, backup.DeepCopy())
}

if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && bm.Initialize {
return bm.performVolumeBackupInitialize(ctx, backup.DeepCopy())
}

if backup.Spec.From == nil {
// skip the DB initialization if spec.from is not specified
return bm.performBackup(ctx, backup.DeepCopy(), nil)
Expand Down Expand Up @@ -333,8 +343,16 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
if backupErr != nil {
errs = append(errs, backupErr)
klog.Errorf("backup cluster %s data failed, err: %s", bm, backupErr)
failedCondition := v1alpha1.BackupFailed
if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) {
if bm.Initialize {
failedCondition = v1alpha1.VolumeBackupInitializeFailed
} else {
failedCondition = v1alpha1.VolumeBackupFailed
}
}
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Type: failedCondition,
Status: corev1.ConditionTrue,
Reason: "BackupDataToRemoteFailed",
Message: backupErr.Error(),
Expand All @@ -345,23 +363,27 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
klog.Infof("backup cluster %s data to %s success", bm, backupFullPath)

var updateStatus *controller.BackupUpdateStatus
completeCondition := v1alpha1.BackupComplete
switch bm.Mode {
case string(v1alpha1.BackupModeVolumeSnapshot):
// In volume snapshot mode, commitTS have been updated according to the
// br command output, so we don't need to update it here.
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)
if !bm.Initialize {
completeCondition = v1alpha1.VolumeBackupComplete
// In volume snapshot mode, commitTS have been updated according to the
// br command output, so we don't need to update it here.
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)

if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
}
if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))
backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus = &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
updateStatus = &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
}
}
default:
backupMeta, err := util.GetBRMetaData(ctx, backup.Spec.StorageProvider)
Expand Down Expand Up @@ -392,8 +414,9 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
CommitTs: &ts,
}
}

return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Type: completeCondition,
Status: corev1.ConditionTrue,
}, updateStatus)
}
Expand Down Expand Up @@ -546,7 +569,7 @@ func (bm *Manager) truncateLogBackup(ctx context.Context, backup *v1alpha1.Backu
}

// run br binary to do the real job
backupErr := bm.doTruncatelogBackup(ctx, backup)
backupErr := bm.doTruncateLogBackup(ctx, backup)

if backupErr != nil {
klog.Errorf("Truncate log backup of cluster %s failed, err: %s", bm, backupErr)
Expand All @@ -564,6 +587,35 @@ func (bm *Manager) truncateLogBackup(ctx context.Context, backup *v1alpha1.Backu
return updateStatus, "", nil
}

// performVolumeBackupInitialize execute br to stop GC and PD schedules
// it will keep running until the process is killed
func (bm *Manager) performVolumeBackupInitialize(ctx context.Context, backup *v1alpha1.Backup) error {
err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupRunning,
Status: corev1.ConditionTrue,
}, nil)
if err != nil {
return err
}

if err = bm.doInitializeVolumeBackup(ctx, backup, bm.StatusUpdater); err != nil {
errs := make([]error, 0, 2)
errs = append(errs, err)
updateErr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.VolumeBackupInitializeFailed,
Status: corev1.ConditionTrue,
Reason: "InitializeVolumeBackupFailed",
Message: err.Error(),
}, nil)
if updateErr != nil {
errs = append(errs, updateErr)
}
return errorutils.NewAggregate(errs)
}

return nil
}

func (bm *Manager) cleanSnapshotBackupEnv(ctx context.Context, backup *v1alpha1.Backup) error {
if backup.Spec.Mode != v1alpha1.BackupModeSnapshot {
return nil
Expand All @@ -582,3 +634,46 @@ func (bm *Manager) isBRCanContinueRunByCheckpoint() bool {
lessThanV651, _ := semver.NewConstraint("<v6.5.1-0")
return !lessThanV651.Check(v)
}

// VolumeBackupInitializeManager manages volume backup initializing status
type VolumeBackupInitializeManager struct {
done bool
gcStopped bool
pdSchedulesStopped bool

backup *v1alpha1.Backup
statusUpdater controller.BackupConditionUpdaterInterface
}

// UpdateBackupStatus extracts information from log line and update backup status to VolumeBackupInitialized
// when GC and PD schedules are all stopped
func (vb *VolumeBackupInitializeManager) UpdateBackupStatus(logLine string) {
if vb.done {
return
}

if strings.Contains(logLine, gcPausedKeyword) {
vb.gcStopped = true
} else if strings.Contains(logLine, pdSchedulesPausedKeyword) {
vb.pdSchedulesStopped = true
}
vb.tryUpdateBackupStatus()
}

// tryUpdateBackupStatus tries to update backup status
func (vb *VolumeBackupInitializeManager) tryUpdateBackupStatus() {
if !vb.gcStopped || !vb.pdSchedulesStopped {
return
}

err := vb.statusUpdater.Update(vb.backup, &v1alpha1.BackupCondition{
Type: v1alpha1.VolumeBackupInitialized,
Status: corev1.ConditionTrue,
}, nil)
if err == nil {
vb.done = true
} else {
klog.Warningf("backup %s/%s update status to VolumeBackupInitialized failed, err: %s",
vb.backup.Namespace, vb.backup.Name, err.Error())
}
}
1 change: 1 addition & 0 deletions cmd/backup-manager/app/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewBackupCommand() *cobra.Command {
cmd.Flags().StringVar(&bo.SubCommand, "subcommand", string(v1alpha1.LogStartCommand), "the log backup subcommand")
cmd.Flags().StringVar(&bo.CommitTS, "commit-ts", "0", "the log backup start ts")
cmd.Flags().StringVar(&bo.TruncateUntil, "truncate-until", "0", "the log backup truncate until")
cmd.Flags().BoolVar(&bo.Initialize, "initialize", false, "Whether execute initialize process for volume backup")
return cmd
}

Expand Down
1 change: 1 addition & 0 deletions cmd/backup-manager/app/util/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type GenericOptions struct {
CommitTS string
TruncateUntil string
PitrRestoredTs string
Initialize bool
}

func (bo *GenericOptions) String() string {
Expand Down
Loading

0 comments on commit fd4f496

Please sign in to comment.