Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types #32688

Merged
merged 15 commits into from
Oct 10, 2024
Merged
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -64,7 +65,10 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.thrift.TException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
Expand Down Expand Up @@ -100,6 +104,10 @@ public class IcebergHiveCatalogIT {
.addArrayField("arr_long", Schema.FieldType.INT64)
.addRowField("row", NESTED_ROW_SCHEMA)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.addDateTimeField("datetime_tz")
.addLogicalTypeField("datetime", SqlTypes.DATETIME)
.addLogicalTypeField("date", SqlTypes.DATE)
.addLogicalTypeField("time", SqlTypes.TIME)
.build();

private static final SimpleFunction<Long, Row> ROW_FUNC =
Expand Down Expand Up @@ -127,6 +135,10 @@ public Row apply(Long num) {
.addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList()))
.addValue(nestedRow)
.addValue(num % 2 == 0 ? null : nestedRow)
.addValue(new DateTime(num).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25)))
.addValue(DateTimeUtil.timestampFromMicros(num))
.addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum)))
.addValue(DateTimeUtil.timeFromMicros(num))
.build();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,16 @@
* <td> DOUBLE </td> <td> DOUBLE </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> STRING </td>
* <td> SqlTypes.DATETIME </td> <td> TIMESTAMP </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> TIMESTAMPTZ </td>
* </tr>
* <tr>
* <td> SqlTypes.DATE </td> <td> DATE </td>
* </tr>
* <tr>
* <td> SqlTypes.TIME </td> <td> TIME </td>
* </tr>
* <tr>
* <td> ITERABLE </td> <td> LIST </td>
Expand All @@ -166,6 +175,29 @@
* </tr>
* </table>
*
* <p><b>Note:</b> {@code SqlTypes} are Beam logical types.
*
* <h3>Note on timestamps</h3>
*
* <p>For an existing table, the following Beam types are supported for both {@code timestamp} and
* {@code timestamptz}:
*
* <ul>
* <li>{@code SqlTypes.DATETIME} --> Using a {@link java.time.LocalDateTime} object
* <li>{@code DATETIME} --> Using a {@link org.joda.time.DateTime} object
* <li>{@code INT64} --> Using a {@link Long} representing micros since EPOCH
* <li>{@code STRING} --> Using a timestamp {@link String} representation (e.g. {@code
* "2024-10-08T13:18:20.053+03:27"})
* </ul>
*
* <p><b>Note</b>: If you expect Beam to create the Iceberg table at runtime, please provide {@code
* SqlTypes.DATETIME} for a {@code timestamp} column and {@code DATETIME} for a {@code timestamptz}
* column. If the table does not exist, Beam will treat {@code STRING} and {@code INT64} at
* face-value and create equivalent column types.
*
* <p>For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for
* Iceberg's {@code timestamp} and {@code DATETIME} types for {@code timestamptz}.
*
* <h3>Dynamic Destinations</h3>
*
* <p>Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -34,15 +40,13 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;

/** Utilities for converting between Beam and Iceberg types. */
/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
public class IcebergUtils {
// This is made public for users convenience, as many may have more experience working with
// Iceberg types.

private IcebergUtils() {}

private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
Expand All @@ -54,6 +58,14 @@ private IcebergUtils() {}
.put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
.put(Schema.TypeName.STRING, Types.StringType.get())
.put(Schema.TypeName.BYTES, Types.BinaryType.get())
.put(Schema.TypeName.DATETIME, Types.TimestampType.withZone())
.build();

private static final Map<String, Type> BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES =
ImmutableMap.<String, Type>builder()
.put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
.put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
.build();

private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
Expand All @@ -69,9 +81,15 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
case DOUBLE:
return Schema.FieldType.DOUBLE;
case DATE:
return Schema.FieldType.logicalType(SqlTypes.DATE);
case TIME:
case TIMESTAMP: // TODO: Logical types?
return Schema.FieldType.DATETIME;
return Schema.FieldType.logicalType(SqlTypes.TIME);
case TIMESTAMP:
Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType();
if (ts.shouldAdjustToUTC()) {
return Schema.FieldType.DATETIME;
}
return Schema.FieldType.logicalType(SqlTypes.DATETIME);
case STRING:
return Schema.FieldType.STRING;
case UUID:
Expand Down Expand Up @@ -151,6 +169,14 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
// other types.
return new TypeAndMaxId(
--nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
} else if (beamType.getTypeName().isLogicalType()) {
String logicalTypeIdentifier =
checkArgumentNotNull(beamType.getLogicalType()).getIdentifier();
@Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
if (type == null) {
throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier);
}
return new TypeAndMaxId(--nestedFieldId, type);
} else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE
Schema.FieldType beamCollectionType =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
Expand Down Expand Up @@ -227,8 +253,6 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
*
* <p>The following unsupported Beam types will be defaulted to {@link Types.StringType}:
* <li>{@link Schema.TypeName.DECIMAL}
* <li>{@link Schema.TypeName.DATETIME}
* <li>{@link Schema.TypeName.LOGICAL_TYPE}
*/
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) {
List<Types.NestedField> fields = new ArrayList<>(schema.getFieldCount());
Expand Down Expand Up @@ -282,12 +306,20 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v));
break;
case DATE:
throw new UnsupportedOperationException("Date fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIME:
throw new UnsupportedOperationException("Time fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIMESTAMP:
Optional.ofNullable(value.getDateTime(name))
.ifPresent(v -> rec.setField(name, v.getMillis()));
Object val = value.getValue(name);
if (val == null) {
break;
}
Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType();
rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC()));
break;
case STRING:
Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v));
Expand Down Expand Up @@ -322,6 +354,55 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
}
}

/**
* Returns the appropriate value for an Iceberg timestamp field
*
* <p>If `timestamp`, we resolve incoming values to a {@link LocalDateTime}.
*
* <p>If `timestamptz`, we resolve to a UTC {@link OffsetDateTime}. Iceberg already resolves all
* incoming timestamps to UTC, so there is no harm in doing it from our side.
*
* <p>Valid types are:
*
* <ul>
* <li>{@link SqlTypes.DATETIME} --> {@link LocalDateTime}
* <li>{@link Schema.FieldType.DATETIME} --> {@link Instant}
* <li>{@link Schema.FieldType.INT64} --> {@link Long}
* <li>{@link Schema.FieldType.STRING} --> {@link String}
* </ul>
*/
private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) {
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
// timestamptz
if (shouldAdjustToUtc) {
if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME
return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC);
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L);
} else if (beamValue instanceof Long) { // FieldType.INT64
return DateTimeUtil.timestamptzFromMicros((Long) beamValue);
} else if (beamValue instanceof String) { // FieldType.STRING
return OffsetDateTime.parse((String) beamValue).withOffsetSameInstant(ZoneOffset.UTC);
} else {
throw new UnsupportedOperationException(
"Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass());
}
}

// timestamp
if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME
return beamValue;
} else if (beamValue instanceof Instant) { // FieldType.DATETIME
return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L);
} else if (beamValue instanceof Long) { // FieldType.INT64
return DateTimeUtil.timestampFromMicros((Long) beamValue);
} else if (beamValue instanceof String) { // FieldType.STRING
return LocalDateTime.parse((String) beamValue);
} else {
throw new UnsupportedOperationException(
"Unsupported Beam type for Iceberg timestamp with timezone: " + beamValue.getClass());
}
}

/** Converts an Iceberg {@link Record} to a Beam {@link Row}. */
public static Row icebergRecordToBeamRow(Schema schema, Record record) {
Row.Builder rowBuilder = Row.withSchema(schema);
Expand All @@ -345,16 +426,17 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
case FLOAT: // Iceberg and Beam both use float
case DOUBLE: // Iceberg and Beam both use double
case STRING: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use boolean
case ARRAY:
case ITERABLE:
case MAP:
rowBuilder.addValue(icebergValue);
break;
case DATETIME:
// Iceberg uses a long for millis; Beam uses joda time DateTime
long millis = (long) icebergValue;
rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
// Iceberg uses a long for micros.
// Beam DATETIME uses joda's DateTime, which only supports millis,
// so we do lose some precision here
rowBuilder.addValue(getBeamDateTimeValue(icebergValue));
break;
case BYTES:
// Iceberg uses ByteBuffer; Beam uses byte[]
Expand All @@ -369,13 +451,59 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord));
break;
case LOGICAL_TYPE:
throw new UnsupportedOperationException(
"Cannot convert iceberg field to Beam logical type");
rowBuilder.addValue(getLogicalTypeValue(icebergValue, field.getType()));
break;
default:
throw new UnsupportedOperationException(
"Unsupported Beam type: " + field.getType().getTypeName());
}
}
return rowBuilder.build();
}

private static DateTime getBeamDateTimeValue(Object icebergValue) {
long micros;
if (icebergValue instanceof OffsetDateTime) {
micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue);
} else if (icebergValue instanceof LocalDateTime) {
micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue);
} else if (icebergValue instanceof Long) {
micros = (long) icebergValue;
} else if (icebergValue instanceof String) {
return DateTime.parse((String) icebergValue);
} else {
throw new UnsupportedOperationException(
"Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass());
}
return new DateTime(micros / 1000L);
}

private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) {
if (icebergValue instanceof String) {
String strValue = (String) icebergValue;
if (type.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(strValue);
} else if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) {
return LocalTime.parse(strValue);
} else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return LocalDateTime.parse(strValue);
}
} else if (icebergValue instanceof Long) {
if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) {
return DateTimeUtil.timeFromMicros((Long) icebergValue);
} else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return DateTimeUtil.timestampFromMicros((Long) icebergValue);
}
} else if (icebergValue instanceof Integer
&& type.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return DateTimeUtil.dateFromDays((Integer) icebergValue);
} else if (icebergValue instanceof OffsetDateTime
&& type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return ((OffsetDateTime) icebergValue)
.withOffsetSameInstant(ZoneOffset.UTC)
.toLocalDateTime();
}
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}
}
Loading
Loading