Skip to content

Commit

Permalink
[FLINK-34638][cdc-common] Support column with default value
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan authored and leonardBang committed Aug 7, 2024
1 parent f06cc1f commit 4561a8a
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,46 @@ public abstract class Column implements Serializable {

private static final long serialVersionUID = 1L;

protected static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'";
protected static final String FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION =
"%s %s '%s'";

protected static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s";
protected static final String FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
"%s %s '%s'";

protected static final String FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
"%s %s '%s' '%s'";

protected static final String FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION = "%s %s";

protected final String name;

protected final DataType type;

protected final @Nullable String comment;

/**
* Save the literal value of the column's default value, For uncertain functions such as UUID(),
* the value is null, For the current time function such as CURRENT_TIMESTAMP(), the value is
* Unix Epoch time(1970-01-01 00:00:00).
*/
protected final @Nullable String defaultValueExpression;

protected Column(String name, DataType type, @Nullable String comment) {
this.name = name;
this.type = type;
this.comment = comment;
this.defaultValueExpression = null;
}

protected Column(
String name,
DataType type,
@Nullable String comment,
@Nullable String defaultValueExpression) {
this.name = name;
this.type = type;
this.comment = comment;
this.defaultValueExpression = defaultValueExpression;
}

/** Returns the name of this column. */
Expand All @@ -69,17 +95,41 @@ public String getComment() {
return comment;
}

@Nullable
public String getDefaultValueExpression() {
return defaultValueExpression;
}

/** Returns a string that summarizes this column for printing to a console. */
public String asSummaryString() {
if (comment == null) {
return String.format(
FIELD_FORMAT_NO_DESCRIPTION, escapeIdentifier(name), type.asSummaryString());
if (defaultValueExpression == null) {
return String.format(
FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString());
} else {
return String.format(
FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString(),
defaultValueExpression);
}
} else {
return String.format(
FIELD_FORMAT_WITH_DESCRIPTION,
escapeIdentifier(name),
type.asSummaryString(),
escapeSingleQuotes(comment));
if (defaultValueExpression == null) {
return String.format(
FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString(),
escapeSingleQuotes(comment));
} else {
return String.format(
FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString(),
escapeSingleQuotes(comment),
defaultValueExpression);
}
}
}

Expand All @@ -103,19 +153,29 @@ public boolean equals(Object o) {
Column column = (Column) o;
return name.equals(column.name)
&& type.equals(column.type)
&& Objects.equals(comment, column.comment);
&& Objects.equals(comment, column.comment)
&& Objects.equals(defaultValueExpression, column.defaultValueExpression);
}

@Override
public int hashCode() {
return Objects.hash(name, type, comment);
return Objects.hash(name, type, comment, defaultValueExpression);
}

@Override
public String toString() {
return asSummaryString();
}

/** Creates a physical column. */
public static PhysicalColumn physicalColumn(
String name,
DataType type,
@Nullable String comment,
@Nullable String defaultValueExpression) {
return new PhysicalColumn(name, type, comment, defaultValueExpression);
}

/** Creates a physical column. */
public static PhysicalColumn physicalColumn(
String name, DataType type, @Nullable String comment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@ public PhysicalColumn(String name, DataType type, @Nullable String comment) {
super(name, type, comment);
}

public PhysicalColumn(
String name, DataType type, @Nullable String comment, @Nullable String defaultValue) {
super(name, type, comment, defaultValue);
}

@Override
public boolean isPhysical() {
return true;
}

@Override
public Column copy(DataType newType) {
return new PhysicalColumn(name, newType, comment);
return new PhysicalColumn(name, newType, comment, defaultValueExpression);
}

@Override
public Column copy(String newName) {
return new PhysicalColumn(newName, type, comment);
return new PhysicalColumn(newName, type, comment, defaultValueExpression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,21 @@ public Builder physicalColumn(String columnName, DataType type, String comment)
return this;
}

/**
* Declares a physical column that is appended to this schema.
*
* @param columnName column name
* @param type data type of the column
* @param comment description of the column
* @param defaultValue default value of the column
*/
public Builder physicalColumn(
String columnName, DataType type, String comment, String defaultValue) {
checkColumn(columnName, type);
columns.add(Column.physicalColumn(columnName, type, comment, defaultValue));
return this;
}

/**
* Declares a metadata column that is appended to this schema.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ private Map<String, FieldSchema> buildFields(Schema schema) {
}
fieldSchemaMap.put(
column.getName(),
new FieldSchema(column.getName(), typeString, column.getComment()));
new FieldSchema(
column.getName(),
typeString,
column.getDefaultValueExpression(),
column.getComment()));
}
return fieldSchemaMap;
}
Expand Down Expand Up @@ -170,6 +174,7 @@ private void applyAddColumnEvent(AddColumnEvent event)
new FieldSchema(
column.getName(),
buildTypeString(column.getType()),
column.getDefaultValueExpression(),
column.getComment());
schemaChangeManager.addColumn(
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx)

private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(), fromDbzColumn(dbzColumn), dbzColumn.comment());
dbzColumn.name(),
fromDbzColumn(dbzColumn),
dbzColumn.comment(),
dbzColumn.defaultValueExpression().orElse(null));
}

private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
if (!column.isOptional()) {
dataType = dataType.notNull();
}
tableBuilder.physicalColumn(colName, dataType, column.comment());
tableBuilder.physicalColumn(
colName,
dataType,
column.comment(),
column.defaultValueExpression().orElse(null));
}

List<String> primaryKey = table.primaryKeyColumnNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
tableId,
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("description", DataTypes.VARCHAR(512))
.physicalColumn("weight", DataTypes.FLOAT())
.primaryKey(Collections.singletonList("id"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;

/** A {@code MetadataApplier} that applies metadata changes to StarRocks. */
public class StarRocksMetadataApplier implements MetadataApplier {

Expand Down Expand Up @@ -117,8 +119,9 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) {
new StarRocksColumn.Builder()
.setColumnName(column.getName())
.setOrdinalPosition(-1)
.setColumnComment(column.getComment());
StarRocksUtils.toStarRocksDataType(column, false, builder);
.setColumnComment(column.getComment())
.setDefaultValue(column.getDefaultValueExpression());
toStarRocksDataType(column, false, builder);
addColumns.add(builder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public static StarRocksTable toStarRocksTable(
new StarRocksColumn.Builder()
.setColumnName(column.getName())
.setOrdinalPosition(i)
.setColumnComment(column.getComment());
.setColumnComment(column.getComment())
.setDefaultValue(column.getDefaultValueExpression());
toStarRocksDataType(column, i < primaryKeyCount, builder);
starRocksColumns.add(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ private void registerNewSchema(TableId tableId, Schema newSchema) {
/** Serializer for {@link SchemaManager}. */
public static class Serializer implements SimpleVersionedSerializer<SchemaManager> {

public static final int CURRENT_VERSION = 1;
/**
* Update history: from Version 3.0.0, set to 0, from version 3.1.1, updated to 1, from
* version 3.2.0, updated to 2.
*/
public static final int CURRENT_VERSION = 2;

@Override
public int getVersion() {
Expand Down Expand Up @@ -214,6 +218,7 @@ public SchemaManager deserialize(int version, byte[] serialized) throws IOExcept
switch (version) {
case 0:
case 1:
case 2:
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
break;
}
case 1:
case 2:
{
int length = in.readInt();
byte[] serializedSchemaManager = new byte[length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static TableChangeInfo of(
/** Serializer for {@link TableChangeInfo}. */
public static class Serializer implements SimpleVersionedSerializer<TableChangeInfo> {

public static final int CURRENT_VERSION = 1;
public static final int CURRENT_VERSION = 2;

@Override
public int getVersion() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class ColumnSerializer extends TypeSerializerSingleton<Column> {
private final MetadataColumnSerializer metadataColumnSerializer =
MetadataColumnSerializer.INSTANCE;

private static int currentVersion = 2;

/** Update {@link #currentVersion} as We did not directly include this version in the file. */
public static void updateVersion(int version) {
currentVersion = version;
}

@Override
public boolean isImmutableType() {
return false;
Expand Down Expand Up @@ -92,12 +99,16 @@ public void serialize(Column record, DataOutputView target) throws IOException {

@Override
public Column deserialize(DataInputView source) throws IOException {
return deserialize(currentVersion, source);
}

public Column deserialize(int version, DataInputView source) throws IOException {
ColumnType columnType = enumSerializer.deserialize(source);
switch (columnType) {
case METADATA:
return metadataColumnSerializer.deserialize(source);
case PHYSICAL:
return physicalColumnSerializer.deserialize(source);
return physicalColumnSerializer.deserialize(version, source);
default:
throw new IOException("Unknown column type: " + columnType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class PhysicalColumnSerializer extends TypeSerializerSingleton<PhysicalCo
private final DataTypeSerializer dataTypeSerializer = new DataTypeSerializer();
private final StringSerializer stringSerializer = StringSerializer.INSTANCE;

private static final int CURRENT_VERSION = 2;

@Override
public boolean isImmutableType() {
return false;
Expand All @@ -57,7 +59,8 @@ public PhysicalColumn copy(PhysicalColumn from) {
return Column.physicalColumn(
stringSerializer.copy(from.getName()),
dataTypeSerializer.copy(from.getType()),
stringSerializer.copy(from.getComment()));
stringSerializer.copy(from.getComment()),
stringSerializer.copy(from.getDefaultValueExpression()));
}

@Override
Expand All @@ -75,14 +78,37 @@ public void serialize(PhysicalColumn record, DataOutputView target) throws IOExc
stringSerializer.serialize(record.getName(), target);
dataTypeSerializer.serialize(record.getType(), target);
stringSerializer.serialize(record.getComment(), target);
stringSerializer.serialize(record.getDefaultValueExpression(), target);
}

@Override
public PhysicalColumn deserialize(DataInputView source) throws IOException {
String name = stringSerializer.deserialize(source);
DataType dataType = dataTypeSerializer.deserialize(source);
String comment = stringSerializer.deserialize(source);
return Column.physicalColumn(name, dataType, comment);
return deserialize(CURRENT_VERSION, source);
}

public PhysicalColumn deserialize(int version, DataInputView source) throws IOException {
switch (version) {
case 0:
case 1:
{
String name = stringSerializer.deserialize(source);
DataType dataType = dataTypeSerializer.deserialize(source);
String comment = stringSerializer.deserialize(source);
return Column.physicalColumn(name, dataType, comment);
}
case 2:
{
String name = stringSerializer.deserialize(source);
DataType dataType = dataTypeSerializer.deserialize(source);
String comment = stringSerializer.deserialize(source);
String defaultValue = stringSerializer.deserialize(source);
return Column.physicalColumn(name, dataType, comment, defaultValue);
}
default:
{
throw new IOException("Unrecognized serialization version " + version);
}
}
}

@Override
Expand Down
Loading

0 comments on commit 4561a8a

Please sign in to comment.