Skip to content

Commit

Permalink
Added support to read partitioned parquet files from S3 (#5206)
Browse files Browse the repository at this point in the history
Breaking Change: Renamed KeyValuePartitionLayout to FileKeyValuePartitionLayout.
  • Loading branch information
malhotrashivam authored Apr 12, 2024
1 parent 2043030 commit bf6fcdb
Show file tree
Hide file tree
Showing 24 changed files with 1,065 additions and 254 deletions.
15 changes: 13 additions & 2 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.regex.Pattern;

public class FileUtils {
private final static FileFilter DIRECTORY_FILE_FILTER = new FileFilter() {
Expand All @@ -29,6 +30,8 @@ public boolean accept(File dir, String name) {
};
private final static String[] EMPTY_STRING_ARRAY = new String[0];

public static final Pattern DUPLICATE_SLASH_PATTERN = Pattern.compile("//+");

/**
* Cleans the specified path. All files and subdirectories in the path will be deleted. (ie you'll be left with an
* empty directory).
Expand Down Expand Up @@ -254,7 +257,8 @@ public boolean accept(File pathname) {
}

/**
* Take the file source path or URI string and convert it to a URI object.
* Take the file source path or URI string and convert it to a URI object. Any unnecessary path separators will be
* removed.
*
* @param source The file source path or URI
* @param isDirectory Whether the source is a directory
Expand All @@ -264,9 +268,16 @@ public static URI convertToURI(final String source, final boolean isDirectory) {
if (source.isEmpty()) {
throw new IllegalArgumentException("Cannot convert empty source to URI");
}
final URI uri;
URI uri;
try {
uri = new URI(source);
// Replace two or more consecutive slashes in the path with a single slash
final String path = uri.getPath();
if (path.contains("//")) {
final String canonicalizedPath = DUPLICATE_SLASH_PATTERN.matcher(path).replaceAll("/");
uri = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), canonicalizedPath,
uri.getQuery(), uri.getFragment());
}
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return convertToURI(new File(source), isDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Stream;

/**
* {@link SeekableChannelsProvider Channel provider} that will cache a bounded number of unused channels.
Expand Down Expand Up @@ -109,6 +110,16 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole
// end no matter what.
}

@Override
public Stream<URI> list(@NotNull final URI directory) throws IOException {
return wrappedProvider.list(directory);
}

@Override
public Stream<URI> walk(@NotNull final URI directory) throws IOException {
return wrappedProvider.walk(directory);
}

@Nullable
private synchronized CachedChannel tryGetPooledChannel(@NotNull final String pathKey,
@NotNull final KeyedObjectHashMap<String, PerPathPool> channelPool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.util.channel;

import io.deephaven.base.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -12,8 +13,10 @@
import java.net.URI;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.stream.Stream;

public class LocalFSChannelProvider implements SeekableChannelsProvider {
@Override
Expand Down Expand Up @@ -56,6 +59,20 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b
return result;
}

@Override
public final Stream<URI> list(@NotNull final URI directory) throws IOException {
// Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in
// the processor.
return Files.list(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false));
}

@Override
public final Stream<URI> walk(@NotNull final URI directory) throws IOException {
// Assuming that the URI is a file, not a directory. The caller should manage file vs. directory handling in
// the processor.
return Files.walk(Path.of(directory)).map(path -> FileUtils.convertToURI(path, false));
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

import static io.deephaven.base.FileUtils.convertToURI;

Expand Down Expand Up @@ -85,4 +86,28 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
}

SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) throws IOException;

/**
* Returns a stream of URIs, the elements of which are the entries in the directory. The listing is non-recursive.
* The URIs supplied by the stream will not have any unnecessary slashes or path separators. Also, the URIs will be
* file URIs (not ending with "/") irrespective of whether the URI corresponds to a file or a directory. The caller
* should manage file vs. directory handling in the processor. The caller is also responsible for closing the
* stream, preferably using a try-with-resources block.
*
* @param directory the URI of the directory to list
* @return The {@link Stream} of {@link URI}s
*/
Stream<URI> list(@NotNull URI directory) throws IOException;

/**
* Returns a stream of URIs, the elements of which are all the files in the file tree rooted at the given starting
* directory. The URIs supplied by the stream will not have any unnecessary slashes or path separators. Also, the
* URIs will be file URIs (not ending with "/") irrespective of whether the URI corresponds to a file or a
* directory. The caller should manage file vs. directory handling in the processor. The caller is also responsible
* for closing the stream, preferably using a try-with-resources block.
*
* @param directory the URI of the directory to walk
* @return The {@link Stream} of {@link URI}s
*/
Stream<URI> walk(@NotNull URI directory) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.util.channel;

import io.deephaven.base.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
Expand All @@ -12,10 +13,13 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -215,6 +219,16 @@ public SeekableByteChannel getWriteChannel(@NotNull Path path, boolean append) {
return new TestMockChannel(count.getAndIncrement(), path.toString());
}

@Override
public final Stream<URI> list(@NotNull final URI directory) {
throw new UnsupportedOperationException("list");
}

@Override
public final Stream<URI> walk(@NotNull final URI directory) {
throw new UnsupportedOperationException("walk");
}

@Override
public void close() {}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.locations.local;

import gnu.trove.map.TIntObjectMap;
import gnu.trove.map.hash.TIntObjectHashMap;
import io.deephaven.base.verify.Require;
import io.deephaven.engine.table.Table;
import io.deephaven.api.util.NameValidator;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.TableLocationKey;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;


/**
* {@link TableLocationKeyFinder Location finder} that will take a directory file, traverse the directory hierarchy and
* infer partitions from key-value pairs in the directory names, for example:
*
* <pre>
* tableRootDirectory/Country=France/City=Paris/parisData.parquet
* </pre>
*
* Traversal is depth-first, and assumes that target files will only be found at a single depth. This class is
* specialized for handling of files. For handling of URIs, see {@link URIStreamKeyValuePartitionLayout}.
*
* @implNote Column names will be legalized via {@link NameValidator#legalizeColumnName(String, Set)}.
*/
public class FileKeyValuePartitionLayout<TLK extends TableLocationKey>
extends KeyValuePartitionLayout<TLK, Path>
implements TableLocationKeyFinder<TLK> {

private final File tableRootDirectory;
private final Predicate<Path> pathFilter;
private final Supplier<LocationTableBuilder> locationTableBuilderFactory;
private final int maxPartitioningLevels;

/**
* @param tableRootDirectory The directory to traverse from
* @param pathFilter Filter to determine whether a regular file should be used to create a key
* @param locationTableBuilderFactory Factory for {@link LocationTableBuilder builders} used to organize partition
* information; as builders are typically stateful, a new builder is created each time this
* {@link KeyValuePartitionLayout} is used to {@link #findKeys(Consumer) find keys}
* @param keyFactory Factory function used to generate table location keys from target files and partition values
* @param maxPartitioningLevels Maximum partitioning levels to traverse. Must be {@code >= 0}. {@code 0} means only
* look at files in {@code tableRootDirectory} and find no partitions.
*/
public FileKeyValuePartitionLayout(
@NotNull final File tableRootDirectory,
@NotNull final Predicate<Path> pathFilter,
@NotNull final Supplier<LocationTableBuilder> locationTableBuilderFactory,
@NotNull final BiFunction<Path, Map<String, Comparable<?>>, TLK> keyFactory,
final int maxPartitioningLevels) {
super(keyFactory);
this.tableRootDirectory = tableRootDirectory;
this.pathFilter = pathFilter;
this.locationTableBuilderFactory = locationTableBuilderFactory;
this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels");
}

@Override
public String toString() {
return FileKeyValuePartitionLayout.class.getSimpleName() + '[' + tableRootDirectory + ']';
}

@Override
public void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
final Queue<Path> targetFiles = new ArrayDeque<>();
final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get();
try {
Files.walkFileTree(tableRootDirectory.toPath(), EnumSet.of(FileVisitOption.FOLLOW_LINKS),
maxPartitioningLevels + 1, new SimpleFileVisitor<>() {
final Set<String> partitionKeys = new LinkedHashSet<>(); // Preserve order of insertion
final List<String> partitionValues = new ArrayList<>();
final TIntObjectMap<ColumnNameInfo> partitionColInfo = new TIntObjectHashMap<>();
boolean registered;
int columnCount = -1;

@Override
public FileVisitResult preVisitDirectory(
@NotNull final Path dir,
@NotNull final BasicFileAttributes attrs) {
final String dirName = dir.getFileName().toString();
// Skip dot directories
if (!dirName.isEmpty() && dirName.charAt(0) == '.') {
return FileVisitResult.SKIP_SUBTREE;
}
if (++columnCount > 0) {
// We're descending and past the root
final int columnIndex = columnCount - 1;
processSubdirectoryInternal(dirName, dir.toString(), columnIndex, partitionKeys,
partitionValues, partitionColInfo);
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(
@NotNull final Path file,
@NotNull final BasicFileAttributes attrs) {
if (attrs.isRegularFile() && pathFilter.test(file)) {
if (!registered) {
locationTableBuilder.registerPartitionKeys(partitionKeys);
registered = true;
}
locationTableBuilder.acceptLocation(partitionValues);
targetFiles.add(file);
}
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(
@NotNull final Path dir,
@Nullable final IOException exc) throws IOException {
if (--columnCount >= 0) {
partitionValues.remove(columnCount);
}
return super.postVisitDirectory(dir, exc);
}
});
} catch (IOException e) {
throw new TableDataException("Error finding locations for under " + tableRootDirectory, e);
}

final Table locationTable = locationTableBuilder.build();
buildLocationKeys(locationTable, targetFiles, locationKeyObserver);
}
}
Loading

0 comments on commit bf6fcdb

Please sign in to comment.