Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to read partitioned parquet files from S3 #5206

Merged
merged 41 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7734255
Initial commit
malhotrashivam Feb 28, 2024
c16ac2f
Working state, can be optimized further
malhotrashivam Mar 1, 2024
e8c4cda
Changed interface of flat partitioned reader
malhotrashivam Mar 4, 2024
d15dc6a
Added key value partitioned parquet reader
malhotrashivam Mar 11, 2024
7df60e2
Fixed some issues with partitioned reading
malhotrashivam Mar 13, 2024
d232f39
Fixed reading partitioned data with partitioned column in data
malhotrashivam Mar 14, 2024
f9df8f1
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 18, 2024
3dd9fb9
Minor improvements after rebasing
malhotrashivam Mar 18, 2024
cde1feb
WIP commit
malhotrashivam Mar 19, 2024
2838495
Seperated URI List processing to a separete class
malhotrashivam Mar 22, 2024
9247f2e
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 22, 2024
baad429
Review with Devin part 1
malhotrashivam Mar 26, 2024
4227fd0
Review with Devin contd.
malhotrashivam Mar 27, 2024
5351fb9
Refined some comments
malhotrashivam Mar 27, 2024
814b2e0
Splitting instead of iterating
malhotrashivam Mar 27, 2024
3457070
Reverting some old changes
malhotrashivam Mar 27, 2024
8290e88
More review comments
malhotrashivam Mar 27, 2024
423892e
Added more tests
malhotrashivam Mar 27, 2024
641a439
Added tests for batching
malhotrashivam Mar 28, 2024
953bd02
Refactored KeyValuePartitionedLayout to extract file processing logic
malhotrashivam Mar 28, 2024
5995648
Review comments
malhotrashivam Mar 28, 2024
922ab0a
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 2, 2024
e89cfb7
Review with Devin contd.
malhotrashivam Apr 2, 2024
db809b2
Pending comments
malhotrashivam Apr 2, 2024
649526b
Rephrased comment
malhotrashivam Apr 2, 2024
086c9f4
Minor improvements
malhotrashivam Apr 3, 2024
2939d28
More tweaks for parquet file filtering
malhotrashivam Apr 3, 2024
71a9bdc
Reverting some old changes
malhotrashivam Apr 3, 2024
b412679
Review comments
malhotrashivam Apr 3, 2024
b6876cd
Review with Ryan Part 1
malhotrashivam Apr 8, 2024
11d0b68
Review with Ryan part 2
malhotrashivam Apr 8, 2024
9ec7e3e
Review with Ryan part 3
malhotrashivam Apr 9, 2024
05a294d
Review contd.
malhotrashivam Apr 9, 2024
b9a1b87
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 9, 2024
b0b3022
More review comments resolved
malhotrashivam Apr 9, 2024
1383850
Resolved more comments
malhotrashivam Apr 10, 2024
8c68079
Deprecated all File overloads from ParquetTools
malhotrashivam Apr 10, 2024
78dd6ac
Resolved some javadoc issues
malhotrashivam Apr 10, 2024
6fa3629
Revert "Resolved some javadoc issues"
malhotrashivam Apr 12, 2024
4ab6a31
Revert "Deprecated all File overloads from ParquetTools"
malhotrashivam Apr 12, 2024
79bd133
Tagged the new methods as Deprecated
malhotrashivam Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Predicate;

/**
* {@link SeekableChannelsProvider Channel provider} that will cache a bounded number of unused channels.
Expand Down Expand Up @@ -109,6 +110,12 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole
// end no matter what.
}

@Override
public List<URI> getChildURIListFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
throws IOException {
return wrappedProvider.getChildURIListFromDirectory(directoryURI, uriFilter);
}

@Nullable
private synchronized CachedChannel tryGetPooledChannel(@NotNull final String pathKey,
@NotNull final KeyedObjectHashMap<String, PerPathPool> channelPool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.function.Predicate;

public interface SeekableChannelsProvider extends SafeCloseable {

Expand Down Expand Up @@ -106,4 +108,17 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
}

SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException;

/**
* Get a list of URIs of files contained inside the given directory. The URIs are expected to be the direct children
* of the directory, and not recursively include files from subdirectories.
*
* @param directoryURI The URI of the directory to list
* @param uriFilter A filter to apply to the URIs in the directory
*/
default List<URI> getChildURIListFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
throws IOException {
throw new UnsupportedOperationException("TODO Add this support for all providers");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ public static Table readKeyValuePartitionedTable(
* Callers wishing to be more explicit and skip the inference step may prefer to call
* {@link #readFlatPartitionedTable(File, ParquetInstructions, TableDefinition)}.
*
* @param directory the source of {@link ParquetTableLocationKey location keys} to include
* @param directory the directory to search for .parquet files
* @param readInstructions the instructions for customizations while reading
* @return the table
* @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions)
Expand All @@ -926,11 +926,32 @@ public static Table readFlatPartitionedTable(
return readPartitionedTable(new ParquetFlatPartitionedLayout(directory, readInstructions), readInstructions);
}

/**
* Creates a partitioned table via the flat parquet files from the root {@code directory}, inferring the table
* definition from those files.
*
* <p>
* Callers wishing to be more explicit and skip the inference step may prefer to call
* {@link #readFlatPartitionedTable(File, ParquetInstructions, TableDefinition)}.
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
*
* @param directory the path or URI for the directory to search for .parquet files
* @param readInstructions the instructions for customizations while reading
* @return the table
* @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions)
* @see ParquetFlatPartitionedLayout#ParquetFlatPartitionedLayout(File, ParquetInstructions)
*/
public static Table readFlatPartitionedTable(
@NotNull final String directory,
@NotNull final ParquetInstructions readInstructions) {
return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory), readInstructions),
readInstructions);
}

/**
* Creates a partitioned table via the flat parquet files from the root {@code directory} using the provided
* {@code tableDefinition}.
*
* @param directory the source of {@link ParquetTableLocationKey location keys} to include
* @param directory the directory to search for .parquet files
* @param readInstructions the instructions for customizations while reading
* @param tableDefinition the table definition
* @return the table
Expand All @@ -945,6 +966,25 @@ public static Table readFlatPartitionedTable(
tableDefinition);
}

/**
* Creates a partitioned table via the flat parquet files from the root {@code directory} using the provided
* {@code tableDefinition}.
*
* @param directory the path or URI for the directory to search for .parquet files
* @param readInstructions the instructions for customizations while reading
* @param tableDefinition the table definition
* @return the table
* @see #readPartitionedTable(TableLocationKeyFinder, ParquetInstructions, TableDefinition)
* @see ParquetFlatPartitionedLayout#ParquetFlatPartitionedLayout(File, ParquetInstructions)
*/
public static Table readFlatPartitionedTable(
@NotNull final String directory,
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory), readInstructions),
readInstructions, tableDefinition);
}

/**
* Creates a single table via the parquet {@code file} using the table definition derived from that {@code file}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.deephaven.parquet.table.ParquetTableWriter;

import java.net.URI;
import java.nio.file.Path;

final class ParquetFileHelper {
Expand All @@ -12,4 +13,11 @@ static boolean fileNameMatches(final Path path) {
final String fileName = path.getFileName().toString();
return fileName.endsWith(ParquetTableWriter.PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.';
}

static boolean fileNameMatches(final URI uri) {
// TODO Test this for file paths
final String path = uri.getPath();
final String fileName = path.substring(path.lastIndexOf('/') + 1);
return fileName.endsWith(ParquetTableWriter.PARQUET_FILE_EXTENSION) && fileName.charAt(0) != '.';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderLoader;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

Expand All @@ -23,21 +24,30 @@
*/
public final class ParquetFlatPartitionedLayout implements TableLocationKeyFinder<ParquetTableLocationKey> {

private static ParquetTableLocationKey locationKey(Path path, @NotNull final ParquetInstructions readInstructions) {
return new ParquetTableLocationKey(path.toFile(), 0, null, readInstructions);
private static ParquetTableLocationKey locationKey(URI uri, @NotNull final ParquetInstructions readInstructions) {
return new ParquetTableLocationKey(uri, 0, null, readInstructions);
}

private final File tableRootDirectory;
private final Map<Path, ParquetTableLocationKey> cache;
private final URI tableRootDirectory;
private final Map<URI, ParquetTableLocationKey> cache;
private final ParquetInstructions readInstructions;

/**
* @param tableRootDirectory The directory to search for .parquet files.
* @param tableRootDirectoryFile The directory to search for .parquet files.
* @param readInstructions the instructions for customizations while reading
*/
public ParquetFlatPartitionedLayout(@NotNull final File tableRootDirectory,
public ParquetFlatPartitionedLayout(@NotNull final File tableRootDirectoryFile,
@NotNull final ParquetInstructions readInstructions) {
this.tableRootDirectory = tableRootDirectory;
this(tableRootDirectoryFile.toURI(), readInstructions);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @param tableRootDirectoryURI The directory URI to search for .parquet files.
* @param readInstructions the instructions for customizations while reading
*/
public ParquetFlatPartitionedLayout(@NotNull final URI tableRootDirectoryURI,
@NotNull final ParquetInstructions readInstructions) {
this.tableRootDirectory = tableRootDirectoryURI;
this.cache = new HashMap<>();
this.readInstructions = readInstructions;
}
Expand All @@ -47,19 +57,25 @@ public String toString() {
}

@Override
public synchronized void findKeys(@NotNull final Consumer<ParquetTableLocationKey> locationKeyObserver) {
try (final DirectoryStream<Path> parquetFileStream =
Files.newDirectoryStream(tableRootDirectory.toPath(), ParquetFileHelper::fileNameMatches)) {
for (final Path parquetFilePath : parquetFileStream) {
ParquetTableLocationKey locationKey = cache.get(parquetFilePath);
if (locationKey == null) {
locationKey = locationKey(parquetFilePath, readInstructions);
if (!locationKey.verifyFileReader()) {
continue;
public void findKeys(@NotNull final Consumer<ParquetTableLocationKey> locationKeyObserver) {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
tableRootDirectory, readInstructions.getSpecialInstructions());
try {
final List<URI> parquetURIs = provider.getChildURIListFromDirectory(tableRootDirectory,
ParquetFileHelper::fileNameMatches);
synchronized (this) {
// Iterate over the URI stream and add the location keys to the cache
parquetURIs.forEach(parquetFileURI -> {
ParquetTableLocationKey locationKey = cache.get(parquetFileURI);
if (locationKey == null) {
locationKey = locationKey(parquetFileURI, readInstructions);
if (!locationKey.verifyFileReader()) {
return;
}
cache.put(parquetFileURI, locationKey);
}
cache.put(parquetFilePath, locationKey);
}
locationKeyObserver.accept(locationKey);
locationKeyObserver.accept(locationKey);
});
}
} catch (final IOException e) {
throw new TableDataException("Error finding parquet locations under " + tableRootDirectory, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@
* Parquet {@link TableLocationKeyFinder location finder} that will discover a single file.
*/
public final class ParquetSingleFileLayout implements TableLocationKeyFinder<ParquetTableLocationKey> {
private final URI parquetFileUri;
private final URI parquetFileURI;
private final ParquetInstructions readInstructions;

/**
* @param parquetFileUri URI of single parquet file to find
* @param parquetFileURI URI of single parquet file to find
* @param readInstructions the instructions for customizations while reading
*/
public ParquetSingleFileLayout(@NotNull final URI parquetFileUri,
public ParquetSingleFileLayout(@NotNull final URI parquetFileURI,
@NotNull final ParquetInstructions readInstructions) {
this.parquetFileUri = parquetFileUri;
this.parquetFileURI = parquetFileURI;
this.readInstructions = readInstructions;
}

public String toString() {
return ParquetSingleFileLayout.class.getSimpleName() + '[' + parquetFileUri + ']';
return ParquetSingleFileLayout.class.getSimpleName() + '[' + parquetFileURI + ']';
}

@Override
public void findKeys(@NotNull final Consumer<ParquetTableLocationKey> locationKeyObserver) {
locationKeyObserver.accept(new ParquetTableLocationKey(parquetFileUri, 0, null, readInstructions));
locationKeyObserver.accept(new ParquetTableLocationKey(parquetFileURI, 0, null, readInstructions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,25 @@ public void readSampleParquetFilesFromS3Test2() {
readInstructions, tableDefinition).select();
}

@Test
public void readFlatPartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_S3_TESTING);
final S3Instructions s3Instructions = S3Instructions.builder()
.regionName("us-east-1")
.readAheadCount(1)
.fragmentSize(5 * 1024 * 1024)
.maxConcurrentRequests(50)
.maxCacheSize(32)
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.defaultCredentials())
.build();
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(s3Instructions)
.build();
ParquetTools.readFlatPartitionedTable("s3://dh-s3-parquet-test1/flatPartitionedParquet/",
readInstructions).select();
}

@Test
public void stringDictionaryTest() {
final int nullPos = -5;
Expand Down
1 change: 0 additions & 1 deletion extensions/s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ dependencies {
implementation platform('software.amazon.awssdk:bom:2.23.19')
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:aws-crt-client'
//implementation 'software.amazon.awssdk:netty-nio-client'

compileOnly depAnnotations

Expand Down
Loading
Loading