From 000e6aff8bb647c2c3c08953fe89db00476c5e71 Mon Sep 17 00:00:00 2001 From: Drew Sessler Date: Fri, 30 Aug 2024 17:00:57 -0400 Subject: [PATCH] Take snapshots of pgdata using a dedicated volume. Whenever a backup finishes successfully, do a delta restore into dedicated volume and then snapshot the volume. Add/adjust tests for snapshots. Co-authored by: Anthony Landreth --- ...ator.crunchydata.com_postgresclusters.yaml | 1 + .../controller/postgrescluster/controller.go | 6 +- .../postgrescluster/helpers_test.go | 55 + .../controller/postgrescluster/pgbackrest.go | 31 +- .../postgrescluster/pgbackrest_test.go | 23 +- .../controller/postgrescluster/snapshots.go | 541 +++++-- .../postgrescluster/snapshots_test.go | 1410 +++++++++++++---- internal/naming/annotations.go | 9 +- internal/naming/annotations_test.go | 8 +- internal/naming/labels.go | 3 + internal/naming/names.go | 9 + internal/naming/selectors.go | 12 + internal/pgbackrest/config.go | 36 + internal/pgbackrest/config_test.go | 30 + .../v1beta1/postgrescluster_types.go | 1 + 15 files changed, 1736 insertions(+), 439 deletions(-) diff --git a/config/crd/bases/postgres-operator.crunchydata.com_postgresclusters.yaml b/config/crd/bases/postgres-operator.crunchydata.com_postgresclusters.yaml index 1c25b57b1..4f79a8012 100644 --- a/config/crd/bases/postgres-operator.crunchydata.com_postgresclusters.yaml +++ b/config/crd/bases/postgres-operator.crunchydata.com_postgresclusters.yaml @@ -4330,6 +4330,7 @@ spec: volumeSnapshotClassName: description: Name of the VolumeSnapshotClass that should be used by VolumeSnapshots + minLength: 1 type: string required: - volumeSnapshotClassName diff --git a/internal/controller/postgrescluster/controller.go b/internal/controller/postgrescluster/controller.go index 802fc36ca..d459d30a1 100644 --- a/internal/controller/postgrescluster/controller.go +++ b/internal/controller/postgrescluster/controller.go @@ -168,6 +168,7 @@ func (r *Reconciler) Reconcile( err error backupsSpecFound bool backupsReconciliationAllowed bool + dedicatedSnapshotPVC *corev1.PersistentVolumeClaim ) patchClusterStatus := func() error { @@ -364,7 +365,10 @@ func (r *Reconciler) Reconcile( } } if err == nil { - err = r.reconcileVolumeSnapshots(ctx, cluster, instances, clusterVolumes) + dedicatedSnapshotPVC, err = r.reconcileDedicatedSnapshotVolume(ctx, cluster, clusterVolumes) + } + if err == nil { + err = r.reconcileVolumeSnapshots(ctx, cluster, dedicatedSnapshotPVC) } if err == nil { err = r.reconcilePGBouncer(ctx, cluster, instances, primaryCertificate, rootCA) diff --git a/internal/controller/postgrescluster/helpers_test.go b/internal/controller/postgrescluster/helpers_test.go index 589e9b1a2..0536b466d 100644 --- a/internal/controller/postgrescluster/helpers_test.go +++ b/internal/controller/postgrescluster/helpers_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -21,6 +22,7 @@ import ( "github.com/crunchydata/postgres-operator/internal/controller/runtime" "github.com/crunchydata/postgres-operator/internal/initialize" + "github.com/crunchydata/postgres-operator/internal/naming" "github.com/crunchydata/postgres-operator/internal/testing/require" "github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1" ) @@ -99,6 +101,7 @@ func testVolumeClaimSpec() corev1.PersistentVolumeClaimSpec { }, } } + func testCluster() *v1beta1.PostgresCluster { // Defines a base cluster spec that can be used by tests to generate a // cluster with an expected number of instances @@ -138,6 +141,58 @@ func testCluster() *v1beta1.PostgresCluster { return cluster.DeepCopy() } +func testBackupJob(cluster *v1beta1.PostgresCluster) *batchv1.Job { + job := batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: batchv1.SchemeGroupVersion.String(), + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "backup-job-1", + Namespace: cluster.Namespace, + Labels: map[string]string{ + naming.LabelCluster: cluster.Name, + naming.LabelPGBackRestBackup: "", + naming.LabelPGBackRestRepo: "repo1", + }, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "test"}}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } + + return job.DeepCopy() +} + +func testRestoreJob(cluster *v1beta1.PostgresCluster) *batchv1.Job { + job := batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: batchv1.SchemeGroupVersion.String(), + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "restore-job-1", + Namespace: cluster.Namespace, + Labels: naming.PGBackRestRestoreJobLabels(cluster.Name), + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "test", Image: "test"}}, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } + + return job.DeepCopy() +} + // setupManager creates the runtime manager used during controller testing func setupManager(t *testing.T, cfg *rest.Config, controllerSetup func(mgr manager.Manager)) (context.Context, context.CancelFunc) { diff --git a/internal/controller/postgrescluster/pgbackrest.go b/internal/controller/postgrescluster/pgbackrest.go index 670ece55b..218880b26 100644 --- a/internal/controller/postgrescluster/pgbackrest.go +++ b/internal/controller/postgrescluster/pgbackrest.go @@ -32,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/crunchydata/postgres-operator/internal/config" + "github.com/crunchydata/postgres-operator/internal/feature" "github.com/crunchydata/postgres-operator/internal/initialize" "github.com/crunchydata/postgres-operator/internal/logging" "github.com/crunchydata/postgres-operator/internal/naming" @@ -197,7 +198,7 @@ func (r *Reconciler) applyRepoVolumeIntent(ctx context.Context, // getPGBackRestResources returns the existing pgBackRest resources that should utilized by the // PostgresCluster controller during reconciliation. Any items returned are verified to be owned // by the PostgresCluster controller and still applicable per the current PostgresCluster spec. -// Additionally, and resources identified that no longer correspond to any current configuration +// Additionally, any resources identified that no longer correspond to any current configuration // are deleted. func (r *Reconciler) getPGBackRestResources(ctx context.Context, postgresCluster *v1beta1.PostgresCluster, @@ -374,6 +375,15 @@ func (r *Reconciler) cleanupRepoResources(ctx context.Context, if !backupsSpecFound { break } + + // If the restore job has the PGBackRestBackupJobCompletion annotation, it is + // used for volume snapshots and should not be deleted (volume snapshots code + // will clean it up when appropriate). + if _, ok := owned.GetAnnotations()[naming.PGBackRestBackupJobCompletion]; ok { + ownedNoDelete = append(ownedNoDelete, owned) + delete = false + } + // When a cluster is prepared for restore, the system identifier is removed from status // and the cluster is therefore no longer bootstrapped. Only once the restore Job is // complete will the cluster then be bootstrapped again, which means by the time we @@ -762,7 +772,7 @@ func (r *Reconciler) generateRepoVolumeIntent(postgresCluster *v1beta1.PostgresC } // generateBackupJobSpecIntent generates a JobSpec for a pgBackRest backup job -func generateBackupJobSpecIntent(postgresCluster *v1beta1.PostgresCluster, +func generateBackupJobSpecIntent(ctx context.Context, postgresCluster *v1beta1.PostgresCluster, repo v1beta1.PGBackRestRepo, serviceAccountName string, labels, annotations map[string]string, opts ...string) *batchv1.JobSpec { @@ -771,6 +781,11 @@ func generateBackupJobSpecIntent(postgresCluster *v1beta1.PostgresCluster, "--stanza=" + pgbackrest.DefaultStanzaName, "--repo=" + repoIndex, } + // If VolumeSnapshots are enabled, use archive-copy and archive-check options + if postgresCluster.Spec.Backups.Snapshots != nil && feature.Enabled(ctx, feature.VolumeSnapshots) { + cmdOpts = append(cmdOpts, "--archive-copy=y", "--archive-check=y") + } + cmdOpts = append(cmdOpts, opts...) container := corev1.Container{ @@ -1634,6 +1649,9 @@ func (r *Reconciler) reconcilePostgresClusterDataSource(ctx context.Context, return errors.WithStack(err) } + // TODO(snapshots): If pgdata is being sourced by a VolumeSnapshot then don't perform a typical restore job; + // we only want to replay the WAL. + // reconcile the pgBackRest restore Job to populate the cluster's data directory if err := r.reconcileRestoreJob(ctx, cluster, sourceCluster, pgdata, pgwal, pgtablespaces, dataSource, instanceName, instanceSetName, configHash, pgbackrest.DefaultStanzaName); err != nil { @@ -2362,7 +2380,7 @@ func (r *Reconciler) reconcileManualBackup(ctx context.Context, backupJob.ObjectMeta.Labels = labels backupJob.ObjectMeta.Annotations = annotations - spec := generateBackupJobSpecIntent(postgresCluster, repo, + spec := generateBackupJobSpecIntent(ctx, postgresCluster, repo, serviceAccount.GetName(), labels, annotations, backupOpts...) backupJob.Spec = *spec @@ -2523,7 +2541,7 @@ func (r *Reconciler) reconcileReplicaCreateBackup(ctx context.Context, backupJob.ObjectMeta.Labels = labels backupJob.ObjectMeta.Annotations = annotations - spec := generateBackupJobSpecIntent(postgresCluster, replicaCreateRepo, + spec := generateBackupJobSpecIntent(ctx, postgresCluster, replicaCreateRepo, serviceAccount.GetName(), labels, annotations) backupJob.Spec = *spec @@ -2886,8 +2904,7 @@ func (r *Reconciler) reconcilePGBackRestCronJob( labels := naming.Merge( cluster.Spec.Metadata.GetLabelsOrNil(), cluster.Spec.Backups.PGBackRest.Metadata.GetLabelsOrNil(), - naming.PGBackRestCronJobLabels(cluster.Name, repo.Name, backupType), - ) + naming.PGBackRestCronJobLabels(cluster.Name, repo.Name, backupType)) objectmeta := naming.PGBackRestCronJob(cluster, backupType, repo.Name) // Look for an existing CronJob by the associated Labels. If one exists, @@ -2951,7 +2968,7 @@ func (r *Reconciler) reconcilePGBackRestCronJob( // set backup type (i.e. "full", "diff", "incr") backupOpts := []string{"--type=" + backupType} - jobSpec := generateBackupJobSpecIntent(cluster, repo, + jobSpec := generateBackupJobSpecIntent(ctx, cluster, repo, serviceAccount.GetName(), labels, annotations, backupOpts...) // Suspend cronjobs when shutdown or read-only. Any jobs that have already diff --git a/internal/controller/postgrescluster/pgbackrest_test.go b/internal/controller/postgrescluster/pgbackrest_test.go index 73b605075..8e34dabb5 100644 --- a/internal/controller/postgrescluster/pgbackrest_test.go +++ b/internal/controller/postgrescluster/pgbackrest_test.go @@ -2438,8 +2438,9 @@ func TestCopyConfigurationResources(t *testing.T) { } func TestGenerateBackupJobIntent(t *testing.T) { + ctx := context.Background() t.Run("empty", func(t *testing.T) { - spec := generateBackupJobSpecIntent( + spec := generateBackupJobSpecIntent(ctx, &v1beta1.PostgresCluster{}, v1beta1.PGBackRestRepo{}, "", nil, nil, @@ -2512,7 +2513,7 @@ volumes: ImagePullPolicy: corev1.PullAlways, }, } - job := generateBackupJobSpecIntent( + job := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, @@ -2527,7 +2528,7 @@ volumes: cluster.Spec.Backups = v1beta1.Backups{ PGBackRest: v1beta1.PGBackRestArchive{}, } - job := generateBackupJobSpecIntent( + job := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, @@ -2544,7 +2545,7 @@ volumes: }, }, } - job := generateBackupJobSpecIntent( + job := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, @@ -2583,7 +2584,7 @@ volumes: }, }, } - job := generateBackupJobSpecIntent( + job := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, @@ -2596,7 +2597,7 @@ volumes: cluster.Spec.Backups.PGBackRest.Jobs = &v1beta1.BackupJobs{ PriorityClassName: initialize.String("some-priority-class"), } - job := generateBackupJobSpecIntent( + job := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, @@ -2614,7 +2615,7 @@ volumes: cluster.Spec.Backups.PGBackRest.Jobs = &v1beta1.BackupJobs{ Tolerations: tolerations, } - job := generateBackupJobSpecIntent( + job := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, @@ -2628,14 +2629,14 @@ volumes: t.Run("Undefined", func(t *testing.T) { cluster.Spec.Backups.PGBackRest.Jobs = nil - spec := generateBackupJobSpecIntent( + spec := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, ) assert.Assert(t, spec.TTLSecondsAfterFinished == nil) cluster.Spec.Backups.PGBackRest.Jobs = &v1beta1.BackupJobs{} - spec = generateBackupJobSpecIntent( + spec = generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, ) assert.Assert(t, spec.TTLSecondsAfterFinished == nil) @@ -2646,7 +2647,7 @@ volumes: TTLSecondsAfterFinished: initialize.Int32(0), } - spec := generateBackupJobSpecIntent( + spec := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, ) if assert.Check(t, spec.TTLSecondsAfterFinished != nil) { @@ -2659,7 +2660,7 @@ volumes: TTLSecondsAfterFinished: initialize.Int32(100), } - spec := generateBackupJobSpecIntent( + spec := generateBackupJobSpecIntent(ctx, cluster, v1beta1.PGBackRestRepo{}, "", nil, nil, ) if assert.Check(t, spec.TTLSecondsAfterFinished != nil) { diff --git a/internal/controller/postgrescluster/snapshots.go b/internal/controller/postgrescluster/snapshots.go index 6e5d3878f..4f5eff817 100644 --- a/internal/controller/postgrescluster/snapshots.go +++ b/internal/controller/postgrescluster/snapshots.go @@ -6,6 +6,8 @@ package postgrescluster import ( "context" + "fmt" + "strings" "time" "github.com/pkg/errors" @@ -16,8 +18,12 @@ import ( volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" + "github.com/crunchydata/postgres-operator/internal/config" "github.com/crunchydata/postgres-operator/internal/feature" + "github.com/crunchydata/postgres-operator/internal/initialize" "github.com/crunchydata/postgres-operator/internal/naming" + "github.com/crunchydata/postgres-operator/internal/pgbackrest" + "github.com/crunchydata/postgres-operator/internal/postgres" "github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1" ) @@ -25,106 +31,121 @@ import ( // reconcileVolumeSnapshots creates and manages VolumeSnapshots if the proper VolumeSnapshot CRDs // are installed and VolumeSnapshots are enabled for the PostgresCluster. A VolumeSnapshot of the -// primary instance's pgdata volume will be created whenever a backup is completed. +// primary instance's pgdata volume will be created whenever a backup is completed. The steps to +// create snapshots include the following sequence: +// 1. We find the latest completed backup job and check the timestamp. +// 2. If the timestamp is later than what's on the dedicated snapshot PVC, a restore job runs in +// the dedicated snapshot volume. +// 3. When the restore job completes, an annotation is updated on the PVC. If the restore job +// fails, we don't run it again. +// 4. When the PVC annotation is updated, we see if there's a volume snapshot with an earlier +// timestamp. +// 5. If there are no snapshots at all, we take a snapshot and put the backup job's completion +// timestamp on the snapshot annotation. +// 6. If an earlier snapshot is found, we take a new snapshot, annotate it and delete the old +// snapshot. +// 7. When the snapshot job completes, we delete the restore job. func (r *Reconciler) reconcileVolumeSnapshots(ctx context.Context, - postgrescluster *v1beta1.PostgresCluster, instances *observedInstances, - clusterVolumes []corev1.PersistentVolumeClaim) error { + postgrescluster *v1beta1.PostgresCluster, pvc *corev1.PersistentVolumeClaim) error { - // Get feature gate state - volumeSnapshotsFeatureEnabled := feature.Enabled(ctx, feature.VolumeSnapshots) + // If VolumeSnapshots feature gate is disabled. Do nothing and return early. + if !feature.Enabled(ctx, feature.VolumeSnapshots) { + return nil + } // Check if the Kube cluster has VolumeSnapshots installed. If VolumeSnapshots - // are not installed we need to return early. If user is attempting to use + // are not installed, we need to return early. If user is attempting to use // VolumeSnapshots, return an error, otherwise return nil. - volumeSnapshotsExist, err := r.GroupVersionKindExists("snapshot.storage.k8s.io/v1", "VolumeSnapshot") + volumeSnapshotKindExists, err := r.GroupVersionKindExists("snapshot.storage.k8s.io/v1", "VolumeSnapshot") if err != nil { return err } - if !*volumeSnapshotsExist { - if postgrescluster.Spec.Backups.Snapshots != nil && volumeSnapshotsFeatureEnabled { + if !*volumeSnapshotKindExists { + if postgrescluster.Spec.Backups.Snapshots != nil { return errors.New("VolumeSnapshots are not installed/enabled in this Kubernetes cluster; cannot create snapshot.") } else { return nil } } - // Get all snapshots for this cluster + // If user is attempting to use snapshots and has tablespaces enabled, we + // need to create a warning event indicating that the two features are not + // currently compatible and return early. + if postgrescluster.Spec.Backups.Snapshots != nil && + clusterUsingTablespaces(ctx, postgrescluster) { + r.Recorder.Event(postgrescluster, corev1.EventTypeWarning, "IncompatibleFeatures", + "VolumeSnapshots not currently compatible with TablespaceVolumes; cannot create snapshot.") + return nil + } + + // Get all snapshots for the cluster. snapshots, err := r.getSnapshotsForCluster(ctx, postgrescluster) if err != nil { return err } // If snapshots are disabled, delete any existing snapshots and return early. - if postgrescluster.Spec.Backups.Snapshots == nil || !volumeSnapshotsFeatureEnabled { - for i := range snapshots.Items { - if err == nil { - err = errors.WithStack(client.IgnoreNotFound( - r.deleteControlled(ctx, postgrescluster, &snapshots.Items[i]))) - } - } - - return err + if postgrescluster.Spec.Backups.Snapshots == nil { + return r.deleteSnapshots(ctx, postgrescluster, snapshots) } - // Check snapshots for errors; if present, create an event. If there - // are multiple snapshots with errors, create event for the latest error. - latestSnapshotWithError := getLatestSnapshotWithError(snapshots) - if latestSnapshotWithError != nil { + // If we got here, then the snapshots are enabled (feature gate is enabled and the + // cluster has a Spec.Backups.Snapshots section defined). + + // Check snapshots for errors; if present, create an event. If there are + // multiple snapshots with errors, create event for the latest error and + // delete any older snapshots with error. + snapshotWithLatestError := getSnapshotWithLatestError(snapshots) + if snapshotWithLatestError != nil { r.Recorder.Event(postgrescluster, corev1.EventTypeWarning, "VolumeSnapshotError", - *latestSnapshotWithError.Status.Error.Message) + *snapshotWithLatestError.Status.Error.Message) + for _, snapshot := range snapshots.Items { + if snapshot.Status.Error != nil && + snapshot.Status.Error.Time.Before(snapshotWithLatestError.Status.Error.Time) { + err = r.deleteControlled(ctx, postgrescluster, &snapshot) + if err != nil { + return err + } + } + } } - // Get all backup jobs for this cluster - jobs := &batchv1.JobList{} - selectJobs, err := naming.AsSelector(naming.ClusterBackupJobs(postgrescluster.Name)) - if err == nil { - err = errors.WithStack( - r.Client.List(ctx, jobs, - client.InNamespace(postgrescluster.Namespace), - client.MatchingLabelsSelector{Selector: selectJobs}, - )) - } - if err != nil { + // Get pvc backup job completion annotation. If it does not exist, there has not been + // a successful restore yet, so return early. + pvcUpdateTimeStamp, pvcAnnotationExists := pvc.GetAnnotations()[naming.PGBackRestBackupJobCompletion] + if !pvcAnnotationExists { return err } - // Find most recently completed backup job - backupJob := getLatestCompleteBackupJob(jobs) - - // Return early if no completed backup job found - if backupJob == nil { - return nil - } - - // Find snapshot associated with latest backup - snapshotFound := false - snapshotIdx := 0 + // Check to see if snapshot exists for the latest backup that has been restored into + // the dedicated pvc. + var snapshotForPvcUpdateIdx int + snapshotFoundForPvcUpdate := false for idx, snapshot := range snapshots.Items { - if snapshot.GetAnnotations()[naming.PGBackRestBackupJobId] == string(backupJob.UID) { - snapshotFound = true - snapshotIdx = idx + if snapshot.GetAnnotations()[naming.PGBackRestBackupJobCompletion] == pvcUpdateTimeStamp { + snapshotForPvcUpdateIdx = idx + snapshotFoundForPvcUpdate = true } } - // If snapshot exists for latest backup and it is Ready, delete all other snapshots. - // If it exists, but is not ready, do nothing. If it does not exist, create a snapshot. - if snapshotFound { - if *snapshots.Items[snapshotIdx].Status.ReadyToUse { - // Snapshot found and ready. We only keep one snapshot, so delete any other snapshots. - for idx := range snapshots.Items { - if idx != snapshotIdx { - err = r.deleteControlled(ctx, postgrescluster, &snapshots.Items[idx]) - if err != nil { - return err - } + // If a snapshot exists for the latest backup that has been restored into the dedicated pvc + // and the snapshot is Ready, delete all other snapshots. + if snapshotFoundForPvcUpdate && snapshots.Items[snapshotForPvcUpdateIdx].Status.ReadyToUse != nil && + *snapshots.Items[snapshotForPvcUpdateIdx].Status.ReadyToUse { + for idx, snapshot := range snapshots.Items { + if idx != snapshotForPvcUpdateIdx { + err = r.deleteControlled(ctx, postgrescluster, &snapshot) + if err != nil { + return err } } } - } else { - // Snapshot not found. Create snapshot. + } + + // If a snapshot for the latest backup/restore does not exist, create a snapshot. + if !snapshotFoundForPvcUpdate { var snapshot *volumesnapshotv1.VolumeSnapshot - snapshot, err = r.generateVolumeSnapshotOfPrimaryPgdata(postgrescluster, - instances, clusterVolumes, backupJob) + snapshot, err = r.generateSnapshotOfDedicatedSnapshotVolume(postgrescluster, pvc) if err == nil { err = errors.WithStack(r.apply(ctx, snapshot)) } @@ -133,49 +154,268 @@ func (r *Reconciler) reconcileVolumeSnapshots(ctx context.Context, return err } -// generateVolumeSnapshotOfPrimaryPgdata will generate a VolumeSnapshot of a -// PostgresCluster's primary instance's pgdata PersistentVolumeClaim and -// annotate it with the provided backup job's UID. -func (r *Reconciler) generateVolumeSnapshotOfPrimaryPgdata( - postgrescluster *v1beta1.PostgresCluster, instances *observedInstances, - clusterVolumes []corev1.PersistentVolumeClaim, backupJob *batchv1.Job, -) (*volumesnapshotv1.VolumeSnapshot, error) { +// +kubebuilder:rbac:groups="",resources="persistentvolumeclaims",verbs={get} +// +kubebuilder:rbac:groups="",resources="persistentvolumeclaims",verbs={create,delete,patch} + +// reconcileDedicatedSnapshotVolume reconciles the PersistentVolumeClaim that holds a +// copy of the pgdata and is dedicated for clean snapshots of the database. It creates +// and manages the volume as well as the restore jobs that bring the volume data forward +// after a successful backup. +func (r *Reconciler) reconcileDedicatedSnapshotVolume( + ctx context.Context, cluster *v1beta1.PostgresCluster, + clusterVolumes []corev1.PersistentVolumeClaim, +) (*corev1.PersistentVolumeClaim, error) { + + // If VolumeSnapshots feature gate is disabled, do nothing and return early. + if !feature.Enabled(ctx, feature.VolumeSnapshots) { + return nil, nil + } + + // Set appropriate labels for dedicated snapshot volume + labelMap := map[string]string{ + naming.LabelCluster: cluster.Name, + naming.LabelRole: naming.RoleSnapshot, + naming.LabelData: naming.DataPostgres, + } - // Find primary instance - primaryInstance := &Instance{} - for _, instance := range instances.forCluster { - if isPrimary, known := instance.IsPrimary(); isPrimary && known { - primaryInstance = instance + // If volume already exists, use existing name. Otherwise, generate a name. + var pvc *corev1.PersistentVolumeClaim + existingPVCName, err := getPGPVCName(labelMap, clusterVolumes) + if err != nil { + return nil, errors.WithStack(err) + } + if existingPVCName != "" { + pvc = &corev1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{ + Namespace: cluster.GetNamespace(), + Name: existingPVCName, + }} + } else { + pvc = &corev1.PersistentVolumeClaim{ObjectMeta: naming.ClusterDedicatedSnapshotVolume(cluster)} + } + pvc.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("PersistentVolumeClaim")) + + // If snapshots are disabled, delete the PVC if it exists and return early. + // Check the client cache first using Get. + if cluster.Spec.Backups.Snapshots == nil { + key := client.ObjectKeyFromObject(pvc) + err := errors.WithStack(r.Client.Get(ctx, key, pvc)) + if err == nil { + err = errors.WithStack(r.deleteControlled(ctx, cluster, pvc)) + } + return nil, client.IgnoreNotFound(err) + } + + // If we've got this far, snapshots are enabled so we should create/update/get + // the dedicated snapshot volume + pvc, err = r.createDedicatedSnapshotVolume(ctx, cluster, labelMap, pvc) + if err != nil { + return pvc, err + } + + // Determine if we need to run a restore job, based on the most recent backup + // and an annotation on the PVC. + + // Find the most recently completed backup job. + backupJob, err := r.getLatestCompleteBackupJob(ctx, cluster) + if err != nil { + return pvc, err + } + + // Return early if no complete backup job is found. + if backupJob == nil { + return pvc, nil + } + + // Return early if the pvc is annotated with a timestamp newer or equal to the latest backup job. + // If the annotation value cannot be parsed, we want to proceed with a restore. + pvcAnnotationTimestampString := pvc.GetAnnotations()[naming.PGBackRestBackupJobCompletion] + if pvcAnnotationTime, err := time.Parse(time.RFC3339, pvcAnnotationTimestampString); err == nil { + if backupJob.Status.CompletionTime.Compare(pvcAnnotationTime) <= 0 { + return pvc, nil } } - // Return error if primary instance not found - if primaryInstance.Name == "" { - return nil, errors.New("Could not find primary instance. Cannot create volume snapshot.") + + // If we've made it here, the pvc has not been restored with latest backup. + // Find the dedicated snapshot volume restore job if it exists. Since we delete + // successful restores after we annotate the PVC and stop making restore jobs + // if a failed DSV restore job exists, there should only ever be one DSV restore + // job in existence at a time. + // TODO(snapshots): Should this function throw an error or something if multiple + // DSV restores somehow exist? + restoreJob, err := r.getDedicatedSnapshotVolumeRestoreJob(ctx, cluster) + if err != nil { + return pvc, err } - // Find pvc associated with primary instance - primaryPvc := corev1.PersistentVolumeClaim{} - for _, pvc := range clusterVolumes { - pvcInstance := pvc.GetLabels()[naming.LabelInstance] - pvcRole := pvc.GetLabels()[naming.LabelRole] - if pvcRole == naming.RolePostgresData && pvcInstance == primaryInstance.Name { - primaryPvc = pvc + // If we don't find a restore job, we run one. + if restoreJob == nil { + err = r.dedicatedSnapshotVolumeRestore(ctx, cluster, pvc, backupJob) + return pvc, err + } + + // If we've made it here, we have found a restore job. If the restore job was + // successful, set/update the annotation on the PVC and delete the restore job. + if restoreJob.Status.Succeeded == 1 { + if pvc.GetAnnotations() == nil { + pvc.Annotations = map[string]string{} } + pvc.Annotations[naming.PGBackRestBackupJobCompletion] = restoreJob.GetAnnotations()[naming.PGBackRestBackupJobCompletion] + annotations := fmt.Sprintf(`{"metadata":{"annotations":{"%s": "%s"}}}`, + naming.PGBackRestBackupJobCompletion, pvc.Annotations[naming.PGBackRestBackupJobCompletion]) + + patch := client.RawPatch(client.Merge.Type(), []byte(annotations)) + err = r.handlePersistentVolumeClaimError(cluster, + errors.WithStack(r.patch(ctx, pvc, patch))) + + if err != nil { + return pvc, err + } + + err = r.Client.Delete(ctx, restoreJob, client.PropagationPolicy(metav1.DeletePropagationBackground)) + return pvc, errors.WithStack(err) + } + + // If the restore job failed, create a warning event. + if restoreJob.Status.Failed == 1 { + r.Recorder.Event(cluster, corev1.EventTypeWarning, + "DedicatedSnapshotVolumeRestoreJobError", "restore job failed, check the logs") + return pvc, nil + } + + // If we made it here, the restore job is still running and we should do nothing. + return pvc, err +} + +// createDedicatedSnapshotVolume creates/updates/gets the dedicated snapshot volume. +// It expects that the volume name and GVK has already been set on the pvc that is passed in. +func (r *Reconciler) createDedicatedSnapshotVolume(ctx context.Context, + cluster *v1beta1.PostgresCluster, labelMap map[string]string, + pvc *corev1.PersistentVolumeClaim, +) (*corev1.PersistentVolumeClaim, error) { + var err error + + // An InstanceSet must be chosen to scale resources for the dedicated snapshot volume. + // TODO: We've chosen the first InstanceSet for the time being, but might want to consider + // making the choice configurable. + instanceSpec := cluster.Spec.InstanceSets[0] + + pvc.Annotations = naming.Merge( + cluster.Spec.Metadata.GetAnnotationsOrNil(), + instanceSpec.Metadata.GetAnnotationsOrNil()) + + pvc.Labels = naming.Merge( + cluster.Spec.Metadata.GetLabelsOrNil(), + instanceSpec.Metadata.GetLabelsOrNil(), + labelMap, + ) + + err = errors.WithStack(r.setControllerReference(cluster, pvc)) + if err != nil { + return pvc, err + } + + pvc.Spec = instanceSpec.DataVolumeClaimSpec + + // Set the snapshot volume to the same size as the pgdata volume. The size should scale with auto-grow. + r.setVolumeSize(ctx, cluster, pvc, instanceSpec.Name) + + // Clear any set limit before applying PVC. This is needed to allow the limit + // value to change later. + pvc.Spec.Resources.Limits = nil + + err = r.handlePersistentVolumeClaimError(cluster, + errors.WithStack(r.apply(ctx, pvc))) + if err != nil { + return pvc, err + } + + return pvc, err +} + +// dedicatedSnapshotVolumeRestore creates a Job that performs a restore into the dedicated +// snapshot volume. +// This function is very similar to reconcileRestoreJob, but specifically tailored to the +// dedicated snapshot volume. +func (r *Reconciler) dedicatedSnapshotVolumeRestore(ctx context.Context, + cluster *v1beta1.PostgresCluster, dedicatedSnapshotVolume *corev1.PersistentVolumeClaim, + backupJob *batchv1.Job, +) error { + + pgdata := postgres.DataDirectory(cluster) + repoName := backupJob.GetLabels()[naming.LabelPGBackRestRepo] + + opts := []string{ + "--stanza=" + pgbackrest.DefaultStanzaName, + "--pg1-path=" + pgdata, + "--repo=" + regexRepoIndex.FindString(repoName), + "--delta", } - // Return error if primary pvc not found - if primaryPvc.Name == "" { - return nil, errors.New("Could not find primary's pgdata pvc. Cannot create volume snapshot.") + + cmd := pgbackrest.DedicatedSnapshotVolumeRestoreCommand(pgdata, strings.Join(opts, " ")) + + // Create the volume resources required for the Postgres data directory. + dataVolumeMount := postgres.DataVolumeMount() + dataVolume := corev1.Volume{ + Name: dataVolumeMount.Name, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: dedicatedSnapshotVolume.GetName(), + }, + }, } + volumes := []corev1.Volume{dataVolume} + volumeMounts := []corev1.VolumeMount{dataVolumeMount} + + _, configHash, err := pgbackrest.CalculateConfigHashes(cluster) + if err != nil { + return err + } + + // A DataSource is required to avoid a nil pointer exception. + fakeDataSource := &v1beta1.PostgresClusterDataSource{RepoName: ""} + + restoreJob := &batchv1.Job{} + instanceName := cluster.Status.StartupInstance + + if err := r.generateRestoreJobIntent(cluster, configHash, instanceName, cmd, + volumeMounts, volumes, fakeDataSource, restoreJob); err != nil { + return errors.WithStack(err) + } + + // Attempt the restore exactly once. If the restore job fails, we prompt the user to investigate. + restoreJob.Spec.BackoffLimit = initialize.Int32(0) + restoreJob.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever - // generate VolumeSnapshot - snapshot, err := r.generateVolumeSnapshot(postgrescluster, primaryPvc, + // Add pgBackRest configs to template. + pgbackrest.AddConfigToRestorePod(cluster, cluster, &restoreJob.Spec.Template.Spec) + + // Add nss_wrapper init container and add nss_wrapper env vars to the pgbackrest restore container. + addNSSWrapper( + config.PGBackRestContainerImage(cluster), + cluster.Spec.ImagePullPolicy, + &restoreJob.Spec.Template) + + addTMPEmptyDir(&restoreJob.Spec.Template) + + restoreJob.Annotations[naming.PGBackRestBackupJobCompletion] = backupJob.Status.CompletionTime.Format(time.RFC3339) + return errors.WithStack(r.apply(ctx, restoreJob)) +} + +// generateSnapshotOfDedicatedSnapshotVolume will generate a VolumeSnapshot of +// the dedicated snapshot PersistentVolumeClaim and annotate it with the +// provided backup job's UID. +func (r *Reconciler) generateSnapshotOfDedicatedSnapshotVolume( + postgrescluster *v1beta1.PostgresCluster, + dedicatedSnapshotVolume *corev1.PersistentVolumeClaim, +) (*volumesnapshotv1.VolumeSnapshot, error) { + + snapshot, err := r.generateVolumeSnapshot(postgrescluster, *dedicatedSnapshotVolume, postgrescluster.Spec.Backups.Snapshots.VolumeSnapshotClassName) if err == nil { - // Add annotation for associated backup job's UID if snapshot.Annotations == nil { snapshot.Annotations = map[string]string{} } - snapshot.Annotations[naming.PGBackRestBackupJobId] = string(backupJob.UID) + snapshot.Annotations[naming.PGBackRestBackupJobCompletion] = dedicatedSnapshotVolume.GetAnnotations()[naming.PGBackRestBackupJobCompletion] } return snapshot, err @@ -185,8 +425,8 @@ func (r *Reconciler) generateVolumeSnapshotOfPrimaryPgdata( // PersistentVolumeClaim and VolumeSnapshotClassName and will set the provided // PostgresCluster as the owner. func (r *Reconciler) generateVolumeSnapshot(postgrescluster *v1beta1.PostgresCluster, - pvc corev1.PersistentVolumeClaim, - volumeSnapshotClassName string) (*volumesnapshotv1.VolumeSnapshot, error) { + pvc corev1.PersistentVolumeClaim, volumeSnapshotClassName string, +) (*volumesnapshotv1.VolumeSnapshot, error) { snapshot := &volumesnapshotv1.VolumeSnapshot{ TypeMeta: metav1.TypeMeta{ @@ -209,10 +449,57 @@ func (r *Reconciler) generateVolumeSnapshot(postgrescluster *v1beta1.PostgresClu return snapshot, err } -// getLatestCompleteBackupJob takes a JobList and returns a pointer to the -// most recently completed backup job. If no completed backup job exists -// then it returns nil. -func getLatestCompleteBackupJob(jobs *batchv1.JobList) *batchv1.Job { +// getDedicatedSnapshotVolumeRestoreJob finds a dedicated snapshot volume (DSV) +// restore job if one exists. Since we delete successful restore jobs and stop +// creating new restore jobs when one fails, there should only ever be one DSV +// restore job present at a time. If a DSV restore cannot be found, we return nil. +func (r *Reconciler) getDedicatedSnapshotVolumeRestoreJob(ctx context.Context, + postgrescluster *v1beta1.PostgresCluster) (*batchv1.Job, error) { + + // Get all restore jobs for this cluster + jobs := &batchv1.JobList{} + selectJobs, err := naming.AsSelector(naming.ClusterRestoreJobs(postgrescluster.Name)) + if err == nil { + err = errors.WithStack( + r.Client.List(ctx, jobs, + client.InNamespace(postgrescluster.Namespace), + client.MatchingLabelsSelector{Selector: selectJobs}, + )) + } + if err != nil { + return nil, err + } + + // Get restore job that has PGBackRestBackupJobCompletion annotation + for _, job := range jobs.Items { + _, annotationExists := job.GetAnnotations()[naming.PGBackRestBackupJobCompletion] + if annotationExists { + return &job, nil + } + } + + return nil, nil +} + +// getLatestCompleteBackupJob finds the most recently completed +// backup job for a cluster +func (r *Reconciler) getLatestCompleteBackupJob(ctx context.Context, + postgrescluster *v1beta1.PostgresCluster) (*batchv1.Job, error) { + + // Get all backup jobs for this cluster + jobs := &batchv1.JobList{} + selectJobs, err := naming.AsSelector(naming.ClusterBackupJobs(postgrescluster.Name)) + if err == nil { + err = errors.WithStack( + r.Client.List(ctx, jobs, + client.InNamespace(postgrescluster.Namespace), + client.MatchingLabelsSelector{Selector: selectJobs}, + )) + } + if err != nil { + return nil, err + } + zeroTime := metav1.NewTime(time.Time{}) latestCompleteBackupJob := batchv1.Job{ Status: batchv1.JobStatus{ @@ -228,37 +515,39 @@ func getLatestCompleteBackupJob(jobs *batchv1.JobList) *batchv1.Job { } if latestCompleteBackupJob.Status.CompletionTime.Equal(&zeroTime) { - return nil + return nil, nil } - return &latestCompleteBackupJob + return &latestCompleteBackupJob, nil } -// getLatestSnapshotWithError takes a VolumeSnapshotList and returns a pointer to the -// most recently created snapshot that has an error. If no snapshot errors exist +// getSnapshotWithLatestError takes a VolumeSnapshotList and returns a pointer to the +// snapshot that has most recently had an error. If no snapshot errors exist // then it returns nil. -func getLatestSnapshotWithError(snapshots *volumesnapshotv1.VolumeSnapshotList) *volumesnapshotv1.VolumeSnapshot { +func getSnapshotWithLatestError(snapshots *volumesnapshotv1.VolumeSnapshotList) *volumesnapshotv1.VolumeSnapshot { zeroTime := metav1.NewTime(time.Time{}) - latestSnapshotWithError := volumesnapshotv1.VolumeSnapshot{ + snapshotWithLatestError := volumesnapshotv1.VolumeSnapshot{ Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: &zeroTime, + Error: &volumesnapshotv1.VolumeSnapshotError{ + Time: &zeroTime, + }, }, } for _, snapshot := range snapshots.Items { if snapshot.Status.Error != nil && - latestSnapshotWithError.Status.CreationTime.Before(snapshot.Status.CreationTime) { - latestSnapshotWithError = snapshot + snapshotWithLatestError.Status.Error.Time.Before(snapshot.Status.Error.Time) { + snapshotWithLatestError = snapshot } } - if latestSnapshotWithError.Status.CreationTime.Equal(&zeroTime) { + if snapshotWithLatestError.Status.Error.Time.Equal(&zeroTime) { return nil } - return &latestSnapshotWithError + return &snapshotWithLatestError } -// getSnapshotsForCluster gets all the VolumeSnapshots for a given postgrescluster +// getSnapshotsForCluster gets all the VolumeSnapshots for a given postgrescluster. func (r *Reconciler) getSnapshotsForCluster(ctx context.Context, cluster *v1beta1.PostgresCluster) ( *volumesnapshotv1.VolumeSnapshotList, error) { @@ -276,7 +565,7 @@ func (r *Reconciler) getSnapshotsForCluster(ctx context.Context, cluster *v1beta return snapshots, err } -// getLatestReadySnapshot takes a VolumeSnapshotList and returns the latest ready VolumeSnapshot +// getLatestReadySnapshot takes a VolumeSnapshotList and returns the latest ready VolumeSnapshot. func getLatestReadySnapshot(snapshots *volumesnapshotv1.VolumeSnapshotList) *volumesnapshotv1.VolumeSnapshot { zeroTime := metav1.NewTime(time.Time{}) latestReadySnapshot := volumesnapshotv1.VolumeSnapshot{ @@ -285,7 +574,7 @@ func getLatestReadySnapshot(snapshots *volumesnapshotv1.VolumeSnapshotList) *vol }, } for _, snapshot := range snapshots.Items { - if *snapshot.Status.ReadyToUse && + if snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse && latestReadySnapshot.Status.CreationTime.Before(snapshot.Status.CreationTime) { latestReadySnapshot = snapshot } @@ -297,3 +586,29 @@ func getLatestReadySnapshot(snapshots *volumesnapshotv1.VolumeSnapshotList) *vol return &latestReadySnapshot } + +// deleteSnapshots takes a postgrescluster and a snapshot list and deletes all snapshots +// in the list that are controlled by the provided postgrescluster. +func (r *Reconciler) deleteSnapshots(ctx context.Context, + postgrescluster *v1beta1.PostgresCluster, snapshots *volumesnapshotv1.VolumeSnapshotList) error { + + for i := range snapshots.Items { + err := errors.WithStack(client.IgnoreNotFound( + r.deleteControlled(ctx, postgrescluster, &snapshots.Items[i]))) + if err != nil { + return err + } + } + return nil +} + +// tablespaceVolumesInUse determines if the TablespaceVolumes feature is enabled and the given +// cluster has tablespace volumes in place. +func clusterUsingTablespaces(ctx context.Context, postgrescluster *v1beta1.PostgresCluster) bool { + for _, instanceSet := range postgrescluster.Spec.InstanceSets { + if len(instanceSet.TablespaceVolumes) > 0 { + return feature.Enabled(ctx, feature.TablespaceVolumes) + } + } + return false +} diff --git a/internal/controller/postgrescluster/snapshots_test.go b/internal/controller/postgrescluster/snapshots_test.go index 1442877ed..455b1b158 100644 --- a/internal/controller/postgrescluster/snapshots_test.go +++ b/internal/controller/postgrescluster/snapshots_test.go @@ -7,50 +7,66 @@ package postgrescluster import ( "context" "testing" + "time" "github.com/pkg/errors" "gotest.tools/v3/assert" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/crunchydata/postgres-operator/internal/controller/runtime" "github.com/crunchydata/postgres-operator/internal/feature" "github.com/crunchydata/postgres-operator/internal/initialize" "github.com/crunchydata/postgres-operator/internal/naming" + "github.com/crunchydata/postgres-operator/internal/testing/cmp" + "github.com/crunchydata/postgres-operator/internal/testing/events" "github.com/crunchydata/postgres-operator/internal/testing/require" "github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1" volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" ) -func TestReconcileSnapshots(t *testing.T) { +func TestReconcileVolumeSnapshots(t *testing.T) { ctx := context.Background() cfg, cc := setupKubernetes(t) require.ParallelCapacity(t, 1) discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) assert.NilError(t, err) + recorder := events.NewRecorder(t, runtime.Scheme) r := &Reconciler{ Client: cc, Owner: client.FieldOwner(t.Name()), DiscoveryClient: discoveryClient, + Recorder: recorder, } ns := setupNamespace(t, cc) + // Enable snapshots feature gate + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.VolumeSnapshots: true, + })) + ctx = feature.NewContext(ctx, gate) + t.Run("SnapshotsDisabledDeleteSnapshots", func(t *testing.T) { + // Create cluster (without snapshots spec) cluster := testCluster() cluster.Namespace = ns.Name cluster.ObjectMeta.UID = "the-uid-123" + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) - instances := newObservedInstances(cluster, nil, nil) - volumes := []corev1.PersistentVolumeClaim{} - + // Create a snapshot pvc := &corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: "instance1-abc-def", + Name: "dedicated-snapshot-volume", }, } volumeSnapshotClassName := "my-snapshotclass" @@ -59,10 +75,7 @@ func TestReconcileSnapshots(t *testing.T) { err = errors.WithStack(r.apply(ctx, snapshot)) assert.NilError(t, err) - err = r.reconcileVolumeSnapshots(ctx, cluster, instances, volumes) - assert.NilError(t, err) - - // Get all snapshots for this cluster + // Get all snapshots for this cluster and assert 1 exists selectSnapshots, err := naming.AsSelector(naming.Cluster(cluster.Name)) assert.NilError(t, err) snapshots := &volumesnapshotv1.VolumeSnapshotList{} @@ -72,31 +85,98 @@ func TestReconcileSnapshots(t *testing.T) { client.MatchingLabelsSelector{Selector: selectSnapshots}, )) assert.NilError(t, err) + assert.Equal(t, len(snapshots.Items), 1) + + // Reconcile snapshots + err = r.reconcileVolumeSnapshots(ctx, cluster, pvc) + assert.NilError(t, err) + + // Get all snapshots for this cluster and assert 0 exist + assert.NilError(t, err) + snapshots = &volumesnapshotv1.VolumeSnapshotList{} + err = errors.WithStack( + r.Client.List(ctx, snapshots, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectSnapshots}, + )) + assert.NilError(t, err) assert.Equal(t, len(snapshots.Items), 0) }) - t.Run("SnapshotsEnabledNoJobsNoSnapshots", func(t *testing.T) { + t.Run("SnapshotsEnabledTablespacesEnabled", func(t *testing.T) { + // Enable both tablespaces and snapshots feature gates gate := feature.NewGate() assert.NilError(t, gate.SetFromMap(map[string]bool{ - feature.VolumeSnapshots: true, + feature.TablespaceVolumes: true, + feature.VolumeSnapshots: true, })) ctx := feature.NewContext(ctx, gate) + // Create a cluster with snapshots and tablespaces enabled + volumeSnapshotClassName := "my-snapshotclass" cluster := testCluster() cluster.Namespace = ns.Name - cluster.ObjectMeta.UID = "the-uid-123" + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: volumeSnapshotClassName, + } + cluster.Spec.InstanceSets[0].TablespaceVolumes = []v1beta1.TablespaceVolume{{ + Name: "volume-1", + }} + + // Create pvc for reconcile + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dedicated-snapshot-volume", + }, + } + + // Reconcile + err = r.reconcileVolumeSnapshots(ctx, cluster, pvc) + assert.NilError(t, err) + + // Assert warning event was created and has expected attributes + if assert.Check(t, len(recorder.Events) > 0) { + assert.Equal(t, recorder.Events[0].Type, "Warning") + assert.Equal(t, recorder.Events[0].Regarding.Kind, "PostgresCluster") + assert.Equal(t, recorder.Events[0].Regarding.Name, "hippo") + assert.Equal(t, recorder.Events[0].Reason, "IncompatibleFeatures") + assert.Assert(t, cmp.Contains(recorder.Events[0].Note, "VolumeSnapshots not currently compatible with TablespaceVolumes")) + } + }) + + t.Run("SnapshotsEnabledNoPvcAnnotation", func(t *testing.T) { + // Create a volume snapshot class volumeSnapshotClassName := "my-snapshotclass" + volumeSnapshotClass := &volumesnapshotv1.VolumeSnapshotClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumeSnapshotClassName, + }, + DeletionPolicy: "Delete", + } + assert.NilError(t, r.Client.Create(ctx, volumeSnapshotClass)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, volumeSnapshotClass)) }) + + // Create a cluster with snapshots enabled + cluster := testCluster() + cluster.Namespace = ns.Name cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ VolumeSnapshotClassName: volumeSnapshotClassName, } + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) - instances := newObservedInstances(cluster, nil, nil) - volumes := []corev1.PersistentVolumeClaim{} + // Create pvc for reconcile + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dedicated-snapshot-volume", + }, + } - err := r.reconcileVolumeSnapshots(ctx, cluster, instances, volumes) + // Reconcile + err = r.reconcileVolumeSnapshots(ctx, cluster, pvc) assert.NilError(t, err) - // Get all snapshots for this cluster + // Assert no snapshots exist selectSnapshots, err := naming.AsSelector(naming.Cluster(cluster.Name)) assert.NilError(t, err) snapshots := &volumesnapshotv1.VolumeSnapshotList{} @@ -108,335 +188,802 @@ func TestReconcileSnapshots(t *testing.T) { assert.NilError(t, err) assert.Equal(t, len(snapshots.Items), 0) }) -} - -func TestGenerateVolumeSnapshotOfPrimaryPgdata(t *testing.T) { - // ctx := context.Background() - _, cc := setupKubernetes(t) - require.ParallelCapacity(t, 1) - r := &Reconciler{ - Client: cc, - Owner: client.FieldOwner(t.Name()), - } - ns := setupNamespace(t, cc) + t.Run("SnapshotsEnabledReadySnapshotsExist", func(t *testing.T) { + // Create a volume snapshot class + volumeSnapshotClassName := "my-snapshotclass" + volumeSnapshotClass := &volumesnapshotv1.VolumeSnapshotClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: volumeSnapshotClassName, + }, + DeletionPolicy: "Delete", + } + assert.NilError(t, r.Client.Create(ctx, volumeSnapshotClass)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, volumeSnapshotClass)) }) - t.Run("NoPrimary", func(t *testing.T) { + // Create a cluster with snapshots enabled cluster := testCluster() cluster.Namespace = ns.Name - instances := newObservedInstances(cluster, nil, nil) - volumes := []corev1.PersistentVolumeClaim{} - backupJob := &batchv1.Job{} + cluster.ObjectMeta.UID = "the-uid-123" + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: volumeSnapshotClassName, + } + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) - snapshot, err := r.generateVolumeSnapshotOfPrimaryPgdata(cluster, instances, volumes, backupJob) - assert.Error(t, err, "Could not find primary instance. Cannot create volume snapshot.") - assert.Check(t, snapshot == nil) - }) + // Create pvc with annotation + pvcName := initialize.String("dedicated-snapshot-volume") + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: *pvcName, + Annotations: map[string]string{ + naming.PGBackRestBackupJobCompletion: "backup-timestamp", + }, + }, + } - t.Run("NoVolume", func(t *testing.T) { - cluster := testCluster() - cluster.Namespace = ns.Name - instances := newObservedInstances(cluster, - []appsv1.StatefulSet{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "instance1-abc", - Labels: map[string]string{ - "postgres-operator.crunchydata.com/instance-set": "00", - }, - }, + // Create snapshot with annotation matching the pvc annotation + snapshot1 := &volumesnapshotv1.VolumeSnapshot{ + TypeMeta: metav1.TypeMeta{ + APIVersion: volumesnapshotv1.SchemeGroupVersion.String(), + Kind: "VolumeSnapshot", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "first-snapshot", + Namespace: ns.Name, + Annotations: map[string]string{ + naming.PGBackRestBackupJobCompletion: "backup-timestamp", + }, + Labels: map[string]string{ + naming.LabelCluster: "hippo", }, }, - []corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "some-pod-name", - Labels: map[string]string{ - "postgres-operator.crunchydata.com/instance-set": "00", - "postgres-operator.crunchydata.com/instance": "instance1-abc", - "postgres-operator.crunchydata.com/role": "master", - }, - }, + Spec: volumesnapshotv1.VolumeSnapshotSpec{ + Source: volumesnapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: pvcName, }, - }) - volumes := []corev1.PersistentVolumeClaim{} - backupJob := &batchv1.Job{} - - snapshot, err := r.generateVolumeSnapshotOfPrimaryPgdata(cluster, instances, volumes, backupJob) - assert.Error(t, err, "Could not find primary's pgdata pvc. Cannot create volume snapshot.") - assert.Check(t, snapshot == nil) - }) + }, + } + err := errors.WithStack(r.setControllerReference(cluster, snapshot1)) + assert.NilError(t, err) + err = r.apply(ctx, snapshot1) + assert.NilError(t, err) - t.Run("Success", func(t *testing.T) { - cluster := testCluster() - cluster.Namespace = ns.Name - cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ - VolumeSnapshotClassName: "my-volume-snapshot-class", + // Update snapshot status + truePtr := initialize.Bool(true) + snapshot1.Status = &volumesnapshotv1.VolumeSnapshotStatus{ + ReadyToUse: truePtr, } - cluster.ObjectMeta.UID = "the-uid-123" - instances := newObservedInstances(cluster, - []appsv1.StatefulSet{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "instance1-abc", - Labels: map[string]string{ - "postgres-operator.crunchydata.com/instance-set": "00", - }, - }, + err = r.Client.Status().Update(ctx, snapshot1) + assert.NilError(t, err) + + // Create second snapshot with different annotation value + snapshot2 := &volumesnapshotv1.VolumeSnapshot{ + TypeMeta: metav1.TypeMeta{ + APIVersion: volumesnapshotv1.SchemeGroupVersion.String(), + Kind: "VolumeSnapshot", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "second-snapshot", + Namespace: ns.Name, + Annotations: map[string]string{ + naming.PGBackRestBackupJobCompletion: "older-backup-timestamp", + }, + Labels: map[string]string{ + naming.LabelCluster: "hippo", }, }, - []corev1.Pod{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "some-pod-name", - Labels: map[string]string{ - "postgres-operator.crunchydata.com/instance-set": "00", - "postgres-operator.crunchydata.com/instance": "instance1-abc", - "postgres-operator.crunchydata.com/role": "master", - }, - }, + Spec: volumesnapshotv1.VolumeSnapshotSpec{ + Source: volumesnapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: pvcName, }, }, - ) - volumes := []corev1.PersistentVolumeClaim{{ + } + err = errors.WithStack(r.setControllerReference(cluster, snapshot2)) + assert.NilError(t, err) + err = r.apply(ctx, snapshot2) + assert.NilError(t, err) + + // Update second snapshot's status + snapshot2.Status = &volumesnapshotv1.VolumeSnapshotStatus{ + ReadyToUse: truePtr, + } + err = r.Client.Status().Update(ctx, snapshot2) + assert.NilError(t, err) + + // Reconcile + err = r.reconcileVolumeSnapshots(ctx, cluster, pvc) + assert.NilError(t, err) + + // Assert first snapshot exists and second snapshot was deleted + selectSnapshots, err := naming.AsSelector(naming.Cluster(cluster.Name)) + assert.NilError(t, err) + snapshots := &volumesnapshotv1.VolumeSnapshotList{} + err = errors.WithStack( + r.Client.List(ctx, snapshots, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectSnapshots}, + )) + assert.NilError(t, err) + assert.Equal(t, len(snapshots.Items), 1) + assert.Equal(t, snapshots.Items[0].Name, "first-snapshot") + + // Cleanup + err = r.deleteControlled(ctx, cluster, snapshot1) + assert.NilError(t, err) + }) + + t.Run("SnapshotsEnabledCreateSnapshot", func(t *testing.T) { + // Create a volume snapshot class + volumeSnapshotClassName := "my-snapshotclass" + volumeSnapshotClass := &volumesnapshotv1.VolumeSnapshotClass{ ObjectMeta: metav1.ObjectMeta{ - Name: "instance1-abc-def", - Labels: map[string]string{ - naming.LabelRole: naming.RolePostgresData, - naming.LabelInstanceSet: "instance1", - naming.LabelInstance: "instance1-abc"}, + Name: volumeSnapshotClassName, }, - }} - backupJob := &batchv1.Job{ + DeletionPolicy: "Delete", + } + assert.NilError(t, r.Client.Create(ctx, volumeSnapshotClass)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, volumeSnapshotClass)) }) + + // Create a cluster with snapshots enabled + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: volumeSnapshotClassName, + } + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) + + // Create pvc with annotation + pvcName := initialize.String("dedicated-snapshot-volume") + pvc := &corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Name: "backup1", - UID: "the-uid-456", + Name: *pvcName, + Annotations: map[string]string{ + naming.PGBackRestBackupJobCompletion: "another-backup-timestamp", + }, }, } - snapshot, err := r.generateVolumeSnapshotOfPrimaryPgdata(cluster, instances, volumes, backupJob) + // Reconcile + err = r.reconcileVolumeSnapshots(ctx, cluster, pvc) + assert.NilError(t, err) + + // Assert that a snapshot was created + selectSnapshots, err := naming.AsSelector(naming.Cluster(cluster.Name)) + assert.NilError(t, err) + snapshots := &volumesnapshotv1.VolumeSnapshotList{} + err = errors.WithStack( + r.Client.List(ctx, snapshots, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectSnapshots}, + )) assert.NilError(t, err) - assert.Equal(t, snapshot.Annotations[naming.PGBackRestBackupJobId], "the-uid-456") + assert.Equal(t, len(snapshots.Items), 1) + assert.Equal(t, snapshots.Items[0].Annotations[naming.PGBackRestBackupJobCompletion], + "another-backup-timestamp") }) } -func TestGenerateVolumeSnapshot(t *testing.T) { - // ctx := context.Background() - _, cc := setupKubernetes(t) - require.ParallelCapacity(t, 1) +func TestReconcileDedicatedSnapshotVolume(t *testing.T) { + ctx := context.Background() + cfg, cc := setupKubernetes(t) + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) + assert.NilError(t, err) + recorder := events.NewRecorder(t, runtime.Scheme) r := &Reconciler{ - Client: cc, - Owner: client.FieldOwner(t.Name()), - } - ns := setupNamespace(t, cc) - - cluster := testCluster() - cluster.Namespace = ns.Name - - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: "instance1-abc-def", - }, + Client: cc, + Owner: client.FieldOwner(t.Name()), + DiscoveryClient: discoveryClient, + Recorder: recorder, } - volumeSnapshotClassName := "my-snapshot" - snapshot, err := r.generateVolumeSnapshot(cluster, *pvc, volumeSnapshotClassName) - assert.NilError(t, err) - assert.Equal(t, *snapshot.Spec.VolumeSnapshotClassName, "my-snapshot") - assert.Equal(t, *snapshot.Spec.Source.PersistentVolumeClaimName, "instance1-abc-def") - assert.Equal(t, snapshot.Labels[naming.LabelCluster], "hippo") - assert.Equal(t, snapshot.ObjectMeta.OwnerReferences[0].Name, "hippo") -} + // Enable snapshots feature gate + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.VolumeSnapshots: true, + })) + ctx = feature.NewContext(ctx, gate) -func TestGetLatestCompleteBackupJob(t *testing.T) { - t.Run("NoJobs", func(t *testing.T) { - jobList := &batchv1.JobList{} - latestCompleteBackupJob := getLatestCompleteBackupJob(jobList) - assert.Check(t, latestCompleteBackupJob == nil) - }) + t.Run("SnapshotsDisabledDeletePvc", func(t *testing.T) { + // Create cluster without snapshots spec + ns := setupNamespace(t, cc) + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) - t.Run("NoCompleteJobs", func(t *testing.T) { - jobList := &batchv1.JobList{ - Items: []batchv1.Job{ - { - Status: batchv1.JobStatus{ - Succeeded: 0, - }, - }, - { - Status: batchv1.JobStatus{ - Succeeded: 0, - }, - }, + // Create a dedicated snapshot volume + pvc := &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{ + Kind: "PersistentVolumeClaim", + APIVersion: corev1.SchemeGroupVersion.String(), }, - } - latestCompleteBackupJob := getLatestCompleteBackupJob(jobList) - assert.Check(t, latestCompleteBackupJob == nil) - }) - - t.Run("OneCompleteBackupJob", func(t *testing.T) { - currentTime := metav1.Now() - jobList := &batchv1.JobList{ - Items: []batchv1.Job{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "backup1", - UID: "something-here", - }, - Status: batchv1.JobStatus{ - Succeeded: 1, - CompletionTime: ¤tTime, - }, - }, - { - Status: batchv1.JobStatus{ - Succeeded: 0, - }, + ObjectMeta: metav1.ObjectMeta{ + Name: "dedicated-snapshot-volume", + Namespace: ns.Name, + Labels: map[string]string{ + naming.LabelCluster: cluster.Name, + naming.LabelRole: naming.RoleSnapshot, + naming.LabelData: naming.DataPostgres, }, }, + Spec: testVolumeClaimSpec(), } - latestCompleteBackupJob := getLatestCompleteBackupJob(jobList) - assert.Check(t, latestCompleteBackupJob.UID == "something-here") - }) + err = errors.WithStack(r.setControllerReference(cluster, pvc)) + assert.NilError(t, err) + err = r.apply(ctx, pvc) + assert.NilError(t, err) - t.Run("TwoCompleteBackupJobs", func(t *testing.T) { - currentTime := metav1.Now() - earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) - assert.Check(t, earlierTime.Before(¤tTime)) + // Assert that the pvc was created + selectPvcs, err := naming.AsSelector(naming.Cluster(cluster.Name)) + assert.NilError(t, err) + pvcs := &corev1.PersistentVolumeClaimList{} + err = errors.WithStack( + r.Client.List(ctx, pvcs, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectPvcs}, + )) + assert.NilError(t, err) + assert.Equal(t, len(pvcs.Items), 1) - jobList := &batchv1.JobList{ - Items: []batchv1.Job{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "backup2", - UID: "newer-one", - }, - Status: batchv1.JobStatus{ - Succeeded: 1, - CompletionTime: ¤tTime, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "backup1", - UID: "older-one", - }, - Status: batchv1.JobStatus{ - Succeeded: 1, - CompletionTime: &earlierTime, - }, - }, - }, - } - latestCompleteBackupJob := getLatestCompleteBackupJob(jobList) - assert.Check(t, latestCompleteBackupJob.UID == "newer-one") - }) -} + // Create volumes for reconcile + clusterVolumes := []corev1.PersistentVolumeClaim{*pvc} -func TestGetLatestSnapshotWithError(t *testing.T) { - t.Run("NoSnapshots", func(t *testing.T) { - snapshotList := &volumesnapshotv1.VolumeSnapshotList{} - latestSnapshotWithError := getLatestSnapshotWithError(snapshotList) - assert.Check(t, latestSnapshotWithError == nil) + // Reconcile + returned, err := r.reconcileDedicatedSnapshotVolume(ctx, cluster, clusterVolumes) + assert.NilError(t, err) + assert.Check(t, returned == nil) + + // Assert that the pvc has been deleted or marked for deletion + key, fetched := client.ObjectKeyFromObject(pvc), &corev1.PersistentVolumeClaim{} + if err := r.Client.Get(ctx, key, fetched); err == nil { + assert.Assert(t, fetched.DeletionTimestamp != nil, "expected deleted") + } else { + assert.Assert(t, apierrors.IsNotFound(err), "expected NotFound, got %v", err) + } }) - t.Run("NoSnapshotsWithErrors", func(t *testing.T) { - snapshotList := &volumesnapshotv1.VolumeSnapshotList{ - Items: []volumesnapshotv1.VolumeSnapshot{ - { - Status: &volumesnapshotv1.VolumeSnapshotStatus{ - ReadyToUse: initialize.Bool(true), - }, - }, - { - Status: &volumesnapshotv1.VolumeSnapshotStatus{ - ReadyToUse: initialize.Bool(false), - }, - }, - }, + t.Run("SnapshotsEnabledCreatePvcNoBackupNoRestore", func(t *testing.T) { + // Create cluster with snapshots enabled + ns := setupNamespace(t, cc) + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: "my-snapshotclass", } - latestSnapshotWithError := getLatestSnapshotWithError(snapshotList) - assert.Check(t, latestSnapshotWithError == nil) + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) + + // Create volumes for reconcile + clusterVolumes := []corev1.PersistentVolumeClaim{} + + // Reconcile + pvc, err := r.reconcileDedicatedSnapshotVolume(ctx, cluster, clusterVolumes) + assert.NilError(t, err) + assert.Assert(t, pvc != nil) + + // Assert pvc was created + selectPvcs, err := naming.AsSelector(naming.Cluster(cluster.Name)) + assert.NilError(t, err) + pvcs := &corev1.PersistentVolumeClaimList{} + err = errors.WithStack( + r.Client.List(ctx, pvcs, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectPvcs}, + )) + assert.NilError(t, err) + assert.Equal(t, len(pvcs.Items), 1) }) - t.Run("OneSnapshotWithError", func(t *testing.T) { + t.Run("SnapshotsEnabledBackupExistsCreateRestore", func(t *testing.T) { + // Create cluster with snapshots enabled + ns := setupNamespace(t, cc) + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: "my-snapshotclass", + } + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) + + // Create successful backup job + backupJob := testBackupJob(cluster) + err = errors.WithStack(r.setControllerReference(cluster, backupJob)) + assert.NilError(t, err) + err = r.apply(ctx, backupJob) + assert.NilError(t, err) + currentTime := metav1.Now() - snapshotList := &volumesnapshotv1.VolumeSnapshotList{ - Items: []volumesnapshotv1.VolumeSnapshot{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "good-snapshot", - UID: "the-uid-123", - }, - Status: &volumesnapshotv1.VolumeSnapshotStatus{ - ReadyToUse: initialize.Bool(true), - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "bad-snapshot", - UID: "the-uid-456", - }, - Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: ¤tTime, - ReadyToUse: initialize.Bool(false), - Error: &volumesnapshotv1.VolumeSnapshotError{}, - }, - }, - }, + backupJob.Status = batchv1.JobStatus{ + Succeeded: 1, + CompletionTime: ¤tTime, } - latestSnapshotWithError := getLatestSnapshotWithError(snapshotList) - assert.Equal(t, latestSnapshotWithError.ObjectMeta.Name, "bad-snapshot") + err = r.Client.Status().Update(ctx, backupJob) + assert.NilError(t, err) + + // Create instance set and volumes for reconcile + sts := &appsv1.StatefulSet{} + generateInstanceStatefulSetIntent(ctx, cluster, &cluster.Spec.InstanceSets[0], "pod-service", "service-account", sts, 1) + clusterVolumes := []corev1.PersistentVolumeClaim{} + + // Reconcile + pvc, err := r.reconcileDedicatedSnapshotVolume(ctx, cluster, clusterVolumes) + assert.NilError(t, err) + assert.Assert(t, pvc != nil) + + // Assert restore job with annotation was created + restoreJobs := &batchv1.JobList{} + selectJobs, err := naming.AsSelector(naming.ClusterRestoreJobs(cluster.Name)) + assert.NilError(t, err) + err = errors.WithStack( + r.Client.List(ctx, restoreJobs, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectJobs}, + )) + assert.NilError(t, err) + assert.Equal(t, len(restoreJobs.Items), 1) + assert.Assert(t, restoreJobs.Items[0].Annotations[naming.PGBackRestBackupJobCompletion] != "") }) - t.Run("TwoSnapshotsWithErrors", func(t *testing.T) { + t.Run("SnapshotsEnabledSuccessfulRestoreExists", func(t *testing.T) { + // Create cluster with snapshots enabled + ns := setupNamespace(t, cc) + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: "my-snapshotclass", + } + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) + + // Create times for jobs currentTime := metav1.Now() earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) - snapshotList := &volumesnapshotv1.VolumeSnapshotList{ - Items: []volumesnapshotv1.VolumeSnapshot{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "first-bad-snapshot", - UID: "the-uid-123", - }, - Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: &earlierTime, - ReadyToUse: initialize.Bool(false), - Error: &volumesnapshotv1.VolumeSnapshotError{}, - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "second-bad-snapshot", - UID: "the-uid-456", - }, - Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: ¤tTime, - ReadyToUse: initialize.Bool(false), - Error: &volumesnapshotv1.VolumeSnapshotError{}, - }, - }, + + // Create successful backup job + backupJob := testBackupJob(cluster) + err = errors.WithStack(r.setControllerReference(cluster, backupJob)) + assert.NilError(t, err) + err = r.apply(ctx, backupJob) + assert.NilError(t, err) + + backupJob.Status = batchv1.JobStatus{ + Succeeded: 1, + CompletionTime: &earlierTime, + } + err = r.Client.Status().Update(ctx, backupJob) + assert.NilError(t, err) + + // Create successful restore job + restoreJob := testRestoreJob(cluster) + restoreJob.Annotations = map[string]string{ + naming.PGBackRestBackupJobCompletion: backupJob.Status.CompletionTime.Format(time.RFC3339), + } + err = errors.WithStack(r.setControllerReference(cluster, restoreJob)) + assert.NilError(t, err) + err = r.apply(ctx, restoreJob) + assert.NilError(t, err) + + restoreJob.Status = batchv1.JobStatus{ + Succeeded: 1, + CompletionTime: ¤tTime, + } + err = r.Client.Status().Update(ctx, restoreJob) + assert.NilError(t, err) + + // Create instance set and volumes for reconcile + sts := &appsv1.StatefulSet{} + generateInstanceStatefulSetIntent(ctx, cluster, &cluster.Spec.InstanceSets[0], "pod-service", "service-account", sts, 1) + clusterVolumes := []corev1.PersistentVolumeClaim{} + + // Reconcile + pvc, err := r.reconcileDedicatedSnapshotVolume(ctx, cluster, clusterVolumes) + assert.NilError(t, err) + assert.Assert(t, pvc != nil) + + // Assert restore job was deleted + restoreJobs := &batchv1.JobList{} + selectJobs, err := naming.AsSelector(naming.ClusterRestoreJobs(cluster.Name)) + assert.NilError(t, err) + err = errors.WithStack( + r.Client.List(ctx, restoreJobs, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectJobs}, + )) + assert.NilError(t, err) + assert.Equal(t, len(restoreJobs.Items), 0) + + // Assert pvc was annotated + assert.Equal(t, pvc.GetAnnotations()[naming.PGBackRestBackupJobCompletion], backupJob.Status.CompletionTime.Format(time.RFC3339)) + }) + + t.Run("SnapshotsEnabledFailedRestoreExists", func(t *testing.T) { + // Create cluster with snapshots enabled + ns := setupNamespace(t, cc) + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: "my-snapshotclass", + } + assert.NilError(t, r.Client.Create(ctx, cluster)) + t.Cleanup(func() { assert.Check(t, r.Client.Delete(ctx, cluster)) }) + + // Create times for jobs + currentTime := metav1.Now() + earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) + + // Create successful backup job + backupJob := testBackupJob(cluster) + err = errors.WithStack(r.setControllerReference(cluster, backupJob)) + assert.NilError(t, err) + err = r.apply(ctx, backupJob) + assert.NilError(t, err) + + backupJob.Status = batchv1.JobStatus{ + Succeeded: 1, + CompletionTime: &earlierTime, + } + err = r.Client.Status().Update(ctx, backupJob) + assert.NilError(t, err) + + // Create failed restore job + restoreJob := testRestoreJob(cluster) + restoreJob.Annotations = map[string]string{ + naming.PGBackRestBackupJobCompletion: backupJob.Status.CompletionTime.Format(time.RFC3339), + } + err = errors.WithStack(r.setControllerReference(cluster, restoreJob)) + assert.NilError(t, err) + err = r.apply(ctx, restoreJob) + assert.NilError(t, err) + + restoreJob.Status = batchv1.JobStatus{ + Succeeded: 0, + Failed: 1, + CompletionTime: ¤tTime, + } + err = r.Client.Status().Update(ctx, restoreJob) + assert.NilError(t, err) + + // Setup instances and volumes for reconcile + sts := &appsv1.StatefulSet{} + generateInstanceStatefulSetIntent(ctx, cluster, &cluster.Spec.InstanceSets[0], "pod-service", "service-account", sts, 1) + clusterVolumes := []corev1.PersistentVolumeClaim{} + + // Reconcile + pvc, err := r.reconcileDedicatedSnapshotVolume(ctx, cluster, clusterVolumes) + assert.NilError(t, err) + assert.Assert(t, pvc != nil) + + // Assert warning event was created and has expected attributes + if assert.Check(t, len(recorder.Events) > 0) { + assert.Equal(t, recorder.Events[0].Type, "Warning") + assert.Equal(t, recorder.Events[0].Regarding.Kind, "PostgresCluster") + assert.Equal(t, recorder.Events[0].Regarding.Name, "hippo") + assert.Equal(t, recorder.Events[0].Reason, "DedicatedSnapshotVolumeRestoreJobError") + assert.Assert(t, cmp.Contains(recorder.Events[0].Note, "restore job failed, check the logs")) + } + }) +} + +func TestCreateDedicatedSnapshotVolume(t *testing.T) { + ctx := context.Background() + _, cc := setupKubernetes(t) + + r := &Reconciler{ + Client: cc, + Owner: client.FieldOwner(t.Name()), + } + + ns := setupNamespace(t, cc) + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + + labelMap := map[string]string{ + naming.LabelCluster: cluster.Name, + naming.LabelRole: naming.RoleSnapshot, + naming.LabelData: naming.DataPostgres, + } + pvc := &corev1.PersistentVolumeClaim{ObjectMeta: naming.ClusterDedicatedSnapshotVolume(cluster)} + pvc.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("PersistentVolumeClaim")) + + pvc, err := r.createDedicatedSnapshotVolume(ctx, cluster, labelMap, pvc) + assert.NilError(t, err) + assert.Assert(t, metav1.IsControlledBy(pvc, cluster)) + assert.Equal(t, pvc.Spec.Resources.Requests[corev1.ResourceStorage], resource.MustParse("1Gi")) +} + +func TestDedicatedSnapshotVolumeRestore(t *testing.T) { + ctx := context.Background() + _, cc := setupKubernetes(t) + + r := &Reconciler{ + Client: cc, + Owner: client.FieldOwner(t.Name()), + } + + ns := setupNamespace(t, cc) + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dedicated-snapshot-volume", + }, + } + + sts := &appsv1.StatefulSet{} + generateInstanceStatefulSetIntent(ctx, cluster, &cluster.Spec.InstanceSets[0], "pod-service", "service-account", sts, 1) + currentTime := metav1.Now() + backupJob := testBackupJob(cluster) + backupJob.Status.CompletionTime = ¤tTime + + err := r.dedicatedSnapshotVolumeRestore(ctx, cluster, pvc, backupJob) + assert.NilError(t, err) + + // Assert a restore job was created that has the correct annotation + jobs := &batchv1.JobList{} + selectJobs, err := naming.AsSelector(naming.ClusterRestoreJobs(cluster.Name)) + assert.NilError(t, err) + err = errors.WithStack( + r.Client.List(ctx, jobs, + client.InNamespace(cluster.Namespace), + client.MatchingLabelsSelector{Selector: selectJobs}, + )) + assert.NilError(t, err) + assert.Equal(t, len(jobs.Items), 1) + assert.Equal(t, jobs.Items[0].Annotations[naming.PGBackRestBackupJobCompletion], + backupJob.Status.CompletionTime.Format(time.RFC3339)) +} + +func TestGenerateSnapshotOfDedicatedSnapshotVolume(t *testing.T) { + _, cc := setupKubernetes(t) + require.ParallelCapacity(t, 1) + + r := &Reconciler{ + Client: cc, + Owner: client.FieldOwner(t.Name()), + } + ns := setupNamespace(t, cc) + + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.Spec.Backups.Snapshots = &v1beta1.VolumeSnapshots{ + VolumeSnapshotClassName: "my-snapshot", + } + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + naming.PGBackRestBackupJobCompletion: "backup-completion-timestamp", }, + Name: "dedicated-snapshot-volume", + }, + } + + snapshot, err := r.generateSnapshotOfDedicatedSnapshotVolume(cluster, pvc) + assert.NilError(t, err) + assert.Equal(t, snapshot.GetAnnotations()[naming.PGBackRestBackupJobCompletion], + "backup-completion-timestamp") +} + +func TestGenerateVolumeSnapshot(t *testing.T) { + _, cc := setupKubernetes(t) + require.ParallelCapacity(t, 1) + + r := &Reconciler{ + Client: cc, + Owner: client.FieldOwner(t.Name()), + } + ns := setupNamespace(t, cc) + + cluster := testCluster() + cluster.Namespace = ns.Name + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dedicated-snapshot-volume", + }, + } + volumeSnapshotClassName := "my-snapshot" + + snapshot, err := r.generateVolumeSnapshot(cluster, *pvc, volumeSnapshotClassName) + assert.NilError(t, err) + assert.Equal(t, *snapshot.Spec.VolumeSnapshotClassName, "my-snapshot") + assert.Equal(t, *snapshot.Spec.Source.PersistentVolumeClaimName, "dedicated-snapshot-volume") + assert.Equal(t, snapshot.Labels[naming.LabelCluster], "hippo") + assert.Equal(t, snapshot.ObjectMeta.OwnerReferences[0].Name, "hippo") +} + +func TestGetDedicatedSnapshotVolumeRestoreJob(t *testing.T) { + ctx := context.Background() + _, cc := setupKubernetes(t) + require.ParallelCapacity(t, 1) + + r := &Reconciler{ + Client: cc, + Owner: client.FieldOwner(t.Name()), + } + ns := setupNamespace(t, cc) + + cluster := testCluster() + cluster.Namespace = ns.Name + + t.Run("NoRestoreJobs", func(t *testing.T) { + dsvRestoreJob, err := r.getDedicatedSnapshotVolumeRestoreJob(ctx, cluster) + assert.NilError(t, err) + assert.Check(t, dsvRestoreJob == nil) + }) + + t.Run("NoDsvRestoreJobs", func(t *testing.T) { + job1 := testRestoreJob(cluster) + job1.Namespace = ns.Name + + err := r.apply(ctx, job1) + assert.NilError(t, err) + + dsvRestoreJob, err := r.getDedicatedSnapshotVolumeRestoreJob(ctx, cluster) + assert.NilError(t, err) + assert.Check(t, dsvRestoreJob == nil) + }) + + t.Run("DsvRestoreJobExists", func(t *testing.T) { + job2 := testRestoreJob(cluster) + job2.Name = "restore-job-2" + job2.Namespace = ns.Name + job2.Annotations = map[string]string{ + naming.PGBackRestBackupJobCompletion: "backup-timestamp", + } + + err := r.apply(ctx, job2) + assert.NilError(t, err) + + job3 := testRestoreJob(cluster) + job3.Name = "restore-job-3" + job3.Namespace = ns.Name + + err = r.apply(ctx, job3) + assert.NilError(t, err) + + dsvRestoreJob, err := r.getDedicatedSnapshotVolumeRestoreJob(ctx, cluster) + assert.NilError(t, err) + assert.Assert(t, dsvRestoreJob != nil) + assert.Equal(t, dsvRestoreJob.Name, "restore-job-2") + }) +} + +func TestGetLatestCompleteBackupJob(t *testing.T) { + ctx := context.Background() + _, cc := setupKubernetes(t) + // require.ParallelCapacity(t, 1) + + r := &Reconciler{ + Client: cc, + Owner: client.FieldOwner(t.Name()), + } + ns := setupNamespace(t, cc) + + cluster := testCluster() + cluster.Namespace = ns.Name + + t.Run("NoJobs", func(t *testing.T) { + latestCompleteBackupJob, err := r.getLatestCompleteBackupJob(ctx, cluster) + assert.NilError(t, err) + assert.Check(t, latestCompleteBackupJob == nil) + }) + + t.Run("NoCompleteJobs", func(t *testing.T) { + job1 := testBackupJob(cluster) + job1.Namespace = ns.Name + + err := r.apply(ctx, job1) + assert.NilError(t, err) + + latestCompleteBackupJob, err := r.getLatestCompleteBackupJob(ctx, cluster) + assert.NilError(t, err) + assert.Check(t, latestCompleteBackupJob == nil) + }) + + t.Run("OneCompleteBackupJob", func(t *testing.T) { + currentTime := metav1.Now() + + job1 := testBackupJob(cluster) + job1.Namespace = ns.Name + + err := r.apply(ctx, job1) + assert.NilError(t, err) + + job2 := testBackupJob(cluster) + job2.Namespace = ns.Name + job2.Name = "backup-job-2" + + err = r.apply(ctx, job2) + assert.NilError(t, err) + + // Get job1 and update Status. + err = r.Client.Get(ctx, client.ObjectKeyFromObject(job1), job1) + assert.NilError(t, err) + + job1.Status = batchv1.JobStatus{ + Succeeded: 1, + CompletionTime: ¤tTime, + } + err = r.Client.Status().Update(ctx, job1) + assert.NilError(t, err) + + latestCompleteBackupJob, err := r.getLatestCompleteBackupJob(ctx, cluster) + assert.NilError(t, err) + assert.Check(t, latestCompleteBackupJob.Name == "backup-job-1") + }) + + t.Run("TwoCompleteBackupJobs", func(t *testing.T) { + currentTime := metav1.Now() + earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) + assert.Check(t, earlierTime.Before(¤tTime)) + + job1 := testBackupJob(cluster) + job1.Namespace = ns.Name + + err := r.apply(ctx, job1) + assert.NilError(t, err) + + job2 := testBackupJob(cluster) + job2.Namespace = ns.Name + job2.Name = "backup-job-2" + + err = r.apply(ctx, job2) + assert.NilError(t, err) + + // Get job1 and update Status. + err = r.Client.Get(ctx, client.ObjectKeyFromObject(job1), job1) + assert.NilError(t, err) + + job1.Status = batchv1.JobStatus{ + Succeeded: 1, + CompletionTime: ¤tTime, + } + err = r.Client.Status().Update(ctx, job1) + assert.NilError(t, err) + + // Get job2 and update Status. + err = r.Client.Get(ctx, client.ObjectKeyFromObject(job2), job2) + assert.NilError(t, err) + + job2.Status = batchv1.JobStatus{ + Succeeded: 1, + CompletionTime: &earlierTime, } - latestSnapshotWithError := getLatestSnapshotWithError(snapshotList) - assert.Equal(t, latestSnapshotWithError.ObjectMeta.Name, "second-bad-snapshot") + err = r.Client.Status().Update(ctx, job2) + assert.NilError(t, err) + + latestCompleteBackupJob, err := r.getLatestCompleteBackupJob(ctx, cluster) + assert.NilError(t, err) + assert.Check(t, latestCompleteBackupJob.Name == "backup-job-1") }) } -func TestGetLatestReadySnapshot(t *testing.T) { +func TestGetSnapshotWithLatestError(t *testing.T) { t.Run("NoSnapshots", func(t *testing.T) { snapshotList := &volumesnapshotv1.VolumeSnapshotList{} - latestReadySnapshot := getLatestReadySnapshot(snapshotList) - assert.Check(t, latestReadySnapshot == nil) + snapshotWithLatestError := getSnapshotWithLatestError(snapshotList) + assert.Check(t, snapshotWithLatestError == nil) }) - t.Run("NoReadySnapshots", func(t *testing.T) { + t.Run("NoSnapshotsWithErrors", func(t *testing.T) { snapshotList := &volumesnapshotv1.VolumeSnapshotList{ Items: []volumesnapshotv1.VolumeSnapshot{ { Status: &volumesnapshotv1.VolumeSnapshotStatus{ - ReadyToUse: initialize.Bool(false), + ReadyToUse: initialize.Bool(true), }, }, { @@ -446,11 +993,11 @@ func TestGetLatestReadySnapshot(t *testing.T) { }, }, } - latestSnapshotWithError := getLatestReadySnapshot(snapshotList) - assert.Check(t, latestSnapshotWithError == nil) + snapshotWithLatestError := getSnapshotWithLatestError(snapshotList) + assert.Check(t, snapshotWithLatestError == nil) }) - t.Run("OneReadySnapshot", func(t *testing.T) { + t.Run("OneSnapshotWithError", func(t *testing.T) { currentTime := metav1.Now() earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) snapshotList := &volumesnapshotv1.VolumeSnapshotList{ @@ -461,7 +1008,7 @@ func TestGetLatestReadySnapshot(t *testing.T) { UID: "the-uid-123", }, Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: &earlierTime, + CreationTime: ¤tTime, ReadyToUse: initialize.Bool(true), }, }, @@ -471,45 +1018,51 @@ func TestGetLatestReadySnapshot(t *testing.T) { UID: "the-uid-456", }, Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: ¤tTime, - ReadyToUse: initialize.Bool(false), + ReadyToUse: initialize.Bool(false), + Error: &volumesnapshotv1.VolumeSnapshotError{ + Time: &earlierTime, + }, }, }, }, } - latestReadySnapshot := getLatestReadySnapshot(snapshotList) - assert.Equal(t, latestReadySnapshot.ObjectMeta.Name, "good-snapshot") + snapshotWithLatestError := getSnapshotWithLatestError(snapshotList) + assert.Equal(t, snapshotWithLatestError.ObjectMeta.Name, "bad-snapshot") }) - t.Run("TwoReadySnapshots", func(t *testing.T) { + t.Run("TwoSnapshotsWithErrors", func(t *testing.T) { currentTime := metav1.Now() earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) snapshotList := &volumesnapshotv1.VolumeSnapshotList{ Items: []volumesnapshotv1.VolumeSnapshot{ { ObjectMeta: metav1.ObjectMeta{ - Name: "first-good-snapshot", + Name: "first-bad-snapshot", UID: "the-uid-123", }, Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: &earlierTime, - ReadyToUse: initialize.Bool(true), + ReadyToUse: initialize.Bool(false), + Error: &volumesnapshotv1.VolumeSnapshotError{ + Time: &earlierTime, + }, }, }, { ObjectMeta: metav1.ObjectMeta{ - Name: "second-good-snapshot", + Name: "second-bad-snapshot", UID: "the-uid-456", }, Status: &volumesnapshotv1.VolumeSnapshotStatus{ - CreationTime: ¤tTime, - ReadyToUse: initialize.Bool(true), + ReadyToUse: initialize.Bool(false), + Error: &volumesnapshotv1.VolumeSnapshotError{ + Time: ¤tTime, + }, }, }, }, } - latestReadySnapshot := getLatestReadySnapshot(snapshotList) - assert.Equal(t, latestReadySnapshot.ObjectMeta.Name, "second-good-snapshot") + snapshotWithLatestError := getSnapshotWithLatestError(snapshotList) + assert.Equal(t, snapshotWithLatestError.ObjectMeta.Name, "second-bad-snapshot") }) } @@ -642,3 +1195,260 @@ func TestGetSnapshotsForCluster(t *testing.T) { assert.Equal(t, len(snapshots.Items), 2) }) } + +func TestGetLatestReadySnapshot(t *testing.T) { + t.Run("NoSnapshots", func(t *testing.T) { + snapshotList := &volumesnapshotv1.VolumeSnapshotList{} + latestReadySnapshot := getLatestReadySnapshot(snapshotList) + assert.Assert(t, latestReadySnapshot == nil) + }) + + t.Run("NoReadySnapshots", func(t *testing.T) { + snapshotList := &volumesnapshotv1.VolumeSnapshotList{ + Items: []volumesnapshotv1.VolumeSnapshot{ + { + Status: &volumesnapshotv1.VolumeSnapshotStatus{ + ReadyToUse: initialize.Bool(false), + }, + }, + { + Status: &volumesnapshotv1.VolumeSnapshotStatus{ + ReadyToUse: initialize.Bool(false), + }, + }, + }, + } + latestReadySnapshot := getLatestReadySnapshot(snapshotList) + assert.Assert(t, latestReadySnapshot == nil) + }) + + t.Run("OneReadySnapshot", func(t *testing.T) { + currentTime := metav1.Now() + earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) + snapshotList := &volumesnapshotv1.VolumeSnapshotList{ + Items: []volumesnapshotv1.VolumeSnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "good-snapshot", + UID: "the-uid-123", + }, + Status: &volumesnapshotv1.VolumeSnapshotStatus{ + CreationTime: &earlierTime, + ReadyToUse: initialize.Bool(true), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bad-snapshot", + UID: "the-uid-456", + }, + Status: &volumesnapshotv1.VolumeSnapshotStatus{ + CreationTime: ¤tTime, + ReadyToUse: initialize.Bool(false), + }, + }, + }, + } + latestReadySnapshot := getLatestReadySnapshot(snapshotList) + assert.Equal(t, latestReadySnapshot.ObjectMeta.Name, "good-snapshot") + }) + + t.Run("TwoReadySnapshots", func(t *testing.T) { + currentTime := metav1.Now() + earlierTime := metav1.NewTime(currentTime.AddDate(-1, 0, 0)) + snapshotList := &volumesnapshotv1.VolumeSnapshotList{ + Items: []volumesnapshotv1.VolumeSnapshot{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "first-good-snapshot", + UID: "the-uid-123", + }, + Status: &volumesnapshotv1.VolumeSnapshotStatus{ + CreationTime: &earlierTime, + ReadyToUse: initialize.Bool(true), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "second-good-snapshot", + UID: "the-uid-456", + }, + Status: &volumesnapshotv1.VolumeSnapshotStatus{ + CreationTime: ¤tTime, + ReadyToUse: initialize.Bool(true), + }, + }, + }, + } + latestReadySnapshot := getLatestReadySnapshot(snapshotList) + assert.Equal(t, latestReadySnapshot.ObjectMeta.Name, "second-good-snapshot") + }) +} + +func TestDeleteSnapshots(t *testing.T) { + ctx := context.Background() + cfg, cc := setupKubernetes(t) + discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg) + assert.NilError(t, err) + + r := &Reconciler{ + Client: cc, + Owner: client.FieldOwner(t.Name()), + DiscoveryClient: discoveryClient, + } + ns := setupNamespace(t, cc) + + cluster := testCluster() + cluster.Namespace = ns.Name + cluster.ObjectMeta.UID = "the-uid-123" + assert.NilError(t, r.Client.Create(ctx, cluster)) + + rhinoCluster := testCluster() + rhinoCluster.Name = "rhino" + rhinoCluster.Namespace = ns.Name + rhinoCluster.ObjectMeta.UID = "the-uid-456" + assert.NilError(t, r.Client.Create(ctx, rhinoCluster)) + + t.Cleanup(func() { + assert.Check(t, r.Client.Delete(ctx, cluster)) + assert.Check(t, r.Client.Delete(ctx, rhinoCluster)) + }) + + t.Run("NoSnapshots", func(t *testing.T) { + snapshotList := &volumesnapshotv1.VolumeSnapshotList{} + err := r.deleteSnapshots(ctx, cluster, snapshotList) + assert.NilError(t, err) + }) + + t.Run("NoSnapshotsControlledByHippo", func(t *testing.T) { + pvcName := initialize.String("dedicated-snapshot-volume") + snapshot1 := &volumesnapshotv1.VolumeSnapshot{ + TypeMeta: metav1.TypeMeta{ + APIVersion: volumesnapshotv1.SchemeGroupVersion.String(), + Kind: "VolumeSnapshot", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "first-snapshot", + Namespace: ns.Name, + }, + Spec: volumesnapshotv1.VolumeSnapshotSpec{ + Source: volumesnapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: pvcName, + }, + }, + } + err := errors.WithStack(r.setControllerReference(rhinoCluster, snapshot1)) + assert.NilError(t, err) + err = r.apply(ctx, snapshot1) + assert.NilError(t, err) + + snapshotList := &volumesnapshotv1.VolumeSnapshotList{ + Items: []volumesnapshotv1.VolumeSnapshot{ + *snapshot1, + }, + } + err = r.deleteSnapshots(ctx, cluster, snapshotList) + assert.NilError(t, err) + existingSnapshots := &volumesnapshotv1.VolumeSnapshotList{} + err = errors.WithStack( + r.Client.List(ctx, existingSnapshots, + client.InNamespace(ns.Namespace), + )) + assert.NilError(t, err) + assert.Equal(t, len(existingSnapshots.Items), 1) + }) + + t.Run("OneSnapshotControlledByHippo", func(t *testing.T) { + pvcName := initialize.String("dedicated-snapshot-volume") + snapshot1 := &volumesnapshotv1.VolumeSnapshot{ + TypeMeta: metav1.TypeMeta{ + APIVersion: volumesnapshotv1.SchemeGroupVersion.String(), + Kind: "VolumeSnapshot", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "first-snapshot", + Namespace: ns.Name, + }, + Spec: volumesnapshotv1.VolumeSnapshotSpec{ + Source: volumesnapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: pvcName, + }, + }, + } + err := errors.WithStack(r.setControllerReference(rhinoCluster, snapshot1)) + assert.NilError(t, err) + err = r.apply(ctx, snapshot1) + assert.NilError(t, err) + + snapshot2 := &volumesnapshotv1.VolumeSnapshot{ + TypeMeta: metav1.TypeMeta{ + APIVersion: volumesnapshotv1.SchemeGroupVersion.String(), + Kind: "VolumeSnapshot", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "second-snapshot", + Namespace: ns.Name, + }, + Spec: volumesnapshotv1.VolumeSnapshotSpec{ + Source: volumesnapshotv1.VolumeSnapshotSource{ + PersistentVolumeClaimName: pvcName, + }, + }, + } + err = errors.WithStack(r.setControllerReference(cluster, snapshot2)) + assert.NilError(t, err) + err = r.apply(ctx, snapshot2) + assert.NilError(t, err) + + snapshotList := &volumesnapshotv1.VolumeSnapshotList{ + Items: []volumesnapshotv1.VolumeSnapshot{ + *snapshot1, *snapshot2, + }, + } + err = r.deleteSnapshots(ctx, cluster, snapshotList) + assert.NilError(t, err) + existingSnapshots := &volumesnapshotv1.VolumeSnapshotList{} + err = errors.WithStack( + r.Client.List(ctx, existingSnapshots, + client.InNamespace(ns.Namespace), + )) + assert.NilError(t, err) + assert.Equal(t, len(existingSnapshots.Items), 1) + assert.Equal(t, existingSnapshots.Items[0].Name, "first-snapshot") + }) +} + +func TestClusterUsingTablespaces(t *testing.T) { + ctx := context.Background() + cluster := testCluster() + + t.Run("NoVolumesFeatureEnabled", func(t *testing.T) { + // Enable Tablespaces feature gate + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.TablespaceVolumes: true, + })) + ctx := feature.NewContext(ctx, gate) + + assert.Assert(t, !clusterUsingTablespaces(ctx, cluster)) + }) + + t.Run("VolumesInPlaceFeatureDisabled", func(t *testing.T) { + cluster.Spec.InstanceSets[0].TablespaceVolumes = []v1beta1.TablespaceVolume{{ + Name: "volume-1", + }} + + assert.Assert(t, !clusterUsingTablespaces(ctx, cluster)) + }) + + t.Run("VolumesInPlaceAndFeatureEnabled", func(t *testing.T) { + // Enable Tablespaces feature gate + gate := feature.NewGate() + assert.NilError(t, gate.SetFromMap(map[string]bool{ + feature.TablespaceVolumes: true, + })) + ctx := feature.NewContext(ctx, gate) + + assert.Assert(t, clusterUsingTablespaces(ctx, cluster)) + }) +} diff --git a/internal/naming/annotations.go b/internal/naming/annotations.go index 17ecf6794..2179a5f08 100644 --- a/internal/naming/annotations.go +++ b/internal/naming/annotations.go @@ -21,10 +21,11 @@ const ( // ID associated with a specific manual backup Job. PGBackRestBackup = annotationPrefix + "pgbackrest-backup" - // PGBackRestBackupJobId is the annotation that is added to a VolumeSnapshot to identify the - // backup job that is associated with it (a backup is always taken right before a - // VolumeSnapshot is taken). - PGBackRestBackupJobId = annotationPrefix + "pgbackrest-backup-job-id" + // PGBackRestBackupJobCompletion is the annotation that is added to restore jobs, pvcs, and + // VolumeSnapshots that are involved in the volume snapshot creation process. The annotation + // holds a RFC3339 formatted timestamp that corresponds to the completion time of the associated + // backup job. + PGBackRestBackupJobCompletion = annotationPrefix + "pgbackrest-backup-job-completion" // PGBackRestConfigHash is an annotation used to specify the hash value associated with a // repo configuration as needed to detect configuration changes that invalidate running Jobs diff --git a/internal/naming/annotations_test.go b/internal/naming/annotations_test.go index 9430acf37..318dd5ab5 100644 --- a/internal/naming/annotations_test.go +++ b/internal/naming/annotations_test.go @@ -12,13 +12,15 @@ import ( ) func TestAnnotationsValid(t *testing.T) { + assert.Assert(t, nil == validation.IsQualifiedName(AuthorizeBackupRemovalAnnotation)) + assert.Assert(t, nil == validation.IsQualifiedName(AutoCreateUserSchemaAnnotation)) + assert.Assert(t, nil == validation.IsQualifiedName(CrunchyBridgeClusterAdoptionAnnotation)) assert.Assert(t, nil == validation.IsQualifiedName(Finalizer)) assert.Assert(t, nil == validation.IsQualifiedName(PatroniSwitchover)) assert.Assert(t, nil == validation.IsQualifiedName(PGBackRestBackup)) - assert.Assert(t, nil == validation.IsQualifiedName(PGBackRestBackupJobId)) + assert.Assert(t, nil == validation.IsQualifiedName(PGBackRestBackupJobCompletion)) assert.Assert(t, nil == validation.IsQualifiedName(PGBackRestConfigHash)) - assert.Assert(t, nil == validation.IsQualifiedName(PGBackRestRestore)) assert.Assert(t, nil == validation.IsQualifiedName(PGBackRestIPVersion)) + assert.Assert(t, nil == validation.IsQualifiedName(PGBackRestRestore)) assert.Assert(t, nil == validation.IsQualifiedName(PostgresExporterCollectorsAnnotation)) - assert.Assert(t, nil == validation.IsQualifiedName(CrunchyBridgeClusterAdoptionAnnotation)) } diff --git a/internal/naming/labels.go b/internal/naming/labels.go index cc9c9716f..f25993122 100644 --- a/internal/naming/labels.go +++ b/internal/naming/labels.go @@ -108,6 +108,9 @@ const ( // RoleMonitoring is the LabelRole applied to Monitoring resources RoleMonitoring = "monitoring" + + // RoleSnapshot is the LabelRole applied to Snapshot resources. + RoleSnapshot = "snapshot" ) const ( diff --git a/internal/naming/names.go b/internal/naming/names.go index fe3a7a9ab..369591de9 100644 --- a/internal/naming/names.go +++ b/internal/naming/names.go @@ -249,6 +249,15 @@ func ClusterReplicaService(cluster *v1beta1.PostgresCluster) metav1.ObjectMeta { } } +// ClusterDedicatedSnapshotVolume returns the ObjectMeta for the dedicated Snapshot +// volume for a cluster. +func ClusterDedicatedSnapshotVolume(cluster *v1beta1.PostgresCluster) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Namespace: cluster.GetNamespace(), + Name: cluster.GetName() + "-snapshot", + } +} + // ClusterVolumeSnapshot returns the ObjectMeta, including a random name, for a // new pgdata VolumeSnapshot. func ClusterVolumeSnapshot(cluster *v1beta1.PostgresCluster) metav1.ObjectMeta { diff --git a/internal/naming/selectors.go b/internal/naming/selectors.go index e842e602d..94dbc3a9f 100644 --- a/internal/naming/selectors.go +++ b/internal/naming/selectors.go @@ -35,6 +35,18 @@ func Cluster(cluster string) metav1.LabelSelector { } } +// ClusterRestoreJobs selects all existing restore jobs in a cluster. +func ClusterRestoreJobs(cluster string) metav1.LabelSelector { + return metav1.LabelSelector{ + MatchLabels: map[string]string{ + LabelCluster: cluster, + }, + MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: LabelPGBackRestRestore, Operator: metav1.LabelSelectorOpExists}, + }, + } +} + // ClusterBackupJobs selects things for all existing backup jobs in cluster. func ClusterBackupJobs(cluster string) metav1.LabelSelector { return metav1.LabelSelector{ diff --git a/internal/pgbackrest/config.go b/internal/pgbackrest/config.go index 09c56c027..f42444a01 100644 --- a/internal/pgbackrest/config.go +++ b/internal/pgbackrest/config.go @@ -263,6 +263,42 @@ mv "${pgdata}" "${pgdata}_bootstrap"` return append([]string{"bash", "-ceu", "--", restoreScript, "-", pgdata}, args...) } +// DedicatedSnapshotVolumeRestoreCommand returns the command for performing a pgBackRest delta restore +// into a dedicated snapshot volume. In addition to calling the pgBackRest restore command with any +// pgBackRest options provided, the script also removes the patroni.dynamic.json file if present. This +// ensures the configuration from the cluster being restored from is not utilized when bootstrapping a +// new cluster, and the configuration for the new cluster is utilized instead. +func DedicatedSnapshotVolumeRestoreCommand(pgdata string, args ...string) []string { + + // The postmaster.pid file is removed, if it exists, before attempting a restore. + // This allows the restore to be tried more than once without the causing an + // error due to the presence of the file in subsequent attempts. + + // Wrap pgbackrest restore command in backup_label checks. If pre/post + // backup_labels are different, restore moved database forward, so return 0 + // so that the Job is successful and we know to proceed with snapshot. + // Otherwise return 1, Job will fail, and we will not proceed with snapshot. + restoreScript := `declare -r pgdata="$1" opts="$2" +BACKUP_LABEL=$([[ ! -e "${pgdata}/backup_label" ]] || md5sum "${pgdata}/backup_label") +echo "Starting pgBackRest delta restore" + +install --directory --mode=0700 "${pgdata}" +rm -f "${pgdata}/postmaster.pid" +bash -xc "pgbackrest restore ${opts}" +rm -f "${pgdata}/patroni.dynamic.json" + +BACKUP_LABEL_POST=$([[ ! -e "${pgdata}/backup_label" ]] || md5sum "${pgdata}/backup_label") +if [[ "${BACKUP_LABEL}" != "${BACKUP_LABEL_POST}" ]] +then + exit 0 +fi +echo Database was not advanced by restore. No snapshot will be taken. +echo Check that your last backup was successful. +exit 1` + + return append([]string{"bash", "-ceu", "--", restoreScript, "-", pgdata}, args...) +} + // populatePGInstanceConfigurationMap returns options representing the pgBackRest configuration for // a PostgreSQL instance func populatePGInstanceConfigurationMap( diff --git a/internal/pgbackrest/config_test.go b/internal/pgbackrest/config_test.go index 8c6d053a1..b74bf9a4a 100644 --- a/internal/pgbackrest/config_test.go +++ b/internal/pgbackrest/config_test.go @@ -365,6 +365,36 @@ func TestRestoreCommandTDE(t *testing.T) { assert.Assert(t, strings.Contains(string(b), "encryption_key_command = 'echo testValue'"), "expected encryption_key_command setting, got:\n%s", b) } + +func TestDedicatedSnapshotVolumeRestoreCommand(t *testing.T) { + shellcheck := require.ShellCheck(t) + + pgdata := "/pgdata/pg13" + opts := []string{ + "--stanza=" + DefaultStanzaName, "--pg1-path=" + pgdata, + "--repo=1"} + command := DedicatedSnapshotVolumeRestoreCommand(pgdata, strings.Join(opts, " ")) + + assert.DeepEqual(t, command[:3], []string{"bash", "-ceu", "--"}) + assert.Assert(t, len(command) > 3) + + dir := t.TempDir() + file := filepath.Join(dir, "script.bash") + assert.NilError(t, os.WriteFile(file, []byte(command[3]), 0o600)) + + cmd := exec.Command(shellcheck, "--enable=all", file) + output, err := cmd.CombinedOutput() + assert.NilError(t, err, "%q\n%s", cmd.Args, output) +} + +func TestDedicatedSnapshotVolumeRestoreCommandPrettyYAML(t *testing.T) { + b, err := yaml.Marshal(DedicatedSnapshotVolumeRestoreCommand("/dir", "--options")) + + assert.NilError(t, err) + assert.Assert(t, strings.Contains(string(b), "\n- |"), + "expected literal block scalar, got:\n%s", b) +} + func TestServerConfig(t *testing.T) { cluster := &v1beta1.PostgresCluster{} cluster.UID = "shoe" diff --git a/pkg/apis/postgres-operator.crunchydata.com/v1beta1/postgrescluster_types.go b/pkg/apis/postgres-operator.crunchydata.com/v1beta1/postgrescluster_types.go index e7b3377bf..d43197ce1 100644 --- a/pkg/apis/postgres-operator.crunchydata.com/v1beta1/postgrescluster_types.go +++ b/pkg/apis/postgres-operator.crunchydata.com/v1beta1/postgrescluster_types.go @@ -694,5 +694,6 @@ func NewPostgresCluster() *PostgresCluster { type VolumeSnapshots struct { // Name of the VolumeSnapshotClass that should be used by VolumeSnapshots // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 VolumeSnapshotClassName string `json:"volumeSnapshotClassName"` }