Skip to content

Commit

Permalink
feat: Added native support to read/write parquet files from GCS URIs (#…
Browse files Browse the repository at this point in the history
…6007)

Closes #5999
  • Loading branch information
malhotrashivam authored Sep 6, 2024
1 parent 483a72f commit 7480812
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ protected final void findKeys(@NotNull final Stream<URI> uriStream,
buildLocationKeys(locationTable, targetURIs, locationKeyObserver);
}

private void getPartitions(@NotNull final URI relativePath,
private void getPartitions(
@NotNull final URI relativePath,
@NotNull final Set<String> partitionKeys,
@NotNull final Collection<String> partitionValues,
@NotNull final TIntObjectMap<ColumnNameInfo> partitionColInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static io.deephaven.parquet.table.ParquetTools.readTable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* These tests verify the behavior of Parquet implementation when reading against remote S3 servers.
Expand Down Expand Up @@ -100,6 +101,59 @@ public void readSampleParquetFilesFromPublicS3Part3() {
readTable("s3://redshift-downloads/redset/serverless/full.parquet", readInstructions).head(10).select();
}

@Test
public void readSampleParquetFromPublicGCS() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
final Table tableWithEndpointOverride;
{
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(S3Instructions.builder()
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.regionName("us-east-1")
.endpointOverride("https://storage.googleapis.com")
.build())
.build();
tableWithEndpointOverride = ParquetTools.readTable(
"s3://cloud-samples-data/bigquery/us-states/us-states.parquet", readInstructions).select();
assertEquals(2, tableWithEndpointOverride.numColumns());
assertEquals(50, tableWithEndpointOverride.size());
}

final Table tableWithoutEndpointOverride;
{
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(S3Instructions.builder()
.readTimeout(Duration.ofSeconds(60))
.regionName("us-east-1")
.credentials(Credentials.anonymous())
.build())
.build();
tableWithoutEndpointOverride = ParquetTools.readTable(
"gs://cloud-samples-data/bigquery/us-states/us-states.parquet", readInstructions).select();
assertEquals(2, tableWithoutEndpointOverride.numColumns());
assertEquals(50, tableWithoutEndpointOverride.size());
}
assertTableEquals(tableWithEndpointOverride, tableWithoutEndpointOverride);
}

@Test
public void testReadFromGCSFailure() {
final ParquetInstructions readInstructions = new ParquetInstructions.Builder()
.setSpecialInstructions(S3Instructions.builder()
.readTimeout(Duration.ofSeconds(60))
.credentials(Credentials.anonymous())
.endpointOverride("https://storage.com")
.build())
.build();
try {
ParquetTools.readTable(
"gs://cloud-samples-data/bigquery/us-states/us-states.parquet", readInstructions).select();
} catch (final IllegalArgumentException e) {
assertTrue(e.toString().contains("endpoint override"));
}
}

@Test
public void readKeyValuePartitionedParquetFromPublicS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
Expand All @@ -120,6 +174,9 @@ public void readKeyValuePartitionedParquetFromPublicS3() {
assertEquals(2, table.numColumns());
}

/**
* The follow test reads from Deephaven's s3 bucket, thus requires the credentials to be set up.
*/
@Test
public void readMetadataPartitionedParquetFromS3() {
Assume.assumeTrue("Skipping test because s3 testing disabled.", ENABLE_REMOTE_S3_TESTING);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.s3;

import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.channel.CompletableOutputStream;
import io.deephaven.util.channel.SeekableChannelContext;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel;
import java.util.stream.Stream;

import static io.deephaven.extensions.s3.GCSSeekableChannelProviderPlugin.GCS_URI_SCHEME;
import static io.deephaven.extensions.s3.S3SeekableChannelProviderPlugin.S3_URI_SCHEME;

final class GCSSeekableChannelProvider extends S3SeekableChannelProvider {

private static final Logger log = LoggerFactory.getLogger(GCSSeekableChannelProvider.class);

GCSSeekableChannelProvider(@NotNull final S3Instructions s3Instructions) {
super(s3Instructions);
}

@Override
public boolean exists(@NotNull final URI uri) {
return super.exists(gcsToS3Uri(uri));
}

@Override
public SeekableByteChannel getReadChannel(
@NotNull final SeekableChannelContext channelContext,
@NotNull final URI uri) {
return super.getReadChannel(channelContext, gcsToS3Uri(uri));
}

@Override
public CompletableOutputStream getOutputStream(@NotNull final URI uri, final int bufferSizeHint) {
return super.getOutputStream(gcsToS3Uri(uri), bufferSizeHint);
}

@Override
public Stream<URI> list(@NotNull final URI directory) {
if (log.isDebugEnabled()) {
log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl();
}
return createStream(gcsToS3Uri(directory), false, GCS_URI_SCHEME);
}

@Override
public Stream<URI> walk(@NotNull final URI directory) {
if (log.isDebugEnabled()) {
log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl();
}
return createStream(gcsToS3Uri(directory), true, GCS_URI_SCHEME);
}

private static URI gcsToS3Uri(@NotNull final URI uri) {
try {
if (S3_URI_SCHEME.equals(uri.getScheme())) {
return uri;
}
return new URI(S3_URI_SCHEME, uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(),
uri.getQuery(), uri.getFragment());
} catch (final URISyntaxException e) {
throw new IllegalArgumentException("Failed to convert GCS URI " + uri + " to s3 URI", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.s3;

import com.google.auto.service.AutoService;
import io.deephaven.util.channel.SeekableChannelsProvider;
import io.deephaven.util.channel.SeekableChannelsProviderPlugin;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;

/**
* {@link SeekableChannelsProviderPlugin} implementation used for reading files from Google Cloud Storage.
*/
@AutoService(SeekableChannelsProviderPlugin.class)
public final class GCSSeekableChannelProviderPlugin implements SeekableChannelsProviderPlugin {

static final String GCS_URI_SCHEME = "gs";

private static final String ENDPOINT_OVERRIDE_SUFFIX = ".googleapis.com";
private static final URI DEFAULT_ENDPOINT_OVERRIDE = URI.create("https://storage.googleapis.com");
private static final S3Instructions DEFAULT_INSTRUCTIONS =
S3Instructions.builder().endpointOverride(DEFAULT_ENDPOINT_OVERRIDE).build();

@Override
public boolean isCompatible(@NotNull final URI uri, @Nullable final Object config) {
return GCS_URI_SCHEME.equals(uri.getScheme());
}

@Override
public SeekableChannelsProvider createProvider(@NotNull final URI uri, @Nullable final Object config) {
if (!isCompatible(uri, config)) {
throw new IllegalArgumentException("Arguments not compatible, provided uri " + uri);
}
return new GCSSeekableChannelProvider(s3Instructions(config));
}

/**
* Get the S3Instructions from the config object, or use the default if the config is null.
*/
private static S3Instructions s3Instructions(@Nullable final Object config) {
if (config == null) {
return DEFAULT_INSTRUCTIONS;
}
if (!(config instanceof S3Instructions)) {
throw new IllegalArgumentException("Only S3Instructions are valid when reading GCS URIs, " +
"provided config instance of class " + config.getClass().getName());
}
final S3Instructions s3Instructions = (S3Instructions) config;
if (s3Instructions.endpointOverride().isEmpty()) {
return s3Instructions.withEndpointOverride(DEFAULT_ENDPOINT_OVERRIDE);
}
if (!(s3Instructions.endpointOverride().get()).toString().endsWith(ENDPOINT_OVERRIDE_SUFFIX)) {
throw new IllegalArgumentException("Provided endpoint override=(" +
s3Instructions.endpointOverride().get() + " not supported when reading GCS URIs, must end with " +
ENDPOINT_OVERRIDE_SUFFIX);
}
return s3Instructions;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ public LogOutput append(final LogOutput logOutput) {
*/
public abstract Optional<URI> endpointOverride();

public abstract S3Instructions withEndpointOverride(final URI endpointOverride);

public interface Builder {
Builder regionName(String regionName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
/**
* {@link SeekableChannelsProvider} implementation that is used to fetch objects from an S3-compatible API.
*/
final class S3SeekableChannelProvider implements SeekableChannelsProvider {
class S3SeekableChannelProvider implements SeekableChannelsProvider {

private static final int MAX_KEYS_PER_BATCH = 1000;
private static final int UNKNOWN_SIZE = -1;
Expand Down Expand Up @@ -97,7 +97,8 @@ public boolean exists(@NotNull final URI uri) {
}

@Override
public SeekableByteChannel getReadChannel(@NotNull final SeekableChannelContext channelContext,
public SeekableByteChannel getReadChannel(
@NotNull final SeekableChannelContext channelContext,
@NotNull final URI uri) {
final S3Uri s3Uri = s3AsyncClient.utilities().parseUri(uri);
// context is unused here, will be set before reading from the channel
Expand Down Expand Up @@ -140,18 +141,28 @@ public Stream<URI> list(@NotNull final URI directory) {
if (log.isDebugEnabled()) {
log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl();
}
return createStream(directory, false);
return createStream(directory, false, S3_URI_SCHEME);
}

@Override
public Stream<URI> walk(@NotNull final URI directory) {
if (log.isDebugEnabled()) {
log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl();
}
return createStream(directory, true);
return createStream(directory, true, S3_URI_SCHEME);
}

private Stream<URI> createStream(@NotNull final URI directory, final boolean isRecursive) {
/**
* Create a stream of URIs, the elements of which are the entries in the directory.
*
* @param directory The parent directory to list.
* @param isRecursive Whether to list the entries recursively.
* @param childScheme The scheme to apply to the children URIs in the returned stream.
*/
Stream<URI> createStream(
@NotNull final URI directory,
final boolean isRecursive,
@NotNull final String childScheme) {
// The following iterator fetches URIs from S3 in batches and creates a stream
final Iterator<URI> iterator = new Iterator<>() {
private final String bucketName;
Expand Down Expand Up @@ -222,7 +233,7 @@ private void fetchNextBatch() throws IOException {
}
final URI uri;
try {
uri = new URI(S3_URI_SCHEME, directory.getUserInfo(), directory.getHost(),
uri = new URI(childScheme, directory.getUserInfo(), directory.getHost(),
directory.getPort(), path, null, null);
} catch (final URISyntaxException e) {
throw new UncheckedDeephavenException("Failed to create URI for S3 object with key: "
Expand Down

0 comments on commit 7480812

Please sign in to comment.