Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7920 Add support for 'infinity' values for PostgreSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Jun 26, 2024
1 parent 60c18c0 commit f70caf1
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ protected void registerTypes() {
super.registerTypes();

registerType(TimeWithTimezoneType.INSTANCE);
registerType(ZonedTimestampType.INSTANCE);
registerType(IntervalType.INSTANCE);
registerType(SerialType.INSTANCE);
registerType(BitType.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.dialect.postgres;

import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;

/**
* An implementation of {@link Type} for {@link ZonedTimestamp} values specific to PostgreSQL.
*
* @author Mario Fiore Vitale
*/
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
public static final List<String> POSITIVE_INFINITY = List.of("infinity", "+infinity");
public static final String NEGATIVE_INFINITY = "-infinity";

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {

if (POSITIVE_INFINITY.contains(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
}

return super.getQueryBinding(column, schema, value);
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {

final ZonedDateTime zdt;

if (POSITIVE_INFINITY.contains(value)) {
return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY.get(0), Types.VARCHAR));
}

if (NEGATIVE_INFINITY.equals(value)) {
return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR));
}

zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType()));
}

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package io.debezium.connector.jdbc.integration.postgres;

import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
Expand Down Expand Up @@ -192,6 +194,46 @@ public void testInsertModeInsertWithPrimaryKeyModeUpperCaseColumnNameWithoutQuot
getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$");
}

@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-7920")
public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws SQLException {

final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_VALUE.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS, "id");
properties.put(JdbcSinkConnectorConfig.INSERT_MODE, InsertMode.INSERT.getValue());

startSinkConnector(properties);
assertSinkConnectorIsRunning();

final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);

Schema zonedTimestampSchema = SchemaBuilder.string()
.name("io.debezium.time.ZonedTimestamp")
.build();

Schema rangeSchema = SchemaBuilder.string().build();

final SinkRecord createInfinityRecord = factory.createRecordWithSchemaValue(topicName, (byte) 1,
List.of("timestamp_infinity-", "timestamp_infinity+", "range_with_infinity"),
List.of(zonedTimestampSchema, zonedTimestampSchema, rangeSchema),
Arrays.asList(new Object[]{ "-infinity", "+infinity", "[2010-01-01 14:30, +infinity)" }));
consume(createInfinityRecord);

final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createInfinityRecord));
tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(4);

getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1);

getSink().assertColumnType(tableAssert, "timestamp_infinity-", Timestamp.class, Timestamp.valueOf(LocalDateTime.of(292269055, 12, 3, 0, 0, 0)));
getSink().assertColumnType(tableAssert, "timestamp_infinity+", Timestamp.class, Timestamp.valueOf(LocalDateTime.of(292278994, 8, 17, 0, 0, 0)));
getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, +infinity)");

}

private static Schema buildGeoTypeSchema(String type) {

SchemaBuilder schemaBuilder = SchemaBuilder.struct()
Expand Down

0 comments on commit f70caf1

Please sign in to comment.