Skip to content

Commit

Permalink
ebs br: enhance TC validation (#5659)
Browse files Browse the repository at this point in the history
  • Loading branch information
BornChanger committed Jun 12, 2024
1 parent 755e6c1 commit 9ef26f8
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/backup/backup/backup_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (bm *backupManager) validateBackup(backup *v1alpha1.Backup) error {
logBackupSubcommand := v1alpha1.ParseLogBackupSubcommand(backup)
var err error
if backup.Spec.BR == nil {
err = backuputil.ValidateBackup(backup, "", false)
err = backuputil.ValidateBackup(backup, "", nil)
} else {
backupNamespace := backup.GetNamespace()
if backup.Spec.BR.ClusterNamespace != "" {
Expand All @@ -216,7 +216,7 @@ func (bm *backupManager) validateBackup(backup *v1alpha1.Backup) error {
}

tikvImage := tc.TiKVImage()
err = backuputil.ValidateBackup(backup, tikvImage, tc.AcrossK8s())
err = backuputil.ValidateBackup(backup, tikvImage, tc)
}

if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions pkg/backup/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,3 +463,37 @@ func TestVolumeBackupInitFailed(t *testing.T) {
g.Expect(err).Should(BeNil())
g.Expect(backup.Status.Phase).Should(Equal(v1alpha1.VolumeBackupInitializeFailed))
}

func TestVolumeBackupAgainstInvalidTC(t *testing.T) {
g := NewGomegaWithT(t)
helper := newHelper(t)
defer helper.Close()
deps := helper.Deps
var err error

bm := NewBackupManager(deps).(*backupManager)
backup := genVolumeBackup()
_, err = deps.Clientset.PingcapV1alpha1().Backups(backup.Namespace).Create(context.TODO(), backup, metav1.CreateOptions{})
g.Expect(err).Should(BeNil())
helper.CreateTC(backup.Spec.BR.ClusterNamespace, backup.Spec.BR.Cluster, false, false)

err = bm.syncBackupJob(backup)
g.Expect(err.Error()).Should(MatchRegexp("only support volume snapshot backup across k8s clusters"))
}

func TestVolumeBackupAgainstTCWithNoTiKV(t *testing.T) {
g := NewGomegaWithT(t)
helper := newHelper(t)
defer helper.Close()
deps := helper.Deps
var err error

bm := NewBackupManager(deps).(*backupManager)
backup := genVolumeBackup()
_, err = deps.Clientset.PingcapV1alpha1().Backups(backup.Namespace).Create(context.TODO(), backup, metav1.CreateOptions{})
g.Expect(err).Should(BeNil())
helper.CreateTCWithNoTiKV(backup.Spec.BR.ClusterNamespace, backup.Spec.BR.Cluster, true, false)

err = bm.syncBackupJob(backup)
g.Expect(err.Error()).Should(MatchRegexp("not support backup TiDB cluster with no tikv replica"))
}
46 changes: 46 additions & 0 deletions pkg/backup/testutils/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,52 @@ func (h *Helper) CreateTC(namespace, clusterName string, acrossK8s, recoverMode
g.Expect(err).Should(BeNil())
}

// CreateTCWithNoTiKV creates a TidbCluster with name `clusterName` in ns `namespace` with no TiKV nodes
func (h *Helper) CreateTCWithNoTiKV(namespace, clusterName string, acrossK8s, recoverMode bool) {
h.T.Helper()
g := NewGomegaWithT(h.T)
var err error

tc := &v1alpha1.TidbCluster{
Spec: v1alpha1.TidbClusterSpec{
AcrossK8s: acrossK8s,
RecoveryMode: recoverMode,
TLSCluster: &v1alpha1.TLSCluster{Enabled: true},
TiKV: &v1alpha1.TiKVSpec{
BaseImage: "pingcap/tikv",
Replicas: 3,
StorageVolumes: []v1alpha1.StorageVolume{
{MountPath: "/var/lib/raft", Name: "raft", StorageSize: "50Gi"},
{MountPath: "/var/lib/wal", Name: "wal", StorageSize: "50Gi"},
},
},
TiDB: &v1alpha1.TiDBSpec{
TLSClient: &v1alpha1.TiDBTLSClient{Enabled: true},
},
PD: &v1alpha1.PDSpec{
Replicas: 1,
},
},
Status: v1alpha1.TidbClusterStatus{
PD: v1alpha1.PDStatus{
Members: map[string]v1alpha1.PDMember{
"pd-0": {Name: "pd-0", Health: true},
},
},
},
}
tc.Namespace = namespace
tc.Name = clusterName
_, err = h.Deps.Clientset.PingcapV1alpha1().TidbClusters(tc.Namespace).Create(context.TODO(), tc, metav1.CreateOptions{})
g.Expect(err).Should(BeNil())
// make sure can read tc from lister
g.Eventually(func() error {
_, err := h.Deps.TiDBClusterLister.TidbClusters(tc.Namespace).Get(tc.Name)
return err
}, time.Second*10).Should(BeNil())
g.Expect(err).Should(BeNil())
}

func (h *Helper) CreateRestore(restore *v1alpha1.Restore) {
h.T.Helper()
g := NewGomegaWithT(h.T)
Expand Down
8 changes: 6 additions & 2 deletions pkg/backup/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func validateAccessConfig(config *v1alpha1.TiDBAccessConfig) string {
}

// ValidateBackup validates backup sepc
func ValidateBackup(backup *v1alpha1.Backup, tikvImage string, acrossK8s bool) error {
func ValidateBackup(backup *v1alpha1.Backup, tikvImage string, tc *v1alpha1.TidbCluster) error {
ns := backup.Namespace
name := backup.Name

Expand Down Expand Up @@ -542,9 +542,13 @@ func ValidateBackup(backup *v1alpha1.Backup, tikvImage string, acrossK8s bool) e
// validate volume snapshot backup
if backup.Spec.Mode == v1alpha1.BackupModeVolumeSnapshot {
// only support across k8s now. TODO compatible for single k8s
if !acrossK8s {
if tc == nil || !tc.AcrossK8s() {
return errors.New("only support volume snapshot backup across k8s clusters")
}

if len(tc.Status.TiKV.Stores) == 0 || tc.Spec.TiKV.Replicas == 0 {
return errors.New("not support backup TiDB cluster with no tikv replica")
}
}

if backup.Spec.BackoffRetryPolicy.MinRetryDuration != "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/util/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func TestValidateBackup(t *testing.T) {
backup := new(v1alpha1.Backup)
match := func(sub string) {
t.Helper()
err := ValidateBackup(backup, "tikv:v4.0.8", false)
err := ValidateBackup(backup, "tikv:v4.0.8", nil)
if sub == "" {
g.Expect(err).Should(BeNil())
} else {
Expand Down

0 comments on commit 9ef26f8

Please sign in to comment.