Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types #32688

Merged
merged 15 commits into from
Oct 10, 2024
Merged
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.io.iceberg.hive.testutils.HiveMetastoreExtension;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -100,6 +102,9 @@ public class IcebergHiveCatalogIT {
.addArrayField("arr_long", Schema.FieldType.INT64)
.addRowField("row", NESTED_ROW_SCHEMA)
.addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
.addLogicalTypeField("datetime", SqlTypes.DATETIME)
.addLogicalTypeField("date", SqlTypes.DATE)
.addLogicalTypeField("time", SqlTypes.TIME)
.build();

private static final SimpleFunction<Long, Row> ROW_FUNC =
Expand Down Expand Up @@ -127,6 +132,9 @@ public Row apply(Long num) {
.addValue(LongStream.range(1, num % 10).boxed().collect(Collectors.toList()))
.addValue(nestedRow)
.addValue(num % 2 == 0 ? null : nestedRow)
.addValue(DateTimeUtil.timestampFromMicros(num))
.addValue(DateTimeUtil.dateFromDays(Integer.parseInt(strNum)))
.addValue(DateTimeUtil.timeFromMicros(num))
.build();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,16 @@
* <td> DOUBLE </td> <td> DOUBLE </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> STRING </td>
* <td> SqlTypes.DATETIME </td> <td> TIMESTAMP </td>
* </tr>
* <tr>
* <td> STRING </td> <td> TIMESTAMPTZ </td>
* </tr>
* <tr>
* <td> SqlTypes.DATE </td> <td> DATE </td>
* </tr>
* <tr>
* <td> SqlTypes.TIME </td> <td> TIME </td>
* </tr>
* <tr>
* <td> ITERABLE </td> <td> LIST </td>
Expand All @@ -166,6 +175,32 @@
* </tr>
* </table>
*
* <p><b>Note:</b> {@code SqlTypes} are Beam logical types.
*
* <h3>Note on timestamps</h3>
*
* <p>Iceberg has two timestamp types: {@code timestamp} and {@code timestamptz} (the latter cares
* about timezones). Beam's native schema extends two types for timestamps: {@code DATETIME} and
* {@code SqlTypes.DATETIME} -- both are supported for writing to the former {@code timestamp} type.
* They are not supported for {@code timestamptz} however because when a Beam Row stores timestamp
* values, it resolves them to UTC and drops the timezone information.
*
* <p>If your use-case requires timestamps <b>with timezone</b>, you can provide {@code STRING}
* timestamp representations, which will be parsed with {@link
* java.time.OffsetDateTime#parse(CharSequence)} and written to Iceberg.
*
* <p>Otherwise, you may write timestamps using any of {@code DATETIME}, {@code SqlTypes.DATETIME},
* or {@code INT64}.
*
* <p><b>Note</b>: Beam does not support creating a table with a timestamptz field. If the table
* does not exist, Beam will treat {@code STRING} and {@code INT64} at face-value. If you expect
* Beam to create a table with Iceberg timestamps (without tz), please use either {@code DATETIME}
* or {@code SqlTypes.DATETIME}.
*
* <p>For Iceberg reads, the connector will produce Beam {@code SqlTypes.DATETIME} types for both of
* Iceberg's {@code timestamp} and {@code timestamptz}. This is sufficient because Iceberg does not
* retain timezone information and only produces UTC timestamps.
*
* <h3>Dynamic Destinations</h3>
*
* <p>Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -34,15 +40,13 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;

/** Utilities for converting between Beam and Iceberg types. */
/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
public class IcebergUtils {
// This is made public for users convenience, as many may have more experience working with
// Iceberg types.

private IcebergUtils() {}

private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
Expand All @@ -54,6 +58,14 @@ private IcebergUtils() {}
.put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
.put(Schema.TypeName.STRING, Types.StringType.get())
.put(Schema.TypeName.BYTES, Types.BinaryType.get())
.put(Schema.TypeName.DATETIME, Types.TimestampType.withoutZone())
.build();

private static final Map<String, Type> BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES =
ImmutableMap.<String, Type>builder()
.put(SqlTypes.DATE.getIdentifier(), Types.DateType.get())
.put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get())
.put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone())
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
.build();

private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
Expand All @@ -69,9 +81,15 @@ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
case DOUBLE:
return Schema.FieldType.DOUBLE;
case DATE:
return Schema.FieldType.logicalType(SqlTypes.DATE);
case TIME:
case TIMESTAMP: // TODO: Logical types?
return Schema.FieldType.DATETIME;
return Schema.FieldType.logicalType(SqlTypes.TIME);
case TIMESTAMP:
// Beam has two 'DATETIME' types: SqlTypes.DATETIME (uses java.time) and
// Schema.FieldType.DATETIME (uses org.joda.time).
// We choose SqlTypes.DATETIME because Iceberg API leans towards `java.time`.
// Also, Iceberg stores timestampts as UTC and does not retain timezone
return Schema.FieldType.logicalType(SqlTypes.DATETIME);
case STRING:
return Schema.FieldType.STRING;
case UUID:
Expand Down Expand Up @@ -151,6 +169,14 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
// other types.
return new TypeAndMaxId(
--nestedFieldId, BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
} else if (beamType.getTypeName().isLogicalType()) {
String logicalTypeIdentifier =
checkArgumentNotNull(beamType.getLogicalType()).getIdentifier();
@Nullable Type type = BEAM_LOGICAL_TYPES_TO_ICEBERG_TYPES.get(logicalTypeIdentifier);
if (type == null) {
throw new RuntimeException("Unsupported Beam logical type " + logicalTypeIdentifier);
}
return new TypeAndMaxId(--nestedFieldId, type);
} else if (beamType.getTypeName().isCollectionType()) { // ARRAY or ITERABLE
Schema.FieldType beamCollectionType =
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
Expand Down Expand Up @@ -227,8 +253,6 @@ static TypeAndMaxId beamFieldTypeToIcebergFieldType(
*
* <p>The following unsupported Beam types will be defaulted to {@link Types.StringType}:
* <li>{@link Schema.TypeName.DECIMAL}
* <li>{@link Schema.TypeName.DATETIME}
* <li>{@link Schema.TypeName.LOGICAL_TYPE}
*/
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema schema) {
List<Types.NestedField> fields = new ArrayList<>(schema.getFieldCount());
Expand Down Expand Up @@ -282,12 +306,50 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row
Optional.ofNullable(value.getDouble(name)).ifPresent(v -> rec.setField(name, v));
break;
case DATE:
throw new UnsupportedOperationException("Date fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalDate.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIME:
throw new UnsupportedOperationException("Time fields not yet supported");
Optional.ofNullable(value.getLogicalTypeValue(name, LocalTime.class))
.ifPresent(v -> rec.setField(name, v));
break;
case TIMESTAMP:
Optional.ofNullable(value.getDateTime(name))
.ifPresent(v -> rec.setField(name, v.getMillis()));
Object val = value.getValue(name);
if (val == null) {
break;
}
Types.TimestampType ts = (Types.TimestampType) field.type().asPrimitiveType();
// timestamp with timezone
if (ts.shouldAdjustToUTC()) {
// currently only string is supported because other types
// do not maintain timezone information
if (val instanceof String) {
// e.g. 2007-12-03T10:15:30+01:00
rec.setField(name, OffsetDateTime.parse((String) val));
break;
} else {
throw new UnsupportedOperationException(
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
"Unsupported Beam type for Iceberg timestamp with timezone: " + val.getClass());
}
}

// timestamp
// SqlTypes.DATETIME
if (val instanceof LocalDateTime) {
rec.setField(name, val);
break;
}

long micros;
if (val instanceof Instant) { // Schema.FieldType.DATETIME
micros = ((Instant) val).getMillis() * 1000L;
} else if (val instanceof Long) { // Schema.FieldType.INT64
micros = (long) val;
} else {
throw new UnsupportedOperationException(
"Unsupported Beam type for Iceberg timestamp: " + val.getClass());
}
rec.setField(name, DateTimeUtil.timestampFromMicros(micros));
break;
case STRING:
Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v));
Expand Down Expand Up @@ -340,21 +402,43 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
case BYTE:
case INT16:
case INT32:
case INT64:
case DECIMAL: // Iceberg and Beam both use BigDecimal
case FLOAT: // Iceberg and Beam both use float
case DOUBLE: // Iceberg and Beam both use double
case STRING: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use String
case BOOLEAN: // Iceberg and Beam both use boolean
case ARRAY:
case ITERABLE:
case MAP:
rowBuilder.addValue(icebergValue);
break;
case INT64:
Object value = icebergValue;
if (icebergValue instanceof OffsetDateTime) {
value = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue);
} else if (icebergValue instanceof LocalDateTime) {
value = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue);
} else if (icebergValue instanceof LocalTime) {
value = DateTimeUtil.microsFromTime((LocalTime) icebergValue);
}
rowBuilder.addValue(value);
break;
case DATETIME:
// Iceberg uses a long for millis; Beam uses joda time DateTime
long millis = (long) icebergValue;
rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
long micros;
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
if (icebergValue instanceof OffsetDateTime) {
micros = DateTimeUtil.microsFromTimestamptz((OffsetDateTime) icebergValue);
} else if (icebergValue instanceof LocalDateTime) {
micros = DateTimeUtil.microsFromTimestamp((LocalDateTime) icebergValue);
} else if (icebergValue instanceof Long) {
micros = (long) icebergValue;
} else {
throw new UnsupportedOperationException(
"Unsupported Iceberg type for Beam type DATETIME: " + icebergValue.getClass());
}
// Iceberg uses a long for micros
// Beam DATETIME uses joda's DateTime, which only supports millis,
// so we do lose some precision here
rowBuilder.addValue(new DateTime(micros / 1000L));
break;
case BYTES:
// Iceberg uses ByteBuffer; Beam uses byte[]
Expand All @@ -369,13 +453,42 @@ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, nestedRecord));
break;
case LOGICAL_TYPE:
throw new UnsupportedOperationException(
"Cannot convert iceberg field to Beam logical type");
rowBuilder.addValue(getLogicalTypeValue(icebergValue, field.getType()));
break;
default:
throw new UnsupportedOperationException(
"Unsupported Beam type: " + field.getType().getTypeName());
}
}
return rowBuilder.build();
}

private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType type) {
if (icebergValue instanceof String) {
String strValue = (String) icebergValue;
if (type.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return LocalDate.parse(strValue);
} else if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) {
return LocalTime.parse(strValue);
} else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return LocalDateTime.parse(strValue);
}
} else if (icebergValue instanceof Long) {
if (type.isLogicalType(SqlTypes.TIME.getIdentifier())) {
return DateTimeUtil.timeFromMicros((Long) icebergValue);
} else if (type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return DateTimeUtil.timestampFromMicros((Long) icebergValue);
}
} else if (icebergValue instanceof Integer
&& type.isLogicalType(SqlTypes.DATE.getIdentifier())) {
return DateTimeUtil.dateFromDays((Integer) icebergValue);
} else if (icebergValue instanceof OffsetDateTime
&& type.isLogicalType(SqlTypes.DATETIME.getIdentifier())) {
return ((OffsetDateTime) icebergValue)
.withOffsetSameInstant(ZoneOffset.UTC)
.toLocalDateTime();
}
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}
}
Loading
Loading