Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Combined prs #5208

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ spec:
{{- if .Values.brFederationManager.kubeClientBurst }}
- -kube-client-burst={{ .Values.brFederationManager.kubeClientBurst }}
{{- end }}
{{- if .Values.brFederationManager.federationKubeconfigSecretKey }}
- -federation-kubeconfig-path=/etc/br-federation/federation-kubeconfig/{{ .Values.brFederationManager.federationKubeconfigSecretKey }}
{{- end }}
env:
- name: NAMESPACE
valueFrom:
Expand Down
5 changes: 4 additions & 1 deletion charts/br-federation/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ brFederationManager:
serviceAccount: br-federation-manager

# Secret name of the kubeconfig for the federation Kubernetes clusters
# The data item key is the cluster name, and the data item value is the base64 encoded kubeconfig
federationKubeconfigSecret: br-federation-kubeconfig
# which data item is the kubeconfig file, and the data item value is the base64 encoded kubeconfig
# if you have multiple kubernetes clusters, you should merge them in one kubeconfig
# we use the context name in the kubeconfig as the k8sClusterName in volume backup/restore CR
federationKubeconfigSecretKey: kubeconfig

logLevel: 2
replicas: 1
Expand Down
19 changes: 11 additions & 8 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,21 +319,24 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
defer func() {
// Calculate the backup size for ebs backup job even if it fails
if bm.Mode == string(v1alpha1.BackupModeVolumeSnapshot) && !bm.Initialize {
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)
fullBackupSize, incrementalBackupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider, backup.Spec.CalcSizeLevel)
if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
klog.Errorf("Failed to calc volume snapshot backup, err: %v", err)
return
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

backupSizeReadable := humanize.Bytes(uint64(fullBackupSize))
incrementalBackupSizeReadable := humanize.Bytes(uint64(incrementalBackupSize))
updateStatus := &controller.BackupUpdateStatus{
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
BackupSize: &fullBackupSize,
BackupSizeReadable: &backupSizeReadable,
IncrementalBackupSize: &incrementalBackupSize,
IncrementalBackupSizeReadable: &incrementalBackupSizeReadable,
}

bm.StatusUpdater.Update(backup, nil,
updateStatus)
if err := bm.StatusUpdater.Update(backup, nil, updateStatus); err != nil {
klog.Errorf("update backup size to status error: %v", err)
}
}
}()

Expand Down
7 changes: 4 additions & 3 deletions cmd/backup-manager/app/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ func (bo *Options) deleteSnapshotsAndBackupMeta(ctx context.Context, backup *v1a
}()

contents, err := os.ReadFile(metaFile)

if errors.Is(err, os.ErrNotExist) {
klog.Warningf("read metadata file %s failed, err: %s, a mannual check or delete aciton require.", metaFile, err)
return nil
klog.Errorf("read metadata file %s failed, err: %s, a manual check or delete action required.", metaFile, err)
return err
} else if err != nil { // will retry it
klog.Warningf("read metadata file %s failed, err: %s", metaFile, err)
klog.Errorf("read metadata file %s failed, err: %s", metaFile, err)
return err
}

Expand Down
192 changes: 167 additions & 25 deletions cmd/backup-manager/app/util/backup_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/dustin/go-humanize"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ebs"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand All @@ -42,33 +46,39 @@ import (
// interface CalcVolSnapBackupSize called by backup and backup clean.

const (
// This value can be between 100 and 1,0000, and charge ~0.6$/1 million request
// DescribeSnapMaxReturnResult can be between 5 and 1,000; if MaxResults is given a value larger than 1,000, only 1,000 results are returned.
DescribeSnapMaxReturnResult = 1000
// ListSnapMaxReturnResult can be between 100 and 10,000, and charge ~0.6$/1 million request
ListSnapMaxReturnResult = 10000
// This value can be between 1 and 50 due to aws service quota
// EbsApiConcurrency This value can be between 1 and 50 due to aws service quota
EbsApiConcurrency = 40

CalculateFullSize = "full"
CalculateIncremental = "incremental"
CalculateAll = "all"
)

// CalcVolSnapBackupSize get snapshots from backup meta and then calc the backup size of snapshots.
func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvider) (int64, error) {
func CalcVolSnapBackupSize(ctx context.Context, provider v1alpha1.StorageProvider, level string) (fullBackupSize int64, incrementalBackupSize int64, err error) {
start := time.Now()
// retrieves all snapshots from backup meta file
volSnapshots, err := getSnapshotsFromBackupmeta(ctx, provider)
if err != nil {
return 0, err
return 0, 0, err
}

if err != nil {
return 0, err
return 0, 0, err
}

backupSize, err := calcBackupSize(ctx, volSnapshots)
fullBackupSize, incrementalBackupSize, err = calcBackupSize(ctx, volSnapshots, level)

if err != nil {
return 0, err
return 0, 0, err
}
elapsed := time.Since(start)
klog.Infof("calculate volume-snapshot backup size takes %v", elapsed)
return int64(backupSize), nil
return
}

// getSnapshotsFromBackupmeta read all snapshots from backupmeta
Expand Down Expand Up @@ -131,39 +141,63 @@ func getSnapshotsFromBackupmeta(ctx context.Context, provider v1alpha1.StoragePr
}

// calcBackupSize get a volume-snapshots backup size
func calcBackupSize(ctx context.Context, volumes map[string]string) (uint64, error) {
var backupSize uint64
var apiReqCount uint64
func calcBackupSize(ctx context.Context, volumes map[string]string, level string) (fullBackupSize int64, incrementalBackupSize int64, err error) {
var apiReqCount, incrementalApiReqCount uint64

workerPool := util.NewWorkerPool(EbsApiConcurrency, "list snapshot size")
eg, _ := errgroup.WithContext(ctx)

for _, id := range volumes {
snapshotId := id
for vid, sid := range volumes {
snapshotId := sid
volumeId := vid
// sort snapshots by timestamp
workerPool.ApplyOnErrorGroup(eg, func() error {
snapSize, apiReq, err := calculateSnapshotSize(snapshotId)
if err != nil {
return err
var snapSize uint64
if level == CalculateAll || level == CalculateFullSize {
snapSize, apiReq, err := calculateSnapshotSize(volumeId, snapshotId)
if err != nil {
return err
}
atomic.AddInt64(&fullBackupSize, int64(snapSize))
atomic.AddUint64(&apiReqCount, apiReq)
}

if level == CalculateAll || level == CalculateIncremental {
volSnapshots, err := getVolSnapshots(volumeId)
if err != nil {
return err
}
prevSnapshotId, existed := getPrevSnapshotId(snapshotId, volSnapshots)
if !existed {
// if there is no previous snapshot, means it's the first snapshot, uses its full size as incremental size
atomic.AddInt64(&incrementalBackupSize, int64(snapSize))
return nil
}
klog.Infof("get previous snapshot %s of snapshot %s, volume %s", prevSnapshotId, snapshotId, volumeId)
incrementalSnapSize, incrementalApiReq, err := calculateChangedBlocksSize(volumeId, prevSnapshotId, snapshotId)
if err != nil {
return err
}
atomic.AddInt64(&incrementalBackupSize, int64(incrementalSnapSize))
atomic.AddUint64(&incrementalApiReqCount, incrementalApiReq)
}
atomic.AddUint64(&backupSize, snapSize)
atomic.AddUint64(&apiReqCount, apiReq)
return nil
})
}

if err := eg.Wait(); err != nil {
klog.Errorf("failed to get snapshots size %d, number of api request %d", backupSize, apiReqCount)
return 0, err
klog.Errorf("failed to get snapshots size %d, number of api request %d", fullBackupSize, apiReqCount)
return 0, 0, err
}

// currently, we do not count api request fees, since it is very few of cost, however, we print it in log in case "very few" is not correct
klog.Infof("backup size %d bytes, number of api request %d", backupSize, apiReqCount)
return backupSize, nil
klog.Infof("backup size %d bytes, number of api request %d, incremental backup size %d bytes, numbers of incremental size's api request %d",
fullBackupSize, apiReqCount, incrementalBackupSize, incrementalApiReqCount)
return
}

// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks.
func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {
func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
Expand All @@ -174,7 +208,6 @@ func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {

var nextToken *string
for {

resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
Expand All @@ -194,6 +227,115 @@ func calculateSnapshotSize(snapshotId string) (uint64, uint64, error) {
}
nextToken = resp.NextToken
}
klog.Infof("full backup snapshot size %d bytes, num of ListSnapshotBlocks request %d", snapshotSize, numApiReq)
klog.Infof("full backup snapshot size %d bytes, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s", humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId)
return snapshotSize, numApiReq, nil
}

// calculateChangedBlocksSize calculates changed blocks total size in bytes between two snapshots with common ancestry.
func calculateChangedBlocksSize(volumeId, preSnapshotId string, snapshotId string) (uint64, uint64, error) {
var numBlocks int
var snapshotSize uint64
var numApiReq uint64

klog.Infof("the calc snapshot size for %s, base on prev snapshot %s, volume id %s", snapshotId, preSnapshotId, volumeId)
ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ebs session failure.")
return 0, numApiReq, err
}

var nextToken *string

for {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}

// check if there is more to retrieve
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
klog.Infof("the total size of snapshot %d, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s", humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId)
return snapshotSize, numApiReq, nil
}

// getBackupVolSnapshots get a volume-snapshots map contains map[volumeId]{snapshot1, snapshot2, snapshot3}
func getVolSnapshots(volumeId string) ([]*ec2.Snapshot, error) {
// read all snapshots from aws
ec2Session, err := util.NewEC2Session(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ec2 session failure.")
return nil, err
}

filters := []*ec2.Filter{{Name: aws.String("volume-id"), Values: []*string{aws.String(volumeId)}}}
// describe snapshot is heavy operator, try to call only once
// api has limit with max 1000 snapshots
// search with filter volume id the backupmeta contains
var nextToken *string
var snapshots []*ec2.Snapshot
for {
resp, err := ec2Session.EC2.DescribeSnapshots(&ec2.DescribeSnapshotsInput{
OwnerIds: aws.StringSlice([]string{"self"}),
MaxResults: aws.Int64(DescribeSnapMaxReturnResult),
Filters: filters,
NextToken: nextToken,
})

if err != nil {
return nil, err
}

for _, s := range resp.Snapshots {
if *s.State == ec2.SnapshotStateCompleted {
klog.Infof("get the snapshot %s created for volume %s", *s.SnapshotId, *s.VolumeId)
snapshots = append(snapshots, s)
} else { // skip ongoing snapshots
klog.Infof("the snapshot %s is creating... skip it, volume %s", *s.SnapshotId, *s.VolumeId)
continue
}
}

// check if there's more to retrieve
if resp.NextToken == nil {
break
}
klog.Infof("the total number of snapshot is %d", len(resp.Snapshots))
nextToken = resp.NextToken
}

sort.Slice(snapshots, func(i, j int) bool {
return snapshots[i].StartTime.Before(*snapshots[j].StartTime)
})
return snapshots, nil
}

func getPrevSnapshotId(snapshotId string, sortedVolSnapshots []*ec2.Snapshot) (string, bool) {
for i, snapshot := range sortedVolSnapshots {
if snapshotId == *snapshot.SnapshotId {
if i == 0 {
return "", false
} else {
return *sortedVolSnapshots[i-1].SnapshotId, true
}
}
}
return "", false
}
15 changes: 6 additions & 9 deletions cmd/br-federation-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"path/filepath"
"reflect"
"syscall"

Expand Down Expand Up @@ -219,18 +218,16 @@ func main() {
}

func initFederationKubeClients(cliCfg *controller.BrFedCLIConfig) (map[string]fedversioned.Interface, error) {
files, err := os.ReadDir(cliCfg.FederationKubeConfigPath)
kubeConfig, err := clientcmd.LoadFromFile(cliCfg.FederationKubeConfigPath)
if err != nil {
return nil, err
}

clients := make(map[string]fedversioned.Interface)
for _, f := range files {
if f.IsDir() || f.Name() == "..data" {
continue
}

cfg, err := clientcmd.BuildConfigFromFlags("", filepath.Join(cliCfg.FederationKubeConfigPath, f.Name()))
for contextName := range kubeConfig.Contexts {
cfg, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: cliCfg.FederationKubeConfigPath},
&clientcmd.ConfigOverrides{CurrentContext: contextName}).ClientConfig()
if err != nil {
return nil, err // return error if any kube client init failed
}
Expand All @@ -243,7 +240,7 @@ func initFederationKubeClients(cliCfg *controller.BrFedCLIConfig) (map[string]fe
if err != nil {
return nil, err
}
clients[f.Name()] = cli
clients[contextName] = cli
}

return clients, nil
Expand Down
Loading
Loading