Skip to content

Commit

Permalink
Minor improvements after rebasing
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Mar 18, 2024
1 parent f9df8f1 commit 2512562
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.engine.table.impl.locations.local;

import io.deephaven.engine.table.Table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public static Table readTable(
* @return The URI
*/
private static URI convertParquetSourceToURI(@NotNull final String source) {
if (source.endsWith(".parquet")) {
if (source.endsWith(PARQUET_FILE_EXTENSION)) {
return convertToURI(source, false);
}
return convertToURI(source, true);
Expand Down Expand Up @@ -697,6 +697,10 @@ private static Table readTableInternal(
@NotNull final URI source,
@NotNull final ParquetInstructions instructions) {
if (!FILE_URI_SCHEME.equals(source.getScheme())) {
if (!source.getRawPath().endsWith(PARQUET_FILE_EXTENSION)) {
throw new IllegalArgumentException("This API currently does not support reading partitioned parquet " +
"data from non-local sources. Please use the appropriate API for reading partitioned parquet.");
}
return readSingleFileTable(source, instructions);
}
return readTableInternal(new File(source), instructions);
Expand Down Expand Up @@ -917,7 +921,7 @@ public static Table readKeyValuePartitionedTable(
public static Table readKeyValuePartitionedTable(
@NotNull final String directory,
@NotNull final ParquetInstructions readInstructions) {
return readPartitionedTable(new ParquetKeyValuePartitionedLayout(convertToURI(directory),
return readPartitionedTable(new ParquetKeyValuePartitionedLayout(convertToURI(directory, true),
MAX_PARTITIONING_LEVELS_INFERENCE, readInstructions), readInstructions);
}

Expand Down Expand Up @@ -963,7 +967,7 @@ public static Table readKeyValuePartitionedTable(
if (tableDefinition.getColumnStream().noneMatch(ColumnDefinition::isPartitioning)) {
throw new IllegalArgumentException("No partitioning columns");
}
return readPartitionedTable(new ParquetKeyValuePartitionedLayout(convertToURI(directory), tableDefinition,
return readPartitionedTable(new ParquetKeyValuePartitionedLayout(convertToURI(directory, true), tableDefinition,
readInstructions), readInstructions, tableDefinition);
}

Expand Down Expand Up @@ -1004,7 +1008,7 @@ public static Table readFlatPartitionedTable(
public static Table readFlatPartitionedTable(
@NotNull final String directory,
@NotNull final ParquetInstructions readInstructions) {
return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory), readInstructions),
return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory, true), readInstructions),
readInstructions);
}

Expand Down Expand Up @@ -1042,7 +1046,7 @@ public static Table readFlatPartitionedTable(
@NotNull final String directory,
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory), readInstructions),
return readPartitionedTable(new ParquetFlatPartitionedLayout(convertToURI(directory, true), readInstructions),
readInstructions, tableDefinition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public final class ParquetTableReadWriteTest {
// TODO(deephaven-core#5064): Add support for local S3 testing
// The following tests are disabled by default, as they require a AWS access key and secret key to be set
private static final boolean ENABLE_S3_TESTING =
Configuration.getInstance().getBooleanWithDefault("ParquetTest.enableS3Testing", true);
Configuration.getInstance().getBooleanWithDefault("ParquetTest.enableS3Testing", false);

private static File rootFile;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ final class S3ChannelContext implements SeekableChannelContext {
*/
private long numFragments;

private GetObjectRequest.Builder getObjectRequestBuilder;

S3ChannelContext(S3AsyncClient client, S3Instructions instructions, BufferPool bufferPool) {
this.client = Objects.requireNonNull(client);
this.instructions = Objects.requireNonNull(instructions);
Expand All @@ -73,6 +75,7 @@ final class S3ChannelContext implements SeekableChannelContext {
uri = null;
size = UNINITIALIZED_SIZE;
numFragments = UNINITIALIZED_NUM_FRAGMENTS;
getObjectRequestBuilder = null;
if (log.isDebugEnabled()) {
log.debug("creating context: {}", ctxStr());
}
Expand Down Expand Up @@ -339,7 +342,10 @@ private int requestLength() {
}

private GetObjectRequest getObjectRequest() {
return GetObjectRequest.builder()
if (getObjectRequestBuilder == null) {
getObjectRequestBuilder = GetObjectRequest.builder();
}
return getObjectRequestBuilder
.bucket(uri.bucket().orElseThrow())
.key(uri.key().orElseThrow())
.range("bytes=" + from + "-" + to)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public void setContext(@Nullable final SeekableChannelContext channelContext) {
}
this.context = (S3ChannelContext) channelContext;
if (this.context != null) {
if (this.context.getUri() != uri) {
if (this.context.getUri() == null) {
this.context.verifyOrSetUri(uri);
} else if (!this.context.getUri().equals(uri)) {
this.context.close();
this.context.verifyOrSetUri(uri);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryMode;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
Expand Down Expand Up @@ -41,6 +43,8 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider {
*/
private static final BufferPool BUFFER_POOL = new BufferPool(S3Instructions.MAX_FRAGMENT_SIZE);

private static final Logger log = LoggerFactory.getLogger(S3SeekableChannelProvider.class);

private final S3AsyncClient s3AsyncClient;
private final S3Instructions s3Instructions;

Expand All @@ -51,6 +55,10 @@ final class S3SeekableChannelProvider implements SeekableChannelsProvider {
this.s3Instructions = s3Instructions;
}

S3Instructions getS3Instructions() {
return s3Instructions;
}

private static S3AsyncClient buildClient(@NotNull S3Instructions s3Instructions) {
final S3AsyncClientBuilder builder = S3AsyncClient.builder()
.httpClient(AwsCrtAsyncHttpClient.builder()
Expand All @@ -70,6 +78,9 @@ private static S3AsyncClient buildClient(@NotNull S3Instructions s3Instructions)
.region(Region.of(s3Instructions.regionName()))
.credentialsProvider(s3Instructions.awsV2CredentialsProvider());
s3Instructions.endpointOverride().ifPresent(builder::endpointOverride);
if (log.isDebugEnabled()) {
log.debug("building client with instructions: {}", s3Instructions);
}
return builder.build();
}

Expand Down Expand Up @@ -110,12 +121,18 @@ public SeekableByteChannel getWriteChannel(@NotNull final Path path, final boole
@Override
public void applyToChildURIs(@NotNull final URI directoryURI, @NotNull final Consumer<URI> processor)
throws IOException {
if (log.isDebugEnabled()) {
log.debug("requesting list of child URIs for directory: {}", directoryURI);
}
applyToChildURIsHelper(directoryURI, processor, false);
}

@Override
public void applyToChildURIsRecursively(@NotNull final URI directoryURI, @NotNull final Consumer<URI> processor)
throws IOException {
if (log.isDebugEnabled()) {
log.debug("requesting recursive list of child URIs for directory: {}", directoryURI);
}
applyToChildURIsHelper(directoryURI, processor, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public final class S3SeekableChannelProviderPlugin implements SeekableChannelsPr

static final String S3_URI_SCHEME = "s3";

private static volatile S3SeekableChannelProvider instance;

@Override
public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) {
return S3_URI_SCHEME.equals(uri.getScheme());
Expand All @@ -33,6 +35,13 @@ public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable
throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri);
}
final S3Instructions s3Instructions = (S3Instructions) config;
return new S3SeekableChannelProvider(s3Instructions);
if (instance == null || s3Instructions != instance.getS3Instructions()) {
synchronized (S3SeekableChannelProvider.class) {
if (instance == null || s3Instructions != instance.getS3Instructions()) {
instance = new S3SeekableChannelProvider(s3Instructions);
}
}
}
return instance;
}
}

0 comments on commit 2512562

Please sign in to comment.