diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index 04e5f59815..feb5121bc0 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -40,7 +40,9 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -348,4 +350,73 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche }); return oldSchema.copy(columns); } + + public static Optional transformSchemaChangeEvent( + boolean hasAsterisk, List referencedColumns, SchemaChangeEvent event) { + Optional evolvedSchemaChangeEvent = Optional.empty(); + if (event instanceof AddColumnEvent) { + // Send add column events to downstream iff there's an asterisk + if (hasAsterisk) { + List addedColumns = + ((AddColumnEvent) event) + .getAddedColumns().stream() + .map( + e -> { + if (AddColumnEvent.ColumnPosition.LAST.equals( + e.getPosition())) { + return new AddColumnEvent + .ColumnWithPosition( + e.getAddColumn(), + AddColumnEvent.ColumnPosition.AFTER, + referencedColumns.get( + referencedColumns.size() + - 1)); + } else if (AddColumnEvent.ColumnPosition.FIRST + .equals(e.getPosition())) { + return new AddColumnEvent + .ColumnWithPosition( + e.getAddColumn(), + AddColumnEvent.ColumnPosition + .BEFORE, + referencedColumns.get(0)); + } else { + return e; + } + }) + .collect(Collectors.toList()); + evolvedSchemaChangeEvent = + Optional.of(new AddColumnEvent(event.tableId(), addedColumns)); + } + } else if (event instanceof AlterColumnTypeEvent) { + AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) event; + if (hasAsterisk) { + // In wildcard mode, all alter column type events should be sent to downstream + evolvedSchemaChangeEvent = Optional.of(event); + } else { + // Or, we need to filter out those referenced columns and reconstruct + // SchemaChangeEvents + Map newDataTypeMap = + alterColumnTypeEvent.getTypeMapping().entrySet().stream() + .filter(e -> referencedColumns.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + if (!newDataTypeMap.isEmpty()) { + evolvedSchemaChangeEvent = + Optional.of( + new AlterColumnTypeEvent( + alterColumnTypeEvent.tableId(), newDataTypeMap)); + } + } + } else if (event instanceof RenameColumnEvent) { + if (hasAsterisk) { + evolvedSchemaChangeEvent = Optional.of(event); + } + } else if (event instanceof DropColumnEvent) { + if (hasAsterisk) { + evolvedSchemaChangeEvent = Optional.of(event); + } + } else { + evolvedSchemaChangeEvent = Optional.of(event); + } + return evolvedSchemaChangeEvent; + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index fdf447a630..30c78d202f 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -293,7 +293,7 @@ private Optional cacheSchema(SchemaChangeEvent event) throws if (event instanceof CreateTableEvent) { return Optional.of(new CreateTableEvent(tableId, projectedSchema)); } else { - return TransformParser.transformSchemaChangeEvent( + return SchemaUtils.transformSchemaChangeEvent( hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 37ed0e7e3d..5d87980684 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -269,7 +269,7 @@ private Optional cacheChangeSchema(SchemaChangeEvent event) { SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event); Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema(); Optional schemaChangeEvent = - TransformParser.transformSchemaChangeEvent( + SchemaUtils.transformSchemaChangeEvent( hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event); if (schemaChangeEvent.isPresent()) { preTransformedSchema = diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 72e880b9de..38121f35d9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -18,11 +18,6 @@ package org.apache.flink.cdc.runtime.parser; import org.apache.flink.api.common.io.ParseException; -import org.apache.flink.cdc.common.event.AddColumnEvent; -import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; -import org.apache.flink.cdc.common.event.DropColumnEvent; -import org.apache.flink.cdc.common.event.RenameColumnEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn; @@ -612,73 +607,4 @@ private static boolean hasAsterisk(SqlNode sqlNode) { return false; } } - - public static Optional transformSchemaChangeEvent( - boolean hasAsterisk, List referencedColumns, SchemaChangeEvent event) { - Optional evolvedSchemaChangeEvent = Optional.empty(); - if (event instanceof AddColumnEvent) { - // Send add column events to downstream iff there's an asterisk - if (hasAsterisk) { - List addedColumns = - ((AddColumnEvent) event) - .getAddedColumns().stream() - .map( - e -> { - if (AddColumnEvent.ColumnPosition.LAST.equals( - e.getPosition())) { - return new AddColumnEvent - .ColumnWithPosition( - e.getAddColumn(), - AddColumnEvent.ColumnPosition.AFTER, - referencedColumns.get( - referencedColumns.size() - - 1)); - } else if (AddColumnEvent.ColumnPosition.FIRST - .equals(e.getPosition())) { - return new AddColumnEvent - .ColumnWithPosition( - e.getAddColumn(), - AddColumnEvent.ColumnPosition - .BEFORE, - referencedColumns.get(0)); - } else { - return e; - } - }) - .collect(Collectors.toList()); - evolvedSchemaChangeEvent = - Optional.of(new AddColumnEvent(event.tableId(), addedColumns)); - } - } else if (event instanceof AlterColumnTypeEvent) { - AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) event; - if (hasAsterisk) { - // In wildcard mode, all alter column type events should be sent to downstream - evolvedSchemaChangeEvent = Optional.of(event); - } else { - // Or, we need to filter out those referenced columns and reconstruct - // SchemaChangeEvents - Map newDataTypeMap = - alterColumnTypeEvent.getTypeMapping().entrySet().stream() - .filter(e -> referencedColumns.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - if (!newDataTypeMap.isEmpty()) { - evolvedSchemaChangeEvent = - Optional.of( - new AlterColumnTypeEvent( - alterColumnTypeEvent.tableId(), newDataTypeMap)); - } - } - } else if (event instanceof RenameColumnEvent) { - if (hasAsterisk) { - evolvedSchemaChangeEvent = Optional.of(event); - } - } else if (event instanceof DropColumnEvent) { - if (hasAsterisk) { - evolvedSchemaChangeEvent = Optional.of(event); - } - } else { - evolvedSchemaChangeEvent = Optional.of(event); - } - return evolvedSchemaChangeEvent; - } }