diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/BytesType.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/BytesType.java new file mode 100644 index 00000000..64b5a776 --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/BytesType.java @@ -0,0 +1,60 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.dialect.postgres; + +import java.nio.ByteBuffer; +import java.sql.Types; + +import org.apache.kafka.connect.data.Schema; +import org.hibernate.engine.jdbc.Size; +import org.hibernate.query.Query; + +import io.debezium.connector.jdbc.dialect.DatabaseDialect; +import io.debezium.connector.jdbc.type.AbstractType; +import io.debezium.connector.jdbc.type.Type; +import io.debezium.connector.jdbc.util.ByteArrayUtils; + +/** + * An implementation of {@link Type} for {@code BYTES} column types. + * + * @author Bertrand Paquet + */ +class BytesType extends AbstractType { + + public static final BytesType INSTANCE = new BytesType(); + + @Override + public String[] getRegistrationKeys() { + return new String[]{ "BYTES" }; + } + + @Override + public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) { + return String.format(dialect.getByteArrayFormat(), ByteArrayUtils.getByteArrayAsHex(value)); + } + + @Override + public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) { + final int columnSize = Integer.parseInt(getSourceColumnSize(schema).orElse("0")); + if (columnSize > 0) { + return dialect.getTypeName(Types.VARBINARY, Size.length(columnSize)); + } + else if (key) { + return dialect.getTypeName(Types.VARBINARY, Size.length(dialect.getMaxVarbinaryLength())); + } + return dialect.getTypeName(Types.VARBINARY); + } + + @Override + public int bind(Query query, int index, Schema schema, Object value) { + if (value instanceof ByteBuffer) { + value = ((ByteBuffer) value).array(); + } + query.setParameter(index, value); + + return 1; + } +} diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java index 5b1fe879..1c09be7e 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java @@ -154,6 +154,7 @@ protected void registerTypes() { registerType(IntervalType.INSTANCE); registerType(SerialType.INSTANCE); registerType(BitType.INSTANCE); + registerType(BytesType.INSTANCE); registerType(JsonType.INSTANCE); registerType(UuidType.INSTANCE); registerType(EnumType.INSTANCE); diff --git a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java index 0e02cdc2..9012dd4c 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java @@ -5,6 +5,9 @@ */ package io.debezium.connector.jdbc.integration.postgres; +import static org.fest.assertions.Assertions.assertThat; + +import java.nio.ByteBuffer; import java.util.Map; import org.apache.kafka.connect.data.Schema; @@ -96,4 +99,44 @@ private void shouldCoerceStringTypeToColumnType(SinkRecordFactory factory, Strin getSink().assertColumn(destinationTable, "data", columnType); } + + @ParameterizedTest + @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class) + @FixFor("DBZ-6967") + public void testShouldCoerceNioByteBufferTypeToByteArrayColumnType(SinkRecordFactory factory) throws Exception { + final Map properties = getDefaultSinkConfig(); + properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue()); + properties.put(JdbcSinkConnectorConfig.INSERT_MODE, JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue()); + startSinkConnector(properties); + assertSinkConnectorIsRunning(); + + final String tableName = randomTableName(); + final String topicName = topicName("server2", "schema", tableName); + + ByteBuffer buffer = ByteBuffer.allocate(3); + buffer.put((byte) 1); + buffer.put((byte) 2); + buffer.put((byte) 3); + + final SinkRecord createRecord = factory.createRecordWithSchemaValue( + topicName, + (byte) 1, + "data", + Schema.OPTIONAL_BYTES_SCHEMA, + buffer); + + final String destinationTable = destinationTableName(createRecord); + final String sql = "CREATE TABLE %s (id int not null, data bytea, primary key(id))"; + getSink().execute(String.format(sql, destinationTable)); + + consume(createRecord); + + getSink().assertRows(destinationTable, rs -> { + assertThat(rs.getInt(1)).isEqualTo(1); + assertThat(rs.getBytes(2)).isEqualTo(new byte[]{ 1, 2, 3 }); + return null; + }); + } + }