Skip to content

Commit

Permalink
br: refactor restore controller to support across k8s (pingcap#5010) (p…
Browse files Browse the repository at this point in the history
…ingcap#5132)

Signed-off-by: WangLe1321 <[email protected]>
Co-authored-by: WangLe1321 <[email protected]>
  • Loading branch information
ti-chi-bot and WangLe1321 authored Jul 4, 2023
1 parent 19ff032 commit e7cd31a
Show file tree
Hide file tree
Showing 24 changed files with 152 additions and 24 deletions.
1 change: 1 addition & 0 deletions cmd/backup-manager/app/cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewRestoreCommand() *cobra.Command {
cmd.Flags().StringVar(&ro.Mode, "mode", string(v1alpha1.RestoreModeSnapshot), "restore mode, which is pitr or snapshot(default)")
cmd.Flags().StringVar(&ro.PitrRestoredTs, "pitrRestoredTs", "0", "The pitr restored ts")
cmd.Flags().BoolVar(&ro.Prepare, "prepare", false, "Whether to prepare for restore")
cmd.Flags().StringVar(&ro.TargetAZ, "target-az", "", "For volume-snapshot restore, which az the volume snapshots restore to")
return cmd
}

Expand Down
3 changes: 3 additions & 0 deletions cmd/backup-manager/app/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Options struct {
backupUtil.GenericOptions
// Prepare to restore data. It's used in volume-snapshot mode.
Prepare bool
// TargetAZ indicates which az the volume snapshots restore to. It's used in volume-snapshot mode.
TargetAZ string
}

func (ro *Options) restoreData(
Expand Down Expand Up @@ -103,6 +105,7 @@ func (ro *Options) restoreData(
args = append(args, "--prepare")
csbPath = path.Join(util.BRBinPath, "csb_restore.json")
args = append(args, fmt.Sprintf("--output-file=%s", csbPath))
args = append(args, fmt.Sprintf("--target-az=%s", ro.TargetAZ))
progressStep = "Volume Restore"
} else {
progressStep = "Data Restore"
Expand Down
4 changes: 4 additions & 0 deletions cmd/backup-manager/app/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func validCmdFlagFunc(flag *pflag.Flag) {
if len(flag.Value.String()) > 0 {
return
}
// optional flag
if flag.Name == "target-az" {
return
}

cmdutil.CheckErr(fmt.Errorf(cmdHelpMsg, flag.Name))
}
Expand Down
26 changes: 26 additions & 0 deletions docs/api-references/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,19 @@ FederalVolumeRestorePhase
</tr>
<tr>
<td>
<code>volumeAZ</code></br>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>VolumeAZ indicates which AZ the volume snapshots restore to.
it is only valid for mode of volume-snapshot</p>
</td>
</tr>
<tr>
<td>
<code>tikvGCLifeTime</code></br>
<em>
string
Expand Down Expand Up @@ -13707,6 +13720,19 @@ FederalVolumeRestorePhase
</tr>
<tr>
<td>
<code>volumeAZ</code></br>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>VolumeAZ indicates which AZ the volume snapshots restore to.
it is only valid for mode of volume-snapshot</p>
</td>
</tr>
<tr>
<td>
<code>tikvGCLifeTime</code></br>
<em>
string
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14303,6 +14303,8 @@ spec:
type: string
useKMS:
type: boolean
volumeAZ:
type: string
type: object
status:
properties:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd/v1/pingcap.com_restores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,8 @@ spec:
type: string
useKMS:
type: boolean
volumeAZ:
type: string
type: object
status:
properties:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd/v1beta1/pingcap.com_restores.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2080,6 +2080,8 @@ spec:
type: string
useKMS:
type: boolean
volumeAZ:
type: string
type: object
status:
properties:
Expand Down
2 changes: 2 additions & 0 deletions manifests/crd_v1beta1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14283,6 +14283,8 @@ spec:
type: string
useKMS:
type: boolean
volumeAZ:
type: string
type: object
status:
properties:
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/pingcap/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions pkg/apis/pingcap/v1alpha1/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,10 @@ func UpdateRestoreCondition(status *RestoreStatus, condition *RestoreCondition)
return false
}
condition.LastTransitionTime = metav1.Now()
status.Phase = condition.Type
// Try to find this Restore condition.
conditionIndex, oldCondition := GetRestoreCondition(status, condition.Type)

switch condition.Type {
case RestoreVolumeComplete, RestoreDataComplete:
// VolumeComplete and DataComplete are intermediately conditions,
// they can not represent the current phase of restore.
default:
status.Phase = condition.Type
}

if oldCondition == nil {
// We are adding new Restore condition.
status.Conditions = append(status.Conditions, *condition)
Expand Down Expand Up @@ -137,6 +130,12 @@ func IsRestoreVolumeComplete(restore *Restore) bool {
return condition != nil && condition.Status == corev1.ConditionTrue
}

// IsRestoreTiKVComplete returns true if all TiKVs run successfully during volume restore
func IsRestoreTiKVComplete(restore *Restore) bool {
_, condition := GetRestoreCondition(&restore.Status, RestoreTiKVComplete)
return condition != nil && condition.Status == corev1.ConditionTrue
}

// IsRestoreDataComplete returns true if a Restore for data consistency has successfully completed
func IsRestoreDataComplete(restore *Restore) bool {
_, condition := GetRestoreCondition(&restore.Status, RestoreDataComplete)
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2389,6 +2389,10 @@ type RestoreSpec struct {
// FederalVolumeRestorePhase indicates which phase to execute in federal volume restore
// +optional
FederalVolumeRestorePhase FederalVolumeRestorePhase `json:"federalVolumeRestorePhase,omitempty"`
// VolumeAZ indicates which AZ the volume snapshots restore to.
// it is only valid for mode of volume-snapshot
// +optional
VolumeAZ string `json:"volumeAZ,omitempty"`
// TikvGCLifeTime is to specify the safe gc life time for restore.
// The time limit during which data is retained for each GC, in the format of Go Duration.
// When a GC happens, the current time minus this value is the safe point.
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ func (bm *backupManager) teardownVolumeBackup(backup *v1alpha1.Backup) (err erro
}
// if job exists but isn't running, we can't ensure GC and PD schedules are stopped during volume backup
// the volume snapshots are invalid, we should set backup failed
if !jobCompleteOrFailed {
if jobCompleteOrFailed {
backupCondition = v1alpha1.BackupFailed
}
err = bm.statusUpdater.Update(backup, &v1alpha1.BackupCondition{
Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestBackupManagerBR(t *testing.T) {
helper.hasCondition(backup.Namespace, backup.Name, v1alpha1.BackupRetryTheFailed, "failed to fetch tidbcluster")

// create relate tc and try again should success and job created.
helper.CreateTC(backup.Spec.BR.ClusterNamespace, backup.Spec.BR.Cluster)
helper.CreateTC(backup.Spec.BR.ClusterNamespace, backup.Spec.BR.Cluster, false)
err = bm.syncBackupJob(backup)
g.Expect(err).Should(BeNil())
helper.hasCondition(backup.Namespace, backup.Name, v1alpha1.BackupScheduled, "")
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestClean(t *testing.T) {
_, err := deps.Clientset.PingcapV1alpha1().Backups(backup.Namespace).Create(context.TODO(), backup, metav1.CreateOptions{})
g.Expect(err).Should(BeNil())
helper.CreateSecret(backup)
helper.CreateTC(backup.Spec.BR.ClusterNamespace, backup.Spec.BR.Cluster)
helper.CreateTC(backup.Spec.BR.ClusterNamespace, backup.Spec.BR.Cluster, false)

statusUpdater := controller.NewRealBackupConditionUpdater(deps.Clientset, deps.BackupLister, deps.Recorder)
bc := NewBackupCleaner(deps, statusUpdater)
Expand Down
2 changes: 2 additions & 0 deletions pkg/backup/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ const (
KubeAnnBoundByController = "pv.kubernetes.io/bound-by-controller"
KubeAnnDynamicallyProvisioned = "pv.kubernetes.io/provisioned-by"

NodeAffinityCsiEbsAzKey = "topology.ebs.csi.aws.com/zone"

LocalTmp = "/tmp"
ClusterBackupMeta = "clustermeta"
ClusterRestoreMeta = "restoremeta"
Expand Down
33 changes: 26 additions & 7 deletions pkg/backup/restore/restore_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error {
)

if restore.Spec.BR == nil {
err = backuputil.ValidateRestore(restore, "")
err = backuputil.ValidateRestore(restore, "", false)
} else {
restoreNamespace = restore.GetNamespace()
if restore.Spec.BR.ClusterNamespace != "" {
Expand All @@ -94,7 +94,7 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error {
}

tikvImage := tc.TiKVImage()
err = backuputil.ValidateRestore(restore, tikvImage)
err = backuputil.ValidateRestore(restore, tikvImage, tc.Spec.AcrossK8s)
}

if err != nil {
Expand Down Expand Up @@ -134,8 +134,24 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error {
if !tc.PDAllMembersReady() {
return controller.RequeueErrorf("restore %s/%s: waiting for all PD members are ready in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name)
}
if v1alpha1.IsRestoreVolumeComplete(restore) && !v1alpha1.IsRestoreDataComplete(restore) && !tc.AllTiKVsAreAvailable() {
return controller.RequeueErrorf("restore %s/%s: waiting for all TiKVs are available in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name)

if v1alpha1.IsRestoreVolumeComplete(restore) && !v1alpha1.IsRestoreTiKVComplete(restore) {
if !tc.AllTiKVsAreAvailable() {
return controller.RequeueErrorf("restore %s/%s: waiting for all TiKVs are available in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name)
} else {
return rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{
Type: v1alpha1.RestoreTiKVComplete,
Status: corev1.ConditionTrue,
}, nil)
}
}

if restore.Spec.FederalVolumeRestorePhase == v1alpha1.FederalVolumeRestoreFinish {
if !v1alpha1.IsRestoreComplete(restore) {
return controller.RequeueErrorf("restore %s/%s: waiting for restore status complete in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name)
} else {
return nil
}
}
}

Expand Down Expand Up @@ -365,8 +381,8 @@ func (rm *restoreManager) volumeSnapshotRestore(r *v1alpha1.Restore, tc *v1alpha
return "", nil
}

if v1alpha1.IsRestoreDataComplete(r) {
klog.Infof("restore-manager prepares to deal with the phase DataComplete")
if r.Spec.FederalVolumeRestorePhase == v1alpha1.FederalVolumeRestoreFinish {
klog.Infof("restore-manager prepares to deal with the phase restore-finish")

// When restore is based on volume snapshot, we need to restart all TiKV pods
// after restore data is complete.
Expand Down Expand Up @@ -403,7 +419,7 @@ func (rm *restoreManager) volumeSnapshotRestore(r *v1alpha1.Restore, tc *v1alpha
return "", nil
}

if v1alpha1.IsRestoreVolumeComplete(r) {
if v1alpha1.IsRestoreVolumeComplete(r) && r.Spec.FederalVolumeRestorePhase == v1alpha1.FederalVolumeRestoreVolume {
klog.Infof("restore-manager prepares to deal with the phase VolumeComplete")

// TiKV volumes are ready, we can skip prepare restore metadata.
Expand Down Expand Up @@ -640,6 +656,9 @@ func (rm *restoreManager) makeRestoreJob(restore *v1alpha1.Restore) (*batchv1.Jo
args = append(args, fmt.Sprintf("--mode=%s", v1alpha1.RestoreModeVolumeSnapshot))
if !v1alpha1.IsRestoreVolumeComplete(restore) {
args = append(args, "--prepare")
if restore.Spec.VolumeAZ != "" {
args = append(args, fmt.Sprintf("--target-az=%s", restore.Spec.VolumeAZ))
}
}
default:
args = append(args, fmt.Sprintf("--mode=%s", v1alpha1.RestoreModeSnapshot))
Expand Down
4 changes: 2 additions & 2 deletions pkg/backup/restore/restore_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func TestBRRestore(t *testing.T) {
for i, restore := range genValidBRRestores() {
helper.createRestore(restore)
helper.CreateSecret(restore)
helper.CreateTC(restore.Spec.BR.ClusterNamespace, restore.Spec.BR.Cluster)
helper.CreateTC(restore.Spec.BR.ClusterNamespace, restore.Spec.BR.Cluster, false)

m := NewRestoreManager(deps)
err = m.Sync(restore)
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestBRRestoreByEBS(t *testing.T) {
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {

helper.CreateTC(tt.restore.Spec.BR.ClusterNamespace, tt.restore.Spec.BR.Cluster)
helper.CreateTC(tt.restore.Spec.BR.ClusterNamespace, tt.restore.Spec.BR.Cluster, true)
helper.CreateRestore(tt.restore)
m := NewRestoreManager(deps)
err := m.Sync(tt.restore)
Expand Down
4 changes: 4 additions & 0 deletions pkg/backup/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Snapshotter interface {
// SetVolumeID sets the cloud provider specific identifier
// for the PersistentVolume-PV.
SetVolumeID(pv *corev1.PersistentVolume, volumeID string) error

// ResetPvAvailableZone resets az of pv if the volumes restore to another az
ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume)
}

type BaseSnapshotter struct {
Expand Down Expand Up @@ -490,6 +493,7 @@ func (m *StoresMixture) ProcessCSBPVCsAndPVs(r *v1alpha1.Restore, csb *CloudSnap
}
// Clear out non-core metadata fields and status
resetMetadataAndStatus(r, backupClusterName, pvc, pv)
m.snapshotter.ResetPvAvailableZone(r, pv)

pvs = append(pvs, pv)
pvcs = append(pvcs, pvc)
Expand Down
26 changes: 26 additions & 0 deletions pkg/backup/snapshotter/snapshotter_aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,29 @@ func (s *AWSSnapshotter) SetVolumeID(pv *corev1.PersistentVolume, volumeID strin
func (s *AWSSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudSnapBackup) (string, error) {
return s.BaseSnapshotter.prepareRestoreMetadata(r, csb, s)
}

func (s *AWSSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {
if r.Spec.VolumeAZ == "" {
return
}

restoreAZ := r.Spec.VolumeAZ
if pv.Spec.NodeAffinity == nil {
return
}
if pv.Spec.NodeAffinity.Required == nil {
return
}
for i, nodeSelector := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
for j, field := range nodeSelector.MatchFields {
if field.Key == constants.NodeAffinityCsiEbsAzKey {
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchFields[j].Values = []string{restoreAZ}
}
}
for j, expr := range nodeSelector.MatchExpressions {
if expr.Key == constants.NodeAffinityCsiEbsAzKey && expr.Operator == corev1.NodeSelectorOpIn {
pv.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions[j].Values = []string{restoreAZ}
}
}
}
}
4 changes: 4 additions & 0 deletions pkg/backup/snapshotter/snapshotter_gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,7 @@ func (s *GCPSnapshotter) SetVolumeID(pv *corev1.PersistentVolume, volumeID strin
func (s *GCPSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudSnapBackup) (string, error) {
return s.BaseSnapshotter.prepareRestoreMetadata(r, csb, s)
}

func (s *GCPSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {
// TODO implement it if support to restore snapshots to another az on GCP
}
2 changes: 2 additions & 0 deletions pkg/backup/snapshotter/snapshotter_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ func (s *NoneSnapshotter) SetVolumeID(pv *corev1.PersistentVolume, volumeID stri
func (s *NoneSnapshotter) PrepareRestoreMetadata(r *v1alpha1.Restore, csb *CloudSnapBackup) (string, error) {
return "", nil
}

func (s *NoneSnapshotter) ResetPvAvailableZone(r *v1alpha1.Restore, pv *corev1.PersistentVolume) {}
3 changes: 2 additions & 1 deletion pkg/backup/testutils/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,14 @@ func (h *Helper) CreateSecret(obj interface{}) {
}

// CreateTC creates a TidbCluster with name `clusterName` in ns `namespace`
func (h *Helper) CreateTC(namespace, clusterName string) {
func (h *Helper) CreateTC(namespace, clusterName string, acrossK8s bool) {
h.T.Helper()
g := NewGomegaWithT(h.T)
var err error

tc := &v1alpha1.TidbCluster{
Spec: v1alpha1.TidbClusterSpec{
AcrossK8s: acrossK8s,
TLSCluster: &v1alpha1.TLSCluster{Enabled: true},
TiKV: &v1alpha1.TiKVSpec{
BaseImage: "pingcap/tikv",
Expand Down
9 changes: 8 additions & 1 deletion pkg/backup/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func ValidateBackup(backup *v1alpha1.Backup, tikvImage string, acrossK8s bool) e
}

// ValidateRestore checks whether a restore spec is valid.
func ValidateRestore(restore *v1alpha1.Restore, tikvImage string) error {
func ValidateRestore(restore *v1alpha1.Restore, tikvImage string, acrossK8s bool) error {
ns := restore.Namespace
name := restore.Name

Expand Down Expand Up @@ -614,6 +614,13 @@ func ValidateRestore(restore *v1alpha1.Restore, tikvImage string) error {
return err
}
}

if restore.Spec.Mode == v1alpha1.RestoreModeVolumeSnapshot {
// only support across k8s now. TODO compatible for single k8s
if !acrossK8s {
return errors.New("only support volume snapshot restore across k8s clusters")
}
}
}
return nil
}
Expand Down
Loading

0 comments on commit e7cd31a

Please sign in to comment.