Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-6967: Handle bytea target field with Postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
bpaquet authored and mfvitale committed Oct 4, 2023
1 parent ea21647 commit 75bddb0
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.dialect.postgres;

import java.nio.ByteBuffer;
import java.sql.Types;

import org.apache.kafka.connect.data.Schema;
import org.hibernate.engine.jdbc.Size;
import org.hibernate.query.Query;

import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.type.AbstractType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.util.ByteArrayUtils;

/**
* An implementation of {@link Type} for {@code BYTES} column types.
*
* @author Bertrand Paquet
*/
class BytesType extends AbstractType {

public static final BytesType INSTANCE = new BytesType();

@Override
public String[] getRegistrationKeys() {
return new String[]{ "BYTES" };
}

@Override
public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) {
return String.format(dialect.getByteArrayFormat(), ByteArrayUtils.getByteArrayAsHex(value));
}

@Override
public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
final int columnSize = Integer.parseInt(getSourceColumnSize(schema).orElse("0"));
if (columnSize > 0) {
return dialect.getTypeName(Types.VARBINARY, Size.length(columnSize));
}
else if (key) {
return dialect.getTypeName(Types.VARBINARY, Size.length(dialect.getMaxVarbinaryLength()));
}
return dialect.getTypeName(Types.VARBINARY);
}

@Override
public int bind(Query<?> query, int index, Schema schema, Object value) {
if (value instanceof ByteBuffer) {
value = ((ByteBuffer) value).array();
}
query.setParameter(index, value);

return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ protected void registerTypes() {
registerType(IntervalType.INSTANCE);
registerType(SerialType.INSTANCE);
registerType(BitType.INSTANCE);
registerType(BytesType.INSTANCE);
registerType(JsonType.INSTANCE);
registerType(UuidType.INSTANCE);
registerType(EnumType.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package io.debezium.connector.jdbc.integration.postgres;

import static org.fest.assertions.Assertions.assertThat;

import java.nio.ByteBuffer;
import java.util.Map;

import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -96,4 +99,44 @@ private void shouldCoerceStringTypeToColumnType(SinkRecordFactory factory, Strin

getSink().assertColumn(destinationTable, "data", columnType);
}

@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-6967")
public void testShouldCoerceNioByteBufferTypeToByteArrayColumnType(SinkRecordFactory factory) throws Exception {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
startSinkConnector(properties);
assertSinkConnectorIsRunning();

final String tableName = randomTableName();
final String topicName = topicName("server2", "schema", tableName);

ByteBuffer buffer = ByteBuffer.allocate(3);
buffer.put((byte) 1);
buffer.put((byte) 2);
buffer.put((byte) 3);

final SinkRecord createRecord = factory.createRecordWithSchemaValue(
topicName,
(byte) 1,
"data",
Schema.OPTIONAL_BYTES_SCHEMA,
buffer);

final String destinationTable = destinationTableName(createRecord);
final String sql = "CREATE TABLE %s (id int not null, data bytea, primary key(id))";
getSink().execute(String.format(sql, destinationTable));

consume(createRecord);

getSink().assertRows(destinationTable, rs -> {
assertThat(rs.getInt(1)).isEqualTo(1);
assertThat(rs.getBytes(2)).isEqualTo(new byte[]{ 1, 2, 3 });
return null;
});
}

}

0 comments on commit 75bddb0

Please sign in to comment.