Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to read partitioned parquet files from S3 #5206

Merged
merged 41 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7734255
Initial commit
malhotrashivam Feb 28, 2024
c16ac2f
Working state, can be optimized further
malhotrashivam Mar 1, 2024
e8c4cda
Changed interface of flat partitioned reader
malhotrashivam Mar 4, 2024
d15dc6a
Added key value partitioned parquet reader
malhotrashivam Mar 11, 2024
7df60e2
Fixed some issues with partitioned reading
malhotrashivam Mar 13, 2024
d232f39
Fixed reading partitioned data with partitioned column in data
malhotrashivam Mar 14, 2024
f9df8f1
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 18, 2024
3dd9fb9
Minor improvements after rebasing
malhotrashivam Mar 18, 2024
cde1feb
WIP commit
malhotrashivam Mar 19, 2024
2838495
Seperated URI List processing to a separete class
malhotrashivam Mar 22, 2024
9247f2e
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 22, 2024
baad429
Review with Devin part 1
malhotrashivam Mar 26, 2024
4227fd0
Review with Devin contd.
malhotrashivam Mar 27, 2024
5351fb9
Refined some comments
malhotrashivam Mar 27, 2024
814b2e0
Splitting instead of iterating
malhotrashivam Mar 27, 2024
3457070
Reverting some old changes
malhotrashivam Mar 27, 2024
8290e88
More review comments
malhotrashivam Mar 27, 2024
423892e
Added more tests
malhotrashivam Mar 27, 2024
641a439
Added tests for batching
malhotrashivam Mar 28, 2024
953bd02
Refactored KeyValuePartitionedLayout to extract file processing logic
malhotrashivam Mar 28, 2024
5995648
Review comments
malhotrashivam Mar 28, 2024
922ab0a
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 2, 2024
e89cfb7
Review with Devin contd.
malhotrashivam Apr 2, 2024
db809b2
Pending comments
malhotrashivam Apr 2, 2024
649526b
Rephrased comment
malhotrashivam Apr 2, 2024
086c9f4
Minor improvements
malhotrashivam Apr 3, 2024
2939d28
More tweaks for parquet file filtering
malhotrashivam Apr 3, 2024
71a9bdc
Reverting some old changes
malhotrashivam Apr 3, 2024
b412679
Review comments
malhotrashivam Apr 3, 2024
b6876cd
Review with Ryan Part 1
malhotrashivam Apr 8, 2024
11d0b68
Review with Ryan part 2
malhotrashivam Apr 8, 2024
9ec7e3e
Review with Ryan part 3
malhotrashivam Apr 9, 2024
05a294d
Review contd.
malhotrashivam Apr 9, 2024
b9a1b87
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 9, 2024
b0b3022
More review comments resolved
malhotrashivam Apr 9, 2024
1383850
Resolved more comments
malhotrashivam Apr 10, 2024
8c68079
Deprecated all File overloads from ParquetTools
malhotrashivam Apr 10, 2024
78dd6ac
Resolved some javadoc issues
malhotrashivam Apr 10, 2024
6fa3629
Revert "Resolved some javadoc issues"
malhotrashivam Apr 12, 2024
4ab6a31
Revert "Deprecated all File overloads from ParquetTools"
malhotrashivam Apr 12, 2024
79bd133
Tagged the new methods as Deprecated
malhotrashivam Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class LocalFSChannelProvider implements SeekableChannelsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
* Note that the URIs supplied by the stream will be file URIs (not ending with "/") irrespective of whether the URI
* corresponds to a file or a directory. The caller should manage file vs. directory handling in the processor.
*
* @apiNote This method must be used within a try-with-resources statement or similar control structure to ensure
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* that the stream's open resources are closed promptly after the stream's operations have completed.
*
* @param directory the URI of the directory to list
* @return The {@link Stream} of {@link URI}s
*/
Expand All @@ -103,6 +106,9 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
* whether the URI corresponds to a file or a directory. The caller should manage file vs. directory handling in the
* processor.
*
* @apiNote This method must be used within a try-with-resources statement or similar control structure to ensure
* that the stream's open resources are closed promptly after the stream's operations have completed.
*
* @param directory the URI of the directory to walk
* @return The {@link Stream} of {@link URI}s
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,12 @@ public SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) {

@Override
public final Stream<URI> list(@NotNull final URI directory) {
return Stream.empty();
throw new UnsupportedOperationException("list");
}

@Override
public final Stream<URI> walk(@NotNull final URI directory) {
return Stream.empty();
throw new UnsupportedOperationException("walk");
}

@Override
Expand Down
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,19 @@ public FileKeyValuePartitionLayout(
this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels");
}

@Override
public String toString() {
return FileKeyValuePartitionLayout.class.getSimpleName() + '[' + tableRootDirectory + ']';
}

@Override
public void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
final Deque<Path> targetFiles = new ArrayDeque<>();
final Queue<Path> targetFiles = new ArrayDeque<>();
final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get();
try {
Files.walkFileTree(tableRootDirectory.toPath(), EnumSet.of(FileVisitOption.FOLLOW_LINKS),
maxPartitioningLevels + 1, new SimpleFileVisitor<>() {
final Set<String> takenNames = new HashSet<>();
final List<String> partitionKeys = new ArrayList<>();
final List<String> partitionValues = new ArrayList<>();
boolean registered;
Expand All @@ -100,9 +102,8 @@ public FileVisitResult preVisitDirectory(
throw new TableDataException(
"Unexpected directory name format (not key=value) at " + dir);
}
// We use an empty set to allow duplicate partition keys across files
final String columnKey =
NameValidator.legalizeColumnName(components[0], Collections.emptySet());
final String columnKey = NameValidator.legalizeColumnName(components[0], takenNames);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
takenNames.add(columnKey);
final int columnIndex = columnCount - 1;
if (columnCount > partitionKeys.size()) {
partitionKeys.add(columnKey);
Expand All @@ -129,6 +130,7 @@ public FileVisitResult visitFile(
locationTableBuilder.acceptLocation(partitionValues);
targetFiles.add(file);
}
takenNames.clear();
return FileVisitResult.CONTINUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import java.util.function.Consumer;

/**
* Base class for {@link TableLocationKeyFinder location finders} that perform file system traversal to infer
* partitions.
* Base class for {@link TableLocationKeyFinder location finders} that traverse file hierarchy to infer partitions.
*
* @param <TLK> The type of {@link TableLocationKey} to be generated
* @param <TARGET_FILE_TYPE> The type of files used to generate location keys, like a {@link URI} or a {@link Path}
Expand Down Expand Up @@ -71,8 +70,9 @@ public KeyValuePartitionLayout(
this.keyFactory = keyFactory;
}

@Override
public String toString() {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
return KeyValuePartitionLayout.class.getSimpleName();
return getClass().getSimpleName();
}

/**
Expand All @@ -84,7 +84,7 @@ public String toString() {
*/
final void buildLocationKeys(
@NotNull final Table locationTable,
@NotNull final Deque<TARGET_FILE_TYPE> targetFiles,
@NotNull final Queue<TARGET_FILE_TYPE> targetFiles,
@NotNull final Consumer<TLK> locationKeyObserver) {
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();
// Note that we allow the location table to define partition priority order.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -32,10 +32,11 @@
public abstract class URIStreamKeyValuePartitionLayout<TLK extends TableLocationKey>
extends KeyValuePartitionLayout<TLK, URI> {

private static final String URI_SEPARATOR = "/";

protected final URI tableRootDirectory;
private final Supplier<KeyValuePartitionLayout.LocationTableBuilder> locationTableBuilderFactory;
private final int maxPartitioningLevels;
private final String fileSeparator;

/**
* @param tableRootDirectory The directory to traverse from
Expand All @@ -48,15 +49,13 @@ public abstract class URIStreamKeyValuePartitionLayout<TLK extends TableLocation
*/
public URIStreamKeyValuePartitionLayout(
@NotNull final URI tableRootDirectory,
@NotNull final Supplier<KeyValuePartitionLayout.LocationTableBuilder> locationTableBuilderFactory,
@NotNull final Supplier<LocationTableBuilder> locationTableBuilderFactory,
@NotNull final BiFunction<URI, Map<String, Comparable<?>>, TLK> keyFactory,
final int maxPartitioningLevels,
@NotNull final String fileSeparator) {
final int maxPartitioningLevels) {
super(keyFactory);
this.tableRootDirectory = tableRootDirectory;
this.locationTableBuilderFactory = locationTableBuilderFactory;
this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels");
this.fileSeparator = fileSeparator;
}

@Override
Expand All @@ -69,19 +68,22 @@ public String toString() {
*/
protected final void findKeys(@NotNull final Stream<URI> uriStream,
@NotNull final Consumer<TLK> locationKeyObserver) {
final KeyValuePartitionLayout.LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get();
final Deque<URI> targetURIs = new ArrayDeque<>();
final Set<String> takenNames = new HashSet<>();
final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get();
final Queue<URI> targetURIs = new ArrayDeque<>();
final List<String> partitionKeys = new ArrayList<>();
final boolean[] registered = {false}; // Hack to make the variable final
final MutableBoolean registered = new MutableBoolean(false);
uriStream.forEachOrdered(uri -> {
final Collection<String> partitionValues = new ArrayList<>();
final URI relativePath = tableRootDirectory.relativize(uri);
getPartitions(relativePath, partitionKeys, partitionValues, takenNames, registered[0]);
if (!registered[0]) {
// Use the first path to find the partition keys and then use the same partition keys for the rest
getPartitions(relativePath, partitionKeys, partitionValues, registered.booleanValue());
if (registered.isFalse()) {
// Use the first path to find the partition keys and use the same for the rest
if (partitionKeys.size() > maxPartitioningLevels) {
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
throw new TableDataException("Too many partitioning levels at " + uri + ", count = " +
partitionKeys.size() + ", maximum expected are " + maxPartitioningLevels);
}
locationTableBuilder.registerPartitionKeys(partitionKeys);
registered[0] = true;
registered.setTrue();
}
// Use the partition values from each path to build the location table
locationTableBuilder.acceptLocation(partitionValues);
Expand All @@ -94,14 +96,14 @@ protected final void findKeys(@NotNull final Stream<URI> uriStream,
private void getPartitions(@NotNull final URI relativePath,
@NotNull final List<String> partitionKeys,
@NotNull final Collection<String> partitionValues,
@NotNull final Set<String> takenNames,
final boolean registered) {
final Set<String> takenNames = new HashSet<>();
final String relativePathString = relativePath.getPath();
int partitioningColumnIndex = 0;
// Split the path to get the subdirectory names
final String[] subDirs = relativePathString.split(fileSeparator);
for (int i = 0; i < subDirs.length - 1; i++) {
final String dirName = subDirs[i];
final String[] subDirs = relativePathString.split(URI_SEPARATOR);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
for (int sdi = 0; sdi < subDirs.length - 1; sdi++) {
final String dirName = subDirs[sdi];
if (dirName.isEmpty()) {
// Ignore empty directory names
continue;
Expand All @@ -111,18 +113,15 @@ private void getPartitions(@NotNull final URI relativePath,
throw new TableDataException("Unexpected directory name format (not key=value) at "
+ new File(tableRootDirectory.getPath(), relativePathString));
}
if (partitioningColumnIndex == maxPartitioningLevels) {
throw new TableDataException("Too many partitioning levels at " + relativePathString + ", maximum " +
"expected partitioning levels are " + maxPartitioningLevels);
}
// We use an empty set to allow duplicate partition keys across files
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final String columnKey = NameValidator.legalizeColumnName(components[0], Collections.emptySet());
final String columnKey = NameValidator.legalizeColumnName(components[0], takenNames);
takenNames.add(columnKey);
if (registered) {
// We have already seen another parquet file in the tree, so compare the
// partitioning levels against the previous ones
// We have already seen another leaf node in the tree, so compare the partitioning levels against the
// previous ones
if (partitioningColumnIndex >= partitionKeys.size()) {
throw new TableDataException("Too many partitioning levels at " + relativePathString + " (expected "
+ partitionKeys.size() + ") based on earlier parquet files in the tree.");
+ partitionKeys.size() + ") based on earlier leaf nodes in the tree.");
}
if (!partitionKeys.get(partitioningColumnIndex).equals(columnKey)) {
throw new TableDataException(String.format(
Expand All @@ -131,7 +130,7 @@ private void getPartitions(@NotNull final URI relativePath,
relativePathString));
}
} else {
// This is the first parquet file in the tree, so accumulate the partitioning levels
// This is the first leaf node in the tree, so accumulate the partitioning levels
partitionKeys.add(columnKey);
}
final String columnValue = components[1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -64,8 +63,7 @@ public ParquetKeyValuePartitionedLayout(
super(tableRootDirectory,
() -> new LocationTableBuilderDefinition(tableDefinition),
(uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions),
Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count()),
FILE_URI_SCHEME.equals(tableRootDirectory.getScheme()) ? File.separator : "/");
Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count()));
this.readInstructions = readInstructions;
}

Expand All @@ -83,8 +81,7 @@ public ParquetKeyValuePartitionedLayout(
super(tableRootDirectory,
() -> new LocationTableBuilderCsv(tableRootDirectory),
(uri, partitions) -> new ParquetTableLocationKey(uri, 0, partitions, readInstructions),
maxPartitioningLevels,
FILE_URI_SCHEME.equals(tableRootDirectory.getScheme()) ? File.separator : "/");
maxPartitioningLevels);
this.readInstructions = readInstructions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public Stream<URI> list(@NotNull final URI directory) {
@Override
public Stream<URI> walk(@NotNull final URI directory) {
if (log.isDebugEnabled()) {
log.debug("requesting list of child URIs for directory: {}", directory);
log.debug("requesting walking the tree rooted at directory: {}", directory);
}
return createStream(directory, true);
}
Expand Down
Loading