Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-6958 Revisit the support for Postgres special character in column…
Browse files Browse the repository at this point in the history
… name
  • Loading branch information
mfvitale authored and Naros committed Oct 4, 2023
1 parent d1384c3 commit f6c5762
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public String getCreateTableStatement(SinkRecordDescriptor record, TableId table
// First handle key columns
builder.appendLists(", ", record.getKeyFieldNames(), record.getNonKeyFieldNames(), (name) -> {
final FieldDescriptor field = record.getFields().get(name);
final String columnName = toIdentifier(columnNamingStrategy.resolveColumnName(field.getColumnName()));
final String columnName = toIdentifier(resolveColumnName(field));

final String columnType = field.getTypeName();

Expand Down Expand Up @@ -633,7 +633,7 @@ protected void addColumnDefaultValue(FieldDescriptor field, StringBuilder column

protected String columnQueryBindingFromField(String fieldName, TableDescriptor table, SinkRecordDescriptor record) {
final FieldDescriptor field = record.getFields().get(fieldName);
final String columnName = resolveColumnNameFromField(field.getColumnName());
final String columnName = resolveColumnName(field);
final ColumnDescriptor column = table.getColumnByName(columnName);
if (column == null) {
throw new DebeziumException("Failed to find column " + columnName + " in table " + table.getId().getTableName());
Expand Down Expand Up @@ -680,8 +680,7 @@ private Object getColumnValueForKafkaKeyMode(String columnName, SinkRecordDescri

protected String columnNameFromField(String fieldName, SinkRecordDescriptor record) {
final FieldDescriptor field = record.getFields().get(fieldName);
final String columnName = getColumnNamingStrategy().resolveColumnName(field.getColumnName());
return getIdentifierHelper().toIdentifier(columnName, getConfig().isQuoteIdentifiers()).render(dialect);
return toIdentifier(resolveColumnName(field));
}

protected String columnNameFromField(String fieldName, String prefix, SinkRecordDescriptor record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.SinkRecordDescriptor;
import io.debezium.connector.jdbc.SinkRecordDescriptor.FieldDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.dialect.DatabaseDialectProvider;
import io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect;
Expand Down Expand Up @@ -196,15 +195,4 @@ protected String resolveColumnNameFromField(String fieldName) {
}
return columnName;
}

@Override
protected String resolveColumnName(FieldDescriptor field) {
final String columnName = getColumnNamingStrategy().resolveColumnName(field.getColumnName());
final String columnIdentifier = toIdentifier(columnName);
if (columnIdentifier.startsWith("\"") && columnIdentifier.endsWith("\"")) {
return columnName;
}
return super.resolveColumnName(field);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,13 @@ public void testInsertModeInsertWithPrimaryKeyModeUpperCaseColumnNameWithoutQuot
consume(createSimpleRecord2);

DataSourceWithLetterCase dataSourceWithLetterCase = new DataSourceWithLetterCase(dataSource(), LetterCase.TABLE_DEFAULT, LOWER_CASE_STRICT, LOWER_CASE_STRICT);
// The case-insensitive conflict issue will cause getting zero rows, because the postgres always keep the origin column name as "NICK_NAME$",
// but the table assert will treat it as lower according the definition.
final String[] columnsToExclude = new String[]{ "nick_name$" };
final TableAssert tableAssert = TestHelper.assertTable(dataSourceWithLetterCase, destinationTableName(createSimpleRecord1), null, columnsToExclude);
tableAssert.exists().hasNumberOfRows(2).hasNumberOfColumns(2);

final TableAssert tableAssert = TestHelper.assertTable(dataSourceWithLetterCase, destinationTableName(createSimpleRecord1), null, null);
tableAssert.exists().hasNumberOfRows(2).hasNumberOfColumns(3);

getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1, (byte) 2);
getSink().assertColumnType(tableAssert, "name", ValueType.TEXT, "John Doe", "John Doe");
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
}

private static Schema buildGeoTypeSchema(String type) {
Expand Down

0 comments on commit f6c5762

Please sign in to comment.