Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35813][runtime] Do not clear state field in TSO until operator was closed #3469

Merged
merged 1 commit into from
Aug 6, 2024

Conversation

yuxiqian
Copy link
Contributor

@yuxiqian yuxiqian commented Jul 11, 2024

This closes FLINK-35813.

Currently, transform schema operator clears its state field after its finish method was called. However, checkpoints / savepoints could be triggered between finish and close, and any following checkpoint requests in snapshotState method will cause an NPE.

For example, with a bounded pipeline job like this:

source:
  type: values
sink:
  type: values
transform:
   ...

would cause null pointer exception after job finishes:

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 1 for operator Source: Flink CDC Event Source: values -> Transform:Schema (1/1)#0. Failure reason: Checkpoint was declined.
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:281)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:185)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:347)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:720)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:352)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$16(StreamTask.java:1369)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1357)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1242)
	... 14 more
Caused by: java.lang.NullPointerException
	at org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator.snapshotState(TransformSchemaOperator.java:137)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:234)
	... 25 more

By postponing clearing state to close() should resolve this problem.

@yuxiqian
Copy link
Contributor Author

@aiwenmo Please take a look

Copy link
Contributor

@aiwenmo aiwenmo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@leonardBang leonardBang merged commit 44dafe3 into apache:master Aug 6, 2024
10 of 17 checks passed
qiaozongmi pushed a commit to qiaozongmi/flink-cdc that referenced this pull request Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants