-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug fixes and improved support for Parquet TIMESTAMP #4801
Changes from 4 commits
9b73db5
a48ea8d
9932f33
257da90
69b4bfd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -236,23 +236,13 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle | |
} | ||
|
||
if (schemaElement.isSetLogicalType()) { | ||
LogicalType logicalType = schemaElement.logicalType; | ||
if (logicalType.isSetTIMESTAMP()) { | ||
TimestampType timestamp = logicalType.getTIMESTAMP(); | ||
if (!timestamp.isAdjustedToUTC) { | ||
// TODO(deephaven-core#976): Unable to read non UTC adjusted timestamps | ||
throw new ParquetFileReaderException(String.format( | ||
"Only UTC timestamp is supported, found time column `%s` with isAdjustedToUTC=false", | ||
schemaElement.getName())); | ||
} | ||
} | ||
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(logicalType)); | ||
((Types.Builder) childBuilder).as(getLogicalTypeAnnotation(schemaElement.logicalType)); | ||
} | ||
|
||
if (schemaElement.isSetConverted_type()) { | ||
LogicalTypeAnnotation originalType = | ||
getLogicalTypeAnnotation(schemaElement.converted_type, schemaElement); | ||
LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType() | ||
final LogicalTypeAnnotation originalType = getLogicalTypeAnnotation( | ||
schemaElement.converted_type, schemaElement.logicalType, schemaElement); | ||
final LogicalTypeAnnotation newOriginalType = schemaElement.isSetLogicalType() | ||
&& getLogicalTypeAnnotation(schemaElement.logicalType) != null | ||
? getLogicalTypeAnnotation(schemaElement.logicalType) | ||
: null; | ||
|
@@ -299,20 +289,20 @@ static LogicalTypeAnnotation getLogicalTypeAnnotation(LogicalType type) throws P | |
case LIST: | ||
return LogicalTypeAnnotation.listType(); | ||
case TIME: | ||
TimeType time = type.getTIME(); | ||
final TimeType time = type.getTIME(); | ||
return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit)); | ||
case STRING: | ||
return LogicalTypeAnnotation.stringType(); | ||
case DECIMAL: | ||
DecimalType decimal = type.getDECIMAL(); | ||
final DecimalType decimal = type.getDECIMAL(); | ||
return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision); | ||
case INTEGER: | ||
IntType integer = type.getINTEGER(); | ||
final IntType integer = type.getINTEGER(); | ||
return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned); | ||
case UNKNOWN: | ||
return null; | ||
case TIMESTAMP: | ||
TimestampType timestamp = type.getTIMESTAMP(); | ||
final TimestampType timestamp = type.getTIMESTAMP(); | ||
return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit)); | ||
default: | ||
throw new ParquetFileReaderException("Unknown logical type " + type); | ||
|
@@ -354,9 +344,9 @@ private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(Colu | |
return org.apache.parquet.schema.ColumnOrder.undefined(); | ||
} | ||
|
||
private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type, SchemaElement schemaElement) | ||
throws ParquetFileReaderException { | ||
switch (type) { | ||
private static LogicalTypeAnnotation getLogicalTypeAnnotation(final ConvertedType convertedType, | ||
final LogicalType logicalType, final SchemaElement schemaElement) throws ParquetFileReaderException { | ||
switch (convertedType) { | ||
case UTF8: | ||
return LogicalTypeAnnotation.stringType(); | ||
case MAP: | ||
|
@@ -368,23 +358,23 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type | |
case ENUM: | ||
return LogicalTypeAnnotation.enumType(); | ||
case DECIMAL: | ||
int scale = schemaElement == null ? 0 : schemaElement.scale; | ||
int precision = schemaElement == null ? 0 : schemaElement.precision; | ||
final int scale = schemaElement == null ? 0 : schemaElement.scale; | ||
final int precision = schemaElement == null ? 0 : schemaElement.precision; | ||
return LogicalTypeAnnotation.decimalType(scale, precision); | ||
case DATE: | ||
return LogicalTypeAnnotation.dateType(); | ||
case TIME_MILLIS: | ||
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC | ||
// isAdjustedToUTC parameter is ignored while reading Parquet TIME type, so disregard it here | ||
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); | ||
case TIME_MICROS: | ||
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC | ||
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS); | ||
case TIMESTAMP_MILLIS: | ||
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC | ||
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); | ||
// Converted type doesn't have isAdjustedToUTC parameter, so use the information from logical type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an ugly hack that I had to do because of the code inside
Mainly we are trying to deduce the type of the column from The problem is that Ideally, I would have liked to change the above code to prefer the type deduced from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would be the side effect of switching to use the logical type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added a test commit here and started the nightly jobs. Let me see how this work. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The nightly jobs run fine. |
||
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType), | ||
LogicalTypeAnnotation.TimeUnit.MILLIS); | ||
case TIMESTAMP_MICROS: | ||
// TODO(deephaven-core#976) Assuming that time is adjusted to UTC | ||
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS); | ||
return LogicalTypeAnnotation.timestampType(isAdjustedToUTC(logicalType), | ||
LogicalTypeAnnotation.TimeUnit.MICROS); | ||
case INTERVAL: | ||
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); | ||
case INT_8: | ||
|
@@ -409,8 +399,21 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(ConvertedType type | |
return LogicalTypeAnnotation.bsonType(); | ||
default: | ||
throw new ParquetFileReaderException( | ||
"Can't convert converted type to logical type, unknown converted type " + type); | ||
"Can't convert converted type to logical type, unknown converted type " + convertedType); | ||
} | ||
} | ||
|
||
/** | ||
* Helper method to determine if a logical type is adjusted to UTC. | ||
* | ||
* @param logicalType the logical type to check | ||
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise | ||
*/ | ||
private static boolean isAdjustedToUTC(final LogicalType logicalType) { | ||
if (logicalType.getSetField() == LogicalType._Fields.TIMESTAMP) { | ||
return logicalType.getTIMESTAMP().isAdjustedToUTC; | ||
} | ||
return false; | ||
} | ||
|
||
public MessageType getSchema() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case my comments seem strong, this library is very exposed to users, so it needs to be ultra currated. Functions should only get added when there is a compelling reason.
LocalDateTime
,LocalDateTime
signatures should be added to all relevant methods.