Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: add 2 operator related tags to restored ebs volume #5104

Merged
merged 8 commits into from
Jul 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/backup-manager/app/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (bo *Options) deleteVolumeSnapshots(meta *bkutil.EBSBasedBRMeta) error {
return nil
}

// cleanBRRemoteBackupData clean the backup data from remote
// CleanBRRemoteBackupData clean the backup data from remote
func (bo *Options) CleanBRRemoteBackupData(ctx context.Context, backup *v1alpha1.Backup) error {
opt := backup.GetCleanOption()

Expand Down
44 changes: 44 additions & 0 deletions pkg/backup/restore/restore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,50 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error {
if !tc.AllTiKVsAreAvailable() {
return controller.RequeueErrorf("restore %s/%s: waiting for all TiKVs are available in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name)
} else {
sel, err := label.New().Instance(tc.Name).TiKV().Selector()
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: "BuildTiKVSelectorFailed",
Message: err.Error(),
}, nil)
return err
}

pvs, err := rm.deps.PVLister.List(sel)
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: "ListPVsFailed",
Message: err.Error(),
}, nil)
return err
}

s, reason, err := snapshotter.NewSnapshotterForRestore(restore.Spec.Mode, rm.deps)
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: reason,
Message: err.Error(),
}, nil)
return err
}

err = s.AddVolumeTags(pvs)
if err != nil {
rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreRetryFailed,
Status: corev1.ConditionTrue,
Reason: "AddVolumeTagFailed",
Message: err.Error(),
}, nil)
return err
}

return rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreTiKVComplete,
Status: corev1.ConditionTrue,
Expand Down
3 changes: 3 additions & 0 deletions pkg/backup/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type Snapshotter interface {

// ResetPvAvailableZone resets az of pv if the volumes restore to another az
ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume)

// AddVolumeTags add operator related tags to volumes
AddVolumeTags(pvs []*corev1.PersistentVolume) error
}

type BaseSnapshotter struct {
Expand Down
40 changes: 37 additions & 3 deletions pkg/backup/snapshotter/snapshotter_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@ import (
"fmt"
"regexp"

"github.com/pingcap/tidb-operator/pkg/apis/label"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/constants"
"github.com/pingcap/tidb-operator/pkg/backup/util"
"github.com/pingcap/tidb-operator/pkg/controller"
corev1 "k8s.io/api/core/v1"
)

// The snapshotter for creating snapshots from volumes (during a backup)
const (
CloudAPIConcurrency = 8
PVCTagKey = "CSIVolumeName"
PodTagKey = "kubernetes.io/created-for/pvc/name"
)

// AWSSnapshotter is the snapshotter for creating snapshots from volumes (during a backup)
// and volumes from snapshots (during a restore) on AWS EBS.
type AWSSnapshotter struct {
BaseSnapshotter
}

func (s *AWSSnapshotter) Init(deps *controller.Dependencies, conf map[string]string) error {
s.BaseSnapshotter.Init(deps, conf)
err := s.BaseSnapshotter.Init(deps, conf)
s.volRegexp = regexp.MustCompile("vol-.*")
return nil
return err
}

func (s *AWSSnapshotter) GetVolumeID(pv *corev1.PersistentVolume) (string, error) {
Expand Down Expand Up @@ -90,6 +98,32 @@ func (s *AWSSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS
return s.BaseSnapshotter.prepareRestoreMetadata(r, csb, s)
}

func (s *AWSSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
resourcesTags := make(map[string]util.TagMap)

for _, pv := range pvs {
podName := pv.GetAnnotations()[label.AnnPodNameKey]
pvcName := pv.GetName()
volId := pv.Spec.CSI.VolumeHandle

tags := make(map[string]string)
tags[PVCTagKey] = pvcName
tags[PodTagKey] = podName

resourcesTags[volId] = tags
}
ec2Session, err := util.NewEC2Session(CloudAPIConcurrency)
if err != nil {
return err
}
if err = ec2Session.AddTags(resourcesTags); err != nil {
return err
}

return nil

}

func (s *AWSSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {
if r.Spec.VolumeAZ == "" {
return
Expand Down
9 changes: 7 additions & 2 deletions pkg/backup/snapshotter/snapshotter_gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ type GCPSnapshotter struct {
}

func (s *GCPSnapshotter) Init(deps *controller.Dependencies, conf map[string]string) error {
s.BaseSnapshotter.Init(deps, conf)
err := s.BaseSnapshotter.Init(deps, conf)
s.volRegexp = regexp.MustCompile(`^projects\/[^\/]+\/(zones|regions)\/[^\/]+\/disks\/[^\/]+$`)
return nil
return err
}

func (s *GCPSnapshotter) GetVolumeID(pv *corev1.PersistentVolume) (string, error) {
Expand Down Expand Up @@ -102,3 +102,8 @@ func (s *GCPSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudS
func (s *GCPSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {
// TODO implement it if support to restore snapshots to another az on GCP
}

func (s *GCPSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
// TODO implement it if support to restore snapshots to another az on GCP
return nil
}
5 changes: 5 additions & 0 deletions pkg/backup/snapshotter/snapshotter_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ func (s *NoneSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *Cloud
}

func (s *NoneSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {}

func (s *NoneSnapshotter) AddVolumeTags(pvs []*corev1.PersistentVolume) error {
// TODO implement it if support to restore snapshots to another az on GCP
return nil
}
37 changes: 36 additions & 1 deletion pkg/backup/util/aws_ebs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type EC2Session struct {
concurrency uint
}

type VolumeAZs map[string]string
type TagMap map[string]string

func NewEC2Session(concurrency uint) (*EC2Session, error) {
// aws-sdk has builtin exponential backoff retry mechanism, see:
Expand Down Expand Up @@ -151,6 +151,41 @@ func (e *EC2Session) DeleteSnapshots(snapIDMap map[string]string) error {
return nil
}

func (e *EC2Session) AddTags(resourcesTags map[string]TagMap) error {

eg := new(errgroup.Group)
for resourceID := range resourcesTags {
id := resourceID
tagMap := resourcesTags[resourceID]
var tags []*ec2.Tag
for tag := range tagMap {
value := tagMap[tag]
tags = append(tags, &ec2.Tag{Key: &tag, Value: &value})
}

// Create the input for adding the tag
input := &ec2.CreateTagsInput{
Resources: []*string{aws.String(id)},
Tags: tags,
}

eg.Go(func() error {
_, err := e.EC2.CreateTags(input)
if err != nil {
klog.Errorf("failed to create tags for resource id=%s, %v", id, err)
return err
}
return nil
})
}

if err := eg.Wait(); err != nil {
klog.Errorf("failed to create tags for all resources")
return err
}
return nil
}

type EBSSession struct {
EBS ebsiface.EBSAPI
// aws operation concurrency
Expand Down