Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-7920 Add support for timestamp infinity values for Oracle
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Jun 27, 2024
1 parent d1aa252 commit ece5b40
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.sql.Types;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ public String getFormattedTimestampWithTimeZone(String value) {
return String.format(TO_TIMESTAMP_FF9_TZ, value);
}

@Override
public String getTimestampPositiveInfinityValue() {
return "9999-12-31T23:59:59+00:00";
}

@Override
public String getTimestampNegativeInfinityValue() {
return "-4712-01-01T00:00:00+00:00";
}

@Override
protected String resolveColumnNameFromField(String fieldName) {
String columnName = super.resolveColumnNameFromField(fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@
*/
package io.debezium.connector.jdbc.dialect.oracle;

import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.type.AbstractTimestampType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.time.ZonedTimestamp;

Expand All @@ -23,40 +17,28 @@
*
* @author Chris Cranford
*/
public class ZonedTimestampType extends AbstractTimestampType {
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public String[] getRegistrationKeys() {
return new String[]{ ZonedTimestamp.SCHEMA_NAME };
}
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
final ZonedDateTime zdt;

@Override
public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) {
return dialect.getFormattedTimestampWithTimeZone((String) value);
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
if (POSITIVE_INFINITY.equals(value)) {
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER);
}
if (value instanceof String) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt, getJdbcType()));
else {
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER);
}

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType()));
}

@Override
protected int getJdbcType() {
return Types.TIMESTAMP_WITH_TIMEZONE;
}
protected List<ValueBindDescriptor> normalTimestampValue(int index, Object value) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.sink.SinkRecord;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -2575,16 +2576,7 @@ public void testTimestampWithTimeZoneDataTypeWithInfinityValue(Source source, Si

final List<String> values = List.of("'-infinity'", "'infinity'");

List<ZonedDateTime> expectedValues = List.of();
if (sink.getType().is(SinkType.SQLSERVER)) {

expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
}
else if (sink.getType().is(SinkType.MYSQL)) {
expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC),
ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC));
}
List<ZonedDateTime> expectedValues = getExpectedZonedDateTimes(sink);

assertDataTypesNonKeyOnly(source,
sink,
Expand All @@ -2598,6 +2590,30 @@ else if (sink.getType().is(SinkType.MYSQL)) {
(rs, index) -> rs.getTimestamp(index).toInstant().atZone(ZoneOffset.UTC));
}

private static @NotNull List<ZonedDateTime> getExpectedZonedDateTimes(Sink sink) {

List<ZonedDateTime> expectedValues = List.of();
if (sink.getType().is(SinkType.SQLSERVER)) {

expectedValues = List.of(ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC),
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
}
else if (sink.getType().is(SinkType.MYSQL)) {

expectedValues = List.of(ZonedDateTime.of(1970, 1, 1, 0, 0, 1, 0, ZoneOffset.UTC),
ZonedDateTime.of(2038, 1, 19, 3, 14, 7, 0, ZoneOffset.UTC));
}
else if (sink.getType().is(SinkType.ORACLE)) {

// The value read by the rs.getTimestamp() is correct but then the
// rs.getTimestamp().toInstant() will return -4712-11-24. I suspect a bug somewhere in the time library.
// The value on the DB is correct since select to_char(A , 'AD YYYY-MM-DD HH24:MI:SS') will return BC 4712-01-01 00:00:00
expectedValues = List.of(ZonedDateTime.of(-4712, 11, 24, 0, 0, 0, 0, ZoneOffset.UTC),
ZonedDateTime.of(9999, 12, 31, 23, 59, 59, 0, ZoneOffset.UTC));
}
return expectedValues;
}

// todo: remaining data types need tests and/or type system mapping support
// GEOMETRY (MySql/PostgreSQL)
// LINESTRING (MySQL)
Expand Down

0 comments on commit ece5b40

Please sign in to comment.