Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

DBZ-7920 Add support for 'infinity' values for PostgreSQL #85

Merged
merged 7 commits into from
Jul 9, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ default String getTimeQueryBinding() {
*/
String getTypeName(int jdbcType, Size size);

/**
* +Infinity value for a timestamp.
*
* @return the +infinity representation for timestamp.
*/
String getTimestampPositiveInfinityValue();

/**
* -Infinity value for a timestamp.
*
* @return the -infinity representation for timestamp.
*/
String getTimestampNegativeInfinityValue();

/**
* Bind the specified value to the query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
Expand Down Expand Up @@ -67,6 +69,7 @@
import io.debezium.connector.jdbc.type.connect.ConnectTimeType;
import io.debezium.connector.jdbc.type.connect.ConnectTimestampType;
import io.debezium.connector.jdbc.type.debezium.DateType;
import io.debezium.connector.jdbc.type.debezium.DebeziumZonedTimestampType;
import io.debezium.connector.jdbc.type.debezium.MicroTimeType;
import io.debezium.connector.jdbc.type.debezium.MicroTimestampType;
import io.debezium.connector.jdbc.type.debezium.NanoTimeType;
Expand All @@ -75,7 +78,6 @@
import io.debezium.connector.jdbc.type.debezium.TimestampType;
import io.debezium.connector.jdbc.type.debezium.VariableScaleDecimalType;
import io.debezium.connector.jdbc.type.debezium.ZonedTimeType;
import io.debezium.connector.jdbc.type.debezium.ZonedTimestampType;
import io.debezium.util.Strings;

/**
Expand Down Expand Up @@ -526,6 +528,16 @@ public String getTypeName(int jdbcType, Size size) {
return ddlTypeRegistry.getTypeName(jdbcType, size);
}

@Override
public String getTimestampPositiveInfinityValue() {
return Timestamp.from(Instant.MAX).toString();
}

@Override
public String getTimestampNegativeInfinityValue() {
return Timestamp.from(Instant.MIN).toString();
}

@Override
public String getByteArrayFormat() {
return "x'%s'";
Expand Down Expand Up @@ -624,7 +636,7 @@ protected void registerTypes() {
registerType(NanoTimeType.INSTANCE);
registerType(NanoTimestampType.INSTANCE);
registerType(ZonedTimeType.INSTANCE);
registerType(ZonedTimestampType.INSTANCE);
registerType(DebeziumZonedTimestampType.INSTANCE);
registerType(VariableScaleDecimalType.INSTANCE);

// Supported connect data types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ public String getAlterTableColumnDelimiter() {
return " ";
}

@Override
public String getTimestampPositiveInfinityValue() {
return "9999-12-31T23:59:59+00:00";
}

@Override
public String getTimestampNegativeInfinityValue() {
return "0001-01-01T00:00:00+00:00";
}

@Override
public String getUpsertStatement(TableDescriptor table, SinkRecordDescriptor record) {
final SqlStatementBuilder builder = new SqlStatementBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,48 +9,38 @@
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.type.AbstractTimestampType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.type.debezium.DebeziumZonedTimestampType;
import io.debezium.time.ZonedTimestamp;

/**
* An implementation of {@link Type} for {@link ZonedTimestamp} values.
*
* @author Chris Cranford
*/
public class ZonedTimestampType extends AbstractTimestampType {
public class ZonedTimestampType extends DebeziumZonedTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public String[] getRegistrationKeys() {
return new String[]{ ZonedTimestamp.SCHEMA_NAME };
}
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
final ZonedDateTime zdt;

@Override
public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) {
return dialect.getFormattedTimestampWithTimeZone((String) value);
if (POSITIVE_INFINITY.equals(value)) {
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER);
}
else {
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER);
}

return List.of(new ValueBindDescriptor(index, Timestamp.valueOf((zdt.toLocalDateTime()))));
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {
protected List<ValueBindDescriptor> normalTimestampValue(int index, Object value) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, Timestamp.from(zdt.toInstant())));
}
final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
return List.of(new ValueBindDescriptor(index, Timestamp.from(zdt.toInstant())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ public String getFormattedTimestampWithTimeZone(String value) {
return String.format("'%s'", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(zonedDateTime));
}

@Override
public String getTimestampPositiveInfinityValue() {
return "2038-01-19T03:14:07+00:00";
}

@Override
public String getTimestampNegativeInfinityValue() {
return "1970-01-01T00:00:01+00:00";
}

@Override
public String getAlterTablePrefix() {
return "ADD COLUMN (";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,22 @@
package io.debezium.connector.jdbc.dialect.mysql;

import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.type.debezium.DebeziumZonedTimestampType;
import io.debezium.time.ZonedTimestamp;

/**
* An implementation of {@link Type} for {@link ZonedTimestamp} values.
*
* @author Chris Cranford
*/
public class ZonedTimestampType extends io.debezium.connector.jdbc.type.debezium.ZonedTimestampType {
public class ZonedTimestampType extends DebeziumZonedTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
}
if (value instanceof String) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt.toOffsetDateTime(), Types.TIMESTAMP)); // TIMESTAMP_WITH_TIMEZONE not supported
}

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
protected int getJdbcBindType() {
return Types.TIMESTAMP; // TIMESTAMP_WITH_TIMEZONE not supported
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ public String getFormattedTimestampWithTimeZone(String value) {
return String.format(TO_TIMESTAMP_FF9_TZ, value);
}

@Override
public String getTimestampPositiveInfinityValue() {
return "9999-12-31T23:59:59+00:00";
}

@Override
public String getTimestampNegativeInfinityValue() {
return "-4712-01-01T00:00:00+00:00";
}

@Override
protected String resolveColumnNameFromField(String fieldName) {
String columnName = super.resolveColumnNameFromField(fieldName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,41 @@
*/
package io.debezium.connector.jdbc.dialect.oracle;

import java.sql.Types;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.type.AbstractTimestampType;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.type.debezium.DebeziumZonedTimestampType;
import io.debezium.time.ZonedTimestamp;

/**
* An implementation of {@link Type} for {@link ZonedTimestamp} values.
*
* @author Chris Cranford
*/
public class ZonedTimestampType extends AbstractTimestampType {
public class ZonedTimestampType extends DebeziumZonedTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public String[] getRegistrationKeys() {
return new String[]{ ZonedTimestamp.SCHEMA_NAME };
}
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {
final ZonedDateTime zdt;

@Override
public String getDefaultValueBinding(DatabaseDialect dialect, Schema schema, Object value) {
return dialect.getFormattedTimestampWithTimeZone((String) value);
}

@Override
public List<ValueBindDescriptor> bind(int index, Schema schema, Object value) {

if (value == null) {
return List.of(new ValueBindDescriptor(index, null));
if (POSITIVE_INFINITY.equals(value)) {
zdt = ZonedDateTime.parse(getDialect().getTimestampPositiveInfinityValue(), ZonedTimestamp.FORMATTER);
}
if (value instanceof String) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt, getJdbcType()));
else {
zdt = ZonedDateTime.parse(getDialect().getTimestampNegativeInfinityValue(), ZonedTimestamp.FORMATTER);
}

throw new ConnectException(String.format("Unexpected %s value '%s' with type '%s'", getClass().getSimpleName(),
value, value.getClass().getName()));
return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType()));
}

@Override
protected int getJdbcType() {
return Types.TIMESTAMP_WITH_TIMEZONE;
}
protected List<ValueBindDescriptor> normalTimestampValue(int index, Object value) {

final ZonedDateTime zdt = ZonedDateTime.parse((String) value, ZonedTimestamp.FORMATTER).withZoneSameInstant(getDatabaseTimeZone().toZoneId());

return List.of(new ValueBindDescriptor(index, zdt, getJdbcBindType()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package io.debezium.connector.jdbc.dialect.postgres;

import static io.debezium.connector.jdbc.type.debezium.DebeziumZonedTimestampType.NEGATIVE_INFINITY;
import static io.debezium.connector.jdbc.type.debezium.DebeziumZonedTimestampType.POSITIVE_INFINITY;

import java.sql.Connection;
import java.sql.SQLException;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -165,6 +168,7 @@ protected void registerTypes() {
super.registerTypes();

registerType(TimeWithTimezoneType.INSTANCE);
registerType(ZonedTimestampType.INSTANCE);
registerType(IntervalType.INSTANCE);
registerType(SerialType.INSTANCE);
registerType(BitType.INSTANCE);
Expand Down Expand Up @@ -211,4 +215,14 @@ protected String resolveColumnNameFromField(String fieldName) {
}
return columnName;
}

@Override
public String getTimestampPositiveInfinityValue() {
return POSITIVE_INFINITY;
}

@Override
public String getTimestampNegativeInfinityValue() {
return NEGATIVE_INFINITY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.jdbc.dialect.postgres;

import java.sql.Types;
import java.util.List;

import org.apache.kafka.connect.data.Schema;

import io.debezium.connector.jdbc.ValueBindDescriptor;
import io.debezium.connector.jdbc.relational.ColumnDescriptor;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.type.debezium.DebeziumZonedTimestampType;
import io.debezium.time.ZonedTimestamp;

/**
* An implementation of {@link Type} for {@link ZonedTimestamp} values specific to PostgreSQL.
*
* @author Mario Fiore Vitale
*/
public class ZonedTimestampType extends DebeziumZonedTimestampType {

public static final ZonedTimestampType INSTANCE = new ZonedTimestampType();

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {

if (POSITIVE_INFINITY.equals(value) || NEGATIVE_INFINITY.equals(value)) {
return "cast(? as timestamptz)";
}

return super.getQueryBinding(column, schema, value);
}

@Override
protected List<ValueBindDescriptor> infinityTimestampValue(int index, Object value) {

if (POSITIVE_INFINITY.equals(value)) {
return List.of(new ValueBindDescriptor(index, POSITIVE_INFINITY, Types.VARCHAR));
}
else {
return List.of(new ValueBindDescriptor(index, NEGATIVE_INFINITY, Types.VARCHAR));
}
}
}
Loading