Skip to content

Commit

Permalink
[FLINK-36514] Fix unable to override exclude schema types in lenient …
Browse files Browse the repository at this point in the history
…mode

Signed-off-by: yuxiqian <[email protected]>
  • Loading branch information
yuxiqian committed Oct 12, 2024
1 parent 4b13c49 commit f47c309
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ private SourceDef toSourceDef(JsonNode sourceNode) {
private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();
boolean excludedFieldNotPresent = sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES) == null;

Optional.ofNullable(sinkNode.get(INCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> includedSETypes.add(tag.asText())));
Expand All @@ -191,8 +192,7 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe
.forEach(includedSETypes::add);
}

if (excludedSETypes.isEmpty()
&& SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
if (excludedFieldNotPresent && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
// In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be
// overridden by manually specifying excluded types.
Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.cli.parser;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
Expand All @@ -37,12 +38,15 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.TRUNCATE_TABLE;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
Expand Down Expand Up @@ -190,6 +194,103 @@ void testUdfDefinition() throws Exception {
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
}

@Test
void testSchemaEvolutionTypesConfiguration() throws Exception {
testSchemaEvolutionTypesParsing(
"evolve",
null,
null,
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
testSchemaEvolutionTypesParsing(
"try_evolve",
null,
null,
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
testSchemaEvolutionTypesParsing(
"evolve",
"[column, table]",
"[drop]",
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
testSchemaEvolutionTypesParsing(
"lenient",
null,
null,
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
RENAME_COLUMN));
testSchemaEvolutionTypesParsing(
"lenient",
null,
"[]",
ImmutableSet.of(
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
DROP_TABLE,
RENAME_COLUMN,
TRUNCATE_TABLE));
}

private void testSchemaEvolutionTypesParsing(
String behavior, String included, String excluded, Set<SchemaChangeEventType> expected)
throws Exception {
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef =
parser.parse(
"source:\n"
+ " type: foo\n"
+ "sink:\n"
+ " type: bar\n"
+ (included != null
? String.format(" include.schema.changes: %s\n", included)
: "")
+ (excluded != null
? String.format(" exclude.schema.changes: %s\n", excluded)
: "")
+ "pipeline:\n"
+ " schema.change.behavior: "
+ behavior
+ "\n"
+ " parallelism: 1\n",
new Configuration());
assertThat(pipelineDef)
.isEqualTo(
new PipelineDef(
new SourceDef("foo", null, new Configuration()),
new SinkDef("bar", null, new Configuration(), expected),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("schema.change.behavior", behavior)
.put("parallelism", "1")
.build())));
}

private final PipelineDef fullDef =
new PipelineDef(
new SourceDef(
Expand Down

0 comments on commit f47c309

Please sign in to comment.