diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java index 066956fc..4592571d 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/DatabaseDialect.java @@ -360,6 +360,20 @@ default String getTimeQueryBinding() { */ String getTypeName(int jdbcType, Size size); + /** + * +Infinity value for a timestamp. + * + * @return the +infinity representation for timestamp. + */ + String getTimestampPositiveInfinityValue(); + + /** + * -Infinity value for a timestamp. + * + * @return the -infinity representation for timestamp. + */ + String getTimestampNegativeInfinityValue(); + /** * Bind the specified value to the query. * diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java index be845f53..a029e57e 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/GeneralDatabaseDialect.java @@ -10,6 +10,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; +import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; @@ -511,6 +513,16 @@ public String getTypeName(int jdbcType, Size size) { return ddlTypeRegistry.getTypeName(jdbcType, size); } + @Override + public String getTimestampPositiveInfinityValue() { + return Timestamp.from(Instant.MAX).toString(); + } + + @Override + public String getTimestampNegativeInfinityValue() { + return Timestamp.from(Instant.MIN).toString(); + } + @Override public String getByteArrayFormat() { return "x'%s'"; diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java index 2c4f5633..15c1400d 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/MySqlDatabaseDialect.java @@ -141,6 +141,16 @@ public String getFormattedTimestampWithTimeZone(String value) { return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime)); } + @Override + public String getTimestampPositiveInfinityValue() { + return "2038-01-19T03:14:07+00:00"; + } + + @Override + public String getTimestampNegativeInfinityValue() { + return "1970-01-01T00:00:01+00:00"; + } + @Override public String getAlterTablePrefix() { return "ADD COLUMN ("; diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/ZonedTimestampType.java b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/ZonedTimestampType.java index 094149e6..a40a0304 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/mysql/ZonedTimestampType.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/mysql/ZonedTimestampType.java @@ -6,13 +6,10 @@ package io.debezium.connector.jdbc.dialect.mysql; import java.sql.Types; -import java.time.ZonedDateTime; -import java.util.List; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; -import io.debezium.connector.jdbc.ValueBindDescriptor; +import io.debezium.connector.jdbc.relational.ColumnDescriptor; import io.debezium.connector.jdbc.type.Type; import io.debezium.time.ZonedTimestamp; @@ -26,19 +23,13 @@ public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium public static final ZonedTimestampType INSTANCE = new ZonedTimestampType(); @Override - public List bind(int index, Schema schema, Object value) { + public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) { - if (value == null) { - return List.of(new ValueBindDescriptor(index, null)); - } - if (value instanceof String) { - - final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); - - return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), Types.TIMESTAMP)); // TIMESTAMP_WITH_TIMEZONE not supported - } + return getDialect().getQueryBindingWithValueCast(column, schema, this); + } - throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), - value, value.getClass().getName())); + @Override + protected int getJdbcBindType() { + return Types.TIMESTAMP; // TIMESTAMP_WITH_TIMEZONE not supported } } diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java index 6fad1359..32e08346 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/PostgresDatabaseDialect.java @@ -5,6 +5,9 @@ */ package io.debezium.connector.jdbc.dialect.postgres; +import static io.debezium.connector.jdbc.type.debezium.ZonedTimestampType.NEGATIVE_INFINITY; +import static io.debezium.connector.jdbc.type.debezium.ZonedTimestampType.POSITIVE_INFINITY; + import java.sql.Connection; import java.sql.SQLException; import java.time.format.DateTimeFormatter; @@ -211,4 +214,14 @@ protected String resolveColumnNameFromField(String fieldName) { } return columnName; } + + @Override + public String getTimestampPositiveInfinityValue() { + return POSITIVE_INFINITY; + } + + @Override + public String getTimestampNegativeInfinityValue() { + return NEGATIVE_INFINITY; + } } diff --git a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ZonedTimestampType.java b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ZonedTimestampType.java index 63d8f76d..6385588d 100644 --- a/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ZonedTimestampType.java +++ b/src/main/java/io/debezium/connector/jdbc/dialect/postgres/ZonedTimestampType.java @@ -6,14 +6,9 @@ package io.debezium.connector.jdbc.dialect.postgres; import java.sql.Types; -import java.time.ZonedDateTime; import java.util.List; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; - import io.debezium.connector.jdbc.ValueBindDescriptor; -import io.debezium.connector.jdbc.relational.ColumnDescriptor; import io.debezium.connector.jdbc.type.Type; import io.debezium.time.ZonedTimestamp; @@ -25,43 +20,15 @@ public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType { public static final ZonedTimestampType INSTANCE = new ZonedTimestampType(); - public static final List POSITIVE_INFINITY = List.of("infinity", "+infinity"); - public static final String NEGATIVE_INFINITY = "-infinity"; @Override - public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) { + protected List infinityTimestampValue(int index, Object value) { - if (POSITIVE_INFINITY.contains(value) || NEGATIVE_INFINITY.equals(value)) { - return "cast(? as timestamptz)"; + if (POSITIVE_INFINITY.equals(value)) { + return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY, Types.VARCHAR)); } - - return super.getQueryBinding(column, schema, value); - } - - @Override - public List bind(int index, Schema schema, Object value) { - - if (value == null) { - return List.of(new ValueBindDescriptor(index, null)); + else { + return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR)); } - if (value instanceof String) { - - final ZonedDateTime zdt; - - if (POSITIVE_INFINITY.contains(value)) { - return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY.get(0), Types.VARCHAR)); - } - - if (NEGATIVE_INFINITY.equals(value)) { - return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR)); - } - - zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); - - return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType())); - } - - throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), - value, value.getClass().getName())); } } diff --git a/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java b/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java index 0061e737..9759d231 100644 --- a/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java +++ b/src/main/java/io/debezium/connector/jdbc/type/debezium/ZonedTimestampType.java @@ -14,6 +14,7 @@ import io.debezium.connector.jdbc.ValueBindDescriptor; import io.debezium.connector.jdbc.dialect.DatabaseDialect; +import io.debezium.connector.jdbc.relational.ColumnDescriptor; import io.debezium.connector.jdbc.type.AbstractTimestampType; import io.debezium.connector.jdbc.type.Type; import io.debezium.time.ZonedTimestamp; @@ -26,6 +27,8 @@ public class ZonedTimestampType extends AbstractTimestampType { public static final ZonedTimestampType INSTANCE = new ZonedTimestampType(); + public static final String POSITIVE_INFINITY = "infinity"; + public static final String NEGATIVE_INFINITY = "-infinity"; @Override public String[] getRegistrationKeys() { @@ -37,6 +40,16 @@ public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Obj return dialect.getFormattedTimestampWithTimeZone((String) value); } + @Override + public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) { + + if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) { + return "cast(? as timestamptz)"; + } + + return super.getQueryBinding(column, schema, value); + } + @Override public List bind(int index, Schema schema, Object value) { @@ -45,15 +58,44 @@ public List bind(int index, Schema schema, Object value) { } if (value instanceof String) { - final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) { + return infinityTimestampValue(index, value); + } - return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType())); + return normalTimestampValue(index, value); } throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(), value, value.getClass().getName())); } + protected List infinityTimestampValue(int index, Object value) { + final ZonedDateTime zdt; + + if (POSITIVE_INFINITY.equals(value)) { + zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER) + .withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + } + else { + zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER) + .withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + } + + return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcBindType())); + } + + protected List normalTimestampValue(int index, Object value) { + + final ZonedDateTime zdt; + zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId()); + + return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcBindType())); + } + + protected int getJdbcBindType() { + return getJdbcType(); + } + @Override protected int getJdbcType() { return Types.TIMESTAMP_WITH_TIMEZONE; diff --git a/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java b/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java index 6421588b..c5004780 100644 --- a/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java +++ b/src/test/java/io/debezium/connector/jdbc/e2e/AbstractJdbcSinkPipelineIT.java @@ -2582,6 +2582,35 @@ else if (sink.getType().is(SinkType.MYSQL)) { ResultSet::getString); } + @TestTemplate + @ForSource(value = { SourceType.POSTGRES }, reason = "The infinity value is valid only for PostgreSQL") + @WithTemporalPrecisionMode + public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Sink sink) throws Exception { + + final List values = List.of("'-infinity'", "'infinity'"); + + List expectedValues = List.of(); + if (sink.getType().is(SinkType.POSTGRES)) { + + // expectedValues = values; + } + else if (sink.getType().is(SinkType.MYSQL)) { + expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC), + ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC)); + } + + assertDataTypesNonKeyOnly(source, + sink, + List.of("timestamptz", "timestamptz"), + values, + expectedValues, + (record) -> { + assertColumn(sink, record, "data0", getTimestampWithTimezoneType(source, false, 6)); + assertColumn(sink, record, "data1", getTimestampWithTimezoneType(source, false, 6)); + }, + (rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC)); + } + // todo: remaining data types need tests and/or type system mapping support // GEOMETRY (MySql/PostgreSQL) // LINESTRING (MySQL) diff --git a/src/test/java/io/debezium/connector/jdbc/e2e/JdbcSinkPipelineToPostgresIT.java b/src/test/java/io/debezium/connector/jdbc/e2e/JdbcSinkPipelineToPostgresIT.java index d28db2f5..afe945eb 100644 --- a/src/test/java/io/debezium/connector/jdbc/e2e/JdbcSinkPipelineToPostgresIT.java +++ b/src/test/java/io/debezium/connector/jdbc/e2e/JdbcSinkPipelineToPostgresIT.java @@ -5,13 +5,24 @@ */ package io.debezium.connector.jdbc.e2e; +import io.debezium.connector.jdbc.junit.jupiter.Sink; +import io.debezium.connector.jdbc.junit.jupiter.SinkType; +import io.debezium.connector.jdbc.junit.jupiter.e2e.ForSource; +import io.debezium.connector.jdbc.junit.jupiter.e2e.WithTemporalPrecisionMode; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; import io.debezium.connector.jdbc.junit.jupiter.PostgresSinkDatabaseContextProvider; import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source; import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceType; +import java.sql.ResultSet; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.stream.Collectors; + /** * Implementation of the JDBC sink connector multi-source pipeline that writes to PostgreSQL. * @@ -184,4 +195,28 @@ protected String getTimestampWithTimezoneType(Source source, boolean key, int pr protected String getIntervalType(Source source, boolean numeric) { return "INTERVAL"; } + + @TestTemplate + @ForSource(value = { SourceType.POSTGRES }, reason = "The infinity value is valid only for PostgreSQL") + @WithTemporalPrecisionMode + @Override + public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Sink sink) throws Exception { + + final List values = List.of("'-infinity'", "'infinity'"); + + List expectedValues = values.stream() + .map(s -> s.replace("'", "")) + .collect(Collectors.toList()); + + assertDataTypesNonKeyOnly(source, + sink, + List.of("timestamptz", "timestamptz"), + values, + expectedValues, + (record) -> { + assertColumn(sink, record, "data0", getTimestampWithTimezoneType(source, false, 6)); + assertColumn(sink, record, "data1", getTimestampWithTimezoneType(source, false, 6)); + }, + ResultSet::getString); + } } diff --git a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java index 3c6b4700..fbe45ef0 100644 --- a/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java +++ b/src/test/java/io/debezium/connector/jdbc/integration/postgres/JdbcSinkInsertModeIT.java @@ -220,7 +220,7 @@ public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws final SinkRecord createInfinityRecord = factory.createRecordWithSchemaValue(topicName, (byte) 1, List.of("timestamp_infinity-", "timestamp_infinity+", "range_with_infinity"), List.of(zonedTimestampSchema, zonedTimestampSchema, rangeSchema), - Arrays.asList(new Object[]{ "-infinity", "+infinity", "[2010-01-01 14:30, +infinity)" })); + Arrays.asList(new Object[]{ "-infinity", "infinity", "[2010-01-01 14:30, infinity)" })); consume(createInfinityRecord); final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createInfinityRecord)); @@ -230,7 +230,7 @@ public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws getSink().assertColumnType(tableAssert, "timestamp_infinity-", Timestamp.class, new Timestamp(PGStatement.DATE_NEGATIVE_INFINITY)); getSink().assertColumnType(tableAssert, "timestamp_infinity+", Timestamp.class, new Timestamp(PGStatement.DATE_POSITIVE_INFINITY)); - getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, +infinity)"); + getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, infinity)"); }