diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ArrayType.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ArrayType.java index ba785d62..e27be008 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ArrayType.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ArrayType.java @@ -25,8 +25,6 @@ public class ArrayType extends AbstractType { public static final ArrayType INSTANCE = new ArrayType(); - private String typeName; - @Override public String[] getRegistrationKeys() { return new String[]{ "ARRAY" }; @@ -34,9 +32,12 @@ public String[] getRegistrationKeys() { @Override public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) { + return getElementTypeName(dialect, schema, key) + "[]"; + } + + private String getElementTypeName(DatabaseDialect dialect, Schema schema, boolean key) { Type elementType = dialect.getSchemaType(schema.valueSchema()); - typeName = elementType.getTypeName(dialect, schema.valueSchema(), key); - return typeName + "[]"; + return elementType.getTypeName(dialect, schema.valueSchema(), key); } @Override @@ -44,6 +45,6 @@ public List bind(int index, Schema schema, Object value) { if (value == null) { return Arrays.asList(new ValueBindDescriptor(index, null)); } - return List.of(new ValueBindDescriptor(index, value, java.sql.Types.ARRAY, typeName)); + return List.of(new ValueBindDescriptor(index, value, java.sql.Types.ARRAY, getElementTypeName(this.getDialect(), schema, false))); } } diff --git a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java index a17a8a2c..8e980196 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkColumnTypeMappingIT.java @@ -9,7 +9,10 @@ import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -25,6 +28,7 @@ import io.debezium.connector.jdbc.junit.jupiter.Sink; import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider; import io.debezium.connector.jdbc.util.SinkRecordFactory; +import io.debezium.data.Uuid; import io.debezium.doc.FixFor; /** @@ -414,4 +418,40 @@ public void testShouldWorkWithBoolArray(SinkRecordFactory factory) throws Except }); } + @ParameterizedTest + @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class) + @FixFor("DBZ-7938") + public void testShouldWorkWithMultipleArraysWithDifferentTypes(SinkRecordFactory factory) throws Exception { + final Map properties = getDefaultSinkConfig(); + properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue()); + properties.put(JdbcSinkConnectorConfig.INSERT_MODE, JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue()); + startSinkConnector(properties); + assertSinkConnectorIsRunning(); + + final String tableName = randomTableName(); + final String topicName = topicName("server2", "schema", tableName); + final List uuids = List.of(UUID.randomUUID(), UUID.randomUUID()); + + final SinkRecord createRecord = factory.createRecordWithSchemaValue( + topicName, + (byte) 1, + List.of("text_data", "uuid_data"), + List.of(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), SchemaBuilder.array(Uuid.schema()).optional().build()), + Arrays.asList(List.of("a", "b"), uuids.stream().map(UUID::toString).collect(Collectors.toList()))); + + final String destinationTable = destinationTableName(createRecord); + final String sql = "CREATE TABLE %s (id int not null, text_data text[], uuid_data uuid[], primary key(id))"; + getSink().execute(String.format(sql, destinationTable)); + + consume(createRecord); + + getSink().assertRows(destinationTable, rs -> { + assertThat(rs.getInt(1)).isEqualTo(1); + assertThat(rs.getArray(2).getArray()).isEqualTo(new String[]{ "a", "b" }); + assertThat(rs.getArray(3).getArray()).isEqualTo(uuids.toArray()); + return null; + }); + } + }