diff --git a/docs/api-references/docs.md b/docs/api-references/docs.md index 9d5109973a..a467b9af07 100644 --- a/docs/api-references/docs.md +++ b/docs/api-references/docs.md @@ -1485,6 +1485,32 @@ For BR image, if it does not contain tag, Pod will use image ‘ToolImage:${ +warmup
+ + +RestoreWarmupMode + + + + +(Optional) +

Warmup represents whether to initialize TiKV volumes after volume snapshot restore

+ + + + +warmupImage
+ +string + + + +(Optional) +

WarmupImage represents using what image to initialize TiKV volumes

+ + + + podSecurityContext
@@ -13959,6 +13985,32 @@ For BR image, if it does not contain tag, Pod will use image ‘ToolImage:${ +warmup
+ +
+RestoreWarmupMode + + + + +(Optional) +

Warmup represents whether to initialize TiKV volumes after volume snapshot restore

+ + + + +warmupImage
+ +string + + + +(Optional) +

WarmupImage represents using what image to initialize TiKV volumes

+ + + + podSecurityContext
@@ -14088,6 +14140,14 @@ RestoreConditionType +

RestoreWarmupMode

+

+(Appears on: +RestoreSpec) +

+

+

RestoreWarmupMode represents when to initialize TiKV volumes

+

S3StorageProvider

(Appears on: diff --git a/docs/api-references/federation-docs.md b/docs/api-references/federation-docs.md index 30976b8eb8..26cb5659a9 100644 --- a/docs/api-references/federation-docs.md +++ b/docs/api-references/federation-docs.md @@ -1459,6 +1459,30 @@ string

PriorityClassName of Restore Job Pods

+ + +warmup
+ +github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1.RestoreWarmupMode + + + +(Optional) +

Warmup represents whether to initialize TiKV volumes after volume snapshot restore

+ + + + +warmupImage
+ +string + + + +(Optional) +

WarmupImage represents using what image to initialize TiKV volumes

+ +

VolumeRestoreMemberStatus

diff --git a/manifests/crd.yaml b/manifests/crd.yaml index cbf8b3fad5..ae3546d693 100644 --- a/manifests/crd.yaml +++ b/manifests/crd.yaml @@ -14339,6 +14339,10 @@ spec: type: boolean volumeAZ: type: string + warmup: + type: string + warmupImage: + type: string type: object status: properties: diff --git a/manifests/crd/federation/v1/federation.pingcap.com_volumerestores.yaml b/manifests/crd/federation/v1/federation.pingcap.com_volumerestores.yaml index 81cb1968e8..f1573c9149 100644 --- a/manifests/crd/federation/v1/federation.pingcap.com_volumerestores.yaml +++ b/manifests/crd/federation/v1/federation.pingcap.com_volumerestores.yaml @@ -945,6 +945,10 @@ spec: type: array toolImage: type: string + warmup: + type: string + warmupImage: + type: string type: object type: object status: diff --git a/manifests/crd/v1/pingcap.com_restores.yaml b/manifests/crd/v1/pingcap.com_restores.yaml index 16a185f26f..702a34cc3c 100644 --- a/manifests/crd/v1/pingcap.com_restores.yaml +++ b/manifests/crd/v1/pingcap.com_restores.yaml @@ -2087,6 +2087,10 @@ spec: type: boolean volumeAZ: type: string + warmup: + type: string + warmupImage: + type: string type: object status: properties: diff --git a/manifests/federation-crd.yaml b/manifests/federation-crd.yaml index 3ea6117888..aaa4b6e17a 100644 --- a/manifests/federation-crd.yaml +++ b/manifests/federation-crd.yaml @@ -2981,6 +2981,10 @@ spec: type: array toolImage: type: string + warmup: + type: string + warmupImage: + type: string type: object type: object status: diff --git a/pkg/apis/federation/pingcap/v1alpha1/openapi_generated.go b/pkg/apis/federation/pingcap/v1alpha1/openapi_generated.go index 3053bbb6b6..0b7ac0b0f2 100644 --- a/pkg/apis/federation/pingcap/v1alpha1/openapi_generated.go +++ b/pkg/apis/federation/pingcap/v1alpha1/openapi_generated.go @@ -722,6 +722,20 @@ func schema_apis_federation_pingcap_v1alpha1_VolumeRestoreMemberSpec(ref common. Format: "", }, }, + "warmup": { + SchemaProps: spec.SchemaProps{ + Description: "Warmup represents whether to initialize TiKV volumes after volume snapshot restore", + Type: []string{"string"}, + Format: "", + }, + }, + "warmupImage": { + SchemaProps: spec.SchemaProps{ + Description: "WarmupImage represents using what image to initialize TiKV volumes", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/federation/pingcap/v1alpha1/types.go b/pkg/apis/federation/pingcap/v1alpha1/types.go index e90c9195e4..697aa2b8e4 100644 --- a/pkg/apis/federation/pingcap/v1alpha1/types.go +++ b/pkg/apis/federation/pingcap/v1alpha1/types.go @@ -361,6 +361,12 @@ type VolumeRestoreMemberSpec struct { ServiceAccount string `json:"serviceAccount,omitempty"` // PriorityClassName of Restore Job Pods PriorityClassName string `json:"priorityClassName,omitempty"` + // Warmup represents whether to initialize TiKV volumes after volume snapshot restore + // +optional + Warmup pingcapv1alpha1.RestoreWarmupMode `json:"warmup,omitempty"` + // WarmupImage represents using what image to initialize TiKV volumes + // +optional + WarmupImage string `json:"warmupImage,omitempty"` } type VolumeRestoreMemberBackupInfo struct { @@ -441,6 +447,11 @@ const ( VolumeRestoreRunning VolumeRestoreConditionType = "Running" // VolumeRestoreVolumeComplete means all the restore members are volume complete VolumeRestoreVolumeComplete VolumeRestoreConditionType = "VolumeComplete" + // VolumeRestoreWarmUpStarted means the Restore has successfully started warmup pods to + // initialize volumes restored from snapshots + VolumeRestoreWarmUpStarted VolumeRestoreConditionType = "WarmUpStarted" + // VolumeRestoreWarmUpComplete means the Restore has successfully warmed up all TiKV volumes + VolumeRestoreWarmUpComplete VolumeRestoreConditionType = "WarmUpComplete" // VolumeRestoreTiKVComplete means all the restore members are tikv complete VolumeRestoreTiKVComplete VolumeRestoreConditionType = "TikvComplete" // VolumeRestoreDataComplete means all the restore members are data complete @@ -460,6 +471,8 @@ const ( VolumeRestoreStepRestoreVolume VolumeRestoreStepType = "RestoreVolume" // VolumeRestoreStepStartTiKV is start tikv step VolumeRestoreStepStartTiKV VolumeRestoreStepType = "StartTiKV" + // VolumeRestoreStepWarmUp is warm up tikv volumes step + VolumeRestoreStepWarmUp VolumeRestoreStepType = "WarmUp" // VolumeRestoreStepRestoreData is restore data step VolumeRestoreStepRestoreData VolumeRestoreStepType = "RestoreData" // VolumeRestoreStepRestartTiKV is restart tikv step diff --git a/pkg/apis/federation/pingcap/v1alpha1/volume_restore.go b/pkg/apis/federation/pingcap/v1alpha1/volume_restore.go index 5622f07d0a..f13408b60a 100644 --- a/pkg/apis/federation/pingcap/v1alpha1/volume_restore.go +++ b/pkg/apis/federation/pingcap/v1alpha1/volume_restore.go @@ -31,6 +31,16 @@ func IsVolumeRestoreVolumeComplete(volumeRestore *VolumeRestore) bool { return condition != nil && condition.Status == corev1.ConditionTrue } +func IsVolumeRestoreWarmUpStarted(volumeRestore *VolumeRestore) bool { + _, condition := GetVolumeRestoreCondition(&volumeRestore.Status, VolumeRestoreWarmUpStarted) + return condition != nil && condition.Status == corev1.ConditionTrue +} + +func IsVolumeRestoreWarmUpComplete(volumeRestore *VolumeRestore) bool { + _, condition := GetVolumeRestoreCondition(&volumeRestore.Status, VolumeRestoreWarmUpComplete) + return condition != nil && condition.Status == corev1.ConditionTrue +} + func IsVolumeRestoreTiKVComplete(volumeRestore *VolumeRestore) bool { _, condition := GetVolumeRestoreCondition(&volumeRestore.Status, VolumeRestoreTiKVComplete) return condition != nil && condition.Status == corev1.ConditionTrue diff --git a/pkg/apis/label/label.go b/pkg/apis/label/label.go index 3007555fd2..093f85bb87 100644 --- a/pkg/apis/label/label.go +++ b/pkg/apis/label/label.go @@ -58,6 +58,8 @@ const ( // RestoreLabelKey is restore key RestoreLabelKey string = "tidb.pingcap.com/restore" + // RestoreWarmUpLabelKey defines which pod the restore warms up + RestoreWarmUpLabelKey string = "tidb.pingcap.com/warm-up-pod" // BackupProtectionFinalizer is the name of finalizer on backups or federation backups BackupProtectionFinalizer string = "tidb.pingcap.com/backup-protection" @@ -168,6 +170,8 @@ const ( CleanJobLabelVal string = "clean" // RestoreJobLabelVal is restore job label value RestoreJobLabelVal string = "restore" + // RestoreWarmUpJobLabelVal is restore warmup job label value + RestoreWarmUpJobLabelVal string = "warmup" // BackupJobLabelVal is backup job label value BackupJobLabelVal string = "backup" // BackupScheduleJobLabelVal is backup schedule job label value @@ -352,6 +356,10 @@ func (l Label) RestoreJob() Label { return l.Component(RestoreJobLabelVal) } +func (l Label) RestoreWarmUpJob() Label { + return l.Component(RestoreWarmUpJobLabelVal) +} + // Backup assigns specific value to backup key in label func (l Label) Backup(val string) Label { l[BackupLabelKey] = val diff --git a/pkg/apis/pingcap/v1alpha1/openapi_generated.go b/pkg/apis/pingcap/v1alpha1/openapi_generated.go index 923e9cb963..5ba67ea233 100644 --- a/pkg/apis/pingcap/v1alpha1/openapi_generated.go +++ b/pkg/apis/pingcap/v1alpha1/openapi_generated.go @@ -7560,6 +7560,20 @@ func schema_pkg_apis_pingcap_v1alpha1_RestoreSpec(ref common.ReferenceCallback) }, }, }, + "warmup": { + SchemaProps: spec.SchemaProps{ + Description: "Warmup represents whether to initialize TiKV volumes after volume snapshot restore", + Type: []string{"string"}, + Format: "", + }, + }, + "warmupImage": { + SchemaProps: spec.SchemaProps{ + Description: "WarmupImage represents using what image to initialize TiKV volumes", + Type: []string{"string"}, + Format: "", + }, + }, "podSecurityContext": { SchemaProps: spec.SchemaProps{ Description: "PodSecurityContext of the component", diff --git a/pkg/apis/pingcap/v1alpha1/restore.go b/pkg/apis/pingcap/v1alpha1/restore.go index ff71f79fa9..dd85349bdf 100644 --- a/pkg/apis/pingcap/v1alpha1/restore.go +++ b/pkg/apis/pingcap/v1alpha1/restore.go @@ -130,6 +130,18 @@ func IsRestoreVolumeComplete(restore *Restore) bool { return condition != nil && condition.Status == corev1.ConditionTrue } +// IsRestoreWarmUpStarted returns true if all the warmup jobs has successfully started +func IsRestoreWarmUpStarted(restore *Restore) bool { + _, condition := GetRestoreCondition(&restore.Status, RestoreWarmUpStarted) + return condition != nil && condition.Status == corev1.ConditionTrue +} + +// IsRestoreWarmUpComplete returns true if all the warmup jobs has successfully finished +func IsRestoreWarmUpComplete(restore *Restore) bool { + _, condition := GetRestoreCondition(&restore.Status, RestoreWarmUpComplete) + return condition != nil && condition.Status == corev1.ConditionTrue +} + // IsRestoreTiKVComplete returns true if all TiKVs run successfully during volume restore func IsRestoreTiKVComplete(restore *Restore) bool { _, condition := GetRestoreCondition(&restore.Status, RestoreTiKVComplete) diff --git a/pkg/apis/pingcap/v1alpha1/types.go b/pkg/apis/pingcap/v1alpha1/types.go index dce70e1515..d9733214d4 100644 --- a/pkg/apis/pingcap/v1alpha1/types.go +++ b/pkg/apis/pingcap/v1alpha1/types.go @@ -2340,6 +2340,11 @@ const ( // RestoreVolumeComplete means the Restore has successfully executed part-1 and the // backup volumes have been rebuilded from the corresponding snapshot RestoreVolumeComplete RestoreConditionType = "VolumeComplete" + // RestoreWarmUpStarted means the Restore has successfully started warm up pods to + // initialize volumes restored from snapshots + RestoreWarmUpStarted RestoreConditionType = "WarmUpStarted" + // RestoreWarmUpComplete means the Restore has successfully warmed up all TiKV volumes + RestoreWarmUpComplete RestoreConditionType = "WarmUpComplete" // RestoreDataComplete means the Restore has successfully executed part-2 and the // data in restore volumes has been deal with consistency based on min_resolved_ts RestoreDataComplete RestoreConditionType = "DataComplete" @@ -2444,6 +2449,12 @@ type RestoreSpec struct { ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"` // TableFilter means Table filter expression for 'db.table' matching. BR supports this from v4.0.3. TableFilter []string `json:"tableFilter,omitempty"` + // Warmup represents whether to initialize TiKV volumes after volume snapshot restore + // +optional + Warmup RestoreWarmupMode `json:"warmup,omitempty"` + // WarmupImage represents using what image to initialize TiKV volumes + // +optional + WarmupImage string `json:"warmupImage,omitempty"` // PodSecurityContext of the component // +optional @@ -2465,6 +2476,16 @@ const ( FederalVolumeRestoreFinish FederalVolumeRestorePhase = "restore-finish" ) +// RestoreWarmupMode represents when to initialize TiKV volumes +type RestoreWarmupMode string + +const ( + // RestoreWarmupModeSync means initialize TiKV volumes before TiKV starts + RestoreWarmupModeSync RestoreWarmupMode = "sync" + // RestoreWarmupModeASync means initialize TiKV volumes after restore complete + RestoreWarmupModeASync RestoreWarmupMode = "async" +) + // RestoreStatus represents the current status of a tidb cluster restore. type RestoreStatus struct { // TimeStarted is the time at which the restore was started. diff --git a/pkg/backup/restore/restore_manager.go b/pkg/backup/restore/restore_manager.go index 7d6fc9db6c..1038e40d08 100644 --- a/pkg/backup/restore/restore_manager.go +++ b/pkg/backup/restore/restore_manager.go @@ -17,6 +17,9 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" + "regexp" + "strconv" "strings" "time" @@ -110,7 +113,6 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error { if restore.Spec.BR != nil && restore.Spec.Mode == v1alpha1.RestoreModeVolumeSnapshot { err = rm.validateRestore(restore, tc) - if err != nil { rm.statusUpdater.Update(restore, &v1alpha1.RestoreCondition{ Type: v1alpha1.RestoreInvalid, @@ -136,6 +138,19 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error { } if v1alpha1.IsRestoreVolumeComplete(restore) && !v1alpha1.IsRestoreTiKVComplete(restore) { + if isWarmUpSync(restore) { + if !v1alpha1.IsRestoreWarmUpStarted(restore) { + return rm.warmUpTiKVVolumesSync(restore, tc) + } + if !v1alpha1.IsRestoreWarmUpComplete(restore) { + return rm.waitWarmUpJobsFinished(restore) + } + } else if isWarmUpAsync(restore) { + if !v1alpha1.IsRestoreWarmUpStarted(restore) { + return rm.warmUpTiKVVolumesAsync(restore, tc) + } + } + if !tc.AllTiKVsAreAvailable() { return controller.RequeueErrorf("restore %s/%s: waiting for all TiKVs are available in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name) } else { @@ -190,12 +205,21 @@ func (rm *restoreManager) syncRestoreJob(restore *v1alpha1.Restore) error { } } + if isWarmUpAsync(restore) && v1alpha1.IsRestoreWarmUpStarted(restore) && !v1alpha1.IsRestoreWarmUpComplete(restore) { + if err := rm.waitWarmUpJobsFinished(restore); err != nil { + return err + } + } + + if v1alpha1.IsRestoreTiKVComplete(restore) && restore.Spec.FederalVolumeRestorePhase == v1alpha1.FederalVolumeRestoreVolume { + return nil + } + if restore.Spec.FederalVolumeRestorePhase == v1alpha1.FederalVolumeRestoreFinish { if !v1alpha1.IsRestoreComplete(restore) { return controller.RequeueErrorf("restore %s/%s: waiting for restore status complete in tidbcluster %s/%s", ns, name, tc.Namespace, tc.Name) - } else { - return nil } + return nil } } @@ -506,6 +530,15 @@ func (rm *restoreManager) volumeSnapshotRestore(r *v1alpha1.Restore, tc *v1alpha return "", nil } + if isWarmUpSync(r) { + if v1alpha1.IsRestoreWarmUpComplete(r) { + return rm.startTiKV(r, tc) + } + if v1alpha1.IsRestoreWarmUpStarted(r) { + return "", nil + } + } + s, reason, err := snapshotter.NewSnapshotterForRestore(r.Spec.Mode, rm.deps) if err != nil { return reason, err @@ -520,21 +553,28 @@ func (rm *restoreManager) volumeSnapshotRestore(r *v1alpha1.Restore, tc *v1alpha if reason, err := s.PrepareRestoreMetadata(r, csb); err != nil { return reason, err } - - restoreMark := fmt.Sprintf("%s/%s", r.Namespace, r.Name) - if len(tc.GetAnnotations()) == 0 { - tc.Annotations = make(map[string]string) - } - tc.Annotations[label.AnnTiKVVolumesReadyKey] = restoreMark - if _, err := rm.deps.TiDBClusterControl.Update(tc); err != nil { - return "AddTCAnnWaitTiKVFailed", err + klog.Infof("Restore %s/%s prepare restore metadata finished", r.Namespace, r.Name) + if !isWarmUpSync(r) { + return rm.startTiKV(r, tc) } - return "", nil } return "", nil } +func (rm *restoreManager) startTiKV(r *v1alpha1.Restore, tc *v1alpha1.TidbCluster) (reason string, err error) { + restoreMark := fmt.Sprintf("%s/%s", r.Namespace, r.Name) + if len(tc.GetAnnotations()) == 0 { + tc.Annotations = make(map[string]string) + } + tc.Annotations[label.AnnTiKVVolumesReadyKey] = restoreMark + if _, err := rm.deps.TiDBClusterControl.Update(tc); err != nil { + return "AddTCAnnWaitTiKVFailed", err + } + klog.Infof("Restore %s/%s start TiKV", r.Namespace, r.Name) + return "", nil +} + func (rm *restoreManager) makeImportJob(restore *v1alpha1.Restore) (*batchv1.Job, string, error) { ns := restore.GetNamespace() name := restore.GetName() @@ -881,6 +921,365 @@ func (rm *restoreManager) makeRestoreJob(restore *v1alpha1.Restore) (*batchv1.Jo return job, "", nil } +type pvcInfo struct { + pvc *corev1.PersistentVolumeClaim + volumeName string + number int +} + +func (rm *restoreManager) warmUpTiKVVolumesSync(r *v1alpha1.Restore, tc *v1alpha1.TidbCluster) error { + warmUpImage := r.Spec.WarmupImage + if warmUpImage == "" { + rm.statusUpdater.Update(r, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "NoWarmupImage", + Message: "warmup image is empty", + }, nil) + return fmt.Errorf("warmup image is empty") + } + + klog.Infof("Restore %s/%s start to warm up TiKV synchronously", r.Namespace, r.Name) + ns := tc.Namespace + sel, err := label.New().Instance(tc.Name).TiKV().Selector() + if err != nil { + return err + } + pvcs, err := rm.deps.PVCLister.PersistentVolumeClaims(ns).List(sel) + if err != nil { + return err + } + + stsName := controller.TiKVMemberName(tc.Name) + reStr := fmt.Sprintf(`^(.+)-%s-(\d+)$`, stsName) + re := regexp.MustCompile(reStr) + pvcInfoMap := make(map[int][]*pvcInfo, len(pvcs)) + for _, pvc := range pvcs { + subMatches := re.FindStringSubmatch(pvc.Name) + if len(subMatches) != 3 { + return fmt.Errorf("pvc name %s doesn't match regex %s", pvc.Name, reStr) + } + volumeName := subMatches[1] + numberStr := subMatches[2] + number, err := strconv.Atoi(numberStr) + if err != nil { + return fmt.Errorf("parse index %s of pvc %s to int: %s", numberStr, pvc.Name, err.Error()) + } + pvcInfoMap[number] = append(pvcInfoMap[number], &pvcInfo{ + pvc: pvc, + volumeName: volumeName, + number: number, + }) + klog.Infof("Restore %s/%s warmup get pvc %s/%s", r.Namespace, r.Name, pvc.Namespace, pvc.Name) + } + + volumesCount := len(tc.Spec.TiKV.StorageVolumes) + 1 + for number, podPVCs := range pvcInfoMap { + if len(podPVCs) != volumesCount { + return fmt.Errorf("expected pvc count %d, got pvc count %d, not equal", volumesCount, len(podPVCs)) + } + warmUpJobName := fmt.Sprintf("%s-%s-%d-warm-up", r.Name, stsName, number) + _, err := rm.deps.JobLister.Jobs(ns).Get(warmUpJobName) + if err == nil { + klog.Infof("Restore %s/%s warmup job %s/%s exists, pass it", r.Namespace, r.Name, ns, warmUpJobName) + continue + } else if !errors.IsNotFound(err) { + return fmt.Errorf("get warm up job %s/%s error: %s", ns, warmUpJobName, err.Error()) + } + + warmUpJob, err := rm.makeSyncWarmUpJob(r, tc, podPVCs, warmUpJobName, warmUpImage) + if err != nil { + return err + } + if err = rm.deps.JobControl.CreateJob(r, warmUpJob); err != nil { + return err + } + klog.Infof("Restore %s/%s creates warmup job %s/%s successfully", r.Namespace, r.Name, ns, warmUpJobName) + } + return rm.statusUpdater.Update(r, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreWarmUpStarted, + Status: corev1.ConditionTrue, + }, nil) +} + +func (rm *restoreManager) warmUpTiKVVolumesAsync(r *v1alpha1.Restore, tc *v1alpha1.TidbCluster) error { + warmUpImage := r.Spec.WarmupImage + if warmUpImage == "" { + rm.statusUpdater.Update(r, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreRetryFailed, + Status: corev1.ConditionTrue, + Reason: "NoWarmupImage", + Message: "warmup image is empty", + }, nil) + return fmt.Errorf("warmup image is empty") + } + + klog.Infof("Restore %s/%s start to warm up TiKV asynchronously", r.Namespace, r.Name) + sel, err := label.New().Instance(tc.Name).TiKV().Selector() + if err != nil { + return err + } + tikvPods, err := rm.deps.PodLister.Pods(tc.Namespace).List(sel) + if err != nil { + return err + } + if int32(len(tikvPods)) != tc.Spec.TiKV.Replicas { + return fmt.Errorf("wait all TiKV pods started to warm up volumes") + } + for _, pod := range tikvPods { + if pod.Status.Phase != corev1.PodRunning { + return fmt.Errorf("wait TiKV pod %s/%s running", pod.Namespace, pod.Name) + } + } + + tikvMountPaths := []string{constants.TiKVDataVolumeMountPath} + for _, vol := range tc.Spec.TiKV.StorageVolumes { + tikvMountPaths = append(tikvMountPaths, vol.MountPath) + } + for _, pod := range tikvPods { + ns, podName := pod.Namespace, pod.Name + warmUpJobName := fmt.Sprintf("%s-%s-warm-up", r.Name, podName) + _, err := rm.deps.JobLister.Jobs(ns).Get(warmUpJobName) + if err == nil { + klog.Infof("Restore %s/%s warmup job %s/%s of tikv pod %s/%s exists, pass it", r.Namespace, r.Name, ns, warmUpJobName, ns, podName) + continue + } else if !errors.IsNotFound(err) { + return fmt.Errorf("get warm up job %s/%s of tikv pod %s/%s error: %s", ns, warmUpJobName, ns, podName, err.Error()) + } + + warmUpJob, err := rm.makeAsyncWarmUpJob(r, pod, tikvMountPaths, warmUpJobName, warmUpImage) + if err != nil { + return err + } + if err = rm.deps.JobControl.CreateJob(r, warmUpJob); err != nil { + return err + } + klog.Infof("Restore %s/%s creates warmup job %s/%s for tikv pod %s/%s successfully", r.Namespace, r.Name, ns, warmUpJobName, ns, podName) + } + + return rm.statusUpdater.Update(r, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreWarmUpStarted, + Status: corev1.ConditionTrue, + }, nil) +} + +func (rm *restoreManager) makeSyncWarmUpJob(r *v1alpha1.Restore, tc *v1alpha1.TidbCluster, pvcs []*pvcInfo, warmUpJobName, warmUpImage string) (*batchv1.Job, error) { + podVolumes := make([]corev1.Volume, 0, len(pvcs)) + podVolumeMounts := make([]corev1.VolumeMount, 0, len(pvcs)) + for _, pvc := range pvcs { + podVolumes = append(podVolumes, corev1.Volume{ + Name: pvc.volumeName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: pvc.pvc.Name, + }, + }, + }) + + mountPath := fmt.Sprintf("/var/lib/%s", pvc.volumeName) + podVolumeMounts = append(podVolumeMounts, corev1.VolumeMount{ + Name: pvc.volumeName, + MountPath: mountPath, + }) + } + + nodeSelector := make(map[string]string, len(tc.Spec.TiKV.NodeSelector)) + for k, v := range tc.Spec.NodeSelector { + nodeSelector[k] = v + } + for k, v := range tc.Spec.TiKV.NodeSelector { + nodeSelector[k] = v + } + + tolerations := make([]corev1.Toleration, 0, len(tc.Spec.TiKV.Tolerations)) + for _, toleration := range tc.Spec.TiKV.Tolerations { + tolerations = append(tolerations, *toleration.DeepCopy()) + } + if len(tolerations) == 0 { + // if the tolerations of tikv is empty, use the tolerations of tidb cluster + for _, toleration := range tc.Spec.Tolerations { + tolerations = append(tolerations, *toleration.DeepCopy()) + } + } + + args := []string{"--fs", constants.TiKVDataVolumeMountPath} + + fioPaths := make([]string, 0, len(podVolumeMounts)) + for _, volumeMount := range podVolumeMounts { + if volumeMount.MountPath == constants.TiKVDataVolumeMountPath { + continue + } + fioPaths = append(fioPaths, volumeMount.MountPath) + } + if len(fioPaths) > 0 { + args = append(args, "--block") + args = append(args, fioPaths...) + } + resourceRequirements := getWarmUpResourceRequirements(tc) + + warmUpPod := &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: podVolumes, + RestartPolicy: corev1.RestartPolicyNever, + Affinity: tc.Spec.TiKV.Affinity.DeepCopy(), + NodeSelector: nodeSelector, + Tolerations: tolerations, + Containers: []corev1.Container{ + { + Name: "warm-up", + Image: warmUpImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/warmup_steps"}, + Args: args, + Resources: *resourceRequirements, + VolumeMounts: podVolumeMounts, + SecurityContext: &corev1.SecurityContext{ + Privileged: pointer.BoolPtr(true), + }, + }, + }, + }, + } + + jobLabel := label.NewRestore().RestoreWarmUpJob().Restore(r.Name) + warmUpJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: warmUpJobName, + Namespace: tc.Namespace, + Labels: jobLabel, + OwnerReferences: []metav1.OwnerReference{ + controller.GetRestoreOwnerRef(r), + }, + }, + Spec: batchv1.JobSpec{ + Template: *warmUpPod, + BackoffLimit: pointer.Int32Ptr(4), + }, + } + return warmUpJob, nil +} + +func (rm *restoreManager) makeAsyncWarmUpJob(r *v1alpha1.Restore, tikvPod *corev1.Pod, warmUpPaths []string, warmUpJobName, warmUpImage string) (*batchv1.Job, error) { + ns, podName := tikvPod.Namespace, tikvPod.Name + var tikvContainer *corev1.Container + for _, container := range tikvPod.Spec.Containers { + if container.Name == v1alpha1.TiKVMemberType.String() { + tikvContainer = container.DeepCopy() + break + } + } + if tikvContainer == nil { + return nil, fmt.Errorf("not found TiKV container in pod %s/%s", ns, podName) + } + + warmUpVolumeMounts := make([]corev1.VolumeMount, 0, len(warmUpPaths)) + for _, mountPath := range warmUpPaths { + for _, volumeMount := range tikvContainer.VolumeMounts { + if strings.TrimRight(volumeMount.MountPath, string(filepath.Separator)) == strings.TrimRight(mountPath, string(filepath.Separator)) { + warmUpVolumeMounts = append(warmUpVolumeMounts, *volumeMount.DeepCopy()) + break + } + } + } + + warmUpVolumes := make([]corev1.Volume, 0, len(warmUpVolumeMounts)) + for _, volumeMount := range warmUpVolumeMounts { + for _, volume := range tikvPod.Spec.Volumes { + if volumeMount.Name == volume.Name { + warmUpVolumes = append(warmUpVolumes, *volume.DeepCopy()) + break + } + } + } + + args := []string{"--fs", constants.TiKVDataVolumeMountPath} + fioPaths := make([]string, 0, len(warmUpPaths)) + for _, warmUpPath := range warmUpPaths { + if warmUpPath == constants.TiKVDataVolumeMountPath { + continue + } + fioPaths = append(fioPaths, warmUpPath) + } + if len(fioPaths) > 0 { + args = append(args, "--block") + args = append(args, fioPaths...) + } + + warmUpPod := &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Volumes: warmUpVolumes, + RestartPolicy: corev1.RestartPolicyNever, + NodeName: tikvPod.Spec.NodeName, + Containers: []corev1.Container{ + { + Name: "warm-up", + Image: warmUpImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/warmup_steps"}, + Args: args, + VolumeMounts: warmUpVolumeMounts, + SecurityContext: &corev1.SecurityContext{ + Privileged: pointer.BoolPtr(true), + }, + }, + }, + }, + } + + jobLabel := label.NewRestore().RestoreWarmUpJob().Restore(r.Name) + jobLabel[label.RestoreWarmUpLabelKey] = podName + warmUpJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: warmUpJobName, + Namespace: ns, + Labels: jobLabel, + OwnerReferences: []metav1.OwnerReference{ + controller.GetRestoreOwnerRef(r), + }, + }, + Spec: batchv1.JobSpec{ + Template: *warmUpPod, + BackoffLimit: pointer.Int32Ptr(4), + }, + } + return warmUpJob, nil +} + +func (rm *restoreManager) waitWarmUpJobsFinished(r *v1alpha1.Restore) error { + if r.Spec.Warmup == "" { + return nil + } + + sel, err := label.NewRestore().RestoreWarmUpJob().Restore(r.Name).Selector() + if err != nil { + return err + } + + jobs, err := rm.deps.JobLister.List(sel) + if err != nil { + return err + } + for _, job := range jobs { + jobFinished := false + for _, condition := range job.Status.Conditions { + if condition.Type == batchv1.JobFailed || condition.Type == batchv1.JobComplete { + jobFinished = true + } + } + if !jobFinished { + if isWarmUpAsync(r) { + return nil + } + return fmt.Errorf("wait warmup job %s/%s finished", job.Namespace, job.Name) + } + } + + return rm.statusUpdater.Update(r, &v1alpha1.RestoreCondition{ + Type: v1alpha1.RestoreWarmUpComplete, + Status: corev1.ConditionTrue, + }, nil) +} + func (rm *restoreManager) ensureRestorePVCExist(restore *v1alpha1.Restore) (string, error) { ns := restore.GetNamespace() name := restore.GetName() @@ -928,6 +1327,35 @@ func (rm *restoreManager) ensureRestorePVCExist(restore *v1alpha1.Restore) (stri return "", nil } +func getWarmUpResourceRequirements(tc *v1alpha1.TidbCluster) *corev1.ResourceRequirements { + tikvResourceRequirements := tc.Spec.TiKV.ResourceRequirements.DeepCopy() + warmUpResourceRequirements := &corev1.ResourceRequirements{ + Requests: make(corev1.ResourceList, 4), + Limits: make(corev1.ResourceList, 4), + } + if quantity, ok := tikvResourceRequirements.Limits[corev1.ResourceCPU]; ok { + warmUpResourceRequirements.Limits[corev1.ResourceCPU] = quantity + } + if quantity, ok := tikvResourceRequirements.Limits[corev1.ResourceMemory]; ok { + warmUpResourceRequirements.Limits[corev1.ResourceMemory] = quantity + } + if quantity, ok := tikvResourceRequirements.Requests[corev1.ResourceCPU]; ok { + warmUpResourceRequirements.Requests[corev1.ResourceCPU] = quantity + } + if quantity, ok := tikvResourceRequirements.Requests[corev1.ResourceMemory]; ok { + warmUpResourceRequirements.Requests[corev1.ResourceMemory] = quantity + } + return warmUpResourceRequirements +} + +func isWarmUpSync(r *v1alpha1.Restore) bool { + return r.Spec.Warmup == v1alpha1.RestoreWarmupModeSync +} + +func isWarmUpAsync(r *v1alpha1.Restore) bool { + return r.Spec.Warmup == v1alpha1.RestoreWarmupModeASync +} + var _ backup.RestoreManager = &restoreManager{} type FakeRestoreManager struct { diff --git a/pkg/controller/restore/restore_controller.go b/pkg/controller/restore/restore_controller.go index db4c27556e..4a6e617bed 100644 --- a/pkg/controller/restore/restore_controller.go +++ b/pkg/controller/restore/restore_controller.go @@ -159,6 +159,13 @@ func (c *Controller) updateRestore(cur interface{}) { } if v1alpha1.IsRestoreComplete(newRestore) { + if newRestore.Spec.Warmup == v1alpha1.RestoreWarmupModeASync { + if !v1alpha1.IsRestoreWarmUpComplete(newRestore) { + c.enqueueRestore(newRestore) + return + } + } + klog.V(4).Infof("restore %s/%s is Complete, skipping.", ns, name) return } @@ -195,17 +202,6 @@ func (c *Controller) updateRestore(cur interface{}) { } if v1alpha1.IsRestoreVolumeComplete(newRestore) { - tc, err := c.getTC(newRestore) - if err != nil { - klog.Errorf("Fail to get tidbcluster for restore %s/%s, %v", ns, name, err) - return - } - - if _, ok := tc.Annotations[label.AnnTiKVVolumesReadyKey]; ok { - klog.V(4).Infof("restore %s/%s is already VolumeComplete, skipping.", ns, name) - return - } - c.enqueueRestore(newRestore) return } diff --git a/pkg/fedvolumebackup/restore/restore_manager.go b/pkg/fedvolumebackup/restore/restore_manager.go index 8ff974e2da..a7c068ccba 100644 --- a/pkg/fedvolumebackup/restore/restore_manager.go +++ b/pkg/fedvolumebackup/restore/restore_manager.go @@ -120,6 +120,7 @@ func (rm *restoreManager) syncRestore(volumeRestore *v1alpha1.VolumeRestore) err if memberCreated { return nil } + rm.syncWarmUpStatus(volumeRestore, restoreMembers) if err := rm.waitRestoreVolumeComplete(volumeRestore, restoreMembers); err != nil { return err } @@ -147,9 +148,15 @@ func (rm *restoreManager) syncRestore(volumeRestore *v1alpha1.VolumeRestore) err if err := rm.waitRestoreComplete(volumeRestore, restoreMembers); err != nil { return err } + v1alpha1.FinishVolumeRestoreStep(&volumeRestore.Status, v1alpha1.VolumeRestoreStepRestartTiKV) + + if isWarmUpAsync(volumeRestore) && !v1alpha1.IsVolumeRestoreWarmUpComplete(volumeRestore) { + klog.Infof("VolumeRestore %s/%s data planes all complete, but warmup doesn't complete, wait warmup complete", ns, name) + return nil + } klog.Infof("VolumeRestore %s/%s restore complete", ns, name) - rm.setVolumeRestoreComplete(&volumeRestore.Status, restoreMembers) + rm.setVolumeRestoreComplete(&volumeRestore.Status) return nil } @@ -251,7 +258,7 @@ func (rm *restoreManager) waitRestoreVolumeComplete(volumeRestore *v1alpha1.Volu } // restore volume complete if !v1alpha1.IsVolumeRestoreVolumeComplete(volumeRestore) { - rm.setVolumeRestoreVolumeComplete(&volumeRestore.Status) + rm.setVolumeRestoreVolumeComplete(volumeRestore) } for _, restoreMember := range restoreMembers { @@ -269,6 +276,28 @@ func (rm *restoreManager) waitRestoreVolumeComplete(volumeRestore *v1alpha1.Volu return nil } +func (rm *restoreManager) syncWarmUpStatus(volumeRestore *v1alpha1.VolumeRestore, restoreMembers []*volumeRestoreMember) { + for _, member := range restoreMembers { + if !pingcapv1alpha1.IsRestoreWarmUpStarted(member.restore) { + return + } + } + if !v1alpha1.IsVolumeRestoreWarmUpStarted(volumeRestore) { + rm.setVolumeRestoreWarmUpStarted(&volumeRestore.Status) + return + } + + for _, member := range restoreMembers { + if !pingcapv1alpha1.IsRestoreWarmUpComplete(member.restore) { + return + } + } + if !v1alpha1.IsVolumeRestoreWarmUpComplete(volumeRestore) { + rm.setVolumeRestoreWarmUpComplete(volumeRestore) + return + } +} + func (rm *restoreManager) executeRestoreDataPhase(ctx context.Context, volumeRestore *v1alpha1.VolumeRestore, restoreMembers []*volumeRestoreMember) (memberUpdated bool, err error) { if len(restoreMembers) != len(volumeRestore.Spec.Clusters) { return false, controller.RequeueErrorf("expect %d restore members but get %d when restore data", len(volumeRestore.Spec.Clusters), len(restoreMembers)) @@ -326,7 +355,7 @@ func (rm *restoreManager) waitRestoreDataComplete(volumeRestore *v1alpha1.Volume k8sClusterName := restoreMember.k8sClusterName if pingcapv1alpha1.IsRestoreDataComplete(restoreMember.restore) { if !v1alpha1.IsVolumeRestoreDataComplete(volumeRestore) { - rm.setVolumeRestoreDataComplete(&volumeRestore.Status) + rm.setVolumeRestoreDataComplete(&volumeRestore.Status, restoreMember.restore.Status.CommitTs) } return nil } @@ -413,13 +442,42 @@ func (rm *restoreManager) setVolumeRestoreRunning(volumeRestoreStatus *v1alpha1. } -func (rm *restoreManager) setVolumeRestoreVolumeComplete(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus) { +func (rm *restoreManager) setVolumeRestoreVolumeComplete(volumeRestore *v1alpha1.VolumeRestore) { + volumeRestoreStatus := &volumeRestore.Status v1alpha1.UpdateVolumeRestoreCondition(volumeRestoreStatus, &v1alpha1.VolumeRestoreCondition{ Type: v1alpha1.VolumeRestoreVolumeComplete, Status: corev1.ConditionTrue, }) v1alpha1.FinishVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepRestoreVolume) - v1alpha1.StartVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepStartTiKV) + + if isWarmUpSync(volumeRestore) { + v1alpha1.StartVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepWarmUp) + } else if isWarmUpAsync(volumeRestore) { + v1alpha1.StartVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepStartTiKV) + v1alpha1.StartVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepWarmUp) + } else { + v1alpha1.StartVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepStartTiKV) + } +} + +func (rm *restoreManager) setVolumeRestoreWarmUpStarted(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus) { + v1alpha1.UpdateVolumeRestoreCondition(volumeRestoreStatus, &v1alpha1.VolumeRestoreCondition{ + Type: v1alpha1.VolumeRestoreWarmUpStarted, + Status: corev1.ConditionTrue, + }) +} + +func (rm *restoreManager) setVolumeRestoreWarmUpComplete(volumeRestore *v1alpha1.VolumeRestore) { + volumeRestoreStatus := &volumeRestore.Status + v1alpha1.UpdateVolumeRestoreCondition(volumeRestoreStatus, &v1alpha1.VolumeRestoreCondition{ + Type: v1alpha1.VolumeRestoreWarmUpComplete, + Status: corev1.ConditionTrue, + }) + v1alpha1.FinishVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepWarmUp) + + if isWarmUpSync(volumeRestore) { + v1alpha1.StartVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepStartTiKV) + } } func (rm *restoreManager) setVolumeRestoreTiKVComplete(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus) { @@ -430,7 +488,8 @@ func (rm *restoreManager) setVolumeRestoreTiKVComplete(volumeRestoreStatus *v1al v1alpha1.FinishVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepStartTiKV) } -func (rm *restoreManager) setVolumeRestoreDataComplete(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus) { +func (rm *restoreManager) setVolumeRestoreDataComplete(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus, commitTs string) { + volumeRestoreStatus.CommitTs = commitTs v1alpha1.UpdateVolumeRestoreCondition(volumeRestoreStatus, &v1alpha1.VolumeRestoreCondition{ Type: v1alpha1.VolumeRestoreDataComplete, Status: corev1.ConditionTrue, @@ -438,21 +497,13 @@ func (rm *restoreManager) setVolumeRestoreDataComplete(volumeRestoreStatus *v1al v1alpha1.FinishVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepRestoreData) } -func (rm *restoreManager) setVolumeRestoreComplete(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus, restoreMembers []*volumeRestoreMember) { - for _, restoreMember := range restoreMembers { - if pingcapv1alpha1.IsRestoreDataComplete(restoreMember.restore) { - volumeRestoreStatus.CommitTs = restoreMember.restore.Status.CommitTs - break - } - } - +func (rm *restoreManager) setVolumeRestoreComplete(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus) { volumeRestoreStatus.TimeCompleted = metav1.Now() volumeRestoreStatus.TimeTaken = volumeRestoreStatus.TimeCompleted.Sub(volumeRestoreStatus.TimeStarted.Time).Round(time.Second).String() v1alpha1.UpdateVolumeRestoreCondition(volumeRestoreStatus, &v1alpha1.VolumeRestoreCondition{ Type: v1alpha1.VolumeRestoreComplete, Status: corev1.ConditionTrue, }) - v1alpha1.FinishVolumeRestoreStep(volumeRestoreStatus, v1alpha1.VolumeRestoreStepRestartTiKV) } func (rm *restoreManager) setVolumeRestoreFailed(volumeRestoreStatus *v1alpha1.VolumeRestoreStatus, reason, message string) { @@ -498,6 +549,8 @@ func (rm *restoreManager) buildRestoreMember(volumeRestoreName string, memberClu ImagePullSecrets: template.ImagePullSecrets, ServiceAccount: template.ServiceAccount, PriorityClassName: template.PriorityClassName, + Warmup: template.Warmup, + WarmupImage: template.WarmupImage, }, } return restoreMember @@ -507,6 +560,14 @@ func (rm *restoreManager) generateRestoreMemberName(volumeRestoreName, k8sCluste return fmt.Sprintf("fed-%s-%s", volumeRestoreName, k8sClusterName) } +func isWarmUpSync(volumeRestore *v1alpha1.VolumeRestore) bool { + return volumeRestore.Spec.Template.Warmup == pingcapv1alpha1.RestoreWarmupModeSync +} + +func isWarmUpAsync(volumeRestore *v1alpha1.VolumeRestore) bool { + return volumeRestore.Spec.Template.Warmup == pingcapv1alpha1.RestoreWarmupModeASync +} + type volumeRestoreMember struct { restore *pingcapv1alpha1.Restore k8sClusterName string diff --git a/pkg/fedvolumebackup/restore/restore_test.go b/pkg/fedvolumebackup/restore/restore_test.go index 4103505b16..7ab50f2733 100644 --- a/pkg/fedvolumebackup/restore/restore_test.go +++ b/pkg/fedvolumebackup/restore/restore_test.go @@ -76,8 +76,9 @@ func newHelper(t *testing.T, restoreName, restoreNamespace string) *helper { return h } -func (h *helper) createVolumeRestore(ctx context.Context) *v1alpha1.VolumeRestore { +func (h *helper) createVolumeRestore(ctx context.Context, warmupMode pingcapv1alpha1.RestoreWarmupMode) *v1alpha1.VolumeRestore { volumeRestore := generateVolumeRestore(h.restoreName, h.restoreNamespace) + volumeRestore.Spec.Template.Warmup = warmupMode volumeRestore, err := h.deps.Clientset.FederationV1alpha1().VolumeRestores(h.restoreNamespace).Create(ctx, volumeRestore, metav1.CreateOptions{}) h.g.Expect(err).To(gomega.BeNil()) h.g.Expect(volumeRestore.Status.TimeStarted.Unix() < 0).To(gomega.BeTrue()) @@ -105,12 +106,40 @@ func (h *helper) assertRunRestoreVolume(ctx context.Context, volumeRestore *v1al func (h *helper) assertRestoreVolumeComplete(volumeRestore *v1alpha1.VolumeRestore) { h.g.Expect(volumeRestore.Status.Phase).To(gomega.Equal(v1alpha1.VolumeRestoreVolumeComplete)) - h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(2)) + if volumeRestore.Spec.Template.Warmup == pingcapv1alpha1.RestoreWarmupModeASync { + // aysnc means start tikv and warm up simultaneously + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(3)) + } else { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(2)) + } +} + +func (h *helper) assertRestoreWarmUpStarted(volumeRestore *v1alpha1.VolumeRestore) { + h.g.Expect(volumeRestore.Status.Phase).To(gomega.Equal(v1alpha1.VolumeRestoreWarmUpStarted)) + if volumeRestore.Spec.Template.Warmup == pingcapv1alpha1.RestoreWarmupModeASync { + // aysnc means start tikv and warm up simultaneously + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(3)) + } else { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(2)) + } +} + +func (h *helper) assertRestoreWarmUpComplete(volumeRestore *v1alpha1.VolumeRestore) { + h.g.Expect(v1alpha1.IsVolumeRestoreWarmUpComplete(volumeRestore)).To(gomega.Equal(true)) + if volumeRestore.Spec.Template.Warmup == pingcapv1alpha1.RestoreWarmupModeSync { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(3)) + } else if volumeRestore.Spec.Template.Warmup == pingcapv1alpha1.RestoreWarmupModeASync { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(5)) + } } func (h *helper) assertRunRestoreData(ctx context.Context, volumeRestore *v1alpha1.VolumeRestore) { h.g.Expect(volumeRestore.Status.Phase).To(gomega.Equal(v1alpha1.VolumeRestoreTiKVComplete)) - h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(3)) + if volumeRestore.Spec.Template.Warmup != "" { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(4)) + } else { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(3)) + } // in setDataPlaneVolumeComplete function, we set restore member1 with minimal commit ts // so restore member1 should execute restore data phase @@ -128,8 +157,13 @@ func (h *helper) assertRunRestoreData(ctx context.Context, volumeRestore *v1alph } func (h *helper) assertRunRestoreFinish(ctx context.Context, volumeRestore *v1alpha1.VolumeRestore) { + h.g.Expect(volumeRestore.Status.CommitTs).To(gomega.Equal("121")) h.g.Expect(volumeRestore.Status.Phase).To(gomega.Equal(v1alpha1.VolumeRestoreDataComplete)) - h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(4)) + if volumeRestore.Spec.Template.Warmup == "" { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(4)) + } else { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(5)) + } restoreMember1, err := h.dataPlaneClient1.PingcapV1alpha1().Restores(fakeTcNamespace1).Get(ctx, h.restoreMemberName1, metav1.GetOptions{}) h.g.Expect(err).To(gomega.BeNil()) h.g.Expect(restoreMember1.Spec.FederalVolumeRestorePhase).To(gomega.Equal(pingcapv1alpha1.FederalVolumeRestoreFinish)) @@ -143,6 +177,11 @@ func (h *helper) assertRunRestoreFinish(ctx context.Context, volumeRestore *v1al func (h *helper) assertRestoreComplete(volumeRestore *v1alpha1.VolumeRestore) { h.g.Expect(volumeRestore.Status.Phase).To(gomega.Equal(v1alpha1.VolumeRestoreComplete)) + if volumeRestore.Spec.Template.Warmup == "" { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(4)) + } else { + h.g.Expect(len(volumeRestore.Status.Steps)).To(gomega.Equal(5)) + } h.g.Expect(volumeRestore.Status.CommitTs).To(gomega.Equal("121")) h.g.Expect(volumeRestore.Status.TimeCompleted.Unix() > 0).To(gomega.BeTrue()) h.g.Expect(len(volumeRestore.Status.TimeTaken) > 0).To(gomega.BeTrue()) @@ -150,7 +189,9 @@ func (h *helper) assertRestoreComplete(volumeRestore *v1alpha1.VolumeRestore) { func (h *helper) assertRestoreFailed(volumeRestore *v1alpha1.VolumeRestore) { h.g.Expect(volumeRestore.Status.Phase).To(gomega.Equal(v1alpha1.VolumeRestoreFailed)) - h.g.Expect(volumeRestore.Status.CommitTs).To(gomega.BeEmpty()) + if !v1alpha1.IsVolumeRestoreDataComplete(volumeRestore) { + h.g.Expect(volumeRestore.Status.CommitTs).To(gomega.BeEmpty()) + } h.g.Expect(volumeRestore.Status.TimeCompleted.Unix() > 0).To(gomega.BeTrue()) h.g.Expect(len(volumeRestore.Status.TimeTaken) > 0).To(gomega.BeTrue()) } @@ -216,6 +257,64 @@ func (h *helper) setDataPlaneTikvComplete(ctx context.Context) { h.g.Expect(err).To(gomega.BeNil()) } +func (h *helper) setDataPlaneWarmUpStarted(ctx context.Context) { + restoreMember1, err := h.dataPlaneClient1.PingcapV1alpha1().Restores(fakeTcNamespace1).Get(ctx, h.restoreMemberName1, metav1.GetOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + pingcapv1alpha1.UpdateRestoreCondition(&restoreMember1.Status, &pingcapv1alpha1.RestoreCondition{ + Type: pingcapv1alpha1.RestoreWarmUpStarted, + Status: corev1.ConditionTrue, + }) + _, err = h.dataPlaneClient1.PingcapV1alpha1().Restores(fakeTcNamespace1).UpdateStatus(ctx, restoreMember1, metav1.UpdateOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + + restoreMember2, err := h.dataPlaneClient2.PingcapV1alpha1().Restores(fakeTcNamespace2).Get(ctx, h.restoreMemberName2, metav1.GetOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + pingcapv1alpha1.UpdateRestoreCondition(&restoreMember2.Status, &pingcapv1alpha1.RestoreCondition{ + Type: pingcapv1alpha1.RestoreWarmUpStarted, + Status: corev1.ConditionTrue, + }) + _, err = h.dataPlaneClient2.PingcapV1alpha1().Restores(fakeTcNamespace2).UpdateStatus(ctx, restoreMember2, metav1.UpdateOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + + restoreMember3, err := h.dataPlaneClient3.PingcapV1alpha1().Restores(fakeTcNamespace3).Get(ctx, h.restoreMemberName3, metav1.GetOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + pingcapv1alpha1.UpdateRestoreCondition(&restoreMember3.Status, &pingcapv1alpha1.RestoreCondition{ + Type: pingcapv1alpha1.RestoreWarmUpStarted, + Status: corev1.ConditionTrue, + }) + _, err = h.dataPlaneClient3.PingcapV1alpha1().Restores(fakeTcNamespace3).UpdateStatus(ctx, restoreMember3, metav1.UpdateOptions{}) + h.g.Expect(err).To(gomega.BeNil()) +} + +func (h *helper) setDataPlaneWarmUpComplete(ctx context.Context) { + restoreMember1, err := h.dataPlaneClient1.PingcapV1alpha1().Restores(fakeTcNamespace1).Get(ctx, h.restoreMemberName1, metav1.GetOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + pingcapv1alpha1.UpdateRestoreCondition(&restoreMember1.Status, &pingcapv1alpha1.RestoreCondition{ + Type: pingcapv1alpha1.RestoreWarmUpComplete, + Status: corev1.ConditionTrue, + }) + _, err = h.dataPlaneClient1.PingcapV1alpha1().Restores(fakeTcNamespace1).UpdateStatus(ctx, restoreMember1, metav1.UpdateOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + + restoreMember2, err := h.dataPlaneClient2.PingcapV1alpha1().Restores(fakeTcNamespace2).Get(ctx, h.restoreMemberName2, metav1.GetOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + pingcapv1alpha1.UpdateRestoreCondition(&restoreMember2.Status, &pingcapv1alpha1.RestoreCondition{ + Type: pingcapv1alpha1.RestoreWarmUpComplete, + Status: corev1.ConditionTrue, + }) + _, err = h.dataPlaneClient2.PingcapV1alpha1().Restores(fakeTcNamespace2).UpdateStatus(ctx, restoreMember2, metav1.UpdateOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + + restoreMember3, err := h.dataPlaneClient3.PingcapV1alpha1().Restores(fakeTcNamespace3).Get(ctx, h.restoreMemberName3, metav1.GetOptions{}) + h.g.Expect(err).To(gomega.BeNil()) + pingcapv1alpha1.UpdateRestoreCondition(&restoreMember3.Status, &pingcapv1alpha1.RestoreCondition{ + Type: pingcapv1alpha1.RestoreWarmUpComplete, + Status: corev1.ConditionTrue, + }) + _, err = h.dataPlaneClient3.PingcapV1alpha1().Restores(fakeTcNamespace3).UpdateStatus(ctx, restoreMember3, metav1.UpdateOptions{}) + h.g.Expect(err).To(gomega.BeNil()) +} + func (h *helper) setDataPlaneDataComplete(ctx context.Context) { restoreMember1, err := h.dataPlaneClient1.PingcapV1alpha1().Restores(fakeTcNamespace1).Get(ctx, h.restoreMemberName1, metav1.GetOptions{}) h.g.Expect(err).To(gomega.BeNil()) @@ -274,7 +373,7 @@ func TestVolumeRestore(t *testing.T) { ctx := context.Background() h := newHelper(t, restoreName, restoreNamespace) - volumeRestore := h.createVolumeRestore(ctx) + volumeRestore := h.createVolumeRestore(ctx, "") // run restore volume phase err := h.rm.Sync(volumeRestore) @@ -318,13 +417,137 @@ func TestVolumeRestore(t *testing.T) { h.assertRestoreComplete(volumeRestore) } +func TestVolumeRestoreWithWarmUpSync(t *testing.T) { + restoreName := "restore-1" + restoreNamespace := "ns-1" + ctx := context.Background() + h := newHelper(t, restoreName, restoreNamespace) + + volumeRestore := h.createVolumeRestore(ctx, pingcapv1alpha1.RestoreWarmupModeSync) + + // run restore volume phase + err := h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRunRestoreVolume(ctx, volumeRestore) + + // wait restore volume phase + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + + // data plane volume complete, still need to wait tikv ready + h.setDataPlaneVolumeComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + h.assertRestoreVolumeComplete(volumeRestore) + + // warmup start + h.setDataPlaneWarmUpStarted(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + h.assertRestoreWarmUpStarted(volumeRestore) + + // warmup complete + h.setDataPlaneWarmUpComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + h.assertRestoreWarmUpComplete(volumeRestore) + + // data plane tikv complete, run restore data phase + h.setDataPlaneTikvComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRunRestoreData(ctx, volumeRestore) + + // wait restore data phase + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + + // data plane data complete, run restore finish phase + h.setDataPlaneDataComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRunRestoreFinish(ctx, volumeRestore) + + // wait restore complete + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + + // data plane complete, restore complete + h.setDataPlaneComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRestoreComplete(volumeRestore) +} + +func TestVolumeRestoreWithWarmUpAsync(t *testing.T) { + restoreName := "restore-1" + restoreNamespace := "ns-1" + ctx := context.Background() + h := newHelper(t, restoreName, restoreNamespace) + + volumeRestore := h.createVolumeRestore(ctx, pingcapv1alpha1.RestoreWarmupModeASync) + + // run restore volume phase + err := h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRunRestoreVolume(ctx, volumeRestore) + + // wait restore volume phase + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + + // data plane volume complete, still need to wait tikv ready + h.setDataPlaneVolumeComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + h.assertRestoreVolumeComplete(volumeRestore) + + // warmup start + h.setDataPlaneWarmUpStarted(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + h.assertRestoreWarmUpStarted(volumeRestore) + + // data plane tikv complete, run restore data phase + h.setDataPlaneTikvComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRunRestoreData(ctx, volumeRestore) + + // wait restore data phase + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + + // data plane data complete, run restore finish phase + h.setDataPlaneDataComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRunRestoreFinish(ctx, volumeRestore) + + // wait restore complete + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.HaveOccurred()) + + // data plane complete, wait warmup complete + h.setDataPlaneComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + + // warmup complete, and restore complete + h.setDataPlaneWarmUpComplete(ctx) + err = h.rm.Sync(volumeRestore) + h.g.Expect(err).To(gomega.BeNil()) + h.assertRestoreWarmUpComplete(volumeRestore) + h.assertRestoreComplete(volumeRestore) +} + func TestVolumeRestore_RestoreVolumeFailed(t *testing.T) { restoreName := "restore-1" restoreNamespace := "ns-1" ctx := context.Background() h := newHelper(t, restoreName, restoreNamespace) - volumeRestore := h.createVolumeRestore(ctx) + volumeRestore := h.createVolumeRestore(ctx, "") // run restore volume phase err := h.rm.Sync(volumeRestore) @@ -344,7 +567,7 @@ func TestVolumeRestore_RestoreDataFailed(t *testing.T) { ctx := context.Background() h := newHelper(t, restoreName, restoreNamespace) - volumeRestore := h.createVolumeRestore(ctx) + volumeRestore := h.createVolumeRestore(ctx, "") // run restore volume phase err := h.rm.Sync(volumeRestore) @@ -380,7 +603,7 @@ func TestVolumeRestore_RestoreFinishFailed(t *testing.T) { ctx := context.Background() h := newHelper(t, restoreName, restoreNamespace) - volumeRestore := h.createVolumeRestore(ctx) + volumeRestore := h.createVolumeRestore(ctx, "") // run restore volume phase err := h.rm.Sync(volumeRestore)