Skip to content

Commit

Permalink
pkg: add 2 operator related tags to restored ebs volume (#5104) (#5139)
Browse files Browse the repository at this point in the history
Signed-off-by: BornChanger <[email protected]>
Co-authored-by: BornChanger <[email protected]>
  • Loading branch information
ti-chi-bot and BornChanger committed Jul 4, 2023
1 parent 3d17117 commit dc52f41
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 7 deletions.
2 changes: 1 addition & 1 deletion cmd/backup-manager/app/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (bo *Options) deleteVolumeSnapshots(meta *bkutil.EBSBasedBRMeta) error {
return nil
}

// cleanBRRemoteBackupData clean the backup data from remote
// CleanBRRemoteBackupData clean the backup data from remote
func (bo *Options) CleanBRRemoteBackupData(ctx context.Context, backup *v1alpha1.Backup) error {
opt := backup.GetCleanOption()

Expand Down
44 changes: 44 additions & 0 deletions pkg/backup/restore/restore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,50 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error {
if !tc.AllTiKVsAreAvailable() {
return controller.RequeueErrorf("restore %s/%s: waiting for all TiKVs are available in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name)
} else {
sel, err := label.New().Instance(tc.Name).TiKV().Selector()
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: "BuildTiKVSelectorFailed",
Message: err.Error(),
}, nil)
return err
}

pvs, err := rm.deps.PVLister.List(sel)
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: "ListPVsFailed",
Message: err.Error(),
}, nil)
return err
}

s, reason, err := snapshotter.NewSnapshotterForRestore(restore.Spec.Mode, rm.deps)
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: reason,
Message: err.Error(),
}, nil)
return err
}

err = s.AddVolumeTags(pvs)
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: "AddVolumeTagFailed",
Message: err.Error(),
}, nil)
return err
}

return rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreTiKVComplete,
Status: corev1.ConditionTrue,
Expand Down
3 changes: 3 additions & 0 deletions pkg/backup/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type Snapshotter interface {

// ResetPvAvailableZone resets az of pv if the volumes restore to another az
ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume)

// AddVolumeTags add operator related tags to volumes
AddVolumeTags(pvs []*corev1.PersistentVolume) error
}

type BaseSnapshotter struct {
Expand Down
40 changes: 37 additions & 3 deletions pkg/backup/snapshotter/snapshotter_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@ import (
"fmt"
"regexp"

"github.com/pingcap/tidb-operator/pkg/apis/label"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/constants"
"github.com/pingcap/tidb-operator/pkg/backup/util"
"github.com/pingcap/tidb-operator/pkg/controller"
corev1 "k8s.io/api/core/v1"
)

// The snapshotter for creating snapshots from volumes (during a backup)
const (
CloudAPIConcurrency = 8
PVCTagKey = "CSIVolumeName"
PodTagKey = "kubernetes.io/created-for/pvc/name"
)

// AWSSnapshotter is the snapshotter for creating snapshots from volumes (during a backup)
// and volumes from snapshots (during a restore) on AWS EBS.
type AWSSnapshotter struct {
BaseSnapshotter
}

func (s *AWSSnapshotter) Init(deps *controller.Dependencies, conf map[string]string) error {
s.BaseSnapshotter.Init(deps, conf)
err := s.BaseSnapshotter.Init(deps, conf)
s.volRegexp = regexp.MustCompile("vol-.*")
return nil
return err
}

func (s *AWSSnapshotter) GetVolumeID(pv *corev1.PersistentVolume) (string, error) {
Expand Down Expand Up @@ -90,6 +98,32 @@ func (s *AWSSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS
return s.BaseSnapshotter.prepareRestoreMetadata(r, csb, s)
}

func (s *AWSSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
resourcesTags := make(map[string]util.TagMap)

for _, pv := range pvs {
podName := pv.GetAnnotations()[label.AnnPodNameKey]
pvcName := pv.GetName()
volId := pv.Spec.CSI.VolumeHandle

tags := make(map[string]string)
tags[PVCTagKey] = pvcName
tags[PodTagKey] = podName

resourcesTags[volId] = tags
}
ec2Session, err := util.NewEC2Session(CloudAPIConcurrency)
if err != nil {
return err
}
if err = ec2Session.AddTags(resourcesTags); err != nil {
return err
}

return nil

}

func (s *AWSSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {
if r.Spec.VolumeAZ == "" {
return
Expand Down
9 changes: 7 additions & 2 deletions pkg/backup/snapshotter/snapshotter_gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type GCPSnapshotter struct {
}

func (s *GCPSnapshotter) Init(deps *controller.Dependencies, conf map[string]string) error {
s.BaseSnapshotter.Init(deps, conf)
err := s.BaseSnapshotter.Init(deps, conf)
s.volRegexp = regexp.MustCompile(`^projects\/[^\/]+\/(zones|regions)\/[^\/]+\/disks\/[^\/]+$`)
return nil
return err
}

func (s *GCPSnapshotter) GetVolumeID(pv *corev1.PersistentVolume) (string, error) {
Expand Down Expand Up @@ -102,3 +102,8 @@ func (s *GCPSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS
func (s *GCPSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {
// TODO implement it if support to restore snapshots to another az on GCP
}

func (s *GCPSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
// TODO implement it if support to restore snapshots to another az on GCP
return nil
}
5 changes: 5 additions & 0 deletions pkg/backup/snapshotter/snapshotter_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ func (s *NoneSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *Cloud
}

func (s *NoneSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {}

func (s *NoneSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
// TODO implement it if support to restore snapshots to another az on GCP
return nil
}
37 changes: 36 additions & 1 deletion pkg/backup/util/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type EC2Session struct {
concurrency uint
}

type VolumeAZs map[string]string
type TagMap map[string]string

func NewEC2Session(concurrency uint) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
Expand Down Expand Up @@ -151,6 +151,41 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) error {
return nil
}

func (e *EC2Session) AddTags(resourcesTags map[string]TagMap) error {

eg := new(errgroup.Group)
for resourceID := range resourcesTags {
id := resourceID
tagMap := resourcesTags[resourceID]
var tags []*ec2.Tag
for tag := range tagMap {
value := tagMap[tag]
tags = append(tags, &ec2.Tag{Key: &tag, Value: &value})
}

// Create the input for adding the tag
input := &ec2.CreateTagsInput{
Resources: []*string{aws.String(id)},
Tags: tags,
}

eg.Go(func() error {
_, err := e.EC2.CreateTags(input)
if err != nil {
klog.Errorf("failed to create tags for resource id=%s, %v", id, err)
return err
}
return nil
})
}

if err := eg.Wait(); err != nil {
klog.Errorf("failed to create tags for all resources")
return err
}
return nil
}

type EBSSession struct {
EBS ebsiface.EBSAPI
// aws operation concurrency
Expand Down

0 comments on commit dc52f41

Please sign in to comment.