Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to read partitioned parquet files from S3 #5206

Merged
merged 41 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7734255
Initial commit
malhotrashivam Feb 28, 2024
c16ac2f
Working state, can be optimized further
malhotrashivam Mar 1, 2024
e8c4cda
Changed interface of flat partitioned reader
malhotrashivam Mar 4, 2024
d15dc6a
Added key value partitioned parquet reader
malhotrashivam Mar 11, 2024
7df60e2
Fixed some issues with partitioned reading
malhotrashivam Mar 13, 2024
d232f39
Fixed reading partitioned data with partitioned column in data
malhotrashivam Mar 14, 2024
f9df8f1
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 18, 2024
3dd9fb9
Minor improvements after rebasing
malhotrashivam Mar 18, 2024
cde1feb
WIP commit
malhotrashivam Mar 19, 2024
2838495
Seperated URI List processing to a separete class
malhotrashivam Mar 22, 2024
9247f2e
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 22, 2024
baad429
Review with Devin part 1
malhotrashivam Mar 26, 2024
4227fd0
Review with Devin contd.
malhotrashivam Mar 27, 2024
5351fb9
Refined some comments
malhotrashivam Mar 27, 2024
814b2e0
Splitting instead of iterating
malhotrashivam Mar 27, 2024
3457070
Reverting some old changes
malhotrashivam Mar 27, 2024
8290e88
More review comments
malhotrashivam Mar 27, 2024
423892e
Added more tests
malhotrashivam Mar 27, 2024
641a439
Added tests for batching
malhotrashivam Mar 28, 2024
953bd02
Refactored KeyValuePartitionedLayout to extract file processing logic
malhotrashivam Mar 28, 2024
5995648
Review comments
malhotrashivam Mar 28, 2024
922ab0a
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 2, 2024
e89cfb7
Review with Devin contd.
malhotrashivam Apr 2, 2024
db809b2
Pending comments
malhotrashivam Apr 2, 2024
649526b
Rephrased comment
malhotrashivam Apr 2, 2024
086c9f4
Minor improvements
malhotrashivam Apr 3, 2024
2939d28
More tweaks for parquet file filtering
malhotrashivam Apr 3, 2024
71a9bdc
Reverting some old changes
malhotrashivam Apr 3, 2024
b412679
Review comments
malhotrashivam Apr 3, 2024
b6876cd
Review with Ryan Part 1
malhotrashivam Apr 8, 2024
11d0b68
Review with Ryan part 2
malhotrashivam Apr 8, 2024
9ec7e3e
Review with Ryan part 3
malhotrashivam Apr 9, 2024
05a294d
Review contd.
malhotrashivam Apr 9, 2024
b9a1b87
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 9, 2024
b0b3022
More review comments resolved
malhotrashivam Apr 9, 2024
1383850
Resolved more comments
malhotrashivam Apr 10, 2024
8c68079
Deprecated all File overloads from ParquetTools
malhotrashivam Apr 10, 2024
78dd6ac
Resolved some javadoc issues
malhotrashivam Apr 10, 2024
6fa3629
Revert "Resolved some javadoc issues"
malhotrashivam Apr 12, 2024
4ab6a31
Revert "Deprecated all File overloads from ParquetTools"
malhotrashivam Apr 12, 2024
79bd133
Tagged the new methods as Deprecated
malhotrashivam Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,16 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
if (source.isEmpty()) {
throw new IllegalArgumentException("Cannot convert empty source to URI");
}
final URI uri;
URI uri;
try {
uri = new URI(source);
// Replace two or more consecutive slashes in the path with a single slash
final String path = uri.getPath();
if (path.contains("//")) {
final String canonicalizedPath = uri.getPath().replaceAll("//+", "/");
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath,
uri.getQuery(), uri.getFragment());
}
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return convertToURI(new File(source), isDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
* Returns a stream of URIs, the elements of which are the entries in the directory. The listing is non-recursive.
* Note that the URIs supplied by the stream will be file URIs (not ending with "/") irrespective of whether the URI
* corresponds to a file or a directory. The caller should manage file vs. directory handling in the processor.
* Also, the caller is responsible for closing the stream, preferably using a try-with-resources block.
*
* @apiNote This method must be used within a try-with-resources statement or similar control structure to ensure
* that the stream's open resources are closed promptly after the stream's operations have completed.
*
* @param directory the URI of the directory to list
* @return The {@link Stream} of {@link URI}s
Expand All @@ -104,10 +103,7 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
* Returns a stream of URIs, the elements of which are all the files in the file tree rooted at the given starting
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* directory. Note that the URIs supplied by the stream will be file URIs (not ending with "/") irrespective of
* whether the URI corresponds to a file or a directory. The caller should manage file vs. directory handling in the
* processor.
*
* @apiNote This method must be used within a try-with-resources statement or similar control structure to ensure
* that the stream's open resources are closed promptly after the stream's operations have completed.
* processor. Also, the caller is responsible for closing the stream, preferably using a try-with-resources block.
*
* @param directory the URI of the directory to walk
* @return The {@link Stream} of {@link URI}s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -35,7 +34,7 @@ public abstract class URIStreamKeyValuePartitionLayout<TLK extends TableLocation
private static final String URI_SEPARATOR = "/";

protected final URI tableRootDirectory;
private final Supplier<KeyValuePartitionLayout.LocationTableBuilder> locationTableBuilderFactory;
private final Supplier<LocationTableBuilder> locationTableBuilderFactory;
private final int maxPartitioningLevels;

/**
Expand All @@ -47,7 +46,7 @@ public abstract class URIStreamKeyValuePartitionLayout<TLK extends TableLocation
* @param maxPartitioningLevels Maximum partitioning levels to traverse. Must be {@code >= 0}. {@code 0} means only
* look at files in {@code tableRootDirectory} and find no partitions.
*/
public URIStreamKeyValuePartitionLayout(
protected URIStreamKeyValuePartitionLayout(
@NotNull final URI tableRootDirectory,
@NotNull final Supplier<LocationTableBuilder> locationTableBuilderFactory,
@NotNull final BiFunction<URI, Map<String, Comparable<?>>, TLK> keyFactory,
Expand Down Expand Up @@ -78,10 +77,6 @@ protected final void findKeys(@NotNull final Stream<URI> uriStream,
getPartitions(relativePath, partitionKeys, partitionValues, registered.booleanValue());
if (registered.isFalse()) {
// Use the first path to find the partition keys and use the same for the rest
if (partitionKeys.size() > maxPartitioningLevels) {
throw new TableDataException("Too many partitioning levels at " + uri + ", count = " +
partitionKeys.size() + ", maximum expected are " + maxPartitioningLevels);
}
locationTableBuilder.registerPartitionKeys(partitionKeys);
registered.setTrue();
}
Expand All @@ -99,34 +94,33 @@ private void getPartitions(@NotNull final URI relativePath,
final boolean registered) {
final Set<String> takenNames = new HashSet<>();
final String relativePathString = relativePath.getPath();
int partitioningColumnIndex = 0;
// Split the path to get the subdirectory names
final String[] subDirs = relativePathString.split(URI_SEPARATOR);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
for (int sdi = 0; sdi < subDirs.length - 1; sdi++) {
final String dirName = subDirs[sdi];
if (dirName.isEmpty()) {
// Ignore empty directory names
continue;
final int numPartitioningCol = subDirs.length - 1;
if (!registered) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
if (numPartitioningCol > maxPartitioningLevels) {
throw new TableDataException("Too many partitioning levels at " + relativePathString + ", count = " +
numPartitioningCol + ", maximum expected are " + maxPartitioningLevels);
}
} else {
if (numPartitioningCol > partitionKeys.size()) {
throw new TableDataException("Too many partitioning levels at " + relativePathString + " (expected "
+ partitionKeys.size() + ") based on earlier leaf nodes in the tree.");
}
}
for (int partitioningColIndex = 0; partitioningColIndex < numPartitioningCol; partitioningColIndex++) {
final String dirName = subDirs[partitioningColIndex];
final String[] components = dirName.split("=", 2);
if (components.length != 2) {
throw new TableDataException("Unexpected directory name format (not key=value) at "
+ new File(tableRootDirectory.getPath(), relativePathString));
+ relativePathString);
}
// We use an empty set to allow duplicate partition keys across files
final String columnKey = NameValidator.legalizeColumnName(components[0], takenNames);
takenNames.add(columnKey);
if (registered) {
// We have already seen another leaf node in the tree, so compare the partitioning levels against the
// previous ones
if (partitioningColumnIndex >= partitionKeys.size()) {
throw new TableDataException("Too many partitioning levels at " + relativePathString + " (expected "
+ partitionKeys.size() + ") based on earlier leaf nodes in the tree.");
}
if (!partitionKeys.get(partitioningColumnIndex).equals(columnKey)) {
if (!partitionKeys.get(partitioningColIndex).equals(columnKey)) {
throw new TableDataException(String.format(
"Column name mismatch at column index %d: expected %s found %s at %s",
partitioningColumnIndex, partitionKeys.get(partitioningColumnIndex), columnKey,
partitioningColIndex, partitionKeys.get(partitioningColIndex), columnKey,
relativePathString));
}
} else {
Expand All @@ -135,7 +129,6 @@ private void getPartitions(@NotNull final URI relativePath,
}
final String columnValue = components[1];
partitionValues.add(columnValue);
partitioningColumnIndex++;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ private static Table readTableInternal(
}
if (source.endsWith(METADATA_FILE_NAME) || source.endsWith(COMMON_METADATA_FILE_NAME)) {
throw new UncheckedDeephavenException("We currently do not support reading parquet metadata files " +
"from non local file systems");
"from non local storage");
}
// Both flat partitioned and key-value partitioned data can be read under key-value partitioned layout
return readPartitionedTable(new ParquetKeyValuePartitionedLayout(sourceURI, MAX_PARTITIONING_LEVELS_INFERENCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,18 @@ public String toString() {

@Override
public void findKeys(@NotNull final Consumer<ParquetTableLocationKey> locationKeyObserver) {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
final Predicate<URI> uriFilter;
if (FILE_URI_SCHEME.equals(tableRootDirectory.getScheme())) {
uriFilter = uri -> {
final String filename = new File(uri).getName();
return filename.endsWith(ParquetUtils.PARQUET_FILE_EXTENSION) && filename.charAt(0) != '.';
};
} else {
uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION);
}
try (final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
tableRootDirectory, readInstructions.getSpecialInstructions());
final boolean isFileURI = FILE_URI_SCHEME.equals(tableRootDirectory.getScheme());
try (final Stream<URI> stream = provider.list(tableRootDirectory)) {
final Predicate<URI> uriFilter;
if (isFileURI) {
uriFilter = uri -> {
final String filename = new File(uri).getName();
return filename.endsWith(ParquetUtils.PARQUET_FILE_EXTENSION) && filename.charAt(0) != '.';
};
} else {
uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION);
}
final Stream<URI> stream = provider.list(tableRootDirectory)) {
stream.filter(uriFilter).forEach(uri -> {
synchronized (ParquetFlatPartitionedLayout.this) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
ParquetTableLocationKey locationKey = cache.get(uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.engine.table.impl.locations.local.LocationTableBuilderDefinition;
import io.deephaven.engine.table.impl.locations.local.URIStreamKeyValuePartitionLayout;
import io.deephaven.engine.table.impl.locations.local.KeyValuePartitionLayout;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
Expand All @@ -32,11 +33,11 @@
import static io.deephaven.parquet.base.ParquetUtils.isVisibleParquetFile;

/**
* Key-Value partitioned layout for Parquet data.
* {@link KeyValuePartitionLayout} for Parquet data.
*
* @implNote
* <ul>
* <li>Unless table definition is provided, type inference for partitioning column uses
* <li>Unless a {@link TableDefinition} is provided, type inference for partitioning column uses
* {@link CsvTools#readCsv(java.io.InputStream) CsvTools.readCsv} as a conversion tool, and hence follows the
* same rules.</li>
* <li>Column names will be legalized via {@link NameValidator#legalizeColumnName(String, Set)
Expand Down Expand Up @@ -87,17 +88,16 @@ public ParquetKeyValuePartitionedLayout(

@Override
public final void findKeys(@NotNull final Consumer<ParquetTableLocationKey> locationKeyObserver) {
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
final Predicate<URI> uriFilter;
if (FILE_URI_SCHEME.equals(tableRootDirectory.getScheme())) {
final Path rootDir = Path.of(tableRootDirectory);
uriFilter = uri -> isVisibleParquetFile(rootDir, Path.of(uri));
} else {
uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION);
}
try (final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
tableRootDirectory, readInstructions.getSpecialInstructions());
final boolean isFileURI = FILE_URI_SCHEME.equals(tableRootDirectory.getScheme());
try (final Stream<URI> uriStream = provider.walk(tableRootDirectory)) {
final Predicate<URI> uriFilter;
if (isFileURI) {
final Path rootDir = Path.of(tableRootDirectory);
uriFilter = uri -> isVisibleParquetFile(rootDir, Path.of(uri));
} else {
uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION);
}
final Stream<URI> uriStream = provider.walk(tableRootDirectory)) {
final Stream<URI> filteredStream = uriStream.filter(uriFilter);
findKeys(filteredStream, locationKeyObserver);
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,16 @@ public void basicWriteAndReadFromFileURITests() {
} catch (final RuntimeException e) {
assertTrue(e instanceof UnsupportedOperationException);
}

// Read from absolute path with additional "/" in the path
final String additionalSlashPath = rootFile.getAbsolutePath() + "/////" + filename;
final Table fromDisk5 = ParquetTools.readTable(additionalSlashPath);
assertTableEquals(tableToSave, fromDisk5);

// Read from URI with additional "/" in the path
final String additionalSlashURI = "file:////" + additionalSlashPath;
final Table fromDisk6 = ParquetTools.readTable(additionalSlashURI);
assertTableEquals(tableToSave, fromDisk6);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.deephaven.extensions.s3;

import com.google.auto.service.AutoService;
import io.deephaven.base.verify.Require;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderPlugin;
import org.jetbrains.annotations.NotNull;
Expand All @@ -28,12 +27,11 @@ public boolean isCompatible(@NotNull final URI uri, @Nullable final Object confi
@Override
public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) {
if (!isCompatible(uri, config)) {
if (!(config instanceof S3Instructions)) {
throw new IllegalArgumentException("Must provide S3Instructions to read files from S3");
}
throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri);
}
final S3Instructions s3Instructions = (S3Instructions) Require.neqNull(config, "config");
return new S3SeekableChannelProvider(s3Instructions);
if (!(config instanceof S3Instructions)) {
throw new IllegalArgumentException("Must provide S3Instructions to read files from S3");
}
return new S3SeekableChannelProvider((S3Instructions) config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public boolean isCompatible(@NotNull final URI uri, @Nullable final Object objec
@Override
public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object object) {
if (!isCompatible(uri, object)) {
if (object != null) {
throw new IllegalArgumentException("Arguments not compatible, provided non null object");
}
throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri);
}
if (object != null) {
throw new IllegalArgumentException("Arguments not compatible, provided non null object");
}
return new TrackedSeekableChannelsProvider(TrackedFileHandleFactory.getInstance());
}
}
Loading