Skip to content

Commit

Permalink
fix: Use parquet converted type only if logical type not set (#5997)
Browse files Browse the repository at this point in the history
If a column in a parquet file has both logical type and converted type set, we should prioritize logical type over converted type.
  • Loading branch information
malhotrashivam committed Aug 30, 2024
1 parent d7034dd commit 9383c3e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 62 deletions.
24 changes: 9 additions & 15 deletions extensions/iceberg/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ dependencies {
runtimeOnly libs.awssdk.sts
runtimeOnly libs.awssdk.glue

testImplementation libs.junit4

testImplementation project(':engine-test-utils')

testImplementation libs.testcontainers
testImplementation libs.testcontainers.junit.jupiter
testImplementation libs.testcontainers.localstack
Expand All @@ -39,20 +43,10 @@ dependencies {
testRuntimeOnly libs.slf4j.simple
}

test {
useJUnitPlatform {
excludeTags("testcontainers")
}
}
TestTools.addEngineOutOfBandTest(project)

tasks.register('testOutOfBand', Test) {
useJUnitPlatform {
includeTags("testcontainers")
}
testOutOfBand.dependsOn Docker.registryTask(project, 'localstack')
testOutOfBand.systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')

dependsOn Docker.registryTask(project, 'localstack')
systemProperty 'testcontainers.localstack.image', Docker.localImageName('localstack')

dependsOn Docker.registryTask(project, 'minio')
systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
}
testOutOfBand.dependsOn Docker.registryTask(project, 'minio')
testOutOfBand.systemProperty 'testcontainers.minio.image', Docker.localImageName('minio')
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,21 @@
//
package io.deephaven.iceberg.util;


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class IcebergLocalStackTest extends IcebergToolsTest {

@BeforeAll
static void initContainer() {
@BeforeClass
public static void initContainer() {
// ensure container is started so container startup time isn't associated with a specific test
SingletonContainers.LocalStack.init();
}

@Override
public Builder s3Instructions(Builder builder) {
public Builder s3Instructions(final Builder builder) {
return SingletonContainers.LocalStack.s3Instructions(builder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,25 @@
//
package io.deephaven.iceberg.util;


import io.deephaven.extensions.s3.S3Instructions.Builder;
import io.deephaven.extensions.s3.testlib.SingletonContainers;
import io.deephaven.stats.util.OSUtil;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.BeforeClass;
import software.amazon.awssdk.services.s3.S3AsyncClient;

@Tag("testcontainers")
public class IcebergMinIOTest extends IcebergToolsTest {

@BeforeAll
static void initContainer() {
@BeforeClass
public static void initContainer() {
// TODO(deephaven-core#5116): MinIO testcontainers does not work on OS X
Assumptions.assumeFalse(OSUtil.runningMacOS(), "OSUtil.runningMacOS()");
// ensure container is started so container startup time isn't associated with a specific test
SingletonContainers.MinIO.init();
}

@Override
public Builder s3Instructions(Builder builder) {
public Builder s3Instructions(final Builder builder) {
return SingletonContainers.MinIO.s3Instructions(builder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.TestCatalog.IcebergTestCatalog;
import io.deephaven.iceberg.TestCatalog.IcebergTestFileIO;
import io.deephaven.test.types.OutOfBandTest;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
Expand Down Expand Up @@ -47,6 +51,7 @@
import static io.deephaven.iceberg.util.IcebergCatalogAdapter.SNAPSHOT_DEFINITION;
import static io.deephaven.iceberg.util.IcebergCatalogAdapter.TABLES_DEFINITION;

@Category(OutOfBandTest.class)
public abstract class IcebergToolsTest {

private static final TableDefinition SALES_SINGLE_DEFINITION = TableDefinition.of(
Expand Down Expand Up @@ -110,8 +115,11 @@ public abstract class IcebergToolsTest {
private Catalog resourceCatalog;
private FileIO resourceFileIO;

@BeforeEach
void setUp() throws ExecutionException, InterruptedException {
@Rule
public final EngineCleanup framework = new EngineCleanup();

@Before
public void setUp() throws ExecutionException, InterruptedException {
bucket = "warehouse";
asyncClient = s3AsyncClient();
asyncClient.createBucket(CreateBucketRequest.builder().bucket(bucket).build()).get();
Expand All @@ -129,6 +137,16 @@ void setUp() throws ExecutionException, InterruptedException {
.build();
}

@After
public void tearDown() throws ExecutionException, InterruptedException {
for (String key : keys) {
asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get();
}
keys.clear();
asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get();
asyncClient.close();
}

private void uploadParquetFiles(final File root, final String prefixToRemove)
throws ExecutionException, InterruptedException, TimeoutException {
for (final File file : root.listFiles()) {
Expand Down Expand Up @@ -175,16 +193,6 @@ private void uploadSalesRenamed() throws ExecutionException, InterruptedExceptio
warehousePath);
}

@AfterEach
public void tearDown() throws ExecutionException, InterruptedException {
for (String key : keys) {
asyncClient.deleteObject(DeleteObjectRequest.builder().bucket(bucket).key(key).build()).get();
}
keys.clear();
asyncClient.deleteBucket(DeleteBucketRequest.builder().bucket(bucket).build()).get();
asyncClient.close();
}

@Test
public void testListNamespaces() {
final IcebergCatalogAdapter adapter = IcebergTools.createAdapter(resourceCatalog, resourceFileIO);
Expand Down Expand Up @@ -805,7 +813,7 @@ public void testOpenAllTypesTable() throws ExecutionException, InterruptedExcept
final TableIdentifier tableId = TableIdentifier.of(ns, "all_types");

// Verify we retrieved all the rows.
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions);
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, instructions).select();
Assert.eq(table.size(), "table.size()", 10, "10 rows in the table");
Assert.equals(table.getDefinition(), "table.getDefinition()", ALL_TYPES_DEF);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,20 +297,13 @@ private static void buildChildren(Types.GroupBuilder builder, Iterator<SchemaEle
columnOrders, columnCount);
}

final LogicalTypeAnnotation logicalType;
if (schemaElement.isSetLogicalType()) {
logicalType = getLogicalTypeAnnotation(schemaElement.logicalType);
final LogicalTypeAnnotation logicalType = getLogicalTypeAnnotation(schemaElement.logicalType);
((Types.Builder) childBuilder).as(logicalType);
} else {
logicalType = null;
}

if (schemaElement.isSetConverted_type()) {
final LogicalTypeAnnotation originalType = getLogicalTypeAnnotation(
} else if (schemaElement.isSetConverted_type()) {
final LogicalTypeAnnotation logicalType = getLogicalTypeFromConvertedType(
schemaElement.converted_type, schemaElement);
if (!originalType.equals(logicalType)) {
((Types.Builder) childBuilder).as(originalType);
}
((Types.Builder) childBuilder).as(logicalType);
}

if (schemaElement.isSetField_id()) {
Expand Down Expand Up @@ -408,7 +401,13 @@ private static org.apache.parquet.schema.ColumnOrder fromParquetColumnOrder(Colu
return org.apache.parquet.schema.ColumnOrder.undefined();
}

private static LogicalTypeAnnotation getLogicalTypeAnnotation(
/**
* This method will convert the {@link ConvertedType} to a {@link LogicalTypeAnnotation} and should only be called
* if the logical type is not set in the schema element.
*
* @see <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md">Reference for conversions</a>
*/
private static LogicalTypeAnnotation getLogicalTypeFromConvertedType(
final ConvertedType convertedType,
final SchemaElement schemaElement) throws ParquetFileReaderException {
switch (convertedType) {
Expand All @@ -429,17 +428,12 @@ private static LogicalTypeAnnotation getLogicalTypeAnnotation(
case DATE:
return LogicalTypeAnnotation.dateType();
case TIME_MILLIS:
// isAdjustedToUTC parameter is ignored while reading Parquet TIME type, so disregard it here
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIME_MICROS:
return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case TIMESTAMP_MILLIS:
// TIMESTAMP_MILLIS is always adjusted to UTC
// ref: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
case TIMESTAMP_MICROS:
// TIMESTAMP_MICROS is always adjusted to UTC
// ref: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
case INTERVAL:
return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
Expand Down

0 comments on commit 9383c3e

Please sign in to comment.