From d5ff0ad8f1fd1a55286b2e439481a67f94b7a356 Mon Sep 17 00:00:00 2001 From: BornChanger <97348524+BornChanger@users.noreply.github.com> Date: Sat, 1 Jul 2023 10:08:12 +0800 Subject: [PATCH] pkg: add 2 operator related tags to restored ebs volume (#5104) --- cmd/backup-manager/app/clean/clean.go | 2 +- pkg/backup/restore/restore_manager.go | 44 ++++++++++++++++++++++ pkg/backup/snapshotter/snapshotter.go | 3 ++ pkg/backup/snapshotter/snapshotter_aws.go | 40 ++++++++++++++++++-- pkg/backup/snapshotter/snapshotter_gcp.go | 9 ++++- pkg/backup/snapshotter/snapshotter_none.go | 5 +++ pkg/backup/util/aws_ebs.go | 37 +++++++++++++++++- 7 files changed, 133 insertions(+), 7 deletions(-) diff --git a/cmd/backup-manager/app/clean/clean.go b/cmd/backup-manager/app/clean/clean.go index 5402e8e943..7fb588c7d0 100644 --- a/cmd/backup-manager/app/clean/clean.go +++ b/cmd/backup-manager/app/clean/clean.go @@ -135,7 +135,7 @@ func (bo *Options) deleteVolumeSnapshots(meta *bkutil.EBSBasedBRMeta) error { return nil } -// cleanBRRemoteBackupData clean the backup data from remote +// CleanBRRemoteBackupData clean the backup data from remote func (bo *Options) CleanBRRemoteBackupData(ctx context.Context, backup *v1alpha1.Backup) error { opt := backup.GetCleanOption() diff --git a/pkg/backup/restore/restore_manager.go b/pkg/backup/restore/restore_manager.go index e43f0a17c8..e9e3ef5158 100644 --- a/pkg/backup/restore/restore_manager.go +++ b/pkg/backup/restore/restore_manager.go @@ -139,6 +139,50 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error { if !tc.AllTiKVsAreAvailable() { return controller.RequeueErrorf("restore %s/%s: waiting for all TiKVs are available in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name) } else { + sel, err := label.New().Instance(tc.Name).TiKV().Selector() + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "BuildTiKVSelectorFailed", + Message: err.Error(), + }, nil) + return err + } + + pvs, err := rm.deps.PVLister.List(sel) + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "ListPVsFailed", + Message: err.Error(), + }, nil) + return err + } + + s, reason, err := snapshotter.NewSnapshotterForRestore(restore.Spec.Mode, rm.deps) + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: reason, + Message: err.Error(), + }, nil) + return err + } + + err = s.AddVolumeTags(pvs) + if err != nil { + rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "AddVolumeTagFailed", + Message: err.Error(), + }, nil) + return err + } + return rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ Type: v1alpha1.RestoreTiKVComplete, Status: corev1.ConditionTrue, diff --git a/pkg/backup/snapshotter/snapshotter.go b/pkg/backup/snapshotter/snapshotter.go index 065aca3077..5ec67df43f 100644 --- a/pkg/backup/snapshotter/snapshotter.go +++ b/pkg/backup/snapshotter/snapshotter.go @@ -56,6 +56,9 @@ type Snapshotter interface { // ResetPvAvailableZone resets az of pv if the volumes restore to another az ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) + + // AddVolumeTags add operator related tags to volumes + AddVolumeTags(pvs []*corev1.PersistentVolume) error } type BaseSnapshotter struct { diff --git a/pkg/backup/snapshotter/snapshotter_aws.go b/pkg/backup/snapshotter/snapshotter_aws.go index 4394892eb8..5877e3dc45 100644 --- a/pkg/backup/snapshotter/snapshotter_aws.go +++ b/pkg/backup/snapshotter/snapshotter_aws.go @@ -18,22 +18,30 @@ import ( "fmt" "regexp" + "github.com/pingcap/tidb-operator/pkg/apis/label" "github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1" "github.com/pingcap/tidb-operator/pkg/backup/constants" + "github.com/pingcap/tidb-operator/pkg/backup/util" "github.com/pingcap/tidb-operator/pkg/controller" corev1 "k8s.io/api/core/v1" ) -// The snapshotter for creating snapshots from volumes (during a backup) +const ( + CloudAPIConcurrency = 8 + PVCTagKey = "CSIVolumeName" + PodTagKey = "kubernetes.io/created-for/pvc/name" +) + +// AWSSnapshotter is the snapshotter for creating snapshots from volumes (during a backup) // and volumes from snapshots (during a restore) on AWS EBS. type AWSSnapshotter struct { BaseSnapshotter } func (s *AWSSnapshotter) Init(deps *controller.Dependencies, conf map[string]string) error { - s.BaseSnapshotter.Init(deps, conf) + err := s.BaseSnapshotter.Init(deps, conf) s.volRegexp = regexp.MustCompile("vol-.*") - return nil + return err } func (s *AWSSnapshotter) GetVolumeID(pv *corev1.PersistentVolume) (string, error) { @@ -90,6 +98,32 @@ func (s *AWSSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS return s.BaseSnapshotter.prepareRestoreMetadata(r, csb, s) } +func (s *AWSSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error { + resourcesTags := make(map[string]util.TagMap) + + for _, pv := range pvs { + podName := pv.GetAnnotations()[label.AnnPodNameKey] + pvcName := pv.GetName() + volId := pv.Spec.CSI.VolumeHandle + + tags := make(map[string]string) + tags[PVCTagKey] = pvcName + tags[PodTagKey] = podName + + resourcesTags[volId] = tags + } + ec2Session, err := util.NewEC2Session(CloudAPIConcurrency) + if err != nil { + return err + } + if err = ec2Session.AddTags(resourcesTags); err != nil { + return err + } + + return nil + +} + func (s *AWSSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) { if r.Spec.VolumeAZ == "" { return diff --git a/pkg/backup/snapshotter/snapshotter_gcp.go b/pkg/backup/snapshotter/snapshotter_gcp.go index 8e68afb855..79049aea24 100644 --- a/pkg/backup/snapshotter/snapshotter_gcp.go +++ b/pkg/backup/snapshotter/snapshotter_gcp.go @@ -32,9 +32,9 @@ type GCPSnapshotter struct { } func (s *GCPSnapshotter) Init(deps *controller.Dependencies, conf map[string]string) error { - s.BaseSnapshotter.Init(deps, conf) + err := s.BaseSnapshotter.Init(deps, conf) s.volRegexp = regexp.MustCompile(`^projects\/[^\/]+\/(zones|regions)\/[^\/]+\/disks\/[^\/]+$`) - return nil + return err } func (s *GCPSnapshotter) GetVolumeID(pv *corev1.PersistentVolume) (string, error) { @@ -102,3 +102,8 @@ func (s *GCPSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS func (s *GCPSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) { // TODO implement it if support to restore snapshots to another az on GCP } + +func (s *GCPSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error { + // TODO implement it if support to restore snapshots to another az on GCP + return nil +} diff --git a/pkg/backup/snapshotter/snapshotter_none.go b/pkg/backup/snapshotter/snapshotter_none.go index 15ff9e1394..28e55d7b3c 100644 --- a/pkg/backup/snapshotter/snapshotter_none.go +++ b/pkg/backup/snapshotter/snapshotter_none.go @@ -43,3 +43,8 @@ func (s *NoneSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *Cloud } func (s *NoneSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {} + +func (s *NoneSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error { + // TODO implement it if support to restore snapshots to another az on GCP + return nil +} diff --git a/pkg/backup/util/aws_ebs.go b/pkg/backup/util/aws_ebs.go index 8a11c8fe45..291caeb061 100644 --- a/pkg/backup/util/aws_ebs.go +++ b/pkg/backup/util/aws_ebs.go @@ -105,7 +105,7 @@ type EC2Session struct { concurrency uint } -type VolumeAZs map[string]string +type TagMap map[string]string func NewEC2Session(concurrency uint) (*EC2Session, error) { // aws-sdk has builtin exponential backoff retry mechanism, see: @@ -151,6 +151,41 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) error { return nil } +func (e *EC2Session) AddTags(resourcesTags map[string]TagMap) error { + + eg := new(errgroup.Group) + for resourceID := range resourcesTags { + id := resourceID + tagMap := resourcesTags[resourceID] + var tags []*ec2.Tag + for tag := range tagMap { + value := tagMap[tag] + tags = append(tags, &ec2.Tag{Key: &tag, Value: &value}) + } + + // Create the input for adding the tag + input := &ec2.CreateTagsInput{ + Resources: []*string{aws.String(id)}, + Tags: tags, + } + + eg.Go(func() error { + _, err := e.EC2.CreateTags(input) + if err != nil { + klog.Errorf("failed to create tags for resource id=%s, %v", id, err) + return err + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + klog.Errorf("failed to create tags for all resources") + return err + } + return nil +} + type EBSSession struct { EBS ebsiface.EBSAPI // aws operation concurrency