Skip to content

Commit

Permalink
ebs: calc full snapshot size and report backup size for failed backup (
Browse files Browse the repository at this point in the history
…#5007) (#5138)

Signed-off-by: BornChanger <[email protected]>
Co-authored-by: BornChanger <[email protected]>
  • Loading branch information
ti-chi-bot and BornChanger committed Jul 4, 2023
1 parent e9c20f7 commit 3d17117
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 214 deletions.
36 changes: 24 additions & 12 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,27 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
// run br binary to do the real job
backupErr := bm.backupData(ctx, backup, bm.StatusUpdater)

defer func() {
// Calculate the backup size for ebs backup job even if it fails
if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && !bm.Initialize {
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)
if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
return
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus := &controller.BackupUpdateStatus{
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
}

bm.StatusUpdater.Update(backup, nil,
updateStatus)
}
}()

if db != nil && oldTikvGCTimeDuration < tikvGCTimeDuration {
// use another context to revert `tikv_gc_life_time` back.
// `DefaultTerminationGracePeriodSeconds` for a pod is 30, so we use a smaller timeout value here.
Expand All @@ -328,6 +349,7 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
}
errs = append(errs, err)
klog.Errorf("cluster %s reset tikv GC life time to %s failed, err: %s", bm, oldTikvGCTime, err)

uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupFailed,
Status: corev1.ConditionTrue,
Expand Down Expand Up @@ -370,19 +392,9 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
completeCondition = v1alpha1.VolumeBackupComplete
// In volume snapshot mode, commitTS have been updated according to the
// br command output, so we don't need to update it here.
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)

if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus = &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
}
}
default:
Expand Down
29 changes: 0 additions & 29 deletions cmd/backup-manager/app/clean/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"sort"

"github.com/dustin/go-humanize"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
Expand Down Expand Up @@ -89,11 +88,6 @@ func (bm *Manager) performCleanBackup(ctx context.Context, backup *v1alpha1.Back
klog.Errorf("delete backup %s for cluster %s backup failure", backup.Name, bm)
}

// update the next backup size
if nextNackup != nil {
bm.updateVolumeSnapshotBackupSize(ctx, nextNackup)
}

} else {
if backup.Spec.BR != nil {
err = bm.CleanBRRemoteBackupData(ctx, backup)
Expand Down Expand Up @@ -156,26 +150,3 @@ func (bm *Manager) getVolumeSnapshotBackup(backups []*v1alpha1.Backup) *v1alpha1
// reach end of backup list, there is no volume snapshot backups
return nil
}

// updateVolumeSnapshotBackupSize update a volume-snapshot backup size
func (bm *Manager) updateVolumeSnapshotBackupSize(ctx context.Context, backup *v1alpha1.Backup) error {
var updateStatus *controller.BackupUpdateStatus

backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)

if err != nil {
klog.Warningf("Failed to parse BackupSize %d KB, %v", backupSize, err)
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus = &controller.BackupUpdateStatus{
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
}

return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Status: corev1.ConditionTrue,
}, updateStatus)
}
179 changes: 7 additions & 172 deletions cmd/backup-manager/app/util/backup_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ebs"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand All @@ -44,15 +42,8 @@ import (
// interface CalcVolSnapBackupSize called by backup and backup clean.

const (
// This value can be between 5 and 1,000; if MaxResults is given a value larger than 1,000, only 1,000 results are returned.
DescribeSnapMaxReturnResult = 1000

// This value can be between 100 and 1,0000, and charge ~0.6$/1 million request
ListSnapMaxReturnResult = 10000

// This value can be between 100 and 1,0000, and charge ~0.6$/1 million request
ListBlocksMaxReturnResult = 10000

// This value can be between 1 and 50 due to aws service quota
EbsApiConcurrency = 40
)
Expand All @@ -66,14 +57,11 @@ func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvide
return 0, err
}

// get all snapshots per backup volume from aws
snapshots, err := getBackupVolSnapshots(volSnapshots)

if err != nil {
return 0, err
}

backupSize, err := calcBackupSize(ctx, volSnapshots, snapshots)
backupSize, err := calcBackupSize(ctx, volSnapshots)

if err != nil {
return 0, err
Expand Down Expand Up @@ -142,105 +130,22 @@ func getSnapshotsFromBackupmeta(ctx context.Context, provider v1alpha1.StoragePr
return volumeIDMap, nil
}

// getBackupVolSnapshots get a volue-snapshots map contains map[volumeId]{snapshot1, snapshot2, snapshot3}
func getBackupVolSnapshots(volumes map[string]string) (map[string][]*ec2.Snapshot, error) {
volWithTheirSnapshots := make(map[string][]*ec2.Snapshot)

// read all snapshots from aws
ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ec2 session failure.")
return nil, err
}

// init search filter.Values
// init volWithTheirSnapshots
volValues := make([]*string, 0)
for volumeId := range volumes {
volValues = append(volValues, aws.String(volumeId))
if volWithTheirSnapshots[volumeId] == nil {
volWithTheirSnapshots[volumeId] = make([]*ec2.Snapshot, 0)
}
}

filters := []*ec2.Filter{{Name: aws.String("volume-id"), Values: volValues}}
// describe snapshot is heavy operator, try to call only once
// api has limit with max 1000 snapshots
// search with filter volume id the backupmeta contains
var nextToken *string
for {
resp, err := ec2Session.EC2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{
OwnerIds: aws.StringSlice([]string{"self"}),
MaxResults: aws.Int64(DescribeSnapMaxReturnResult),
Filters: filters,
NextToken: nextToken,
})

if err != nil {
return nil, err
}

for i, s := range resp.Snapshots {
if *s.State == ec2.SnapshotStateCompleted {
if volWithTheirSnapshots[*s.VolumeId] == nil {
klog.Errorf("search with filter[volume-id] received unexpected result, volumeId:%s, snapshotId:%s", *s.VolumeId, *s.SnapshotId)
break
}
klog.Infof("the snapshot#%d %s created for volume %s", i, *s.SnapshotId, *s.VolumeId)
volWithTheirSnapshots[*s.VolumeId] = append(volWithTheirSnapshots[*s.VolumeId], s)
} else { // skip ongoing snapshots
klog.Infof("the snapshot#%d %s creating... skip it", i, *s.SnapshotId)
continue
}
}

// check if there's more to retrieve
if resp.NextToken == nil {
break
}
klog.Infof("the total number of snapshot is %d", len(resp.Snapshots))
nextToken = resp.NextToken
}

return volWithTheirSnapshots, nil
}

// calcBackupSize get a volue-snapshots backup size
func calcBackupSize(ctx context.Context, volumes map[string]string, snapshots map[string][]*ec2.Snapshot) (uint64, error) {
// calcBackupSize get a volume-snapshots backup size
func calcBackupSize(ctx context.Context, volumes map[string]string) (uint64, error) {
var backupSize uint64
var apiReqCount uint64

workerPool := util.NewWorkerPool(EbsApiConcurrency, "list snapshot size")
eg, _ := errgroup.WithContext(ctx)

for volumeId, id := range volumes {
volSnapshots := snapshots[volumeId]
for _, id := range volumes {
snapshotId := id
// sort snapshots by timestamp
workerPool.ApplyOnErrorGroup(eg, func() error {
// get prev snapshot backup
prevSnapshot, err := getPrevSnapshotId(snapshotId, volSnapshots)
snapSize, apiReq, err := calculateSnapshotSize(snapshotId)
if err != nil {
return err
}

// full/initial snapshot backup
if prevSnapshot == "" {
snapSize, apiReq, err := initialSnapshotSize(snapshotId)
if err != nil {
return err
}

atomic.AddUint64(&backupSize, snapSize)
atomic.AddUint64(&apiReqCount, apiReq)
return nil
}

snapSize, apiReq, err := changedBlocksSize(prevSnapshot, snapshotId)
if err != nil {
return err
}

atomic.AddUint64(&backupSize, snapSize)
atomic.AddUint64(&apiReqCount, apiReq)
return nil
Expand All @@ -257,9 +162,8 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, snapshots ma
return backupSize, nil
}

// initialSnapshotSize calculate size of an initial snapshot in bytes by listing its blocks.
// initial snapshot always a ful backup of volume
func initialSnapshotSize(snapshotId string) (uint64, uint64, error) {
// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks.
func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
Expand Down Expand Up @@ -293,72 +197,3 @@ func initialSnapshotSize(snapshotId string) (uint64, uint64, error) {
klog.Infof("full backup snapshot size %d bytes, num of ListSnapshotBlocks request %d", snapshotSize, numApiReq)
return snapshotSize, numApiReq, nil
}

func getPrevSnapshotId(snapshotId string, volSnapshots []*ec2.Snapshot) (string, error) {
var prevSnapshotId string

sort.Slice(volSnapshots, func(i, j int) bool {
return volSnapshots[i].StartTime.Before(*volSnapshots[j].StartTime)
})

for i, snapshot := range volSnapshots {
if snapshotId == *snapshot.SnapshotId {
// the first snapshot for the volume
if i == 0 {
return "", nil
}
prevSnapshotId = *volSnapshots[i-1].SnapshotId
klog.Infof("the prevSnapshot index is %d, ID is %s", i, *snapshot.SnapshotId)
break
}
}
if len(prevSnapshotId) == 0 {
return "", fmt.Errorf("Could not find the prevousely snapshot id, current snapshotId: %s.", snapshotId)
}
return prevSnapshotId, nil
}

// changedBlocksSize calculates changed blocks total size in bytes between two snapshots with common ancestry.
func changedBlocksSize(preSnapshotId string, snapshotId string) (uint64, uint64, error) {
var numBlocks int
var snapshotSize uint64
var numApiReq uint64

klog.Infof("the calc snapshot size for %s, base on prev snapshot %s", snapshotId, preSnapshotId)
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ebs session failure.")
return 0, numApiReq, err
}

var nextToken *string

for {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListBlocksMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}

// check if there is more to retrieve
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
klog.Infof("the total size of snapshot %d, num of api ListChangedBlocks request %d, snapshot id %s", snapshotSize, numApiReq, snapshotId)
return snapshotSize, numApiReq, nil
}
2 changes: 1 addition & 1 deletion pkg/backup/util/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ type BatchDeleteObjectsResult struct {
Errors []ObjectError
}

// BatchDeleteObjects delete mutli objects
// BatchDeleteObjects delete multi objects
//
// Depending on storage type, it use function 'BatchDeleteObjectsOfS3' or 'BatchDeleteObjectsConcurrently'
func (b *StorageBackend) BatchDeleteObjects(ctx context.Context, objs []*blob.ListObject, opt v1alpha1.BatchDeleteOption) *BatchDeleteObjectsResult {
Expand Down

0 comments on commit 3d17117

Please sign in to comment.