Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7920 Add support for timestamp infinity values for SQLServer
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Jun 26, 2024
1 parent 43947ec commit d1aa252
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@

import java.sql.Types;

import org.apache.kafka.connect.data.Schema;

import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;

Expand All @@ -22,12 +19,6 @@ public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {

return getDialect().getQueryBindingWithValueCast(column, schema, this);
}

@Override
protected int getJdbcBindType() {
return Types.TIMESTAMP; // TIMESTAMP_WITH_TIMEZONE not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
import java.sql.Types;
import java.util.List;

import org.apache.kafka.connect.data.Schema;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;

Expand All @@ -21,6 +24,16 @@ public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {

if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
}

return super.getQueryBinding(column, schema, value);
}

@Override
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ public int getMaxTimestampPrecision() {
return 7;
}

@Override
public String getTimestampPositiveInfinityValue() {
return "9999-12-31T23:59:59+00:00";
}

@Override
public String getTimestampNegativeInfinityValue() {
return "0001-01-01T00:00:00+00:00";
}

@Override
public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
final SqlStatementBuilder builder = new SqlStatementBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.AbstractTimestampType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;
Expand All @@ -40,16 +39,6 @@ public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Obj
return dialect.getFormattedTimestampWithTimeZone((String) value);
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {

if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
}

return super.getQueryBinding(column, schema, value);
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

Expand All @@ -73,12 +62,10 @@ protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object val
final ZonedDateTime zdt;

if (POSITIVE_INFINITY.equals(value)) {
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER)
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER);
}
else {
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER)
.withZoneSameInstant(getDatabaseTimeZone().toZoneId());
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER);
}

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), getJdbcBindType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2576,9 +2576,10 @@ public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Si
final List<String> values = List.of("'-infinity'", "'infinity'");

List<ZonedDateTime> expectedValues = List.of();
if (sink.getType().is(SinkType.POSTGRES)) {
if (sink.getType().is(SinkType.SQLSERVER)) {

// expectedValues = values;
expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
}
else if (sink.getType().is(SinkType.MYSQL)) {
expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
*/
package io.debezium.connector.jdbc.e2e;

import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkType;
import io.debezium.connector.jdbc.junit.jupiter.e2e.ForSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.WithTemporalPrecisionMode;
import java.sql.ResultSet;
import java.util.List;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

import io.debezium.connector.jdbc.junit.jupiter.PostgresSinkDatabaseContextProvider;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.e2e.ForSource;
import io.debezium.connector.jdbc.junit.jupiter.e2e.WithTemporalPrecisionMode;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.Source;
import io.debezium.connector.jdbc.junit.jupiter.e2e.source.SourceType;

import java.sql.ResultSet;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.stream.Collectors;

/**
* Implementation of the JDBC sink connector multi-source pipeline that writes to PostgreSQL.
*
Expand Down

0 comments on commit d1aa252

Please sign in to comment.