diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java index d7aa6903..c442986f 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java @@ -165,6 +165,7 @@ protected void registerTypes() { super.registerTypes(); registerType(TimeWithTimezoneType.INSTANCE); + registerType(ZonedTimestampType.INSTANCE); registerType(IntervalType.INSTANCE); registerType(SerialType.INSTANCE); registerType(BitType.INSTANCE); diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ZonedTimestampType.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ZonedTimestampType.java new file mode 100644 index 00000000..63d8f76d --- /dev/null +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ZonedTimestampType.java @@ -0,0 +1,67 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.jdbc.dialect.postgres; + +import java.sql.Types; +import java.time.ZonedDateTime; +import java.util.List; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.ConnectException; + +import io.debezium.connector.jdbc.ValueBindDescriptor; +import io.debezium.connector.jdbc.relational.ColumnDescriptor; +import io.debezium.connector.jdbc.type.Type; +import io.debezium.time.ZonedTimestamp; + +/** + * An implementation of {@link Type} for {@link ZonedTimestamp} values specific to PostgreSQL. + * + * @author Mario Fiore Vitale + */ +public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType { + + public static final ZonedTimestampType INSTANCE = new ZonedTimestampType(); + public static final List POSITIVE_INFINITY = List.of("infinity", "+infinity"); + public static final String NEGATIVE_INFINITY = "-infinity"; + + @Override + public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) { + + if (POSITIVE_INFINITY.contains(value) || NEGATIVE_INFINITY.equals(value)) { + return "cast(? as timestamptz)"; + } + + return super.getQueryBinding(column, schema, value); + } + + @Override + public List bind(int index, Schema schema, Object value) { + + if (value == null) { + return List.of(new ValueBindDescriptor(index, null)); + } + if (value instanceof String) { + + final ZonedDateTime zdt; + + if (POSITIVE_INFINITY.contains(value)) { + return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY.get(0), Types.VARCHAR)); + } + + if (NEGATIVE_INFINITY.equals(value)) { + return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR)); + } + + zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + + return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType())); + } + + throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), + value, value.getClass().getName())); + } +} diff --git a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java index 21ddbc52..ebac16ac 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java @@ -6,6 +6,8 @@ package io.debezium.connector.jdbc.integration.postgres; import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.LocalDateTime; import java.util.Arrays; import java.util.Base64; import java.util.List; @@ -192,6 +194,46 @@ public void testInsertModeInsertWithPrimaryKeyModeUpperCaseColumnNameWithoutQuot getSink().assertColumnType(tableAssert, "nick_name$", ValueType.TEXT, "John Doe$", "John Doe$"); } + @ParameterizedTest + @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class) + @FixFor("DBZ-7920") + public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws SQLException { + + final Map properties = getDefaultSinkConfig(); + properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_VALUE.getValue()); + properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS, "id"); + properties.put(JdbcSinkConnectorConfig.INSERT_MODE, InsertMode.INSERT.getValue()); + + startSinkConnector(properties); + assertSinkConnectorIsRunning(); + + final String tableName = randomTableName(); + final String topicName = topicName("server1", "schema", tableName); + + Schema zonedTimestampSchema = SchemaBuilder.string() + .name("io.debezium.time.ZonedTimestamp") + .build(); + + Schema rangeSchema = SchemaBuilder.string().build(); + + final SinkRecord createInfinityRecord = factory.createRecordWithSchemaValue(topicName, (byte) 1, + List.of("timestamp_infinity-", "timestamp_infinity+", "range_with_infinity"), + List.of(zonedTimestampSchema, zonedTimestampSchema, rangeSchema), + Arrays.asList(new Object[]{ "-infinity", "+infinity", "[2010-01-01 14:30, +infinity)" })); + consume(createInfinityRecord); + + final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createInfinityRecord)); + tableAssert.exists().hasNumberOfRows(1).hasNumberOfColumns(4); + + getSink().assertColumnType(tableAssert, "id", ValueType.NUMBER, (byte) 1); + + getSink().assertColumnType(tableAssert, "timestamp_infinity-", Timestamp.class, Timestamp.valueOf(LocalDateTime.of(292269055, 12, 3, 0, 0, 0))); + getSink().assertColumnType(tableAssert, "timestamp_infinity+", Timestamp.class, Timestamp.valueOf(LocalDateTime.of(292278994, 8, 17, 0, 0, 0))); + getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, +infinity)"); + + } + private static Schema buildGeoTypeSchema(String type) { SchemaBuilder schemaBuilder = SchemaBuilder.struct()