Skip to content

Commit

Permalink
Add support for unaligned checkpoints for flink runner (#31186)
Browse files Browse the repository at this point in the history
  • Loading branch information
AyWa authored May 7, 2024
1 parent 45fe4f9 commit bb51380
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,13 @@ private static void configureCheckpointing(
: ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
}

if (options.getUnalignedCheckpointEnabled()) {
flinkStreamEnv.getCheckpointConfig().enableUnalignedCheckpoints();
}
flinkStreamEnv
.getCheckpointConfig()
.setForceUnalignedCheckpoints(options.getForceUnalignedCheckpointEnabled());

long minPauseBetweenCheckpoints = options.getMinPauseBetweenCheckpoints();
if (minPauseBetweenCheckpoints != -1) {
flinkStreamEnv
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,22 @@ public interface FlinkPipelineOptions

void setFinishBundleBeforeCheckpointing(boolean finishBundleBeforeCheckpointing);

@Description(
"If set, Unaligned checkpoints contain in-flight data (i.e., data stored in buffers) as part of the "
+ "checkpoint state, allowing checkpoint barriers to overtake these buffers. Thus, the checkpoint duration "
+ "becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the "
+ "stream of data anymore")
@Default.Boolean(false)
boolean getUnalignedCheckpointEnabled();

void setUnalignedCheckpointEnabled(boolean unalignedCheckpointEnabled);

@Description("Forces unaligned checkpoints, particularly allowing them for iterative jobs.")
@Default.Boolean(false)
boolean getForceUnalignedCheckpointEnabled();

void setForceUnalignedCheckpointEnabled(boolean forceUnalignedCheckpointEnabled);

@Description(
"Shuts down sources which have been idle for the configured time of milliseconds. Once a source has been "
+ "shut down, checkpointing is not possible anymore. Shutting down the sources eventually leads to pipeline "
Expand Down

0 comments on commit bb51380

Please sign in to comment.