Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-6999 Fix multi-column add alter table statements
Browse files Browse the repository at this point in the history
  • Loading branch information
Naros committed Oct 9, 2023
1 parent f6c5762 commit 55957ea
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,41 @@ public interface DatabaseDialect {
*/
String getCreateTableStatement(SinkRecordDescriptor record, TableId tableId);

/**
* Gets the prefix used before adding column-clauses in {@code ALTER TABLE} statements.
*
* @return the alter table column-clauses prefix
*/
String getAlterTablePrefix();

/**
* Gets the suffix used after adding the column-clauses in {@code ALTER TABLE} statements.
*
* @return the alter table column-clauses suffix
*/
String getAlterTableSuffix();

/**
* Gets the prefix used before adding each column-clause to {@code ALTER TABLE} statements.
*
* @return the alter table prefix just before each column-clause
*/
String getAlterTableColumnPrefix();

/**
* Gets the suffix used after adding each column-clause to {@code ALTER TABLE statements}.
*
* @return the alter table suffix just after each column-clause
*/
String getAlterTableColumnSuffix();

/**
* Gets the field delimiter used when constructing {@code ALTER TABLE} statements.
*
* @return the field delimiter for alter table SQL statement
*/
String getAlterTableColumnDelimiter();

/**
* Construct a {@code ALTER TABLE} statement specific for this dialect.
*
Expand All @@ -96,15 +131,6 @@ public interface DatabaseDialect {
*/
String getAlterTableStatement(TableDescriptor table, SinkRecordDescriptor record, Set<String> missingFields);

/**
* Gets the field delimeter used when contructing {@code ALTER TABLE} statements.
*
* @return the field delimeter for alter table SQL statement
*/
default String getAlterTableStatementFieldDelimiter() {
return " ";
}

/**
* Construct a {@code INSERT INTO} statement specific for this dialect.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,24 +294,52 @@ public String getCreateTableStatement(SinkRecordDescriptor record, TableId table
return builder.build();
}

@Override
public String getAlterTablePrefix() {
return "ADD (";
}

@Override
public String getAlterTableSuffix() {
return ")";
}

@Override
public String getAlterTableColumnPrefix() {
return "";
}

@Override
public String getAlterTableColumnSuffix() {
return "";
}

@Override
public String getAlterTableColumnDelimiter() {
return ", ";
}

@Override
public String getAlterTableStatement(TableDescriptor table, SinkRecordDescriptor record, Set<String> missingFields) {
final SqlStatementBuilder builder = new SqlStatementBuilder();
builder.append("ALTER TABLE ");
builder.append(getQualifiedTableName(table.getId()));
builder.append(" ");
builder.appendList(getAlterTableStatementFieldDelimiter(), missingFields, (name) -> {
builder.append(getAlterTablePrefix());
builder.appendList(getAlterTableColumnDelimiter(), missingFields, (name) -> {
final FieldDescriptor field = record.getFields().get(name);
final StringBuilder addColumnSpec = new StringBuilder();
addColumnSpec.append("ADD ");
addColumnSpec.append(getAlterTableColumnPrefix());
addColumnSpec.append(" ");
addColumnSpec.append(toIdentifier(columnNamingStrategy.resolveColumnName(field.getColumnName())));
addColumnSpec.append(" ").append(field.getTypeName());
addColumnDefaultValue(field, addColumnSpec);

addColumnSpec.append(field.getSchema().isOptional() ? " NULL" : " NOT NULL");
addColumnSpec.append(getAlterTableColumnSuffix());
return addColumnSpec.toString();
});

builder.append(getAlterTableSuffix());
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@ public int getMaxNVarcharLengthInKey() {
return 255;
}

@Override
public String getAlterTablePrefix() {
return "";
}

@Override
public String getAlterTableSuffix() {
return "";
}

@Override
public String getAlterTableColumnPrefix() {
return "ADD COLUMN";
}

@Override
public String getAlterTableColumnDelimiter() {
return " ";
}

@Override
public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
final SqlStatementBuilder builder = new SqlStatementBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,6 @@ protected void registerTypes() {
registerType(PointType.INSTANCE);
}

@Override
public String getAlterTableStatementFieldDelimiter() {
return ",";
}

@Override
public int getMaxVarcharLengthInKey() {
return 255;
Expand All @@ -144,6 +139,11 @@ public String getFormattedTimestampWithTimeZone(String value) {
return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime));
}

@Override
public String getAlterTablePrefix() {
return "ADD COLUMN (";
}

@Override
public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
final SqlStatementBuilder builder = new SqlStatementBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ public TableDescriptor readTable(Connection connection, TableId tableId) throws
return super.readTable(connection, tableId);
}

@Override
public String getAlterTablePrefix() {
return "";
}

@Override
public String getAlterTableSuffix() {
return "";
}

@Override
public String getAlterTableColumnPrefix() {
return "ADD COLUMN ";
}

@Override
public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
final SqlStatementBuilder builder = new SqlStatementBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ public String getInsertStatement(TableDescriptor table, SinkRecordDescriptor rec
return wrapWithIdentityInsert(table, insertStatement);
}

@Override
public String getAlterTablePrefix() {
return "ADD ";
}

@Override
public String getAlterTableSuffix() {
return "";
}

private String wrapWithIdentityInsert(TableDescriptor table, String sqlStatement) {

if (!table.hasAutoGeneratedIdentityColumn() || !getConfig().isSqlServerIdentityInsert()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,28 @@ public void testTableCreatedThenAlteredWithNewColumn(SinkRecordFactory factory)
.recordSchema(SchemaBuilder.struct()
.field("id", Schema.INT8_SCHEMA)
.field("name", Schema.OPTIONAL_STRING_SCHEMA)
.field("age", Schema.OPTIONAL_INT32_SCHEMA))
.field("age", Schema.OPTIONAL_INT32_SCHEMA)
.field("weight", Schema.OPTIONAL_INT32_SCHEMA))
.sourceSchema(factory.basicSourceSchema())
.key("id", (byte) 1)
.before("id", (byte) 1)
.before("name", "John Doe")
.after("id", (byte) 1)
.after("name", "John Doe")
.after("age", 25)
.after("weight", 150)
.source("ts_ms", (int) Instant.now().getEpochSecond())
.build();
consume(updateRecord);

final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createRecord));
tableAssert.hasNumberOfRows(2).hasNumberOfColumns(4);
tableAssert.hasNumberOfRows(2).hasNumberOfColumns(5);

getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER);
getSink().assertColumnType(tableAssert, "name", ValueType.TEXT);
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$", null);
getSink().assertColumnType(tableAssert, "age", ValueType.NUMBER, null, 25);
getSink().assertColumnType(tableAssert, "weight", ValueType.NUMBER, null, 150);
}

@ParameterizedTest
Expand Down

0 comments on commit 55957ea

Please sign in to comment.