From 1bb611ffc34e81e4253e17949960ff10de821783 Mon Sep 17 00:00:00 2001 From: BornChanger Date: Wed, 28 Jun 2023 21:01:32 +0800 Subject: [PATCH] pkg: add 2 operator related tags to restored ebs volume Signed-off-by: BornChanger --- cmd/backup-manager/app/clean/clean.go | 2 +- pkg/backup/restore/restore_manager.go | 44 ++++++++++++++++++++++ pkg/backup/snapshotter/snapshotter.go | 3 ++ pkg/backup/snapshotter/snapshotter_aws.go | 39 ++++++++++++++++++- pkg/backup/snapshotter/snapshotter_gcp.go | 5 +++ pkg/backup/snapshotter/snapshotter_none.go | 5 +++ pkg/backup/util/aws_ebs.go | 36 +++++++++++++++++- 7 files changed, 131 insertions(+), 3 deletions(-) diff --git a/cmd/backup-manager/app/clean/clean.go b/cmd/backup-manager/app/clean/clean.go index 5402e8e943..7fb588c7d0 100644 --- a/cmd/backup-manager/app/clean/clean.go +++ b/cmd/backup-manager/app/clean/clean.go @@ -135,7 +135,7 @@ func (bo *Options) deleteVolumeSnapshots(meta *bkutil.EBSBasedBRMeta) error { return nil } -// cleanBRRemoteBackupData clean the backup data from remote +// CleanBRRemoteBackupData clean the backup data from remote func (bo *Options) CleanBRRemoteBackupData(ctx context.Context, backup *v1alpha1.Backup) error { opt := backup.GetCleanOption() diff --git a/pkg/backup/restore/restore_manager.go b/pkg/backup/restore/restore_manager.go index e43f0a17c8..90e3fdcc02 100644 --- a/pkg/backup/restore/restore_manager.go +++ b/pkg/backup/restore/restore_manager.go @@ -139,6 +139,50 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error { if !tc.AllTiKVsAreAvailable() { return controller.RequeueErrorf("restore %s/%s: waiting for all TiKVs are available in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name) } else { + sel, err := label.New().Instance(tc.Name).TiKV().Selector() + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "BuildTiKVSelectorFailed", + Message: err.Error(), + }, nil) + return err + } + + pvs, err := rm.deps.PVLister.List(sel) + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "ListTiKVPodsFailed", + Message: err.Error(), + }, nil) + return err + } + + s, reason, err := snapshotter.NewSnapshotterForRestore(restore.Spec.Mode, rm.deps) + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: reason, + Message: err.Error(), + }, nil) + return err + } + + err = s.AddVolumeTags(pvs) + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "AddVolumeTagFailed", + Message: err.Error(), + }, nil) + return err + } + return rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ Type: v1alpha1.RestoreTiKVComplete, Status: corev1.ConditionTrue, diff --git a/pkg/backup/snapshotter/snapshotter.go b/pkg/backup/snapshotter/snapshotter.go index 065aca3077..5ec67df43f 100644 --- a/pkg/backup/snapshotter/snapshotter.go +++ b/pkg/backup/snapshotter/snapshotter.go @@ -56,6 +56,9 @@ type Snapshotter interface { // ResetPvAvailableZone resets az of pv if the volumes restore to another az ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) + + // AddVolumeTags add operator related tags to volumes + AddVolumeTags(pvs []*corev1.PersistentVolume) error } type BaseSnapshotter struct { diff --git a/pkg/backup/snapshotter/snapshotter_aws.go b/pkg/backup/snapshotter/snapshotter_aws.go index 4394892eb8..4c8e877ead 100644 --- a/pkg/backup/snapshotter/snapshotter_aws.go +++ b/pkg/backup/snapshotter/snapshotter_aws.go @@ -16,6 +16,7 @@ package snapshotter import ( "errors" "fmt" + bkutil "github.com/pingcap/tidb-operator/pkg/backup/util" "regexp" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" @@ -24,7 +25,15 @@ import ( corev1 "k8s.io/api/core/v1" ) -// The snapshotter for creating snapshots from volumes (during a backup) +const ( + CloudAPIConcurrency = 8 + SourcePodNameKey = "tidb.pingcap.com/pod-name" + + PVCTagKey = "CSIVolumeName" + PodTagKey = "kubernetes.io/created-for/pvc/name" +) + +// AWSSnapshotter is the snapshotter for creating snapshots from volumes (during a backup) // and volumes from snapshots (during a restore) on AWS EBS. type AWSSnapshotter struct { BaseSnapshotter @@ -90,6 +99,34 @@ func (s *AWSSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS return s.BaseSnapshotter.prepareRestoreMetadata(r, csb, s) } +type TagMap map[string]string + +func (s *AWSSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error { + resourcesTags := make(map[string]bkutil.TagMap) + + for _, pv := range pvs { + podName := pv.GetAnnotations()[SourcePodNameKey] + pvcName := pv.GetName() + volId := pv.Spec.CSI.VolumeHandle + + tags := make(map[string]string) + tags[PVCTagKey] = pvcName + tags[PodTagKey] = podName + + resourcesTags[volId] = tags + } + ec2Session, err := bkutil.NewEC2Session(CloudAPIConcurrency) + if err != nil { + return err + } + if err = ec2Session.AddTags(resourcesTags); err != nil { + return err + } + + return nil + +} + func (s *AWSSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) { if r.Spec.VolumeAZ == "" { return diff --git a/pkg/backup/snapshotter/snapshotter_gcp.go b/pkg/backup/snapshotter/snapshotter_gcp.go index 8e68afb855..b55ac8e45b 100644 --- a/pkg/backup/snapshotter/snapshotter_gcp.go +++ b/pkg/backup/snapshotter/snapshotter_gcp.go @@ -102,3 +102,8 @@ func (s *GCPSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS func (s *GCPSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) { // TODO implement it if support to restore snapshots to another az on GCP } + +func (s *GCPSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error { + // TODO implement it if support to restore snapshots to another az on GCP + return nil +} diff --git a/pkg/backup/snapshotter/snapshotter_none.go b/pkg/backup/snapshotter/snapshotter_none.go index 15ff9e1394..28e55d7b3c 100644 --- a/pkg/backup/snapshotter/snapshotter_none.go +++ b/pkg/backup/snapshotter/snapshotter_none.go @@ -43,3 +43,8 @@ func (s *NoneSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *Cloud } func (s *NoneSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {} + +func (s *NoneSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error { + // TODO implement it if support to restore snapshots to another az on GCP + return nil +} diff --git a/pkg/backup/util/aws_ebs.go b/pkg/backup/util/aws_ebs.go index 8a11c8fe45..d29c4e9a30 100644 --- a/pkg/backup/util/aws_ebs.go +++ b/pkg/backup/util/aws_ebs.go @@ -105,7 +105,7 @@ type EC2Session struct { concurrency uint } -type VolumeAZs map[string]string +type TagMap map[string]string func NewEC2Session(concurrency uint) (*EC2Session, error) { // aws-sdk has builtin exponential backoff retry mechanism, see: @@ -151,6 +151,40 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) error { return nil } +func (e *EC2Session) AddTags(resourcesTags map[string]TagMap) error { + + eg := new(errgroup.Group) + for resourceID := range resourcesTags { + tagMap := resourcesTags[resourceID] + var tags []*ec2.Tag + for tag := range tagMap { + value := tagMap[tag] + tags = append(tags, &ec2.Tag{Key: &tag, Value: &value}) + + // Create the input for adding the tag + input := &ec2.CreateTagsInput{ + Resources: []*string{aws.String(resourceID)}, + Tags: tags, + } + + eg.Go(func() error { + _, err := e.EC2.CreateTags(input) + if err != nil { + klog.Errorf("failed to create tags for resource id=%s", resourceID, err) + return err + } + return nil + }) + } + } + + if err := eg.Wait(); err != nil { + klog.Errorf("failed to create tags for all resources") + return err + } + return nil +} + type EBSSession struct { EBS ebsiface.EBSAPI // aws operation concurrency