diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java index 5c1c59d5e6..910032c468 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Column.java @@ -38,9 +38,16 @@ public abstract class Column implements Serializable { private static final long serialVersionUID = 1L; - protected static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'"; + protected static final String FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION = + "%s %s '%s'"; - protected static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s"; + protected static final String FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION = + "%s %s '%s'"; + + protected static final String FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION = + "%s %s '%s' '%s'"; + + protected static final String FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION = "%s %s"; protected final String name; @@ -48,10 +55,29 @@ public abstract class Column implements Serializable { protected final @Nullable String comment; + /** + * Save the literal value of the column's default value, For uncertain functions such as UUID(), + * the value is null, For the current time function such as CURRENT_TIMESTAMP(), the value is + * Unix Epoch time(1970-01-01 00:00:00). + */ + protected final @Nullable String defaultValueExpression; + protected Column(String name, DataType type, @Nullable String comment) { this.name = name; this.type = type; this.comment = comment; + this.defaultValueExpression = null; + } + + protected Column( + String name, + DataType type, + @Nullable String comment, + @Nullable String defaultValueExpression) { + this.name = name; + this.type = type; + this.comment = comment; + this.defaultValueExpression = defaultValueExpression; } /** Returns the name of this column. */ @@ -69,17 +95,41 @@ public String getComment() { return comment; } + @Nullable + public String getDefaultValueExpression() { + return defaultValueExpression; + } + /** Returns a string that summarizes this column for printing to a console. */ public String asSummaryString() { if (comment == null) { - return String.format( - FIELD_FORMAT_NO_DESCRIPTION, escapeIdentifier(name), type.asSummaryString()); + if (defaultValueExpression == null) { + return String.format( + FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION, + escapeIdentifier(name), + type.asSummaryString()); + } else { + return String.format( + FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION, + escapeIdentifier(name), + type.asSummaryString(), + defaultValueExpression); + } } else { - return String.format( - FIELD_FORMAT_WITH_DESCRIPTION, - escapeIdentifier(name), - type.asSummaryString(), - escapeSingleQuotes(comment)); + if (defaultValueExpression == null) { + return String.format( + FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION, + escapeIdentifier(name), + type.asSummaryString(), + escapeSingleQuotes(comment)); + } else { + return String.format( + FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION, + escapeIdentifier(name), + type.asSummaryString(), + escapeSingleQuotes(comment), + defaultValueExpression); + } } } @@ -103,12 +153,13 @@ public boolean equals(Object o) { Column column = (Column) o; return name.equals(column.name) && type.equals(column.type) - && Objects.equals(comment, column.comment); + && Objects.equals(comment, column.comment) + && Objects.equals(defaultValueExpression, column.defaultValueExpression); } @Override public int hashCode() { - return Objects.hash(name, type, comment); + return Objects.hash(name, type, comment, defaultValueExpression); } @Override @@ -116,6 +167,15 @@ public String toString() { return asSummaryString(); } + /** Creates a physical column. */ + public static PhysicalColumn physicalColumn( + String name, + DataType type, + @Nullable String comment, + @Nullable String defaultValueExpression) { + return new PhysicalColumn(name, type, comment, defaultValueExpression); + } + /** Creates a physical column. */ public static PhysicalColumn physicalColumn( String name, DataType type, @Nullable String comment) { diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java index 1f3e264279..bf711f791d 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/PhysicalColumn.java @@ -32,6 +32,11 @@ public PhysicalColumn(String name, DataType type, @Nullable String comment) { super(name, type, comment); } + public PhysicalColumn( + String name, DataType type, @Nullable String comment, @Nullable String defaultValue) { + super(name, type, comment, defaultValue); + } + @Override public boolean isPhysical() { return true; @@ -39,11 +44,11 @@ public boolean isPhysical() { @Override public Column copy(DataType newType) { - return new PhysicalColumn(name, newType, comment); + return new PhysicalColumn(name, newType, comment, defaultValueExpression); } @Override public Column copy(String newName) { - return new PhysicalColumn(newName, type, comment); + return new PhysicalColumn(newName, type, comment, defaultValueExpression); } } diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java index 62657be885..2db509e6e6 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/schema/Schema.java @@ -316,6 +316,21 @@ public Builder physicalColumn(String columnName, DataType type, String comment) return this; } + /** + * Declares a physical column that is appended to this schema. + * + * @param columnName column name + * @param type data type of the column + * @param comment description of the column + * @param defaultValue default value of the column + */ + public Builder physicalColumn( + String columnName, DataType type, String comment, String defaultValue) { + checkColumn(columnName, type); + columns.add(Column.physicalColumn(columnName, type, comment, defaultValue)); + return this; + } + /** * Declares a metadata column that is appended to this schema. * diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index ad025c6295..8ea6f60322 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -134,7 +134,11 @@ private Map buildFields(Schema schema) { } fieldSchemaMap.put( column.getName(), - new FieldSchema(column.getName(), typeString, column.getComment())); + new FieldSchema( + column.getName(), + typeString, + column.getDefaultValueExpression(), + column.getComment())); } return fieldSchemaMap; } @@ -170,6 +174,7 @@ private void applyAddColumnEvent(AddColumnEvent event) new FieldSchema( column.getName(), buildTypeString(column.getType()), + column.getDefaultValueExpression(), column.getComment()); schemaChangeManager.addColumn( tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 6f3fb9a8cc..cde5aa0c51 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -253,7 +253,10 @@ public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) { return org.apache.flink.cdc.common.schema.Column.physicalColumn( - dbzColumn.name(), fromDbzColumn(dbzColumn), dbzColumn.comment()); + dbzColumn.name(), + fromDbzColumn(dbzColumn), + dbzColumn.comment(), + dbzColumn.defaultValueExpression().orElse(null)); } private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java index b7434bf94d..3f75813471 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java @@ -205,7 +205,11 @@ private Schema parseDDL(String ddlStatement, TableId tableId) { if (!column.isOptional()) { dataType = dataType.notNull(); } - tableBuilder.physicalColumn(colName, dataType, column.comment()); + tableBuilder.physicalColumn( + colName, + dataType, + column.comment(), + column.defaultValueExpression().orElse(null)); } List primaryKey = table.primaryKeyColumnNames(); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 5eb6ce0e59..076e57364c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -355,7 +355,7 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { tableId, Schema.newBuilder() .physicalColumn("id", DataTypes.INT().notNull()) - .physicalColumn("name", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink") .physicalColumn("description", DataTypes.VARCHAR(512)) .physicalColumn("weight", DataTypes.FLOAT()) .primaryKey(Collections.singletonList("id")) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index 5ee93ff62e..34b483d00a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -37,6 +37,8 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType; + /** A {@code MetadataApplier} that applies metadata changes to StarRocks. */ public class StarRocksMetadataApplier implements MetadataApplier { @@ -117,8 +119,9 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) { new StarRocksColumn.Builder() .setColumnName(column.getName()) .setOrdinalPosition(-1) - .setColumnComment(column.getComment()); - StarRocksUtils.toStarRocksDataType(column, false, builder); + .setColumnComment(column.getComment()) + .setDefaultValue(column.getDefaultValueExpression()); + toStarRocksDataType(column, false, builder); addColumns.add(builder.build()); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java index ccab99c7a3..50dab2ac45 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksUtils.java @@ -84,7 +84,8 @@ public static StarRocksTable toStarRocksTable( new StarRocksColumn.Builder() .setColumnName(column.getName()) .setOrdinalPosition(i) - .setColumnComment(column.getComment()); + .setColumnComment(column.getComment()) + .setDefaultValue(column.getDefaultValueExpression()); toStarRocksDataType(column, i < primaryKeyCount, builder); starRocksColumns.add(builder.build()); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java index 99335180d9..30fc86a608 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java @@ -172,7 +172,11 @@ private void registerNewSchema(TableId tableId, Schema newSchema) { /** Serializer for {@link SchemaManager}. */ public static class Serializer implements SimpleVersionedSerializer { - public static final int CURRENT_VERSION = 1; + /** + * Update history: from Version 3.0.0, set to 0, from version 3.1.1, updated to 1, from + * version 3.2.0, updated to 2. + */ + public static final int CURRENT_VERSION = 2; @Override public int getVersion() { @@ -214,6 +218,7 @@ public SchemaManager deserialize(int version, byte[] serialized) throws IOExcept switch (version) { case 0: case 1: + case 2: TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE; SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE; try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index 02abb8903e..8de38e1406 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -214,6 +214,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData break; } case 1: + case 2: { int length = in.readInt(); byte[] serializedSchemaManager = new byte[length]; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java index 7fab9c65ad..fa04fa50a5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TableChangeInfo.java @@ -109,7 +109,7 @@ public static TableChangeInfo of( /** Serializer for {@link TableChangeInfo}. */ public static class Serializer implements SimpleVersionedSerializer { - public static final int CURRENT_VERSION = 1; + public static final int CURRENT_VERSION = 2; @Override public int getVersion() { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java index d6c80eadda..c8e60386e6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/ColumnSerializer.java @@ -46,6 +46,13 @@ public class ColumnSerializer extends TypeSerializerSingleton { private final MetadataColumnSerializer metadataColumnSerializer = MetadataColumnSerializer.INSTANCE; + private static int currentVersion = 2; + + /** Update {@link #currentVersion} as We did not directly include this version in the file. */ + public static void updateVersion(int version) { + currentVersion = version; + } + @Override public boolean isImmutableType() { return false; @@ -92,12 +99,16 @@ public void serialize(Column record, DataOutputView target) throws IOException { @Override public Column deserialize(DataInputView source) throws IOException { + return deserialize(currentVersion, source); + } + + public Column deserialize(int version, DataInputView source) throws IOException { ColumnType columnType = enumSerializer.deserialize(source); switch (columnType) { case METADATA: return metadataColumnSerializer.deserialize(source); case PHYSICAL: - return physicalColumnSerializer.deserialize(source); + return physicalColumnSerializer.deserialize(version, source); default: throw new IOException("Unknown column type: " + columnType); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java index e163d39ad7..b91c260f0d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/schema/PhysicalColumnSerializer.java @@ -42,6 +42,8 @@ public class PhysicalColumnSerializer extends TypeSerializerSingleton getTypeClass() { protected PhysicalColumn[] getTestData() { return new PhysicalColumn[] { Column.physicalColumn("col1", DataTypes.BIGINT()), - Column.physicalColumn("col1", DataTypes.BIGINT(), "comment") + Column.physicalColumn("col1", DataTypes.BIGINT(), "comment"), + Column.physicalColumn("col1", DataTypes.BIGINT(), "comment", "default value") }; } }