Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7938: support for multiple array columns of different types
Browse files Browse the repository at this point in the history
  • Loading branch information
michal-k-gl authored and Naros committed Jun 11, 2024
1 parent f0eee80 commit 67a1714
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,26 @@ public class ArrayType extends AbstractType {

public static final ArrayType INSTANCE = new ArrayType();

private String typeName;

@Override
public String[] getRegistrationKeys() {
return new String[]{ "ARRAY" };
}

@Override
public String getTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
return getElementTypeName(dialect, schema, key) + "[]";
}

private String getElementTypeName(DatabaseDialect dialect, Schema schema, boolean key) {
Type elementType = dialect.getSchemaType(schema.valueSchema());
typeName = elementType.getTypeName(dialect, schema.valueSchema(), key);
return typeName + "[]";
return elementType.getTypeName(dialect, schema.valueSchema(), key);
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
if (value == null) {
return Arrays.asList(new ValueBindDescriptor(index, null));
}
return List.of(new ValueBindDescriptor(index, value, java.sql.Types.ARRAY, typeName));
return List.of(new ValueBindDescriptor(index, value, java.sql.Types.ARRAY, getElementTypeName(this.getDialect(), schema, false)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand All @@ -25,6 +28,7 @@
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.data.Uuid;
import io.debezium.doc.FixFor;

/**
Expand Down Expand Up @@ -414,4 +418,40 @@ public void testShouldWorkWithBoolArray(SinkRecordFactory factory) throws Except
});
}

@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-7938")
public void testShouldWorkWithMultipleArraysWithDifferentTypes(SinkRecordFactory factory) throws Exception {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, JdbcSinkConnectorConfig.PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, JdbcSinkConnectorConfig.InsertMode.UPSERT.getValue());
startSinkConnector(properties);
assertSinkConnectorIsRunning();

final String tableName = randomTableName();
final String topicName = topicName("server2", "schema", tableName);
final List<UUID> uuids = List.of(UUID.randomUUID(), UUID.randomUUID());

final SinkRecord createRecord = factory.createRecordWithSchemaValue(
topicName,
(byte) 1,
List.of("text_data", "uuid_data"),
List.of(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(), SchemaBuilder.array(Uuid.schema()).optional().build()),
Arrays.asList(List.of("a", "b"), uuids.stream().map(UUID::toString).collect(Collectors.toList())));

final String destinationTable = destinationTableName(createRecord);
final String sql = "CREATE TABLE %s (id int not null, text_data text[], uuid_data uuid[], primary key(id))";
getSink().execute(String.format(sql, destinationTable));

consume(createRecord);

getSink().assertRows(destinationTable, rs -> {
assertThat(rs.getInt(1)).isEqualTo(1);
assertThat(rs.getArray(2).getArray()).isEqualTo(new String[]{ "a", "b" });
assertThat(rs.getArray(3).getArray()).isEqualTo(uuids.toArray());
return null;
});
}

}

0 comments on commit 67a1714

Please sign in to comment.