diff --git a/cmd/backup-manager/app/backup/manager.go b/cmd/backup-manager/app/backup/manager.go index 72e9ac9b5e..2425c322a2 100644 --- a/cmd/backup-manager/app/backup/manager.go +++ b/cmd/backup-manager/app/backup/manager.go @@ -319,21 +319,24 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d defer func() { // Calculate the backup size for ebs backup job even if it fails if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && !bm.Initialize { - backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider) + fullBackupSize, incrementalBackupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider) if err != nil { - klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err) + klog.Errorf("Failed to calc volume snapshot backup, err: %v", err) return } - backupSizeReadable := humanize.Bytes(uint64(backupSize)) - + backupSizeReadable := humanize.Bytes(uint64(fullBackupSize)) + incrementalBackupSizeReadable := humanize.Bytes(uint64(incrementalBackupSize)) updateStatus := &controller.BackupUpdateStatus{ - BackupSize: &backupSize, - BackupSizeReadable: &backupSizeReadable, + BackupSize: &fullBackupSize, + BackupSizeReadable: &backupSizeReadable, + IncrementalBackupSize: &incrementalBackupSize, + IncrementalBackupSizeReadable: &incrementalBackupSizeReadable, } - bm.StatusUpdater.Update(backup, nil, - updateStatus) + if err := bm.StatusUpdater.Update(backup, nil, updateStatus); err != nil { + klog.Errorf("update backup size to status error: %v", err) + } } }() diff --git a/cmd/backup-manager/app/util/backup_size.go b/cmd/backup-manager/app/util/backup_size.go index b8a015aac1..18c47d9871 100644 --- a/cmd/backup-manager/app/util/backup_size.go +++ b/cmd/backup-manager/app/util/backup_size.go @@ -17,12 +17,15 @@ import ( "context" "encoding/json" "fmt" + "sort" "strings" "sync/atomic" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ebs" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/dustin/go-humanize" "github.com/pingcap/errors" "github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" @@ -42,6 +45,8 @@ import ( // interface CalcVolSnapBackupSize called by backup and backup clean. const ( + // This value can be between 5 and 1,000; if MaxResults is given a value larger than 1,000, only 1,000 results are returned. + DescribeSnapMaxReturnResult = 1000 // This value can be between 100 and 1,0000, and charge ~0.6$/1 million request ListSnapMaxReturnResult = 10000 // This value can be between 1 and 50 due to aws service quota @@ -49,26 +54,26 @@ const ( ) // CalcVolSnapBackupSize get snapshots from backup meta and then calc the backup size of snapshots. -func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvider) (int64, error) { +func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvider) (fullBackupSize int64, incrementalBackupSize int64, err error) { start := time.Now() // retrieves all snapshots from backup meta file volSnapshots, err := getSnapshotsFromBackupmeta(ctx, provider) if err != nil { - return 0, err + return 0, 0, err } if err != nil { - return 0, err + return 0, 0, err } - backupSize, err := calcBackupSize(ctx, volSnapshots) + fullBackupSize, incrementalBackupSize, err = calcBackupSize(ctx, volSnapshots) if err != nil { - return 0, err + return 0, 0, err } elapsed := time.Since(start) klog.Infof("calculate volume-snapshot backup size takes %v", elapsed) - return int64(backupSize), nil + return } // getSnapshotsFromBackupmeta read all snapshots from backupmeta @@ -131,39 +136,58 @@ func getSnapshotsFromBackupmeta(ctx context.Context, provider v1alpha1.StoragePr } // calcBackupSize get a volume-snapshots backup size -func calcBackupSize(ctx context.Context, volumes map[string]string) (uint64, error) { - var backupSize uint64 - var apiReqCount uint64 +func calcBackupSize(ctx context.Context, volumes map[string]string) (fullBackupSize int64, incrementalBackupSize int64, err error) { + var apiReqCount, incrementalApiReqCount uint64 workerPool := util.NewWorkerPool(EbsApiConcurrency, "list snapshot size") eg, _ := errgroup.WithContext(ctx) - for _, id := range volumes { - snapshotId := id + for vid, sid := range volumes { + snapshotId := sid + volumeId := vid // sort snapshots by timestamp workerPool.ApplyOnErrorGroup(eg, func() error { - snapSize, apiReq, err := calculateSnapshotSize(snapshotId) + snapSize, apiReq, err := calculateSnapshotSize(volumeId, snapshotId) if err != nil { return err } - atomic.AddUint64(&backupSize, snapSize) + atomic.AddInt64(&fullBackupSize, int64(snapSize)) atomic.AddUint64(&apiReqCount, apiReq) + + volSnapshots, err := getVolSnapshots(volumeId) + if err != nil { + return err + } + prevSnapshotId, existed := getPrevSnapshotId(snapshotId, volSnapshots) + if !existed { + // if there is no previous snapshot, means it's the first snapshot, uses its full size as incremental size + atomic.AddInt64(&incrementalBackupSize, int64(snapSize)) + return nil + } + klog.Infof("get previous snapshot %s of snapshot %s, volume %s", prevSnapshotId, snapshotId, volumeId) + incrementalSnapSize, incrementalApiReq, err := calculateChangedBlocksSize(volumeId, prevSnapshotId, snapshotId) + if err != nil { + return err + } + atomic.AddInt64(&incrementalBackupSize, int64(incrementalSnapSize)) + atomic.AddUint64(&incrementalApiReqCount, incrementalApiReq) return nil }) } if err := eg.Wait(); err != nil { - klog.Errorf("failed to get snapshots size %d, number of api request %d", backupSize, apiReqCount) - return 0, err + klog.Errorf("failed to get snapshots size %d, number of api request %d", fullBackupSize, apiReqCount) + return 0, 0, err } // currently, we do not count api request fees, since it is very few of cost, however, we print it in log in case "very few" is not correct - klog.Infof("backup size %d bytes, number of api request %d", backupSize, apiReqCount) - return backupSize, nil + klog.Infof("backup size %d bytes, number of api request %d, incremental backup size %d bytes, numbers of incremental size's api request %d", + fullBackupSize, apiReqCount, incrementalBackupSize, incrementalApiReqCount) + return } // calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks. -func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) { +func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) { var snapshotSize uint64 var numApiReq uint64 ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency) @@ -174,7 +198,6 @@ func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) { var nextToken *string for { - resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{ SnapshotId: aws.String(snapshotId), MaxResults: aws.Int64(ListSnapMaxReturnResult), @@ -194,6 +217,118 @@ func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) { } nextToken = resp.NextToken } - klog.Infof("full backup snapshot size %d bytes, num of ListSnapshotBlocks request %d", snapshotSize, numApiReq) + klog.Infof("full snapshot size %s, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s", + humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId) + return snapshotSize, numApiReq, nil +} + +// calculateChangedBlocksSize calculates changed blocks total size in bytes between two snapshots with common ancestry. +func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uint64, uint64, error) { + var numBlocks int + var snapshotSize uint64 + var numApiReq uint64 + + klog.Infof("start to calculate incremental snapshot size for %s, base on prev snapshot %s, volume id %s", + snapshotId, preSnapshotId, volumeId) + ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency) + if err != nil { + klog.Errorf("new a ebs session failure.") + return 0, numApiReq, err + } + + var nextToken *string + + for { + resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{ + FirstSnapshotId: aws.String(preSnapshotId), + MaxResults: aws.Int64(ListSnapMaxReturnResult), + SecondSnapshotId: aws.String(snapshotId), + NextToken: nextToken, + }) + numApiReq += 1 + if err != nil { + return 0, numApiReq, err + } + numBlocks += len(resp.ChangedBlocks) + + // retrieve only changed block and blocks only existed in current snapshot (new add blocks) + for _, block := range resp.ChangedBlocks { + if block.SecondBlockToken != nil && resp.BlockSize != nil { + snapshotSize += uint64(*resp.BlockSize) + } + } + + // check if there is more to retrieve + if resp.NextToken == nil { + break + } + nextToken = resp.NextToken + } + klog.Infof("incremental snapshot size %s, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s", + humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId) return snapshotSize, numApiReq, nil } + +// getBackupVolSnapshots get a volume-snapshots map contains map[volumeId]{snapshot1, snapshot2, snapshot3} +func getVolSnapshots(volumeId string) ([]*ec2.Snapshot, error) { + // read all snapshots from aws + ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency) + if err != nil { + klog.Errorf("new a ec2 session failure.") + return nil, err + } + + filters := []*ec2.Filter{{Name: aws.String("volume-id"), Values: []*string{aws.String(volumeId)}}} + // describe snapshot is heavy operator, try to call only once + // api has limit with max 1000 snapshots + // search with filter volume id the backupmeta contains + var nextToken *string + var snapshots []*ec2.Snapshot + for { + resp, err := ec2Session.EC2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{ + OwnerIds: aws.StringSlice([]string{"self"}), + MaxResults: aws.Int64(DescribeSnapMaxReturnResult), + Filters: filters, + NextToken: nextToken, + }) + + if err != nil { + return nil, err + } + + for _, s := range resp.Snapshots { + if *s.State == ec2.SnapshotStateCompleted { + klog.Infof("get the snapshot %s created for volume %s", *s.SnapshotId, *s.VolumeId) + snapshots = append(snapshots, s) + } else { // skip ongoing snapshots + klog.Infof("the snapshot %s is creating... skip it, volume %s", *s.SnapshotId, *s.VolumeId) + continue + } + } + + // check if there's more to retrieve + if resp.NextToken == nil { + break + } + klog.Infof("the total number of snapshot is %d", len(resp.Snapshots)) + nextToken = resp.NextToken + } + + sort.Slice(snapshots, func(i, j int) bool { + return snapshots[i].StartTime.Before(*snapshots[j].StartTime) + }) + return snapshots, nil +} + +func getPrevSnapshotId(snapshotId string, sortedVolSnapshots []*ec2.Snapshot) (string, bool) { + for i, snapshot := range sortedVolSnapshots { + if snapshotId == *snapshot.SnapshotId { + if i == 0 { + return "", false + } else { + return *sortedVolSnapshots[i-1].SnapshotId, true + } + } + } + return "", false +} diff --git a/docs/api-references/docs.md b/docs/api-references/docs.md index 7b1abb9066..faea1c7fa3 100644 --- a/docs/api-references/docs.md +++ b/docs/api-references/docs.md @@ -4307,6 +4307,29 @@ int64 +incrementalBackupSizeReadable
+ +string + + + +

the difference with IncrementalBackupSize is that its format is human readable

+ + + + +incrementalBackupSize
+ +int64 + + + +

IncrementalBackupSize is the incremental data size of the backup, it is only used for volume snapshot backup +it is the real size of volume snapshot backup

+ + + + commitTs
string diff --git a/manifests/crd.yaml b/manifests/crd.yaml index b1f1bacc4b..8f188f681b 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -40,6 +40,12 @@ spec: jsonPath: .status.backupSizeReadable name: BackupSize type: string + - description: The real size of volume snapshot backup, only valid to volume snapshot + backup + jsonPath: .status.incrementalBackupSizeReadable + name: IncrementalBackupSize + priority: 10 + type: string - description: The commit ts of the backup jsonPath: .status.commitTs name: CommitTS @@ -1445,6 +1451,11 @@ spec: type: object nullable: true type: array + incrementalBackupSize: + format: int64 + type: integer + incrementalBackupSizeReadable: + type: string logCheckpointTs: type: string logSubCommandStatuses: diff --git a/manifests/crd/v1/pingcap.com_backups.yaml b/manifests/crd/v1/pingcap.com_backups.yaml index ea85a16eab..2fe52ecc38 100644 --- a/manifests/crd/v1/pingcap.com_backups.yaml +++ b/manifests/crd/v1/pingcap.com_backups.yaml @@ -40,6 +40,12 @@ spec: jsonPath: .status.backupSizeReadable name: BackupSize type: string + - description: The real size of volume snapshot backup, only valid to volume snapshot + backup + jsonPath: .status.incrementalBackupSizeReadable + name: IncrementalBackupSize + priority: 10 + type: string - description: The commit ts of the backup jsonPath: .status.commitTs name: CommitTS @@ -1445,6 +1451,11 @@ spec: type: object nullable: true type: array + incrementalBackupSize: + format: int64 + type: integer + incrementalBackupSizeReadable: + type: string logCheckpointTs: type: string logSubCommandStatuses: diff --git a/manifests/crd/v1beta1/pingcap.com_backups.yaml b/manifests/crd/v1beta1/pingcap.com_backups.yaml index 519b7862f4..b15cb1d45f 100644 --- a/manifests/crd/v1beta1/pingcap.com_backups.yaml +++ b/manifests/crd/v1beta1/pingcap.com_backups.yaml @@ -30,6 +30,12 @@ spec: description: The data size of the backup name: BackupSize type: string + - JSONPath: .status.incrementalBackupSizeReadable + description: The real size of volume snapshot backup, only valid to volume snapshot + backup + name: IncrementalBackupSize + priority: 10 + type: string - JSONPath: .status.commitTs description: The commit ts of the backup name: CommitTS @@ -1440,6 +1446,11 @@ spec: type: object nullable: true type: array + incrementalBackupSize: + format: int64 + type: integer + incrementalBackupSizeReadable: + type: string logCheckpointTs: type: string logSubCommandStatuses: diff --git a/manifests/crd_v1beta1.yaml b/manifests/crd_v1beta1.yaml index 5cabbd945d..8526f52171 100644 --- a/manifests/crd_v1beta1.yaml +++ b/manifests/crd_v1beta1.yaml @@ -30,6 +30,12 @@ spec: description: The data size of the backup name: BackupSize type: string + - JSONPath: .status.incrementalBackupSizeReadable + description: The real size of volume snapshot backup, only valid to volume snapshot + backup + name: IncrementalBackupSize + priority: 10 + type: string - JSONPath: .status.commitTs description: The commit ts of the backup name: CommitTS @@ -1440,6 +1446,11 @@ spec: type: object nullable: true type: array + incrementalBackupSize: + format: int64 + type: integer + incrementalBackupSizeReadable: + type: string logCheckpointTs: type: string logSubCommandStatuses: diff --git a/pkg/apis/pingcap/v1alpha1/types.go b/pkg/apis/pingcap/v1alpha1/types.go index 70e89c00a1..58c2ac2a2a 100644 --- a/pkg/apis/pingcap/v1alpha1/types.go +++ b/pkg/apis/pingcap/v1alpha1/types.go @@ -1644,6 +1644,7 @@ type TLSCluster struct { // +kubebuilder:printcolumn:name="Status",type=string,JSONPath=`.status.phase`,description="The current status of the backup" // +kubebuilder:printcolumn:name="BackupPath",type=string,JSONPath=`.status.backupPath`,description="The full path of backup data" // +kubebuilder:printcolumn:name="BackupSize",type=string,JSONPath=`.status.backupSizeReadable`,description="The data size of the backup" +// +kubebuilder:printcolumn:name="IncrementalBackupSize",type=string,JSONPath=`.status.incrementalBackupSizeReadable`,description="The real size of volume snapshot backup, only valid to volume snapshot backup",priority=10 // +kubebuilder:printcolumn:name="CommitTS",type=string,JSONPath=`.status.commitTs`,description="The commit ts of the backup" // +kubebuilder:printcolumn:name="LogTruncateUntil",type=string,JSONPath=`.status.logSuccessTruncateUntil`,description="The log backup truncate until ts" // +kubebuilder:printcolumn:name="Started",type=date,JSONPath=`.status.timeStarted`,description="The time at which the backup was started",priority=1 @@ -2176,6 +2177,11 @@ type BackupStatus struct { BackupSizeReadable string `json:"backupSizeReadable,omitempty"` // BackupSize is the data size of the backup. BackupSize int64 `json:"backupSize,omitempty"` + // the difference with IncrementalBackupSize is that its format is human readable + IncrementalBackupSizeReadable string `json:"incrementalBackupSizeReadable,omitempty"` + // IncrementalBackupSize is the incremental data size of the backup, it is only used for volume snapshot backup + // it is the real size of volume snapshot backup + IncrementalBackupSize int64 `json:"incrementalBackupSize,omitempty"` // CommitTs is the commit ts of the backup, snapshot ts for full backup or start ts for log backup. CommitTs string `json:"commitTs,omitempty"` // LogSuccessTruncateUntil is log backup already successfully truncate until timestamp. diff --git a/pkg/backup/backup/backup_manager.go b/pkg/backup/backup/backup_manager.go index e949d4e418..c84090b524 100644 --- a/pkg/backup/backup/backup_manager.go +++ b/pkg/backup/backup/backup_manager.go @@ -176,6 +176,7 @@ func (bm *backupManager) validateBackup(backup *v1alpha1.Backup) error { } var tc *v1alpha1.TidbCluster + // bm.deps.TiDBClusterAutoScalerLister.List() tc, err = bm.deps.TiDBClusterLister.TidbClusters(backupNamespace).Get(backup.Spec.BR.Cluster) if err != nil { reason := fmt.Sprintf("failed to fetch tidbcluster %s/%s", backupNamespace, backup.Spec.BR.Cluster) diff --git a/pkg/controller/backup_status_updater.go b/pkg/controller/backup_status_updater.go index 2c67b53d72..370ecfacb2 100644 --- a/pkg/controller/backup_status_updater.go +++ b/pkg/controller/backup_status_updater.go @@ -45,6 +45,11 @@ type BackupUpdateStatus struct { BackupSizeReadable *string // BackupSize is the data size of the backup. BackupSize *int64 + // the difference with IncrementalBackupSize is that its format is human readable + IncrementalBackupSizeReadable *string + // IncrementalBackupSize is the incremental data size of the backup, it is only used for volume snapshot backup + // it is the real size of volume snapshot backup + IncrementalBackupSize *int64 // CommitTs is the snapshot time point of tidb cluster. CommitTs *string // LogCheckpointTs is the ts of log backup process. @@ -159,6 +164,14 @@ func updateBackupStatus(status *v1alpha1.BackupStatus, newStatus *BackupUpdateSt status.BackupSize = *newStatus.BackupSize isUpdate = true } + if newStatus.IncrementalBackupSizeReadable != nil && status.IncrementalBackupSizeReadable != *newStatus.IncrementalBackupSizeReadable { + status.IncrementalBackupSizeReadable = *newStatus.IncrementalBackupSizeReadable + isUpdate = true + } + if newStatus.IncrementalBackupSize != nil && status.IncrementalBackupSize != *newStatus.IncrementalBackupSize { + status.IncrementalBackupSize = *newStatus.IncrementalBackupSize + isUpdate = true + } if newStatus.CommitTs != nil && status.CommitTs != *newStatus.CommitTs { status.CommitTs = *newStatus.CommitTs isUpdate = true