Skip to content

Commit

Permalink
br: add incremental backup size for volume snapshot backup (#5188) (#…
Browse files Browse the repository at this point in the history
…5211)

Signed-off-by: WangLe1321 <[email protected]>
Co-authored-by: WangLe1321 <[email protected]>
  • Loading branch information
ti-chi-bot and WangLe1321 authored Jul 28, 2023
1 parent 7330207 commit 9738c4f
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 28 deletions.
19 changes: 11 additions & 8 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,21 +319,24 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
defer func() {
// Calculate the backup size for ebs backup job even if it fails
if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && !bm.Initialize {
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)
fullBackupSize, incrementalBackupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)
if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
klog.Errorf("Failed to calc volume snapshot backup, err: %v", err)
return
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

backupSizeReadable := humanize.Bytes(uint64(fullBackupSize))
incrementalBackupSizeReadable := humanize.Bytes(uint64(incrementalBackupSize))
updateStatus := &controller.BackupUpdateStatus{
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
BackupSize: &fullBackupSize,
BackupSizeReadable: &backupSizeReadable,
IncrementalBackupSize: &incrementalBackupSize,
IncrementalBackupSizeReadable: &incrementalBackupSizeReadable,
}

bm.StatusUpdater.Update(backup, nil,
updateStatus)
if err := bm.StatusUpdater.Update(backup, nil, updateStatus); err != nil {
klog.Errorf("update backup size to status error: %v", err)
}
}
}()

Expand Down
175 changes: 155 additions & 20 deletions cmd/backup-manager/app/util/backup_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ebs"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand All @@ -42,33 +45,35 @@ import (
// interface CalcVolSnapBackupSize called by backup and backup clean.

const (
// This value can be between 5 and 1,000; if MaxResults is given a value larger than 1,000, only 1,000 results are returned.
DescribeSnapMaxReturnResult = 1000
// This value can be between 100 and 1,0000, and charge ~0.6$/1 million request
ListSnapMaxReturnResult = 10000
// This value can be between 1 and 50 due to aws service quota
EbsApiConcurrency = 40
)

// CalcVolSnapBackupSize get snapshots from backup meta and then calc the backup size of snapshots.
func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvider) (int64, error) {
func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvider) (fullBackupSize int64, incrementalBackupSize int64, err error) {
start := time.Now()
// retrieves all snapshots from backup meta file
volSnapshots, err := getSnapshotsFromBackupmeta(ctx, provider)
if err != nil {
return 0, err
return 0, 0, err
}

if err != nil {
return 0, err
return 0, 0, err
}

backupSize, err := calcBackupSize(ctx, volSnapshots)
fullBackupSize, incrementalBackupSize, err = calcBackupSize(ctx, volSnapshots)

if err != nil {
return 0, err
return 0, 0, err
}
elapsed := time.Since(start)
klog.Infof("calculate volume-snapshot backup size takes %v", elapsed)
return int64(backupSize), nil
return
}

// getSnapshotsFromBackupmeta read all snapshots from backupmeta
Expand Down Expand Up @@ -131,39 +136,58 @@ func getSnapshotsFromBackupmeta(ctx context.Context, provider v1alpha1.StoragePr
}

// calcBackupSize get a volume-snapshots backup size
func calcBackupSize(ctx context.Context, volumes map[string]string) (uint64, error) {
var backupSize uint64
var apiReqCount uint64
func calcBackupSize(ctx context.Context, volumes map[string]string) (fullBackupSize int64, incrementalBackupSize int64, err error) {
var apiReqCount, incrementalApiReqCount uint64

workerPool := util.NewWorkerPool(EbsApiConcurrency, "list snapshot size")
eg, _ := errgroup.WithContext(ctx)

for _, id := range volumes {
snapshotId := id
for vid, sid := range volumes {
snapshotId := sid
volumeId := vid
// sort snapshots by timestamp
workerPool.ApplyOnErrorGroup(eg, func() error {
snapSize, apiReq, err := calculateSnapshotSize(snapshotId)
snapSize, apiReq, err := calculateSnapshotSize(volumeId, snapshotId)
if err != nil {
return err
}
atomic.AddUint64(&backupSize, snapSize)
atomic.AddInt64(&fullBackupSize, int64(snapSize))
atomic.AddUint64(&apiReqCount, apiReq)

volSnapshots, err := getVolSnapshots(volumeId)
if err != nil {
return err
}
prevSnapshotId, existed := getPrevSnapshotId(snapshotId, volSnapshots)
if !existed {
// if there is no previous snapshot, means it's the first snapshot, uses its full size as incremental size
atomic.AddInt64(&incrementalBackupSize, int64(snapSize))
return nil
}
klog.Infof("get previous snapshot %s of snapshot %s, volume %s", prevSnapshotId, snapshotId, volumeId)
incrementalSnapSize, incrementalApiReq, err := calculateChangedBlocksSize(volumeId, prevSnapshotId, snapshotId)
if err != nil {
return err
}
atomic.AddInt64(&incrementalBackupSize, int64(incrementalSnapSize))
atomic.AddUint64(&incrementalApiReqCount, incrementalApiReq)
return nil
})
}

if err := eg.Wait(); err != nil {
klog.Errorf("failed to get snapshots size %d, number of api request %d", backupSize, apiReqCount)
return 0, err
klog.Errorf("failed to get snapshots size %d, number of api request %d", fullBackupSize, apiReqCount)
return 0, 0, err
}

// currently, we do not count api request fees, since it is very few of cost, however, we print it in log in case "very few" is not correct
klog.Infof("backup size %d bytes, number of api request %d", backupSize, apiReqCount)
return backupSize, nil
klog.Infof("backup size %d bytes, number of api request %d, incremental backup size %d bytes, numbers of incremental size's api request %d",
fullBackupSize, apiReqCount, incrementalBackupSize, incrementalApiReqCount)
return
}

// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks.
func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {
func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
Expand All @@ -174,7 +198,6 @@ func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {

var nextToken *string
for {

resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
Expand All @@ -194,6 +217,118 @@ func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {
}
nextToken = resp.NextToken
}
klog.Infof("full backup snapshot size %d bytes, num of ListSnapshotBlocks request %d", snapshotSize, numApiReq)
klog.Infof("full snapshot size %s, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId)
return snapshotSize, numApiReq, nil
}

// calculateChangedBlocksSize calculates changed blocks total size in bytes between two snapshots with common ancestry.
func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uint64, uint64, error) {
var numBlocks int
var snapshotSize uint64
var numApiReq uint64

klog.Infof("start to calculate incremental snapshot size for %s, base on prev snapshot %s, volume id %s",
snapshotId, preSnapshotId, volumeId)
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ebs session failure.")
return 0, numApiReq, err
}

var nextToken *string

for {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}

// check if there is more to retrieve
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
klog.Infof("incremental snapshot size %s, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId)
return snapshotSize, numApiReq, nil
}

// getBackupVolSnapshots get a volume-snapshots map contains map[volumeId]{snapshot1, snapshot2, snapshot3}
func getVolSnapshots(volumeId string) ([]*ec2.Snapshot, error) {
// read all snapshots from aws
ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ec2 session failure.")
return nil, err
}

filters := []*ec2.Filter{{Name: aws.String("volume-id"), Values: []*string{aws.String(volumeId)}}}
// describe snapshot is heavy operator, try to call only once
// api has limit with max 1000 snapshots
// search with filter volume id the backupmeta contains
var nextToken *string
var snapshots []*ec2.Snapshot
for {
resp, err := ec2Session.EC2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{
OwnerIds: aws.StringSlice([]string{"self"}),
MaxResults: aws.Int64(DescribeSnapMaxReturnResult),
Filters: filters,
NextToken: nextToken,
})

if err != nil {
return nil, err
}

for _, s := range resp.Snapshots {
if *s.State == ec2.SnapshotStateCompleted {
klog.Infof("get the snapshot %s created for volume %s", *s.SnapshotId, *s.VolumeId)
snapshots = append(snapshots, s)
} else { // skip ongoing snapshots
klog.Infof("the snapshot %s is creating... skip it, volume %s", *s.SnapshotId, *s.VolumeId)
continue
}
}

// check if there's more to retrieve
if resp.NextToken == nil {
break
}
klog.Infof("the total number of snapshot is %d", len(resp.Snapshots))
nextToken = resp.NextToken
}

sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].StartTime.Before(*snapshots[j].StartTime)
})
return snapshots, nil
}

func getPrevSnapshotId(snapshotId string, sortedVolSnapshots []*ec2.Snapshot) (string, bool) {
for i, snapshot := range sortedVolSnapshots {
if snapshotId == *snapshot.SnapshotId {
if i == 0 {
return "", false
} else {
return *sortedVolSnapshots[i-1].SnapshotId, true
}
}
}
return "", false
}
23 changes: 23 additions & 0 deletions docs/api-references/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -4307,6 +4307,29 @@ int64
</tr>
<tr>
<td>
<code>incrementalBackupSizeReadable</code></br>
<em>
string
</em>
</td>
<td>
<p>the difference with IncrementalBackupSize is that its format is human readable</p>
</td>
</tr>
<tr>
<td>
<code>incrementalBackupSize</code></br>
<em>
int64
</em>
</td>
<td>
<p>IncrementalBackupSize is the incremental data size of the backup, it is only used for volume snapshot backup
it is the real size of volume snapshot backup</p>
</td>
</tr>
<tr>
<td>
<code>commitTs</code></br>
<em>
string
Expand Down
11 changes: 11 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ spec:
jsonPath: .status.backupSizeReadable
name: BackupSize
type: string
- description: The real size of volume snapshot backup, only valid to volume snapshot
backup
jsonPath: .status.incrementalBackupSizeReadable
name: IncrementalBackupSize
priority: 10
type: string
- description: The commit ts of the backup
jsonPath: .status.commitTs
name: CommitTS
Expand Down Expand Up @@ -1445,6 +1451,11 @@ spec:
type: object
nullable: true
type: array
incrementalBackupSize:
format: int64
type: integer
incrementalBackupSizeReadable:
type: string
logCheckpointTs:
type: string
logSubCommandStatuses:
Expand Down
11 changes: 11 additions & 0 deletions manifests/crd/v1/pingcap.com_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ spec:
jsonPath: .status.backupSizeReadable
name: BackupSize
type: string
- description: The real size of volume snapshot backup, only valid to volume snapshot
backup
jsonPath: .status.incrementalBackupSizeReadable
name: IncrementalBackupSize
priority: 10
type: string
- description: The commit ts of the backup
jsonPath: .status.commitTs
name: CommitTS
Expand Down Expand Up @@ -1445,6 +1451,11 @@ spec:
type: object
nullable: true
type: array
incrementalBackupSize:
format: int64
type: integer
incrementalBackupSizeReadable:
type: string
logCheckpointTs:
type: string
logSubCommandStatuses:
Expand Down
11 changes: 11 additions & 0 deletions manifests/crd/v1beta1/pingcap.com_backups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ spec:
description: The data size of the backup
name: BackupSize
type: string
- JSONPath: .status.incrementalBackupSizeReadable
description: The real size of volume snapshot backup, only valid to volume snapshot
backup
name: IncrementalBackupSize
priority: 10
type: string
- JSONPath: .status.commitTs
description: The commit ts of the backup
name: CommitTS
Expand Down Expand Up @@ -1440,6 +1446,11 @@ spec:
type: object
nullable: true
type: array
incrementalBackupSize:
format: int64
type: integer
incrementalBackupSizeReadable:
type: string
logCheckpointTs:
type: string
logSubCommandStatuses:
Expand Down
Loading

0 comments on commit 9738c4f

Please sign in to comment.