Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35242] Supports per-SE type configuration & "lenient" evolution behavior #3339

Merged
merged 7 commits into from
Aug 8, 2024

Conversation

yuxiqian
Copy link
Contributor

@yuxiqian yuxiqian commented May 21, 2024

This closes FLINK-35242.

  • Adds TRY_EVOLVE behavior that tolerates exception during metadata applying process
  • Supports configuring various behavior for each schema evolution type with include.schema.changes and exclude.schema.changes sink option
  • Adds schema evolution IT cases

With this change, Schema Operator now stores both "upstream" schema (which always keep up with the structure coming from pipeline source after the transformation) and "evolved" schema (the schema that actually applied to sink). They might differ when:

  • Schema change events are partially or fully ignored (by fine-grained options / IGNORE behavior)
  • Some schema change events failed to apply but pipeline keeps executing (in TRY_EVOLVE mode)

If these cases occur, SchemaOperator needs to cast upstream schema to match the actual evolved version (by adding / removing columns, converting types, etc.) with tolerance (fill in null if meaningful casts are not possible.)

case EXCEPTION:
return exceptionOnSchemaChange(input, parallelism);
return exceptionOnSchemaChange(
Copy link
Contributor

@hk-lrzy hk-lrzy May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some questions about the behavior for IGNORE and EXCEPTION.
Now when we setting the behavior as IGNORE or EXCEPTION, the job will be failed, should it be fixed it or create a new PR for it?

Copy link
Contributor Author

@yuxiqian yuxiqian May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out! I think it would be better fix it in this PR since SchemaOperator & Registry will be greatly modified to implement FLINK-35242.

(Seems #3352 / FLINK-35432 is about an irrelevant problem about MySQL ddl parsing, and should be fine to be reviewed and merged independently.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it, and i also point it in the issue with JIRA / https://issues.apache.org/jira/browse/FLINK-35436 and have another #3355

I think we can merge it into this PR.

@yuxiqian yuxiqian force-pushed the FLINK-35242 branch 2 times, most recently from 4e4dbf5 to a3c9efe Compare May 24, 2024 07:14
@yuxiqian yuxiqian marked this pull request as ready for review May 24, 2024 08:46
@yuxiqian
Copy link
Contributor Author

@PatrickRen @leonardBang PTAL

@yuxiqian yuxiqian force-pushed the FLINK-35242 branch 3 times, most recently from 6b13fe7 to 6959c47 Compare May 27, 2024 01:15
@yuxiqian yuxiqian changed the title [FLINK-35242] Optimize schema evolution & add SE IT cases [FLINK-35242] Support per-type configuration & tolerance SE behaviors May 31, 2024
@yuxiqian yuxiqian changed the title [FLINK-35242] Support per-type configuration & tolerance SE behaviors [FLINK-35242] Supports per-SE type configuration & tolerance evolution behavior May 31, 2024
@yuxiqian yuxiqian force-pushed the FLINK-35242 branch 3 times, most recently from 7470fc4 to 079d531 Compare June 19, 2024 11:06
@yuxiqian yuxiqian force-pushed the FLINK-35242 branch 3 times, most recently from a252a61 to 9ac25ef Compare June 26, 2024 08:25
@yuxiqian
Copy link
Contributor Author

yuxiqian commented Jun 26, 2024

Just added LENIENT schema evolve behavior & rebased to master. Could @loserwang1024 please take a look at this?

Since this PR has grown too big, hopefully this could be reviewed & merged soon to avoid any conflicts hassle in the future. cc @leonardBang

@yuxiqian yuxiqian changed the title [FLINK-35242] Supports per-SE type configuration & tolerance evolution behavior [FLINK-35242] Supports per-SE type configuration & "lenient" evolution behavior Jul 8, 2024
@leonardBang
Copy link
Contributor

@yuxiqian Thanks for the great work, could you rebase to latest master?

@yuxiqian
Copy link
Contributor Author

Done, rebased with master branch

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yuxiqian for the great work, I finished the first round review and left some comments and I think you can start to address my comments.

Comment on lines +26 to +39
public static final SchemaChangeEventType[] ADD = {SchemaChangeEventType.ADD_COLUMN};

public static final SchemaChangeEventType[] ALTER = {SchemaChangeEventType.ALTER_COLUMN_TYPE};

public static final SchemaChangeEventType[] CREATE = {SchemaChangeEventType.CREATE_TABLE};

public static final SchemaChangeEventType[] DROP = {SchemaChangeEventType.DROP_COLUMN};

public static final SchemaChangeEventType[] RENAME = {SchemaChangeEventType.RENAME_COLUMN};

public static final SchemaChangeEventType[] TABLE = {SchemaChangeEventType.CREATE_TABLE};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you share the basis of the categories ? current hierarchy confuse me a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For class names like AddColumnEvent, I've splitten them like add.column, and puts them into both add and column family.

import java.util.Map;

/** A collection class for handling metrics in {@link SchemaOperator}. */
public class SchemaOperatorMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can split the metric logic to another PR in next time, it's totally independent with SE behavior. But, I like the idea to report metrics for SchemaOperator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder, I'll take notice on this next time.

@yuxiqian
Copy link
Contributor Author

yuxiqian commented Aug 7, 2024

Squashed & Rebased with master.

Since this also changes SchemaManager serialization format, could @lvyanquan please take a look? (Serialization version isn't updated since it's expected that no version will be released between this PR and FLINK-34638.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yuxiqian for the great work, the PR generally looks good to me, I just left some minor comments

… evolution behavior & Add schema operator metrics

# Conflicts:
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java
#	flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java
Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yuxiqian for the great work, LGTM

@leonardBang leonardBang merged commit 5ed9e05 into apache:master Aug 8, 2024
20 checks passed
qiaozongmi pushed a commit to qiaozongmi/flink-cdc that referenced this pull request Sep 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants