Skip to content

Commit

Permalink
Working state, can be optimized further
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 1, 2024
1 parent 35dbc6a commit 71f917e
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole
}

@Override
public List<URI> getURIStreamFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
public List<URI> getChildURIListFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
throws IOException {
return wrappedProvider.getURIStreamFromDirectory(directoryURI, uriFilter);
return wrappedProvider.getChildURIListFromDirectory(directoryURI, uriFilter);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ default SeekableByteChannel getWriteChannel(@NotNull final String path, final bo
* @param directoryURI The URI of the directory to list
* @param uriFilter A filter to apply to the URIs in the directory
*/
default List<URI> getURIStreamFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
default List<URI> getChildURIListFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
throws IOException {
throw new UnsupportedOperationException("TODO Add this support for all providers");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,8 +982,7 @@ public static Table readFlatPartitionedTable(
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory), readInstructions),
readInstructions,
tableDefinition);
readInstructions, tableDefinition);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void findKeys(@NotNull final Consumer<ParquetTableLocationKey> locationKe
final SeekableChannelsProvider provider = SeekableChannelsProviderLoader.getInstance().fromServiceLoader(
tableRootDirectory, readInstructions.getSpecialInstructions());
try {
final List<URI> parquetURIs = provider.getURIStreamFromDirectory(tableRootDirectory,
final List<URI> parquetURIs = provider.getChildURIListFromDirectory(tableRootDirectory,
ParquetFileHelper::fileNameMatches);
synchronized (this) {
// Iterate over the URI stream and add the location keys to the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.deephaven.base.reference.PooledObjectReference;
import io.deephaven.util.channel.SeekableChannelContext;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
Expand All @@ -28,20 +29,26 @@
import java.util.function.BiConsumer;

/**
* Context object used to store read-ahead buffers for efficiently reading from S3.
* Context object used to store read-ahead buffers for efficiently reading from S3. A single context object can only be
* associated with a single URI at a time.
*/
final class S3ChannelContext implements SeekableChannelContext {
private static final Logger log = LoggerFactory.getLogger(S3ChannelContext.class);
private static final long UNINITIALIZED_SIZE = -1;
static final long UNINITIALIZED_SIZE = -1;
private static final long UNINITIALIZED_NUM_FRAGMENTS = -1;

private final S3AsyncClient client;
final S3Instructions instructions;
private final S3Instructions instructions;
private final BufferPool bufferPool;

/**
* The URI associated with this context. A single context object can only be associated with a single URI at a time.
* But it can be re-associated with a different URI after closing.
*/
private S3Uri uri;

/**
* Used to cache recently fetched fragments for faster lookup
* Used to cache recently fetched fragments from the {@link #uri} for faster lookup.
*/
private final Request[] requests;

Expand All @@ -60,13 +67,19 @@ final class S3ChannelContext implements SeekableChannelContext {
this.instructions = Objects.requireNonNull(instructions);
this.bufferPool = Objects.requireNonNull(bufferPool);
requests = new Request[instructions.maxCacheSize()];
uri = null;
size = UNINITIALIZED_SIZE;
numFragments = UNINITIALIZED_NUM_FRAGMENTS;
if (log.isDebugEnabled()) {
log.debug("creating context: {}", ctxStr());
}
}

void verifyOrSetUri(S3Uri uri) {
S3Uri getUri() {
return uri;
}

void verifyOrSetUri(@Nullable final S3Uri uri) {
if (this.uri == null) {
this.uri = Objects.requireNonNull(uri);
} else if (!this.uri.equals(uri)) {
Expand All @@ -84,7 +97,7 @@ void verifyOrSetSize(long size) {
}
}

public long size() throws IOException {
long size() throws IOException {
ensureSize();
return size;
}
Expand Down Expand Up @@ -137,6 +150,10 @@ public void close() {
requests[i] = null;
}
}
// Reset the internal state
uri = null;
size = UNINITIALIZED_SIZE;
numFragments = UNINITIALIZED_NUM_FRAGMENTS;
}

// --------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
import java.nio.channels.SeekableByteChannel;
import java.util.Objects;

import static io.deephaven.extensions.s3.S3ChannelContext.UNINITIALIZED_SIZE;


/**
* {@link SeekableByteChannel} class used to fetch objects from S3 buckets using an async client with the ability to
* read ahead and cache fragments of the object.
*/
final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelProvider.ContextHolder {

private static final long UNINITIALIZED_SIZE = -1;
private static final long CLOSED_SENTINEL = -1;
private static final int INIT_POSITION = 0;

private final S3Uri uri;

Expand All @@ -41,6 +43,7 @@ final class S3SeekableByteChannel implements SeekableByteChannel, CachedChannelP
S3SeekableByteChannel(S3Uri uri) {
this.uri = Objects.requireNonNull(uri);
this.size = UNINITIALIZED_SIZE;
this.position = INIT_POSITION;
}

/**
Expand All @@ -56,7 +59,10 @@ public void setContext(@Nullable final SeekableChannelContext channelContext) {
}
this.context = (S3ChannelContext) channelContext;
if (this.context != null) {
this.context.verifyOrSetUri(uri);
if (this.context.getUri() != uri) {
this.context.close();
this.context.verifyOrSetUri(uri);
}
if (size != UNINITIALIZED_SIZE) {
context.verifyOrSetSize(size);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole
}

@Override
public List<URI> getURIStreamFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
public List<URI> getChildURIListFromDirectory(@NotNull URI directoryURI, @NotNull Predicate<URI> uriFilter)
throws IOException {
final S3Uri s3DirectoryURI = s3AsyncClient.utilities().parseUri(directoryURI);
final String bucketName = s3DirectoryURI.bucket().orElseThrow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path filePath, final b
}

@Override
public List<URI> getURIStreamFromDirectory(@NotNull final URI directoryURI,
public List<URI> getChildURIListFromDirectory(@NotNull final URI directoryURI,
@NotNull final Predicate<URI> uriFilter) throws IOException {
try (final Stream<Path> childFileStream = Files.list(Path.of(directoryURI))) {
return childFileStream
Expand Down
4 changes: 2 additions & 2 deletions py/server/deephaven/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def read(
if file_layout == ParquetFileLayout.SINGLE_FILE:
j_table = _JParquetTools.readSingleFileTable(path, read_instructions, j_table_definition)
elif file_layout == ParquetFileLayout.FLAT_PARTITIONED:
j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions, j_table_definition)
j_table = _JParquetTools.readFlatPartitionedTable(path, read_instructions, j_table_definition)
elif file_layout == ParquetFileLayout.KV_PARTITIONED:
j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions, j_table_definition)
elif file_layout == ParquetFileLayout.METADATA_PARTITIONED:
Expand All @@ -194,7 +194,7 @@ def read(
elif file_layout == ParquetFileLayout.SINGLE_FILE:
j_table = _JParquetTools.readSingleFileTable(path, read_instructions)
elif file_layout == ParquetFileLayout.FLAT_PARTITIONED:
j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions)
j_table = _JParquetTools.readFlatPartitionedTable(path, read_instructions)
elif file_layout == ParquetFileLayout.KV_PARTITIONED:
j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions)
elif file_layout == ParquetFileLayout.METADATA_PARTITIONED:
Expand Down

0 comments on commit 71f917e

Please sign in to comment.