diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 8179fdbffe..aa75073373 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -176,6 +176,7 @@ private SourceDef toSourceDef(JsonNode sourceNode) { private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) { List includedSETypes = new ArrayList<>(); List excludedSETypes = new ArrayList<>(); + boolean excludedFieldNotPresent = sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES) == null; Optional.ofNullable(sinkNode.get(INCLUDE_SCHEMA_EVOLUTION_TYPES)) .ifPresent(e -> e.forEach(tag -> includedSETypes.add(tag.asText()))); @@ -191,8 +192,7 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe .forEach(includedSETypes::add); } - if (excludedSETypes.isEmpty() - && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { + if (excludedFieldNotPresent && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { // In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be // overridden by manually specifying excluded types. Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE) diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index 2ecf458707..49e984781a 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.cli.parser; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.composer.definition.PipelineDef; import org.apache.flink.cdc.composer.definition.RouteDef; @@ -37,12 +38,15 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.Set; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE; import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN; +import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; @@ -190,6 +194,99 @@ void testUdfDefinition() throws Exception { assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf); } + @Test + void testSchemaEvolutionTypesConfiguration() throws Exception { + testSchemaEvolutionTypesParsing( + "evolve", + null, + null, + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + testSchemaEvolutionTypesParsing( + "try_evolve", + null, + null, + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + testSchemaEvolutionTypesParsing( + "evolve", + "[column, table]", + "[drop]", + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + testSchemaEvolutionTypesParsing( + "lenient", + null, + null, + ImmutableSet.of( + ADD_COLUMN, ALTER_COLUMN_TYPE, CREATE_TABLE, DROP_COLUMN, RENAME_COLUMN)); + testSchemaEvolutionTypesParsing( + "lenient", + null, + "[]", + ImmutableSet.of( + ADD_COLUMN, + ALTER_COLUMN_TYPE, + CREATE_TABLE, + DROP_COLUMN, + DROP_TABLE, + RENAME_COLUMN, + TRUNCATE_TABLE)); + } + + private void testSchemaEvolutionTypesParsing( + String behavior, String included, String excluded, Set expected) + throws Exception { + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = + parser.parse( + "source:\n" + + " type: foo\n" + + "sink:\n" + + " type: bar\n" + + (included != null + ? String.format(" include.schema.changes: %s\n", included) + : "") + + (excluded != null + ? String.format(" exclude.schema.changes: %s\n", excluded) + : "") + + "pipeline:\n" + + " schema.change.behavior: " + + behavior + + "\n" + + " parallelism: 1\n", + new Configuration()); + assertThat(pipelineDef) + .isEqualTo( + new PipelineDef( + new SourceDef("foo", null, new Configuration()), + new SinkDef("bar", null, new Configuration(), expected), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Configuration.fromMap( + ImmutableMap.builder() + .put("schema.change.behavior", behavior) + .put("parallelism", "1") + .build()))); + } + private final PipelineDef fullDef = new PipelineDef( new SourceDef(