Skip to content

Commit

Permalink
ebs br: add retry support for ListSnapshotBlocks() and ListChangedBlo…
Browse files Browse the repository at this point in the history
…cks() (#5232)

Signed-off-by: BornChanger <[email protected]>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
BornChanger and ti-chi-bot[bot] authored Sep 27, 2023
1 parent 3f8c788 commit 0d9aca8
Showing 1 changed file with 87 additions and 32 deletions.
119 changes: 87 additions & 32 deletions cmd/backup-manager/app/util/backup_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ebs"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -195,14 +196,14 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, level string
return
}

// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks.
// calculateSnapshotSize calculate size of a snapshot in bytes by listing its blocks.
func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64

start := time.Now()

klog.Infof("start to calculate snapshot size for %s, volume id %s",
klog.Infof("start to calculate size for snapshot %s, volume %s",
snapshotId, volumeId)

ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
Expand All @@ -213,24 +214,51 @@ func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error)

var nextToken *string
for {
resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 60,
Factor: 1.0,
Jitter: 0.1,
}
if resp.BlockSize != nil {
snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize)

isAllListed := false

listBlocks := func() error {
resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
if resp.BlockSize != nil {
snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize)
}

// check if there is more to retrieve
if resp.NextToken == nil {
isAllListed = true
}
nextToken = resp.NextToken

return nil
}

// check if there is more to retrieve
if resp.NextToken == nil {
isRetry := func(err error) bool {
return request.IsErrorThrottle(err)
}
err = retry.OnError(backoff, isRetry, listBlocks)

if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListSnapshotBlocks() failed against snapshot id %s, volume id %s", snapshotId, volumeId)
}

if isAllListed {
break
}
nextToken = resp.NextToken
}

elapsed := time.Since(start)
Expand Down Expand Up @@ -261,30 +289,57 @@ func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uin
var nextToken *string

for {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 60,
Factor: 1.0,
Jitter: 0.1,
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
isAllChangeListed := false

listChangeBlocks := func() error {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}

// check if there is more to retrieve
if resp.NextToken == nil {
isAllChangeListed = true
}
nextToken = resp.NextToken

return nil
}

// check if there is more to retrieve
if resp.NextToken == nil {
isRetry := func(err error) bool {
return request.IsErrorThrottle(err)
}
err = retry.OnError(backoff, isRetry, listChangeBlocks)

if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListChangedBlocks() failed against volume id %s, preSnapshot id %s, snapshot id %s", volumeId, preSnapshotId, snapshotId)
}

if isAllChangeListed {
break
}
nextToken = resp.NextToken
}

elapsed := time.Since(start)
Expand Down

0 comments on commit 0d9aca8

Please sign in to comment.