Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to read partitioned parquet files from S3 #5206

Merged
merged 41 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7734255
Initial commit
malhotrashivam Feb 28, 2024
c16ac2f
Working state, can be optimized further
malhotrashivam Mar 1, 2024
e8c4cda
Changed interface of flat partitioned reader
malhotrashivam Mar 4, 2024
d15dc6a
Added key value partitioned parquet reader
malhotrashivam Mar 11, 2024
7df60e2
Fixed some issues with partitioned reading
malhotrashivam Mar 13, 2024
d232f39
Fixed reading partitioned data with partitioned column in data
malhotrashivam Mar 14, 2024
f9df8f1
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 18, 2024
3dd9fb9
Minor improvements after rebasing
malhotrashivam Mar 18, 2024
cde1feb
WIP commit
malhotrashivam Mar 19, 2024
2838495
Seperated URI List processing to a separete class
malhotrashivam Mar 22, 2024
9247f2e
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 22, 2024
baad429
Review with Devin part 1
malhotrashivam Mar 26, 2024
4227fd0
Review with Devin contd.
malhotrashivam Mar 27, 2024
5351fb9
Refined some comments
malhotrashivam Mar 27, 2024
814b2e0
Splitting instead of iterating
malhotrashivam Mar 27, 2024
3457070
Reverting some old changes
malhotrashivam Mar 27, 2024
8290e88
More review comments
malhotrashivam Mar 27, 2024
423892e
Added more tests
malhotrashivam Mar 27, 2024
641a439
Added tests for batching
malhotrashivam Mar 28, 2024
953bd02
Refactored KeyValuePartitionedLayout to extract file processing logic
malhotrashivam Mar 28, 2024
5995648
Review comments
malhotrashivam Mar 28, 2024
922ab0a
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 2, 2024
e89cfb7
Review with Devin contd.
malhotrashivam Apr 2, 2024
db809b2
Pending comments
malhotrashivam Apr 2, 2024
649526b
Rephrased comment
malhotrashivam Apr 2, 2024
086c9f4
Minor improvements
malhotrashivam Apr 3, 2024
2939d28
More tweaks for parquet file filtering
malhotrashivam Apr 3, 2024
71a9bdc
Reverting some old changes
malhotrashivam Apr 3, 2024
b412679
Review comments
malhotrashivam Apr 3, 2024
b6876cd
Review with Ryan Part 1
malhotrashivam Apr 8, 2024
11d0b68
Review with Ryan part 2
malhotrashivam Apr 8, 2024
9ec7e3e
Review with Ryan part 3
malhotrashivam Apr 9, 2024
05a294d
Review contd.
malhotrashivam Apr 9, 2024
b9a1b87
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 9, 2024
b0b3022
More review comments resolved
malhotrashivam Apr 9, 2024
1383850
Resolved more comments
malhotrashivam Apr 10, 2024
8c68079
Deprecated all File overloads from ParquetTools
malhotrashivam Apr 10, 2024
78dd6ac
Resolved some javadoc issues
malhotrashivam Apr 10, 2024
6fa3629
Revert "Resolved some javadoc issues"
malhotrashivam Apr 12, 2024
4ab6a31
Revert "Deprecated all File overloads from ParquetTools"
malhotrashivam Apr 12, 2024
79bd133
Tagged the new methods as Deprecated
malhotrashivam Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.ColumnToCodecMappings;
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
Expand Down Expand Up @@ -116,6 +117,24 @@ public static int getDefaultTargetPageSize() {
return defaultTargetPageSize;
}

public enum ParquetFileLayout {
// A single parquet file.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
SINGLE_FILE,

// A single directory of parquet files.
FLAT_PARTITIONED,

// A key-value directory partitioning of parquet files.
KV_PARTITIONED,

// A directory containing a _metadata parquet file and an optional _common_metadata parquet file.
METADATA_PARTITIONED;
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

private static final ParquetFileLayout DEFAULT_FILE_LAYOUT = null;

private static final TableDefinition DEFAULT_TABLE_DEFINITION = null;

private static final boolean DEFAULT_GENERATE_METADATA_FILES = false;

static final String UUID_TOKEN = "{uuid}";
Expand Down Expand Up @@ -178,6 +197,9 @@ public final String getColumnNameFromParquetColumnNameOrDefault(final String par
*/
public abstract boolean generateMetadataFiles();

public abstract ParquetFileLayout getFileLayout();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved

public abstract TableDefinition getTableDefinition();

/**
* @return the base name for partitioned parquet data. Check
Expand Down Expand Up @@ -270,6 +292,16 @@ public boolean generateMetadataFiles() {
public String baseNameForPartitionedParquetData() {
return DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA;
}

@Override
public ParquetFileLayout getFileLayout() {
return DEFAULT_FILE_LAYOUT;
}

@Override
public TableDefinition getTableDefinition() {
return DEFAULT_TABLE_DEFINITION;
}
};

private static class ColumnInstructions {
Expand Down Expand Up @@ -340,6 +372,8 @@ private static final class ReadOnly extends ParquetInstructions {
private final Object specialInstructions;
private final boolean generateMetadataFiles;
private final String baseNameForPartitionedParquetData;
private final ParquetFileLayout fileLayout;
private final TableDefinition tableDefinition;

private ReadOnly(
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructions,
Expand All @@ -352,7 +386,9 @@ private ReadOnly(
final boolean isRefreshing,
final Object specialInstructions,
final boolean generateMetadataFiles,
final String baseNameForPartitionedParquetData) {
final String baseNameForPartitionedParquetData,
final ParquetFileLayout fileLayout,
final TableDefinition tableDefinition) {
this.columnNameToInstructions = columnNameToInstructions;
this.parquetColumnNameToInstructions = parquetColumnNameToColumnName;
this.compressionCodecName = compressionCodecName;
Expand All @@ -364,6 +400,8 @@ private ReadOnly(
this.specialInstructions = specialInstructions;
this.generateMetadataFiles = generateMetadataFiles;
this.baseNameForPartitionedParquetData = baseNameForPartitionedParquetData;
this.fileLayout = fileLayout;
this.tableDefinition = tableDefinition;
}

private String getOrDefault(final String columnName, final String defaultValue,
Expand Down Expand Up @@ -467,6 +505,16 @@ public String baseNameForPartitionedParquetData() {
return baseNameForPartitionedParquetData;
}

@Override
public ParquetFileLayout getFileLayout() {
return fileLayout;
}

@Override
public TableDefinition getTableDefinition() {
return tableDefinition;
}

KeyedObjectHashMap<String, ColumnInstructions> copyColumnNameToInstructions() {
// noinspection unchecked
return (columnNameToInstructions == null)
Expand Down Expand Up @@ -520,6 +568,8 @@ public static class Builder {
private Object specialInstructions;
private boolean generateMetadataFiles = DEFAULT_GENERATE_METADATA_FILES;
private String baseNameForPartitionedParquetData = DEFAULT_BASE_NAME_FOR_PARTITIONED_PARQUET_DATA;
private ParquetFileLayout fileLayout = DEFAULT_FILE_LAYOUT;
private TableDefinition tableDefinition = DEFAULT_TABLE_DEFINITION;

public Builder() {}

Expand Down Expand Up @@ -737,6 +787,31 @@ public Builder setBaseNameForPartitionedParquetData(final String baseNameForPart
return this;
}

/**
* Set the expected file layout when reading a parquet file or a directory. This info can be used to skip some
* computations to deduce the file layout from the source directory structure.
*/
public Builder setFileLayout(final ParquetFileLayout fileLayout) {
this.fileLayout = fileLayout;
return this;
}

/**
* <ul>
* <li>When reading a parquet file, this corresponds to the table definition to use instead of the one implied
* by the parquet file being read. Providing a definition can help save additional computations to deduce the
* table definition from the parquet files as well as from the directory layouts when reading partitioned
* data.</li>
* <li>When writing a parquet file, this corresponds to the table definition to use instead of the one implied
* by the table being written</li>
* </ul>
* This definition can be used to skip some columns or add additional columns with {@code null} values.
*/
public Builder setTableDefinition(final TableDefinition tableDefinition) {
this.tableDefinition = tableDefinition;
return this;
}

public ParquetInstructions build() {
final KeyedObjectHashMap<String, ColumnInstructions> columnNameToInstructionsOut = columnNameToInstructions;
columnNameToInstructions = null;
Expand All @@ -745,7 +820,8 @@ public ParquetInstructions build() {
parquetColumnNameToInstructions = null;
return new ReadOnly(columnNameToInstructionsOut, parquetColumnNameToColumnNameOut, compressionCodecName,
maximumDictionaryKeys, maximumDictionarySize, isLegacyParquet, targetPageSize, isRefreshing,
specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData);
specialInstructions, generateMetadataFiles, baseNameForPartitionedParquetData, fileLayout,
tableDefinition);
}
}

Expand Down
Loading
Loading