Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7920 Add support for timestamp infinity values for MySQL
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Jun 26, 2024
1 parent a5ea60c commit 43947ec
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ default String getTimeQueryBinding() {
*/
String getTypeName(int jdbcType, Size size);

/**
* +Infinity value for a timestamp.
*
* @return the +infinity representation for timestamp.
*/
String getTimestampPositiveInfinityValue();

/**
* -Infinity value for a timestamp.
*
* @return the -infinity representation for timestamp.
*/
String getTimestampNegativeInfinityValue();

/**
* Bind the specified value to the query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.sql.Types;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -526,6 +528,16 @@ public String getTypeName(int jdbcType, Size size) {
return ddlTypeRegistry.getTypeName(jdbcType, size);
}

@Override
public String getTimestampPositiveInfinityValue() {
return Timestamp.from(Instant.MAX).toString();
}

@Override
public String getTimestampNegativeInfinityValue() {
return Timestamp.from(Instant.MIN).toString();
}

@Override
public String getByteArrayFormat() {
return "x'%s'";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ public String getFormattedTimestampWithTimeZone(String value) {
return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime));
}

@Override
public String getTimestampPositiveInfinityValue() {
return "2038-01-19T03:14:07+00:00";
}

@Override
public String getTimestampNegativeInfinityValue() {
return "1970-01-01T00:00:01+00:00";
}

@Override
public String getAlterTablePrefix() {
return "ADD COLUMN (";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
package io.debezium.connector.jdbc.dialect.mysql;

import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;

Expand All @@ -26,19 +23,13 @@ public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium
public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {

if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), Types.TIMESTAMP)); // TIMESTAMP_WITH_TIMEZONE not supported
}
return getDialect().getQueryBindingWithValueCast(column, schema, this);
}

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
@Override
protected int getJdbcBindType() {
return Types.TIMESTAMP; // TIMESTAMP_WITH_TIMEZONE not supported
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package io.debezium.connector.jdbc.dialect.postgres;

import static io.debezium.connector.jdbc.type.debezium.ZonedTimestampType.NEGATIVE_INFINITY;
import static io.debezium.connector.jdbc.type.debezium.ZonedTimestampType.POSITIVE_INFINITY;

import java.sql.Connection;
import java.sql.SQLException;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -212,4 +215,14 @@ protected String resolveColumnNameFromField(String fieldName) {
}
return columnName;
}

@Override
public String getTimestampPositiveInfinityValue() {
return POSITIVE_INFINITY;
}

@Override
public String getTimestampNegativeInfinityValue() {
return NEGATIVE_INFINITY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@
package io.debezium.connector.jdbc.dialect.postgres;

import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;

Expand All @@ -25,43 +20,15 @@
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
public static final List<String> POSITIVE_INFINITY = List.of("infinity", "+infinity");
public static final String NEGATIVE_INFINITY = "-infinity";

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {

if (POSITIVE_INFINITY.contains(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
if (POSITIVE_INFINITY.equals(value)) {
return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY, Types.VARCHAR));
}

return super.getQueryBinding(column, schema, value);
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
else {
return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR));
}
if (value instanceof String) {

final ZonedDateTime zdt;

if (POSITIVE_INFINITY.contains(value)) {
return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY.get(0), Types.VARCHAR));
}

if (NEGATIVE_INFINITY.equals(value)) {
return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR));
}

zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType()));
}

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractTimestampType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;
Expand All @@ -26,6 +27,8 @@
public class ZonedTimestampType extends AbstractTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();
public static final String POSITIVE_INFINITY = "infinity";
public static final String NEGATIVE_INFINITY = "-infinity";

@Override
public String[] getRegistrationKeys() {
Expand All @@ -37,6 +40,16 @@ public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Obj
return dialect.getFormattedTimestampWithTimeZone((String) value);
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {

if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
}

return super.getQueryBinding(column, schema, value);
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

Expand All @@ -45,15 +58,44 @@ public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {
}
if (value instanceof String) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());
if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) {
return infinityTimestampValue(index, value);
}

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcType()));
return normalTimestampValue(index, value);
}

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
}

protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
final ZonedDateTime zdt;

if (POSITIVE_INFINITY.equals(value)) {
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER)
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
}
else {
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER)
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
}

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcBindType()));
}

protected List<ValueBindDescriptor> normalTimestampValue(int index, Object value) {

final ZonedDateTime zdt;
zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcBindType()));
}

protected int getJdbcBindType() {
return getJdbcType();
}

@Override
protected int getJdbcType() {
return Types.TIMESTAMP_WITH_TIMEZONE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2568,6 +2568,35 @@ else if (sink.getType().is(SinkType.MYSQL)) {
ResultSet::getString);
}

@TestTemplate
@ForSource(value = { SourceType.POSTGRES }, reason = "The infinity value is valid only for PostgreSQL")
@WithTemporalPrecisionMode
public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Sink sink) throws Exception {

final List<String> values = List.of("'-infinity'", "'infinity'");

List<ZonedDateTime> expectedValues = List.of();
if (sink.getType().is(SinkType.POSTGRES)) {

// expectedValues = values;
}
else if (sink.getType().is(SinkType.MYSQL)) {
expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC),
ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC));
}

assertDataTypesNonKeyOnly(source,
sink,
List.of("timestamptz", "timestamptz"),
values,
expectedValues,
(record) -> {
assertColumn(sink, record, "data0", getTimestampWithTimezoneType(source, false, 6));
assertColumn(sink, record, "data1", getTimestampWithTimezoneType(source, false, 6));
},
(rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC));
}

// todo: remaining data types need tests and/or type system mapping support
// GEOMETRY (MySql/PostgreSQL)
// LINESTRING (MySQL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@
*/
package io.debezium.connector.jdbc.e2e;

import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkType;
import io.debezium.connector.jdbc.junit.jupiter.e2e.ForSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.WithTemporalPrecisionMode;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

import io.debezium.connector.jdbc.junit.jupiter.PostgresSinkDatabaseContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceType;

import java.sql.ResultSet;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;

/**
* Implementation of the JDBC sink connector multi-source pipeline that writes to PostgreSQL.
*
Expand Down Expand Up @@ -184,4 +195,28 @@ protected String getTimestampWithTimezoneType(Source source, boolean key, int pr
protected String getIntervalType(Source source, boolean numeric) {
return "INTERVAL";
}

@TestTemplate
@ForSource(value = { SourceType.POSTGRES }, reason = "The infinity value is valid only for PostgreSQL")
@WithTemporalPrecisionMode
@Override
public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Sink sink) throws Exception {

final List<String> values = List.of("'-infinity'", "'infinity'");

List<String> expectedValues = values.stream()
.map(s -> s.replace("'", ""))
.collect(Collectors.toList());

assertDataTypesNonKeyOnly(source,
sink,
List.of("timestamptz", "timestamptz"),
values,
expectedValues,
(record) -> {
assertColumn(sink, record, "data0", getTimestampWithTimezoneType(source, false, 6));
assertColumn(sink, record, "data1", getTimestampWithTimezoneType(source, false, 6));
},
ResultSet::getString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws
final SinkRecord createInfinityRecord = factory.createRecordWithSchemaValue(topicName, (byte) 1,
List.of("timestamp_infinity-", "timestamp_infinity+", "range_with_infinity"),
List.of(zonedTimestampSchema, zonedTimestampSchema, rangeSchema),
Arrays.asList(new Object[]{ "-infinity", "+infinity", "[2010-01-01 14:30, +infinity)" }));
Arrays.asList(new Object[]{ "-infinity", "infinity", "[2010-01-01 14:30, infinity)" }));
consume(createInfinityRecord);

final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(createInfinityRecord));
Expand All @@ -230,7 +230,7 @@ public void testInsertModeInsertInfinityValues(SinkRecordFactory factory) throws

getSink().assertColumnType(tableAssert, "timestamp_infinity-", Timestamp.class, new Timestamp(PGStatement.DATE_NEGATIVE_INFINITY));
getSink().assertColumnType(tableAssert, "timestamp_infinity+", Timestamp.class, new Timestamp(PGStatement.DATE_POSITIVE_INFINITY));
getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, +infinity)");
getSink().assertColumnType(tableAssert, "range_with_infinity", String.class, "[2010-01-01 14:30, infinity)");

}

Expand Down

0 comments on commit 43947ec

Please sign in to comment.