Skip to content

Commit

Permalink
Take snapshots of pgdata using a dedicated volume. Whenever a backup …
Browse files Browse the repository at this point in the history
…finishes successfully, do a delta restore into dedicated volume and then snapshot the volume.

Add/adjust tests for snapshots.

Co-authored by: Anthony Landreth <[email protected]>
  • Loading branch information
dsessler7 committed Oct 3, 2024
1 parent 6707a99 commit 000e6af
Show file tree
Hide file tree
Showing 15 changed files with 1,736 additions and 439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4330,6 +4330,7 @@ spec:
volumeSnapshotClassName:
description: Name of the VolumeSnapshotClass that should be
used by VolumeSnapshots
minLength: 1
type: string
required:
- volumeSnapshotClassName
Expand Down
6 changes: 5 additions & 1 deletion internal/controller/postgrescluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (r *Reconciler) Reconcile(
err error
backupsSpecFound bool
backupsReconciliationAllowed bool
dedicatedSnapshotPVC *corev1.PersistentVolumeClaim
)

patchClusterStatus := func() error {
Expand Down Expand Up @@ -364,7 +365,10 @@ func (r *Reconciler) Reconcile(
}
}
if err == nil {
err = r.reconcileVolumeSnapshots(ctx, cluster, instances, clusterVolumes)
dedicatedSnapshotPVC, err = r.reconcileDedicatedSnapshotVolume(ctx, cluster, clusterVolumes)
}
if err == nil {
err = r.reconcileVolumeSnapshots(ctx, cluster, dedicatedSnapshotPVC)
}
if err == nil {
err = r.reconcilePGBouncer(ctx, cluster, instances, primaryCertificate, rootCA)
Expand Down
55 changes: 55 additions & 0 deletions internal/controller/postgrescluster/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -21,6 +22,7 @@ import (

"github.com/crunchydata/postgres-operator/internal/controller/runtime"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/internal/testing/require"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)
Expand Down Expand Up @@ -99,6 +101,7 @@ func testVolumeClaimSpec() corev1.PersistentVolumeClaimSpec {
},
}
}

func testCluster() *v1beta1.PostgresCluster {
// Defines a base cluster spec that can be used by tests to generate a
// cluster with an expected number of instances
Expand Down Expand Up @@ -138,6 +141,58 @@ func testCluster() *v1beta1.PostgresCluster {
return cluster.DeepCopy()
}

func testBackupJob(cluster *v1beta1.PostgresCluster) *batchv1.Job {
job := batchv1.Job{
TypeMeta: metav1.TypeMeta{
APIVersion: batchv1.SchemeGroupVersion.String(),
Kind: "Job",
},
ObjectMeta: metav1.ObjectMeta{
Name: "backup-job-1",
Namespace: cluster.Namespace,
Labels: map[string]string{
naming.LabelCluster: cluster.Name,
naming.LabelPGBackRestBackup: "",
naming.LabelPGBackRestRepo: "repo1",
},
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "test", Image: "test"}},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}

return job.DeepCopy()
}

func testRestoreJob(cluster *v1beta1.PostgresCluster) *batchv1.Job {
job := batchv1.Job{
TypeMeta: metav1.TypeMeta{
APIVersion: batchv1.SchemeGroupVersion.String(),
Kind: "Job",
},
ObjectMeta: metav1.ObjectMeta{
Name: "restore-job-1",
Namespace: cluster.Namespace,
Labels: naming.PGBackRestRestoreJobLabels(cluster.Name),
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "test", Image: "test"}},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}

return job.DeepCopy()
}

// setupManager creates the runtime manager used during controller testing
func setupManager(t *testing.T, cfg *rest.Config,
controllerSetup func(mgr manager.Manager)) (context.Context, context.CancelFunc) {
Expand Down
31 changes: 24 additions & 7 deletions internal/controller/postgrescluster/pgbackrest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/crunchydata/postgres-operator/internal/config"
"github.com/crunchydata/postgres-operator/internal/feature"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/naming"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (r *Reconciler) applyRepoVolumeIntent(ctx context.Context,
// getPGBackRestResources returns the existing pgBackRest resources that should utilized by the
// PostgresCluster controller during reconciliation. Any items returned are verified to be owned
// by the PostgresCluster controller and still applicable per the current PostgresCluster spec.
// Additionally, and resources identified that no longer correspond to any current configuration
// Additionally, any resources identified that no longer correspond to any current configuration
// are deleted.
func (r *Reconciler) getPGBackRestResources(ctx context.Context,
postgresCluster *v1beta1.PostgresCluster,
Expand Down Expand Up @@ -374,6 +375,15 @@ func (r *Reconciler) cleanupRepoResources(ctx context.Context,
if !backupsSpecFound {
break
}

// If the restore job has the PGBackRestBackupJobCompletion annotation, it is
// used for volume snapshots and should not be deleted (volume snapshots code
// will clean it up when appropriate).
if _, ok := owned.GetAnnotations()[naming.PGBackRestBackupJobCompletion]; ok {
ownedNoDelete = append(ownedNoDelete, owned)
delete = false
}

// When a cluster is prepared for restore, the system identifier is removed from status
// and the cluster is therefore no longer bootstrapped. Only once the restore Job is
// complete will the cluster then be bootstrapped again, which means by the time we
Expand Down Expand Up @@ -762,7 +772,7 @@ func (r *Reconciler) generateRepoVolumeIntent(postgresCluster *v1beta1.PostgresC
}

// generateBackupJobSpecIntent generates a JobSpec for a pgBackRest backup job
func generateBackupJobSpecIntent(postgresCluster *v1beta1.PostgresCluster,
func generateBackupJobSpecIntent(ctx context.Context, postgresCluster *v1beta1.PostgresCluster,
repo v1beta1.PGBackRestRepo, serviceAccountName string,
labels, annotations map[string]string, opts ...string) *batchv1.JobSpec {

Expand All @@ -771,6 +781,11 @@ func generateBackupJobSpecIntent(postgresCluster *v1beta1.PostgresCluster,
"--stanza=" + pgbackrest.DefaultStanzaName,
"--repo=" + repoIndex,
}
// If VolumeSnapshots are enabled, use archive-copy and archive-check options
if postgresCluster.Spec.Backups.Snapshots != nil && feature.Enabled(ctx, feature.VolumeSnapshots) {
cmdOpts = append(cmdOpts, "--archive-copy=y", "--archive-check=y")
}

cmdOpts = append(cmdOpts, opts...)

container := corev1.Container{
Expand Down Expand Up @@ -1634,6 +1649,9 @@ func (r *Reconciler) reconcilePostgresClusterDataSource(ctx context.Context,
return errors.WithStack(err)
}

// TODO(snapshots): If pgdata is being sourced by a VolumeSnapshot then don't perform a typical restore job;
// we only want to replay the WAL.

// reconcile the pgBackRest restore Job to populate the cluster's data directory
if err := r.reconcileRestoreJob(ctx, cluster, sourceCluster, pgdata, pgwal, pgtablespaces,
dataSource, instanceName, instanceSetName, configHash, pgbackrest.DefaultStanzaName); err != nil {
Expand Down Expand Up @@ -2362,7 +2380,7 @@ func (r *Reconciler) reconcileManualBackup(ctx context.Context,
backupJob.ObjectMeta.Labels = labels
backupJob.ObjectMeta.Annotations = annotations

spec := generateBackupJobSpecIntent(postgresCluster, repo,
spec := generateBackupJobSpecIntent(ctx, postgresCluster, repo,
serviceAccount.GetName(), labels, annotations, backupOpts...)

backupJob.Spec = *spec
Expand Down Expand Up @@ -2523,7 +2541,7 @@ func (r *Reconciler) reconcileReplicaCreateBackup(ctx context.Context,
backupJob.ObjectMeta.Labels = labels
backupJob.ObjectMeta.Annotations = annotations

spec := generateBackupJobSpecIntent(postgresCluster, replicaCreateRepo,
spec := generateBackupJobSpecIntent(ctx, postgresCluster, replicaCreateRepo,
serviceAccount.GetName(), labels, annotations)

backupJob.Spec = *spec
Expand Down Expand Up @@ -2886,8 +2904,7 @@ func (r *Reconciler) reconcilePGBackRestCronJob(
labels := naming.Merge(
cluster.Spec.Metadata.GetLabelsOrNil(),
cluster.Spec.Backups.PGBackRest.Metadata.GetLabelsOrNil(),
naming.PGBackRestCronJobLabels(cluster.Name, repo.Name, backupType),
)
naming.PGBackRestCronJobLabels(cluster.Name, repo.Name, backupType))
objectmeta := naming.PGBackRestCronJob(cluster, backupType, repo.Name)

// Look for an existing CronJob by the associated Labels. If one exists,
Expand Down Expand Up @@ -2951,7 +2968,7 @@ func (r *Reconciler) reconcilePGBackRestCronJob(
// set backup type (i.e. "full", "diff", "incr")
backupOpts := []string{"--type=" + backupType}

jobSpec := generateBackupJobSpecIntent(cluster, repo,
jobSpec := generateBackupJobSpecIntent(ctx, cluster, repo,
serviceAccount.GetName(), labels, annotations, backupOpts...)

// Suspend cronjobs when shutdown or read-only. Any jobs that have already
Expand Down
23 changes: 12 additions & 11 deletions internal/controller/postgrescluster/pgbackrest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,8 +2438,9 @@ func TestCopyConfigurationResources(t *testing.T) {
}

func TestGenerateBackupJobIntent(t *testing.T) {
ctx := context.Background()
t.Run("empty", func(t *testing.T) {
spec := generateBackupJobSpecIntent(
spec := generateBackupJobSpecIntent(ctx,
&v1beta1.PostgresCluster{}, v1beta1.PGBackRestRepo{},
"",
nil, nil,
Expand Down Expand Up @@ -2512,7 +2513,7 @@ volumes:
ImagePullPolicy: corev1.PullAlways,
},
}
job := generateBackupJobSpecIntent(
job := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{},
"",
nil, nil,
Expand All @@ -2527,7 +2528,7 @@ volumes:
cluster.Spec.Backups = v1beta1.Backups{
PGBackRest: v1beta1.PGBackRestArchive{},
}
job := generateBackupJobSpecIntent(
job := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{},
"",
nil, nil,
Expand All @@ -2544,7 +2545,7 @@ volumes:
},
},
}
job := generateBackupJobSpecIntent(
job := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{},
"",
nil, nil,
Expand Down Expand Up @@ -2583,7 +2584,7 @@ volumes:
},
},
}
job := generateBackupJobSpecIntent(
job := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{},
"",
nil, nil,
Expand All @@ -2596,7 +2597,7 @@ volumes:
cluster.Spec.Backups.PGBackRest.Jobs = &v1beta1.BackupJobs{
PriorityClassName: initialize.String("some-priority-class"),
}
job := generateBackupJobSpecIntent(
job := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{},
"",
nil, nil,
Expand All @@ -2614,7 +2615,7 @@ volumes:
cluster.Spec.Backups.PGBackRest.Jobs = &v1beta1.BackupJobs{
Tolerations: tolerations,
}
job := generateBackupJobSpecIntent(
job := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{},
"",
nil, nil,
Expand All @@ -2628,14 +2629,14 @@ volumes:
t.Run("Undefined", func(t *testing.T) {
cluster.Spec.Backups.PGBackRest.Jobs = nil

spec := generateBackupJobSpecIntent(
spec := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{}, "", nil, nil,
)
assert.Assert(t, spec.TTLSecondsAfterFinished == nil)

cluster.Spec.Backups.PGBackRest.Jobs = &v1beta1.BackupJobs{}

spec = generateBackupJobSpecIntent(
spec = generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{}, "", nil, nil,
)
assert.Assert(t, spec.TTLSecondsAfterFinished == nil)
Expand All @@ -2646,7 +2647,7 @@ volumes:
TTLSecondsAfterFinished: initialize.Int32(0),
}

spec := generateBackupJobSpecIntent(
spec := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{}, "", nil, nil,
)
if assert.Check(t, spec.TTLSecondsAfterFinished != nil) {
Expand All @@ -2659,7 +2660,7 @@ volumes:
TTLSecondsAfterFinished: initialize.Int32(100),
}

spec := generateBackupJobSpecIntent(
spec := generateBackupJobSpecIntent(ctx,
cluster, v1beta1.PGBackRestRepo{}, "", nil, nil,
)
if assert.Check(t, spec.TTLSecondsAfterFinished != nil) {
Expand Down
Loading

0 comments on commit 000e6af

Please sign in to comment.