Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support to read partitioned parquet files from S3 #5206

Merged
merged 41 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
7734255
Initial commit
malhotrashivam Feb 28, 2024
c16ac2f
Working state, can be optimized further
malhotrashivam Mar 1, 2024
e8c4cda
Changed interface of flat partitioned reader
malhotrashivam Mar 4, 2024
d15dc6a
Added key value partitioned parquet reader
malhotrashivam Mar 11, 2024
7df60e2
Fixed some issues with partitioned reading
malhotrashivam Mar 13, 2024
d232f39
Fixed reading partitioned data with partitioned column in data
malhotrashivam Mar 14, 2024
f9df8f1
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 18, 2024
3dd9fb9
Minor improvements after rebasing
malhotrashivam Mar 18, 2024
cde1feb
WIP commit
malhotrashivam Mar 19, 2024
2838495
Seperated URI List processing to a separete class
malhotrashivam Mar 22, 2024
9247f2e
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Mar 22, 2024
baad429
Review with Devin part 1
malhotrashivam Mar 26, 2024
4227fd0
Review with Devin contd.
malhotrashivam Mar 27, 2024
5351fb9
Refined some comments
malhotrashivam Mar 27, 2024
814b2e0
Splitting instead of iterating
malhotrashivam Mar 27, 2024
3457070
Reverting some old changes
malhotrashivam Mar 27, 2024
8290e88
More review comments
malhotrashivam Mar 27, 2024
423892e
Added more tests
malhotrashivam Mar 27, 2024
641a439
Added tests for batching
malhotrashivam Mar 28, 2024
953bd02
Refactored KeyValuePartitionedLayout to extract file processing logic
malhotrashivam Mar 28, 2024
5995648
Review comments
malhotrashivam Mar 28, 2024
922ab0a
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 2, 2024
e89cfb7
Review with Devin contd.
malhotrashivam Apr 2, 2024
db809b2
Pending comments
malhotrashivam Apr 2, 2024
649526b
Rephrased comment
malhotrashivam Apr 2, 2024
086c9f4
Minor improvements
malhotrashivam Apr 3, 2024
2939d28
More tweaks for parquet file filtering
malhotrashivam Apr 3, 2024
71a9bdc
Reverting some old changes
malhotrashivam Apr 3, 2024
b412679
Review comments
malhotrashivam Apr 3, 2024
b6876cd
Review with Ryan Part 1
malhotrashivam Apr 8, 2024
11d0b68
Review with Ryan part 2
malhotrashivam Apr 8, 2024
9ec7e3e
Review with Ryan part 3
malhotrashivam Apr 9, 2024
05a294d
Review contd.
malhotrashivam Apr 9, 2024
b9a1b87
Merge branch 'main' into sm-pq-s3-flat
malhotrashivam Apr 9, 2024
b0b3022
More review comments resolved
malhotrashivam Apr 9, 2024
1383850
Resolved more comments
malhotrashivam Apr 10, 2024
8c68079
Deprecated all File overloads from ParquetTools
malhotrashivam Apr 10, 2024
78dd6ac
Resolved some javadoc issues
malhotrashivam Apr 10, 2024
6fa3629
Revert "Resolved some javadoc issues"
malhotrashivam Apr 12, 2024
4ab6a31
Revert "Deprecated all File overloads from ParquetTools"
malhotrashivam Apr 12, 2024
79bd133
Tagged the new methods as Deprecated
malhotrashivam Apr 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Stream;

/**
* {@link SeekableChannelsProvider Channel provider} that will cache a bounded number of unused channels.
Expand Down Expand Up @@ -109,6 +110,16 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole
// end no matter what.
}

@Override
public Stream<URI> list(@NotNull final URI directory) throws IOException {
return wrappedProvider.list(directory);
}

@Override
public Stream<URI> walk(@NotNull final URI directory) throws IOException {
return wrappedProvider.walk(directory);
}

@Nullable
private synchronized CachedChannel tryGetPooledChannel(@NotNull final String pathKey,
@NotNull final KeyedObjectHashMap<String, PerPathPool> channelPool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.util.channel;

import io.deephaven.base.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -12,8 +13,10 @@
import java.net.URI;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.stream.Stream;

public class LocalFSChannelProvider implements SeekableChannelsProvider {
@Override
Expand Down Expand Up @@ -56,6 +59,20 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b
return result;
}

@Override
public final Stream<URI> list(@NotNull final URI directory) throws IOException {
// Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in
// the processor.
return Files.list(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false));
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public final Stream<URI> walk(@NotNull final URI directory) throws IOException {
// Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in
// the processor.
return Files.walk(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false));
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

import static io.deephaven.base.FileUtils.convertToURI;

Expand Down Expand Up @@ -85,4 +86,25 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
}

SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException;

/**
* Returns a stream of URIs, the elements of which are the entries in the directory. The listing is non-recursive.
* Note that the URIs supplied by the stream will be file URIs (not ending with "/") irrespective of whether the URI
* corresponds to a file or a directory. The caller should manage file vs. directory handling in the processor.
*
* @param directory the URI of the directory to list
* @return The {@link Stream} of {@link URI}s
*/
Stream<URI> list(@NotNull URI directory) throws IOException;

/**
* Returns a stream of URIs, the elements of which are all the files in the file tree rooted at the given starting
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
* directory. Note that the URIs supplied by the stream will be file URIs (not ending with "/") irrespective of
* whether the URI corresponds to a file or a directory. The caller should manage file vs. directory handling in the
* processor.
*
* @param directory the URI of the directory to walk
* @return The {@link Stream} of {@link URI}s
*/
Stream<URI> walk(@NotNull URI directory) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.util.channel;

import io.deephaven.base.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
Expand All @@ -12,10 +13,13 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -215,6 +219,16 @@ public SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) {
return new TestMockChannel(count.getAndIncrement(), path.toString());
}

@Override
public final Stream<URI> list(@NotNull final URI directory) {
throw new UnsupportedOperationException("list");
}

@Override
public final Stream<URI> walk(@NotNull final URI directory) {
throw new UnsupportedOperationException("walk");
}

@Override
public void close() {}
}
Expand Down
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
//
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.locations.local;

import io.deephaven.base.verify.Require;
import io.deephaven.engine.table.Table;
import io.deephaven.api.util.NameValidator;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;


/**
* {@link TableLocationKeyFinder Location finder} that will take a directory file, traverse the directory hierarchy and
* infer partitions from key-value pairs in the directory names, for example:
*
* <pre>
* tableRootDirectory/Country=France/City=Paris/parisData.parquet
* </pre>
*
* Traversal is depth-first, and assumes that target files will only be found at a single depth. This class is
* specialized for handling of files. For handling of URIs, see {@link URIStreamKeyValuePartitionLayout}.
*
* @implNote Column names will be legalized via {@link NameValidator#legalizeColumnName(String, Set)}.
*/
public class FileKeyValuePartitionLayout<TLK extends TableLocationKey>
extends KeyValuePartitionLayout<TLK, Path>
implements TableLocationKeyFinder<TLK> {

private final File tableRootDirectory;
private final Predicate<Path> pathFilter;
private final Supplier<LocationTableBuilder> locationTableBuilderFactory;
private final int maxPartitioningLevels;

/**
* @param tableRootDirectory The directory to traverse from
* @param pathFilter Filter to determine whether a regular file should be used to create a key
* @param locationTableBuilderFactory Factory for {@link LocationTableBuilder builders} used to organize partition
* information; as builders are typically stateful, a new builder is created each time this
* {@link KeyValuePartitionLayout} is used to {@link #findKeys(Consumer) find keys}
* @param keyFactory Factory function used to generate table location keys from target files and partition values
* @param maxPartitioningLevels Maximum partitioning levels to traverse. Must be {@code >= 0}. {@code 0} means only
* look at files in {@code tableRootDirectory} and find no partitions.
*/
public FileKeyValuePartitionLayout(
@NotNull final File tableRootDirectory,
@NotNull final Predicate<Path> pathFilter,
@NotNull final Supplier<LocationTableBuilder> locationTableBuilderFactory,
@NotNull final BiFunction<Path, Map<String, Comparable<?>>, TLK> keyFactory,
final int maxPartitioningLevels) {
super(keyFactory);
this.tableRootDirectory = tableRootDirectory;
this.pathFilter = pathFilter;
this.locationTableBuilderFactory = locationTableBuilderFactory;
this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels");
}

@Override
public String toString() {
return FileKeyValuePartitionLayout.class.getSimpleName() + '[' + tableRootDirectory + ']';
}

@Override
public void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
final Queue<Path> targetFiles = new ArrayDeque<>();
final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get();
try {
Files.walkFileTree(tableRootDirectory.toPath(), EnumSet.of(FileVisitOption.FOLLOW_LINKS),
maxPartitioningLevels + 1, new SimpleFileVisitor<>() {
final Set<String> takenNames = new HashSet<>();
final List<String> partitionKeys = new ArrayList<>();
final List<String> partitionValues = new ArrayList<>();
boolean registered;
int columnCount = -1;

@Override
public FileVisitResult preVisitDirectory(
@NotNull final Path dir,
@NotNull final BasicFileAttributes attrs) {
final String dirName = dir.getFileName().toString();
// Skip dot directories
if (!dirName.isEmpty() && dirName.charAt(0) == '.') {
return FileVisitResult.SKIP_SUBTREE;
}
if (++columnCount > 0) {
// We're descending and past the root
final String[] components = dirName.split("=", 2);
if (components.length != 2) {
throw new TableDataException(
"Unexpected directory name format (not key=value) at " + dir);
}
final String columnKey = NameValidator.legalizeColumnName(components[0], takenNames);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
takenNames.add(columnKey);
final int columnIndex = columnCount - 1;
if (columnCount > partitionKeys.size()) {
partitionKeys.add(columnKey);
} else if (!partitionKeys.get(columnIndex).equals(columnKey)) {
throw new TableDataException(String.format(
"Column name mismatch at column index %d: expected %s found %s at %s",
columnIndex, partitionKeys.get(columnIndex), columnKey, dir));
}
final String columnValue = components[1];
partitionValues.add(columnValue);
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(
@NotNull final Path file,
@NotNull final BasicFileAttributes attrs) {
if (attrs.isRegularFile() && pathFilter.test(file)) {
if (!registered) {
locationTableBuilder.registerPartitionKeys(partitionKeys);
registered = true;
}
locationTableBuilder.acceptLocation(partitionValues);
targetFiles.add(file);
}
takenNames.clear();
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(
@NotNull final Path dir,
@Nullable final IOException exc) throws IOException {
if (--columnCount >= 0) {
partitionValues.remove(columnCount);
}
return super.postVisitDirectory(dir, exc);
}
});
} catch (IOException e) {
throw new TableDataException("Error finding locations for under " + tableRootDirectory, e);
}

final Table locationTable = locationTableBuilder.build();
buildLocationKeys(locationTable, targetFiles, locationKeyObserver);
}
}
Loading
Loading