Skip to content

Commit

Permalink
[FLINK-36162] Remove flinkStateSnapshotReference and namespace from F…
Browse files Browse the repository at this point in the history
…linkStateSnapshot jobReference
  • Loading branch information
gyfora committed Aug 30, 2024
1 parent 2ee9f20 commit d928226
Show file tree
Hide file tree
Showing 34 changed files with 149 additions and 613 deletions.
2 changes: 1 addition & 1 deletion docs/content/docs/concepts/controller-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ It’s very important to understand that the Observer phase records a point-in-t
The `AbstractFlinkResourceReconciler` encapsulates the core reconciliation flow for all Flink resource types. Let’s take a look at the high level flow before we go into specifics for session, application and session job resources.

1. Check if the resource is ready for reconciliation or if there are any pending operations that should not be interrupted (manual savepoints for example)
2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `flinkStateSnapshotReference` provided in the spec.
2. If this is the first deployment attempt for the resource, we simply deploy it. It’s important to note here that this is the only deploy operation where we use the `initialSavepointPath` provided in the spec.
3. Next we determine if the desired spec changed and the type of change: `IGNORE, SCALE, UPGRADE`. Only for scale and upgrade type changes do we need to execute further reconciliation logic.
4. If we have upgrade/scale spec changes we execute the upgrade logic specific for the resource type
5. If we did not receive any spec change we still have to ensure that the currently deployed resources are fully reconciled:
Expand Down
9 changes: 4 additions & 5 deletions docs/content/docs/custom-resource/job-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,16 @@ Users have two options to restore a job from a target savepoint / checkpoint

### Redeploy using the savepointRedeployNonce

It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `flinkStateSnapshotReference` in the job spec:
It is possible to redeploy a `FlinkDeployment` or `FlinkSessionJob` resource from a target savepoint by using the combination of `savepointRedeployNonce` and `initialSavepointPath` in the job spec:

```yaml
job:
flinkStateSnapshotReference:
path: file://redeploy-target-savepoint
initialSavepointPath: file://redeploy-target-savepoint
# If not set previously, set to 1, otherwise increment, e.g. 2
savepointRedeployNonce: 1
```

When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `flinkStateSnapshotReference`. The savepoint path must not be empty.
When changing the `savepointRedeployNonce` the operator will redeploy the job to the savepoint defined in the `initialSavepointPath`. The savepoint path must not be empty.

{{< hint warning >}}
Rollbacks are not supported after redeployments.
Expand All @@ -271,7 +270,7 @@ However, this also means that savepoint history is lost and the operator won't c
1. Locate the latest checkpoint/savepoint metafile in your configured checkpoint/savepoint directory.
2. Delete the `FlinkDeployment` resource for your application
3. Check that you have the current savepoint, and that your `FlinkDeployment` is deleted completely
4. Modify your `FlinkDeployment` JobSpec and set `flinkStateSnapshotReference.path` to your last checkpoint location
4. Modify your `FlinkDeployment` JobSpec and set `initialSavepointPath` to your last checkpoint location
5. Recreate the deployment

These steps ensure that the operator will start completely fresh from the user defined savepoint path and can hopefully fully recover.
Expand Down
17 changes: 2 additions & 15 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,6 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| flinkConfiguration | java.util.Map<java.lang.String,java.lang.String> | Flink configuration overrides for the Flink deployment or Flink session job. |
| deploymentName | java.lang.String | The name of the target session cluster deployment. |

### FlinkStateSnapshotReference
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference

**Description**: Reference for a FlinkStateSnapshot.

| Parameter | Type | Docs |
| ----------| ---- | ---- |
| namespace | java.lang.String | Namespace of the snapshot resource. |
| name | java.lang.String | Name of the snapshot resource. |
| path | java.lang.String | If a path is given, all other fields will be ignored, and this will be used as the initial savepoint path. |

### FlinkStateSnapshotSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotSpec

Expand Down Expand Up @@ -172,7 +161,6 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| ----------| ---- | ---- |
| kind | org.apache.flink.kubernetes.operator.api.spec.JobKind | Kind of the Flink resource, FlinkDeployment or FlinkSessionJob. |
| name | java.lang.String | Name of the Flink resource. |
| namespace | java.lang.String | Namespace of the Flink resource. If empty, the operator will use the namespace of the snapshot. |

### JobSpec
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobSpec
Expand All @@ -188,11 +176,10 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| state | org.apache.flink.kubernetes.operator.api.spec.JobState | Desired state for the job. |
| savepointTriggerNonce | java.lang.Long | Nonce used to manually trigger savepoint for the running job. In order to trigger a savepoint, change the number to a different non-null value. |
| initialSavepointPath | java.lang.String | Savepoint path used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
| flinkStateSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | Snapshot reference used by the job the first time it is deployed or during savepoint redeployments (triggered by changing the savepointRedeployNonce). |
| checkpointTriggerNonce | java.lang.Long | Nonce used to manually trigger checkpoint for the running job. In order to trigger a checkpoint, change the number to a different non-null value. |
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |

### JobState
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
Expand Down Expand Up @@ -418,7 +405,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| state | java.lang.String | Last observed state of the job. |
| startTime | java.lang.String | Start time of the job. |
| updateTime | java.lang.String | Update time of the job. |
| upgradeSnapshotReference | org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference | |
| upgradeSavepointPath | java.lang.String | |
| savepointInfo | org.apache.flink.kubernetes.operator.api.status.SavepointInfo | Information about pending and last savepoint for the job. |
| checkpointInfo | org.apache.flink.kubernetes.operator.api.status.CheckpointInfo | Information about pending and last checkpoint for the job. |

Expand Down
10 changes: 5 additions & 5 deletions docs/content/docs/custom-resource/snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ If you set this to false, the operator will keep using the deprecated status fie
To create a savepoint or checkpoint, exactly one of the spec fields `savepoint` or `checkpoint` must present.
Furthermore, in case of a savepoint you can signal to the operator that the savepoint already exists using the `alreadyExists` field, and the operator will mark it as a successful snapshot in the next reconciliation phase.

You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot CR by using `flinkStateSnapshotReference` in the job spec.
You can also instruct the Operator to start a new FlinkDeployment/FlinkSessionJob from an existing snapshot by using `initialSavepointPath` in the job spec.

## Examples

Expand Down Expand Up @@ -78,11 +78,11 @@ spec:
### Start job from existing snapshot
To start a job from an existing snapshot, you need to extract the path then use:
```yaml
job:
flinkStateSnapshotReference:
namespace: flink # not required if it's in the same namespace
name: example-savepoint
initialSavepointPath: [savepoint_path]
```
{{< hint warning >}}
Expand Down Expand Up @@ -131,7 +131,7 @@ This feature is not available for checkpoints.
## Triggering snapshots

Upgrade savepoints are triggered automatically by the system during the upgrade process as we have seen in the previous sections.
In this case, the savepoint path will also be recorded in the `upgradeSnapshotReference` job status field, which the operator will use when restarting the job.
In this case, the savepoint path will also be recorded in the `upgradeSavepointPath` job status field, which the operator will use when restarting the job.

For backup, job forking and other purposes savepoint and checkpoints can be triggered manually or periodically by the operator, however generally speaking these will not be used during upgrades and are not required for the correct operation.

Expand Down
7 changes: 3 additions & 4 deletions docs/content/docs/operations/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,19 @@ Here is a reference example of upgrading a `basic-checkpoint-ha-example` deploym
```
5. Restore the job:

Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.flinkStateSnapshotReference.path` to the savepoint location obtained from the step 1.
Deploy the previously deleted job using this [FlinkDeployemnt](https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml) with `v1beta1` and explicitly set the `job.initialSavepointPath` to the savepoint location obtained from the step 1.

```
spec:
...
job:
flinkStateSnapshotReference:
path: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
initialSavepointPath: /flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata
upgradeMode: savepoint
...
```
Alternatively, we may use this command to edit and deploy the manifest:
```sh
wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.flinkStateSnapshotReference.path" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
wget -qO - https://raw.githubusercontent.com/apache/flink-kubernetes-operator/main/examples/basic-checkpoint-ha.yaml| yq w - "spec.job.initialSavepointPath" "/flink-data/savepoints/savepoint-000000-aec3dd08e76d/_metadata"| kubectl apply -f -
```
Finally, verify that `deploy/basic-checkpoint-ha-example` log has:
```
Expand Down
10 changes: 4 additions & 6 deletions e2e-tests/test_snapshot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIME
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job":{"state": "suspended"}}}'
wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${TIMEOUT} || exit 1

location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.upgradeSnapshotReference.path')
location=$(kubectl get $APPLICATION_IDENTIFIER -o yaml | yq '.status.jobStatus.upgradeSavepointPath')
if [ "$location" == "" ]; then echo "Legacy savepoint location was empty"; exit 1; fi
echo "Removing upgradeSnapshotReference and setting lastSavepoint"
kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch '{"status":{"jobStatus":{"upgradeSnapshotReference":null,"savepointInfo":{"lastSavepoint":{"timeStamp": 0, "location": "'$location'", "triggerNonce": 0}}}}}'
echo "Removing upgradeSavepointPath and setting lastSavepoint"
kubectl patch flinkdep ${CLUSTER_ID} --type=merge --subresource status --patch '{"status":{"jobStatus":{"upgradeSavepointPath":null,"savepointInfo":{"lastSavepoint":{"timeStamp": 0, "location": "'$location'", "triggerNonce": 0}}}}}'

# Delete operator Pod to clear CR state cache
kubectl delete pod -n $(get_operator_pod_namespace) $(get_operator_pod_name)
Expand Down Expand Up @@ -151,13 +151,11 @@ wait_for_status $APPLICATION_IDENTIFIER '.status.lifecycleState' "SUSPENDED" ${T
echo "Waiting for upgrade savepoint..."
snapshot=$(wait_for_snapshot $CLUSTER_ID "savepoint" "upgrade" ${TIMEOUT})
if [ "$snapshot" == "" ]; then echo "Could not find snapshot"; exit 1; fi
echo "Found upgrade snapshot: $snapshot"
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSnapshotReference.name' "$snapshot" ${TIMEOUT} || exit 1

location=$(kubectl get flinksnp/$snapshot -o yaml | yq '.status.path')
if [ "$location" == "" ]; then echo "Upgrade savepoint location was empty"; exit 1; fi


wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.upgradeSavepointPath' "$location" ${TIMEOUT} || exit 1

echo "Restarting deployment..."
kubectl patch flinkdep ${CLUSTER_ID} --type merge --patch '{"spec":{"job": {"state": "running" } } }'
Expand Down
4 changes: 1 addition & 3 deletions examples/snapshot/job-from-savepoint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,4 @@ spec:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: savepoint
flinkStateSnapshotReference:
name: example-savepoint
namespace: flink
initialSavepointPath: file:///flink-data/savepoints/savepoint-45305c-d867504446e0

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,9 @@ public class JobReference {
/** Name of the Flink resource. */
private String name;

/**
* Namespace of the Flink resource. If empty, the operator will use the namespace of the
* snapshot.
*/
private String namespace;

public static JobReference fromFlinkResource(AbstractFlinkResource<?, ?> flinkResource) {
var result = new JobReference();
result.setName(flinkResource.getMetadata().getName());
result.setNamespace(flinkResource.getMetadata().getNamespace());

if (flinkResource instanceof FlinkDeployment) {
result.setKind(JobKind.FLINK_DEPLOYMENT);
Expand All @@ -71,6 +64,6 @@ public String toString() {
} else if (kind == JobKind.FLINK_SESSION_JOB) {
kindString = CrdConstants.KIND_SESSION_JOB;
}
return String.format("%s/%s (%s)", namespace, name, kindString);
return String.format("%s (%s)", name, kindString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,8 @@ public class JobSpec implements Diffable<JobSpec> {
* redeployments (triggered by changing the savepointRedeployNonce).
*/
@SpecDiff(DiffType.IGNORE)
@Deprecated
private String initialSavepointPath;

/**
* Snapshot reference used by the job the first time it is deployed or during savepoint
* redeployments (triggered by changing the savepointRedeployNonce).
*/
@SpecDiff(DiffType.IGNORE)
private FlinkStateSnapshotReference flinkStateSnapshotReference;

/**
* Nonce used to manually trigger checkpoint for the running job. In order to trigger a
* checkpoint, change the number to a different non-null value.
Expand All @@ -100,9 +92,8 @@ public class JobSpec implements Diffable<JobSpec> {

/**
* Nonce used to trigger a full redeployment of the job from the savepoint path specified in
* initialSavepointPath or the path/FlinkStateSnapshot reference in flinkStateSnapshotReference.
* In order to trigger redeployment, change the number to a different non-null value. Rollback
* is not possible after redeployment.
* initialSavepointPath. In order to trigger redeployment, change the number to a different
* non-null value. Rollback is not possible after redeployment.
*/
@SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
private Long savepointRedeployNonce;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.api.status;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import io.fabric8.crd.generator.annotation.PrinterColumn;
Expand Down Expand Up @@ -51,7 +50,7 @@ public class JobStatus {
/** Update time of the job. */
private String updateTime;

private FlinkStateSnapshotReference upgradeSnapshotReference;
private String upgradeSavepointPath;

/** Information about pending and last savepoint for the job. */
@Deprecated private SavepointInfo savepointInfo = new SavepointInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
import org.apache.flink.kubernetes.operator.api.spec.FlinkStateSnapshotReference;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
Expand Down Expand Up @@ -454,10 +453,7 @@ private void observeLatestCheckpoint(
flinkService
.getLastCheckpoint(JobID.fromHexString(jobID), observeConfig)
.ifPresent(
snapshot ->
jobStatus.setUpgradeSnapshotReference(
FlinkStateSnapshotReference.fromPath(
snapshot.getLocation())));
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()));
} catch (Exception e) {
LOG.error("Could not observe latest checkpoint information.", e);
throw new ReconciliationException(e);
Expand Down
Loading

0 comments on commit d928226

Please sign in to comment.