From 7fdb308d2dd91cb3b7c2a63c0d32d84df29bbec7 Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Wed, 4 Oct 2023 15:46:24 -0400 Subject: [PATCH] DBZ-6999 Fix multi-column add alter table statements --- .../jdbc/dialect/DatabaseDialect.java | 44 +++++++++++++++---- .../jdbc/dialect/GeneralDatabaseDialect.java | 34 ++++++++++++-- .../jdbc/dialect/db2/Db2DatabaseDialect.java | 20 +++++++++ .../dialect/mysql/MySqlDatabaseDialect.java | 10 ++--- .../postgres/PostgresDatabaseDialect.java | 15 +++++++ .../sqlserver/SqlServerDatabaseDialect.java | 10 +++++ .../AbstractJdbcSinkSchemaEvolutionTest.java | 10 +++-- 7 files changed, 123 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java index 0670ae81..cb282cf1 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java @@ -86,6 +86,41 @@ public interface DatabaseDialect { */ String getCreateTableStatement(SinkRecordDescriptor record, TableId tableId); + /** + * Gets the prefix used before adding column-clauses in {@code ALTER TABLE} statements. + * + * @return the alter table column-clauses prefix + */ + String getAlterTablePrefix(); + + /** + * Gets the suffix used after adding the column-clauses in {@code ALTER TABLE} statements. + * + * @return the alter table column-clauses suffix + */ + String getAlterTableSuffix(); + + /** + * Gets the prefix used before adding each column-clause to {@code ALTER TABLE} statements. + * + * @return the alter table prefix just before each column-clause + */ + String getAlterTableColumnPrefix(); + + /** + * Gets the suffix used after adding each column-clause to {@code ALTER TABLE statements}. + * + * @return the alter table suffix just after each column-clause + */ + String getAlterTableColumnSuffix(); + + /** + * Gets the field delimiter used when constructing {@code ALTER TABLE} statements. + * + * @return the field delimiter for alter table SQL statement + */ + String getAlterTableColumnDelimiter(); + /** * Construct a {@code ALTER TABLE} statement specific for this dialect. * @@ -97,15 +132,6 @@ public interface DatabaseDialect { */ String getAlterTableStatement(TableDescriptor table, SinkRecordDescriptor record, Set missingFields); - /** - * Gets the field delimeter used when contructing {@code ALTER TABLE} statements. - * - * @return the field delimeter for alter table SQL statement - */ - default String getAlterTableStatementFieldDelimiter() { - return " "; - } - /** * Construct a {@code INSERT INTO} statement specific for this dialect. * diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index 65384995..8fa8a9b5 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -287,24 +287,52 @@ public String getCreateTableStatement(SinkRecordDescriptor record, TableId table return builder.build(); } + @Override + public String getAlterTablePrefix() { + return "ADD ("; + } + + @Override + public String getAlterTableSuffix() { + return ")"; + } + + @Override + public String getAlterTableColumnPrefix() { + return ""; + } + + @Override + public String getAlterTableColumnSuffix() { + return ""; + } + + @Override + public String getAlterTableColumnDelimiter() { + return ", "; + } + @Override public String getAlterTableStatement(TableDescriptor table, SinkRecordDescriptor record, Set missingFields) { final SqlStatementBuilder builder = new SqlStatementBuilder(); builder.append("ALTER TABLE "); builder.append(getQualifiedTableName(table.getId())); builder.append(" "); - builder.appendList(getAlterTableStatementFieldDelimiter(), missingFields, (name) -> { + builder.append(getAlterTablePrefix()); + builder.appendList(getAlterTableColumnDelimiter(), missingFields, (name) -> { final FieldDescriptor field = record.getFields().get(name); final StringBuilder addColumnSpec = new StringBuilder(); - addColumnSpec.append("ADD "); + addColumnSpec.append(getAlterTableColumnPrefix()); + addColumnSpec.append(" "); addColumnSpec.append(toIdentifier(columnNamingStrategy.resolveColumnName(name))); addColumnSpec.append(" ").append(field.getTypeName()); addColumnDefaultValue(field, addColumnSpec); addColumnSpec.append(field.getSchema().isOptional() ? " NULL" : " NOT NULL"); + addColumnSpec.append(getAlterTableColumnSuffix()); return addColumnSpec.toString(); }); - + builder.append(getAlterTableSuffix()); return builder.build(); } diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java index 8c3af0b3..0a487346 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/db2/Db2DatabaseDialect.java @@ -87,6 +87,26 @@ public int getMaxNVarcharLengthInKey() { return 255; } + @Override + public String getAlterTablePrefix() { + return ""; + } + + @Override + public String getAlterTableSuffix() { + return ""; + } + + @Override + public String getAlterTableColumnPrefix() { + return "ADD COLUMN"; + } + + @Override + public String getAlterTableColumnDelimiter() { + return " "; + } + @Override public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) { final SqlStatementBuilder builder = new SqlStatementBuilder(); diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java index c9176233..d0e3be34 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java @@ -111,11 +111,6 @@ protected void registerTypes() { registerType(MapToJsonType.INSTANCE); } - @Override - public String getAlterTableStatementFieldDelimiter() { - return ","; - } - @Override public int getMaxVarcharLengthInKey() { return 255; @@ -142,6 +137,11 @@ public String getFormattedTimestampWithTimeZone(String value) { return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime)); } + @Override + public String getAlterTablePrefix() { + return "ADD COLUMN ("; + } + @Override public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) { final SqlStatementBuilder builder = new SqlStatementBuilder(); diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java index 553ad828..f76641d7 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java @@ -78,6 +78,21 @@ public TableDescriptor readTable(Connection connection, TableId tableId) throws return super.readTable(connection, tableId); } + @Override + public String getAlterTablePrefix() { + return ""; + } + + @Override + public String getAlterTableSuffix() { + return ""; + } + + @Override + public String getAlterTableColumnPrefix() { + return "ADD COLUMN "; + } + @Override public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) { final SqlStatementBuilder builder = new SqlStatementBuilder(); diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java index 80a0dfb4..a5e79866 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/sqlserver/SqlServerDatabaseDialect.java @@ -47,6 +47,16 @@ private SqlServerDatabaseDialect(JdbcSinkConnectorConfig config, SessionFactory super(config, sessionFactory); } + @Override + public String getAlterTablePrefix() { + return "ADD "; + } + + @Override + public String getAlterTableSuffix() { + return ""; + } + @Override protected Optional getDatabaseTimeZoneQuery() { return Optional.of("SELECT CURRENT_TIMEZONE()"); diff --git a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java index 65d838ee..75563407 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/AbstractJdbcSinkSchemaEvolutionTest.java @@ -185,7 +185,8 @@ public void testTableCreatedThenAlteredWithNewColumn(SinkRecordFactory factory) .recordSchema(SchemaBuilder.struct() .field("id", Schema.INT8_SCHEMA) .field("name", Schema.OPTIONAL_STRING_SCHEMA) - .field("age", Schema.OPTIONAL_INT32_SCHEMA)) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .field("weight", Schema.OPTIONAL_INT32_SCHEMA)) .sourceSchema(factory.basicSourceSchema()) .key("id", (byte) 1) .before("id", (byte) 1) @@ -193,16 +194,19 @@ public void testTableCreatedThenAlteredWithNewColumn(SinkRecordFactory factory) .after("id", (byte) 1) .after("name", "John Doe") .after("age", 25) + .after("weight", 150) .source("ts_ms", (int) Instant.now().getEpochSecond()) .build(); consume(updateRecord); final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createRecord)); - tableAssert.hasNumberOfRows(2).hasNumberOfColumns(3); + tableAssert.hasNumberOfRows(2).hasNumberOfColumns(5); getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER); getSink().assertColumnType(tableAssert, "name", ValueType.TEXT); - getSink().assertColumnType(tableAssert, "age", ValueType.NUMBER, 25, (Number) null); + getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$", null); + getSink().assertColumnType(tableAssert, "age", ValueType.NUMBER, null, 25); + getSink().assertColumnType(tableAssert, "weight", ValueType.NUMBER, null, 150); } @ParameterizedTest