Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes and improved support for Parquet TIMESTAMP #4801

Merged
merged 5 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.LocalDateTime;

/**
* Utility class to concentrate {@link ObjectCodec} lookups.
Expand Down Expand Up @@ -76,6 +77,7 @@ private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
dataType == Instant.class ||
dataType == LocalDate.class ||
dataType == LocalTime.class ||
dataType == LocalDateTime.class ||
dataType == String.class ||
// A BigDecimal column maps to a logical type of decimal, with
// appropriate precision and scale calculated from column data,
Expand Down
53 changes: 53 additions & 0 deletions engine/time/src/main/java/io/deephaven/time/DateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.zone.ZoneRulesException;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -981,6 +982,21 @@ public static long epochNanos(@Nullable final ZonedDateTime dateTime) {
return safeComputeNanos(dateTime.toEpochSecond(), dateTime.getNano());
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case my comments seem strong, this library is very exposed to users, so it needs to be ultra currated. Functions should only get added when there is a compelling reason.

  1. I am not a fan of having hard-coded methods for any specific timezone.
  2. The methods should accept a time zone as an input.
  3. If methods are added for LocalDateTime, LocalDateTime signatures should be added to all relevant methods.

* Returns nanoseconds from the Epoch for a {@link LocalDateTime} value in UTC timezone.
*
* @param localDateTime the local date time to compute the Epoch offset for
* @return nanoseconds since Epoch, or a NULL_LONG value if the local date time is null
*/
@ScriptApi
public static long epochNanosUTC(@Nullable final LocalDateTime localDateTime) {
if (localDateTime == null) {
return NULL_LONG;
}
return TimeUnit.SECONDS.toNanos(localDateTime.toEpochSecond(ZoneOffset.UTC))
+ localDateTime.toLocalTime().getNano();
}

/**
* Returns microseconds from the Epoch for an {@link Instant} value.
*
Expand Down Expand Up @@ -1399,6 +1415,43 @@ public static ZonedDateTime excelToZonedDateTime(final double excel, @Nullable f
return epochMillisToZonedDateTime(excelTimeToEpochMillis(excel, timeZone), timeZone);
}

/**
* Converts nanoseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param nanos nanoseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input nanoseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochNanosToLocalDateTimeUTC(final long nanos) {
return nanos == NULL_LONG ? null
: LocalDateTime.ofEpochSecond(nanos / 1_000_000_000L, (int) (nanos % 1_000_000_000L), ZoneOffset.UTC);
}

/**
* Converts microseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param micros microseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input microseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochMicrosToLocalDateTimeUTC(final long micros) {
return micros == NULL_LONG ? null
: LocalDateTime.ofEpochSecond(micros / 1_000_000L, (int) ((micros % 1_000_000L) * MICRO),
ZoneOffset.UTC);
}

/**
* Converts milliseconds from the Epoch to a {@link LocalDateTime} in UTC timezone.
*
* @param millis milliseconds since Epoch
* @return {@code null} if the input is {@link QueryConstants#NULL_LONG}; otherwise the input milliseconds from the
* Epoch converted to a {@link LocalDateTime} in UTC timezone
*/
public static @Nullable LocalDateTime epochMillisToLocalDateTimeUTC(final long millis) {
return millis == NULL_LONG ? null
: LocalDateTime.ofEpochSecond(millis / 1_000L, (int) ((millis % 1_000L) * MILLI), ZoneOffset.UTC);
}

// endregion

// region Arithmetic
Expand Down
16 changes: 16 additions & 0 deletions engine/time/src/test/java/io/deephaven/time/TestDateTimeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,10 @@ public void testEpochNanos() {

TestCase.assertEquals(nanos, DateTimeUtils.epochNanos(dt3));
TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanos((ZonedDateTime) null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(nanos, DateTimeUtils.epochNanosUTC(ldt));
TestCase.assertEquals(NULL_LONG, DateTimeUtils.epochNanosUTC(null));
}

public void testEpochMicros() {
Expand Down Expand Up @@ -1456,6 +1460,10 @@ public void testEpochNanosTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochNanosToZonedDateTime(nanos, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochNanosToZonedDateTime(nanos, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochNanosToLocalDateTimeUTC(nanos));
TestCase.assertNull(DateTimeUtils.epochNanosToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochMicrosTo() {
Expand All @@ -1471,6 +1479,10 @@ public void testEpochMicrosTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochMicrosToZonedDateTime(micros, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMicrosToZonedDateTime(micros, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochMicrosToLocalDateTimeUTC(micros));
TestCase.assertNull(DateTimeUtils.epochMicrosToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochMillisTo() {
Expand All @@ -1486,6 +1498,10 @@ public void testEpochMillisTo() {
TestCase.assertEquals(dt3, DateTimeUtils.epochMillisToZonedDateTime(millis, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(NULL_LONG, TZ_JP));
TestCase.assertNull(DateTimeUtils.epochMillisToZonedDateTime(millis, null));

final LocalDateTime ldt = LocalDateTime.ofInstant(dt2, ZoneId.of("UTC"));
TestCase.assertEquals(ldt, DateTimeUtils.epochMillisToLocalDateTimeUTC(millis));
TestCase.assertNull(DateTimeUtils.epochMillisToLocalDateTimeUTC(NULL_LONG));
}

public void testEpochSecondsTo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,23 +236,13 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle
}

if (schemaElement.isSetLogicalType()) {
LogicalType logicalType = schemaElement.logicalType;
if (logicalType.isSetTIMESTAMP()) {
TimestampType timestamp = logicalType.getTIMESTAMP();
if (!timestamp.isAdjustedToUTC) {
// TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps
throw new ParquetFileReaderException(String.format(
"Only UTC timestamp is supported, found time column `%s` with isAdjustedToUTC=false",
schemaElement.getName()));
}
}
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(logicalType));
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(schemaElement.logicalType));
}

if (schemaElement.isSetConverted_type()) {
LogicalTypeAnnotation originalType =
getLogicalTypeAnnotation(schemaElement.converted_type, schemaElement);
LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
final LogicalTypeAnnotation originalType = getLogicalTypeAnnotation(
schemaElement.converted_type, schemaElement.logicalType, schemaElement);
final LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
&& getLogicalTypeAnnotation(schemaElement.logicalType) != null
? getLogicalTypeAnnotation(schemaElement.logicalType)
: null;
Expand Down Expand Up @@ -299,20 +289,20 @@ static LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) throws P
case LIST:
return LogicalTypeAnnotation.listType();
case TIME:
TimeType time = type.getTIME();
final TimeType time = type.getTIME();
return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit));
case STRING:
return LogicalTypeAnnotation.stringType();
case DECIMAL:
DecimalType decimal = type.getDECIMAL();
final DecimalType decimal = type.getDECIMAL();
return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision);
case INTEGER:
IntType integer = type.getINTEGER();
final IntType integer = type.getINTEGER();
return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned);
case UNKNOWN:
return null;
case TIMESTAMP:
TimestampType timestamp = type.getTIMESTAMP();
final TimestampType timestamp = type.getTIMESTAMP();
return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit));
default:
throw new ParquetFileReaderException("Unknown logical type " + type);
Expand Down Expand Up @@ -354,9 +344,9 @@ private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(Colu
return org.apache.parquet.schema.ColumnOrder.undefined();
}

private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type, SchemaElement schemaElement)
throws ParquetFileReaderException {
switch (type) {
private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedType convertedType,
final LogicalType logicalType, final SchemaElement schemaElement) throws ParquetFileReaderException {
switch (convertedType) {
case UTF8:
return LogicalTypeAnnotation.stringType();
case MAP:
Expand All @@ -368,23 +358,23 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
case ENUM:
return LogicalTypeAnnotation.enumType();
case DECIMAL:
int scale = schemaElement == null ? 0 : schemaElement.scale;
int precision = schemaElement == null ? 0 : schemaElement.precision;
final int scale = schemaElement == null ? 0 : schemaElement.scale;
final int precision = schemaElement == null ? 0 : schemaElement.precision;
return LogicalTypeAnnotation.decimalType(scale, precision);
case DATE:
return LogicalTypeAnnotation.dateType();
case TIME_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
// isAdjustedToUTC parameter is ignored while reading Parquet TIME type, so disregard it here
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIME_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
// Converted type doesn't have isAdjustedToUTC parameter, so use the information from logical type
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an ugly hack that I had to do because of the code inside ParquetFileReader::buildChildren which looks like this:

if (schemaElement.isSetConverted_type()) {
    LogicalTypeAnnotation originalType =
            getLogicalTypeAnnotation(schemaElement.converted_type, schemaElement);
    LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType()
            && getLogicalTypeAnnotation(schemaElement.logicalType) != null
                    ? getLogicalTypeAnnotation(schemaElement.logicalType)
                    : null;
    if (!originalType.equals(newOriginalType)) {
        ((Types.Builder) childBuilder).as(originalType);
    }
}

Mainly we are trying to deduce the type of the column from schemaElement.converted_type and schemaElement.logicalType and in case of mismatch, we go with type deduced from schemaElement.converted_type.

The problem is that schemaElement.converted_type doesn't have any information about adjustments to UTC. So to deduce it correctly, I had to read the isAdjustedToUTC from the logical type.

Ideally, I would have liked to change the above code to prefer the type deduced from schemaElement.logicalType over schemaElement.converted_type because converted_type is deprecated and superseded by logicalType (based on the Javadoc for converted_type).
But this would be a much bigger change and impact all parquet types. So I added this hack which just impacts the TIMESTAMP type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the side effect of switching to use the logical type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a test commit here and started the nightly jobs. Let me see how this work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nightly jobs run fine.

return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType),
LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
case INT_8:
Expand All @@ -409,8 +399,21 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type
return LogicalTypeAnnotation.bsonType();
default:
throw new ParquetFileReaderException(
"Can't convert converted type to logical type, unknown converted type " + type);
"Can't convert converted type to logical type, unknown converted type " + convertedType);
}
}

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
private static boolean isAdjustedToUTC(final LogicalType logicalType) {
if (logicalType.getSetField() == LogicalType._Fields.TIMESTAMP) {
return logicalType.getTIMESTAMP().isAdjustedToUTC;
}
return false;
}

public MessageType getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.math.BigInteger;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.*;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -350,15 +351,13 @@ public Optional<Class<?>> visit(final LogicalTypeAnnotation.TimeLogicalTypeAnnot
@Override
public Optional<Class<?>> visit(
final LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
// TODO(deephaven-core#976): Unable to read parquet TimestampLogicalTypeAnnotation that is not adjusted
// to UTC
if (timestampLogicalType.isAdjustedToUTC()) {
switch (timestampLogicalType.getUnit()) {
case MILLIS:
case MICROS:
case NANOS:
return Optional.of(Instant.class);
}
switch (timestampLogicalType.getUnit()) {
case MILLIS:
case MICROS:
case NANOS:
// TIMESTAMP fields if adjusted to UTC are read as Instants, else as LocalDatetimes.
return timestampLogicalType.isAdjustedToUTC() ? Optional.of(Instant.class)
: Optional.of(LocalDateTime.class);
}
errorString.setValue("TimestampLogicalType, isAdjustedToUTC=" + timestampLogicalType.isAdjustedToUTC()
+ ", unit=" + timestampLogicalType.getUnit());
Expand Down
Loading
Loading