diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index 7041a5ea..5042b988 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -262,7 +262,7 @@ public String getCreateTableStatement(SinkRecordDescriptor record, TableId table // First handle key columns builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> { final FieldDescriptor field = record.getFields().get(name); - final String columnName = toIdentifier(columnNamingStrategy.resolveColumnName(field.getColumnName())); + final String columnName = toIdentifier(resolveColumnName(field)); final String columnType = field.getTypeName(); @@ -633,7 +633,7 @@ protected void addColumnDefaultValue(FieldDescriptor field, StringBuilder column protected String columnQueryBindingFromField(String fieldName, TableDescriptor table, SinkRecordDescriptor record) { final FieldDescriptor field = record.getFields().get(fieldName); - final String columnName = resolveColumnNameFromField(field.getColumnName()); + final String columnName = resolveColumnName(field); final ColumnDescriptor column = table.getColumnByName(columnName); if (column == null) { throw new DebeziumException("Failed to find column " + columnName + " in table " + table.getId().getTableName()); @@ -680,8 +680,7 @@ private Object getColumnValueForKafkaKeyMode(String columnName, SinkRecordDescri protected String columnNameFromField(String fieldName, SinkRecordDescriptor record) { final FieldDescriptor field = record.getFields().get(fieldName); - final String columnName = getColumnNamingStrategy().resolveColumnName(field.getColumnName()); - return getIdentifierHelper().toIdentifier(columnName, getConfig().isQuoteIdentifiers()).render(dialect); + return toIdentifier(resolveColumnName(field)); } protected String columnNameFromField(String fieldName, String prefix, SinkRecordDescriptor record) { diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java index 1c09be7e..57f5d604 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java @@ -18,7 +18,6 @@ import io.debezium.connector.jdbc.JdbcSinkConnectorConfig; import io.debezium.connector.jdbc.SinkRecordDescriptor; -import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor; import io.debezium.connector.jdbc.dialect.DatabaseDialect; import io.debezium.connector.jdbc.dialect.DatabaseDialectProvider; import io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect; @@ -196,15 +195,4 @@ protected String resolveColumnNameFromField(String fieldName) { } return columnName; } - - @Override - protected String resolveColumnName(FieldDescriptor field) { - final String columnName = getColumnNamingStrategy().resolveColumnName(field.getColumnName()); - final String columnIdentifier = toIdentifier(columnName); - if (columnIdentifier.startsWith("\"") && columnIdentifier.endsWith("\"")) { - return columnName; - } - return super.resolveColumnName(field); - } - } diff --git a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java index 7190aceb..21ddbc52 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java @@ -183,14 +183,13 @@ public void testInsertModeInsertWithPrimaryKeyModeUpperCaseColumnNameWithoutQuot consume(createSimpleRecord2); DataSourceWithLetterCase dataSourceWithLetterCase = new DataSourceWithLetterCase(dataSource(), LetterCase.TABLE_DEFAULT, LOWER_CASE_STRICT, LOWER_CASE_STRICT); - // The case-insensitive conflict issue will cause getting zero rows, because the postgres always keep the origin column name as "NICK_NAME$", - // but the table assert will treat it as lower according the definition. - final String[] columnsToExclude = new String[]{ "nick_name$" }; - final TableAssert tableAssert = TestHelper.assertTable(dataSourceWithLetterCase, destinationTableName(createSimpleRecord1), null, columnsToExclude); - tableAssert.exists().hasNumberOfRows(2).hasNumberOfColumns(2); + + final TableAssert tableAssert = TestHelper.assertTable(dataSourceWithLetterCase, destinationTableName(createSimpleRecord1), null, null); + tableAssert.exists().hasNumberOfRows(2).hasNumberOfColumns(3); getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1, (byte) 2); getSink().assertColumnType(tableAssert, "name", ValueType.TEXT, "John Doe", "John Doe"); + getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$"); } private static Schema buildGeoTypeSchema(String type) {