From dc4ff837b1c5087a1aaf3930deaf7815a8a7b14f Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Tue, 25 Jun 2024 12:10:42 -0400 Subject: [PATCH] DBZ-7969 Fix test failures --- .../jdbc/JdbcSinkConnectorConfig.java | 56 +++++++++++++++++-- .../jdbc/dialect/GeneralDatabaseDialect.java | 28 +++++++++- 2 files changed, 79 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java b/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java index a4c89ffe..070c8ec3 100644 --- a/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java +++ b/src/main/java/io/debezium/connector/jdbc/JdbcSinkConnectorConfig.java @@ -70,6 +70,7 @@ public class JdbcSinkConnectorConfig { public static final String FIELD_INCLUDE_LIST = "field.include.list"; public static final String FIELD_EXCLUDE_LIST = "field.exclude.list"; public static final String USE_REDUCTION_BUFFER = "use.reduction.buffer"; + public static final String COLUMN_TYPE_RESOLUTION_MODE = "column.type.resolution.mode"; // todo add support for the ValueConverter contract @@ -319,6 +320,16 @@ public class JdbcSinkConnectorConfig { .withDescription( "A reduction buffer consolidates the execution of SQL statements by primary key to reduce the SQL load on the target database. When set to false (the default), each incoming event is applied as a logical SQL change. When set to true, incoming events that refer to the same row will be reduced to a single logical change based on the most recent row state."); + public static final Field COLUMN_TYPE_RESOLUTION_MODE_FIELD = Field.create(COLUMN_TYPE_RESOLUTION_MODE) + .withDisplayName("Specifies how a column's type is resolved") + .withEnum(ColumnTypeResolutionMode.class, ColumnTypeResolutionMode.LEGACY) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 2)) + .withWidth(ConfigDef.Width.SHORT) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDescription("Controls how the column type is resolved when no length is specified in the event metadata. " + + "legacy: Uses the legacy behavior where non-length character and binary types prefer CLOB, TEXT, LONGTEXT, BLOB types, " + + "improved: Uses an improved behavior to prefer character-based texts like VARCHAR but with the dialect's maximum length instead of CLOB, TEXT, or LONGTEXT types."); + protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor() .connector( CONNECTION_URL_FIELD, @@ -344,7 +355,8 @@ public class JdbcSinkConnectorConfig { SQLSERVER_IDENTITY_INSERT_FIELD, BATCH_SIZE_FIELD, FIELD_INCLUDE_LIST_FIELD, - FIELD_EXCLUDE_LIST_FIELD) + FIELD_EXCLUDE_LIST_FIELD, + COLUMN_TYPE_RESOLUTION_MODE_FIELD) .create(); /** @@ -497,7 +509,39 @@ public static SchemaEvolutionMode parse(String value) { public String getValue() { return mode; } + } + + public enum ColumnTypeResolutionMode implements EnumeratedValue { + /** + * The legacy column type resolution that is based on Debezium 2.x behavior. + */ + LEGACY("legacy"), + + /** + * An improved mode that favors using maximum length character-based data types over the + * use of large object types such as CLOB, TEXT, LONGTEXT, etc. + */ + IMPROVED("improved"); + + private String mode; + + ColumnTypeResolutionMode(String mode) { + this.mode = mode; + } + public static ColumnTypeResolutionMode parse(String value) { + for (ColumnTypeResolutionMode option : ColumnTypeResolutionMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + return ColumnTypeResolutionMode.LEGACY; + } + + @Override + public String getValue() { + return mode; + } } private final Configuration config; @@ -516,11 +560,10 @@ public String getValue() { private final String databaseTimezone; private final String postgresPostgisSchema; private final boolean sqlServerIdentityInsert; - private FieldNameFilter fieldsFilter; - private final long batchSize; - private final boolean useReductionBuffer; + private final ColumnTypeResolutionMode columnTypeResolutionMode; + private FieldNameFilter fieldsFilter; public JdbcSinkConnectorConfig(Map props) { config = Configuration.from(props); @@ -540,6 +583,7 @@ public JdbcSinkConnectorConfig(Map props) { this.sqlServerIdentityInsert = config.getBoolean(SQLSERVER_IDENTITY_INSERT_FIELD); this.batchSize = config.getLong(BATCH_SIZE_FIELD); this.useReductionBuffer = config.getBoolean(USE_REDUCTION_BUFFER_FIELD); + this.columnTypeResolutionMode = ColumnTypeResolutionMode.parse(config.getString(COLUMN_TYPE_RESOLUTION_MODE_FIELD)); String fieldExcludeList = config.getString(FIELD_EXCLUDE_LIST); String fieldIncludeList = config.getString(FIELD_INCLUDE_LIST); @@ -641,6 +685,10 @@ public String getPostgresPostgisSchema() { return postgresPostgisSchema; } + public ColumnTypeResolutionMode getColumnTypeResolutionMode() { + return columnTypeResolutionMode; + } + /** makes {@link org.hibernate.cfg.Configuration} from connector config * * @return {@link org.hibernate.cfg.Configuration} diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index 66bf9a6c..9b70fd5c 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -5,11 +5,14 @@ */ package io.debezium.connector.jdbc.dialect; +import static io.debezium.connector.jdbc.JdbcSinkConnectorConfig.ColumnTypeResolutionMode; + import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Types; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; @@ -503,7 +506,30 @@ public boolean isNegativeScaleAllowed() { @Override public String getTypeName(int jdbcType) { - return ddlTypeRegistry.getTypeName(jdbcType, dialect); + // To remain consistent with Debezium 2.x releases, the behavior with how column types were + // resolved changed in Hibernate 6.3 to align more closely with JPA. This creates an issue + // for us as we were relying on Hibernate for column type resolution, and now column types + // are being resolved differently. This code aims to retain the Debezium 2.x resolution + // functionality. + if (ColumnTypeResolutionMode.LEGACY.equals(connectorConfig.getColumnTypeResolutionMode())) { + switch (jdbcType) { + case Types.VARCHAR: + case Types.NVARCHAR: + return getTypeName(Types.LONGVARCHAR); + default: + return ddlTypeRegistry.getTypeName(jdbcType, dialect); + } + } + else { + switch (jdbcType) { + case Types.VARCHAR: + return getTypeName(jdbcType, dialect.getMaxVarcharLength()); + case Types.NVARCHAR: + return getTypeName(jdbcType, dialect.getMaxNVarcharLength()); + default: + return ddlTypeRegistry.getTypeName(jdbcType, dialect); + } + } } @Override