diff --git a/gcs/CHANGES.md b/gcs/CHANGES.md index bfc9bf287c..0534d85027 100644 --- a/gcs/CHANGES.md +++ b/gcs/CHANGES.md @@ -122,7 +122,7 @@ 1. Upgrade Hadoop to 3.3.5. -1. Upgrade java-storage to 2.27.1 +1. Upgrade java-storage to 2.28.0 1. Add support for `WORKLOAD_IDENTITY_FEDERATION_CREDENTIAL_CONFIG_FILE` authentication type that retrieves a refresh token using workload identity federation configuraiton defined in: `fs.gs.auth.workload.identity.federation.credential.config.file` diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java index 71f9429f78..7815646c84 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java @@ -35,6 +35,7 @@ import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; import com.google.cloud.hadoop.gcsio.PerformanceCachingGoogleCloudStorageOptions; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PipeType; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType; import com.google.cloud.hadoop.util.RedactedString; @@ -470,7 +471,7 @@ public class GoogleHadoopFileSystemConfiguration { * effective only if fs.gs.client.type is set to STORAGE_CLIENT. */ public static final HadoopConfigurationProperty GCS_CLIENT_UPLOAD_TYPE = - new HadoopConfigurationProperty<>("fs.gs.client.upload.type", UploadType.DEFAULT); + new HadoopConfigurationProperty<>("fs.gs.client.upload.type", UploadType.CHUNK_UPLOAD); /** * Configuration key to configure the Path where uploads will be parked on disk. If not set then @@ -481,6 +482,49 @@ public class GoogleHadoopFileSystemConfiguration { GCS_WRITE_TEMPORARY_FILES_PATH = new HadoopConfigurationProperty<>("fs.gs.write.temporary.dirs", ImmutableSet.of()); + /** + * Configuration key to configure the Buffers for UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in + * alignment with configuration of java-storage client + * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy#com_google_cloud_storage_ParallelCompositeUploadBlobWriteSessionConfig_BufferAllocationStrategy_fixedPool_int_int_ + */ + public static final HadoopConfigurationProperty GCS_PCU_BUFFER_COUNT = + new HadoopConfigurationProperty<>( + "fs.gs.write.parallel.composite.upload.buffer.count", + AsyncWriteChannelOptions.DEFAULT.getPCUBufferCount()); + + /** + * Configuration key to configure the buffer capacity for UploadType.PARALLEL_COMPOSITE_UPLOAD. It + * is in alignment with configuration of java-storage client + * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy#com_google_cloud_storage_ParallelCompositeUploadBlobWriteSessionConfig_BufferAllocationStrategy_fixedPool_int_int_ + */ + public static final HadoopConfigurationProperty GCS_PCU_BUFFER_CAPACITY = + new HadoopConfigurationProperty<>( + "fs.gs.write.parallel.composite.upload.buffer.capacity", + (long) AsyncWriteChannelOptions.DEFAULT.getPCUBufferCapacity()); + + /** + * Configuration key to clean up strategy of part files created via + * UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in alignment with configuration of java-storage + * client + * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy + */ + public static final HadoopConfigurationProperty + GCS_PCU_PART_FILE_CLEANUP_TYPE = + new HadoopConfigurationProperty<>( + "fs.gs.write.parallel.composite.upload.part.file.cleanup.type", + AsyncWriteChannelOptions.DEFAULT.getPartFileCleanupType()); + + /** + * Configuration key to set up the naming strategy of part files created via + * UploadType.PARALLEL_COMPOSITE_UPLOAD. It is in alignment with configuration of java-storage + * client + * https://cloud.google.com/java/docs/reference/google-cloud-storage/latest/com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy + */ + public static final HadoopConfigurationProperty GCS_PCU_PART_FILE_NAME_PREFIX = + new HadoopConfigurationProperty<>( + "fs.gs.write.parallel.composite.upload.part.file.name.prefix", + AsyncWriteChannelOptions.DEFAULT.getPartFileNamePrefix()); + static GoogleCloudStorageFileSystemOptions.Builder getGcsFsOptionsBuilder(Configuration config) { return GoogleCloudStorageFileSystemOptions.builder() .setBucketDeleteEnabled(GCE_BUCKET_DELETE_ENABLE.get(config, config::getBoolean)) @@ -588,6 +632,10 @@ private static AsyncWriteChannelOptions getWriteChannelOptions(Configuration con .setUploadType(GCS_CLIENT_UPLOAD_TYPE.get(config, config::getEnum)) .setTemporaryPaths( ImmutableSet.copyOf(GCS_WRITE_TEMPORARY_FILES_PATH.getStringCollection(config))) + .setPCUBufferCount(GCS_PCU_BUFFER_COUNT.get(config, config::getInt)) + .setPCUBufferCapacity(toIntExact(GCS_PCU_BUFFER_CAPACITY.get(config, config::getLongBytes))) + .setPartFileCleanupType(GCS_PCU_PART_FILE_CLEANUP_TYPE.get(config, config::getEnum)) + .setPartFileNamePrefix(GCS_PCU_PART_FILE_NAME_PREFIX.get(config, config::get)) .build(); } diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java index 0cb1eb903b..dfbf0f5024 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java @@ -32,6 +32,7 @@ import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions.MetricsSink; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; import com.google.cloud.hadoop.gcsio.PerformanceCachingGoogleCloudStorageOptions; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PipeType; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType; import com.google.cloud.hadoop.util.RequesterPaysOptions.RequesterPaysMode; @@ -113,8 +114,14 @@ public class GoogleHadoopFileSystemConfigurationTest { put("fs.gs.storage.service.path", "storage/v1/"); put("fs.gs.tracelog.enable", false); put("fs.gs.working.dir", "/"); - put("fs.gs.client.upload.type", UploadType.DEFAULT); + put("fs.gs.client.upload.type", UploadType.CHUNK_UPLOAD); put("fs.gs.write.temporary.dirs", ImmutableSet.of()); + put("fs.gs.write.parallel.composite.upload.buffer.count", 1); + put("fs.gs.write.parallel.composite.upload.buffer.capacity", 32 * 1024 * 1024L); + put( + "fs.gs.write.parallel.composite.upload.part.file.cleanup.type", + PartFileCleanupType.ALWAYS); + put("fs.gs.write.parallel.composite.upload.part.file.name.prefix", ""); } }; diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java index c9ccf5f68a..653259511b 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java @@ -27,14 +27,20 @@ import com.google.cloud.NoCredentials; import com.google.cloud.hadoop.util.AccessBoundary; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType; import com.google.cloud.hadoop.util.ErrorTypeExtractor; import com.google.cloud.hadoop.util.GcsClientStatisticInterface; import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor; import com.google.cloud.storage.BlobWriteSessionConfig; import com.google.cloud.storage.BlobWriteSessionConfigs; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy; +import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.flogger.GoogleLogger; @@ -87,6 +93,7 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage { @Nullable HttpRequestInitializer httpRequestInitializer, @Nullable ImmutableList gRPCInterceptors, @Nullable Function, String> downscopedAccessTokenFn, + @Nullable ExecutorService pCUExecutorService, @Nullable GcsClientStatisticInterface gcsClientStatisticInterface) throws IOException { super( @@ -102,7 +109,7 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage { this.storageOptions = options; this.storage = clientLibraryStorage == null - ? createStorage(credentials, options, gRPCInterceptors) + ? createStorage(credentials, options, gRPCInterceptors, pCUExecutorService) : clientLibraryStorage; } @@ -209,7 +216,8 @@ private long getWriteGeneration(StorageResourceId resourceId, boolean overwrite) private static Storage createStorage( Credentials credentials, GoogleCloudStorageOptions storageOptions, - List interceptors) + List interceptors, + ExecutorService pCUExecutorService) throws IOException { return StorageOptions.grpc() .setAttemptDirectPath(storageOptions.isDirectPathPreferred()) @@ -227,15 +235,20 @@ private static Storage createStorage( return ImmutableList.copyOf(list); }) .setCredentials(credentials != null ? credentials : NoCredentials.getInstance()) - .setBlobWriteSessionConfig(getSessionConfig(storageOptions.getWriteChannelOptions())) + .setBlobWriteSessionConfig( + getSessionConfig(storageOptions.getWriteChannelOptions(), pCUExecutorService)) .build() .getService(); } - private static BlobWriteSessionConfig getSessionConfig(AsyncWriteChannelOptions writeOptions) + private static BlobWriteSessionConfig getSessionConfig( + AsyncWriteChannelOptions writeOptions, ExecutorService pCUExecutorService) throws IOException { logger.atFiner().log("Upload strategy in use: %s", writeOptions.getUploadType()); switch (writeOptions.getUploadType()) { + case CHUNK_UPLOAD: + return BlobWriteSessionConfigs.getDefault() + .withChunkSize(writeOptions.getUploadChunkSize()); case WRITE_TO_DISK_THEN_UPLOAD: if (writeOptions.getTemporaryPaths() == null || writeOptions.getTemporaryPaths().isEmpty()) { @@ -255,12 +268,47 @@ private static BlobWriteSessionConfig getSessionConfig(AsyncWriteChannelOptions writeOptions.getTemporaryPaths().stream() .map(x -> Paths.get(x)) .collect(ImmutableSet.toImmutableSet())); + case PARALLEL_COMPOSITE_UPLOAD: + return BlobWriteSessionConfigs.parallelCompositeUpload() + .withBufferAllocationStrategy( + BufferAllocationStrategy.fixedPool( + writeOptions.getPCUBufferCount(), writeOptions.getPCUBufferCapacity())) + .withPartCleanupStrategy(getPartCleanupStrategy(writeOptions.getPartFileCleanupType())) + .withExecutorSupplier(getPCUExecutorSupplier(pCUExecutorService)) + .withPartNamingStrategy(getPartNamingStrategy(writeOptions.getPartFileNamePrefix())); default: - return BlobWriteSessionConfigs.getDefault() - .withChunkSize(writeOptions.getUploadChunkSize()); + throw new IllegalArgumentException( + String.format("Upload type:%s is not supported.", writeOptions.getUploadType())); + } + } + + private static PartCleanupStrategy getPartCleanupStrategy(PartFileCleanupType cleanupType) { + switch (cleanupType) { + case NEVER: + return PartCleanupStrategy.never(); + case ON_SUCCESS: + return PartCleanupStrategy.onlyOnSuccess(); + case ALWAYS: + return PartCleanupStrategy.always(); + default: + throw new IllegalArgumentException( + String.format("Cleanup type:%s is not handled.", cleanupType)); } } + private static PartNamingStrategy getPartNamingStrategy(String partFilePrefix) { + if (Strings.isNullOrEmpty(partFilePrefix)) { + return PartNamingStrategy.noPrefix(); + } + return PartNamingStrategy.prefix(partFilePrefix); + } + + private static ExecutorSupplier getPCUExecutorSupplier(ExecutorService pCUExecutorService) { + return pCUExecutorService == null + ? ExecutorSupplier.cachedPool() + : ExecutorSupplier.useExecutor(pCUExecutorService); + } + public static Builder builder() { return new AutoBuilder_GoogleCloudStorageClientImpl_Builder(); } @@ -290,6 +338,9 @@ public abstract Builder setGcsClientStatisticInterface( @VisibleForTesting public abstract Builder setClientLibraryStorage(@Nullable Storage clientLibraryStorage); + @VisibleForTesting + public abstract Builder setPCUExecutorService(@Nullable ExecutorService pCUExecutorService); + public abstract GoogleCloudStorageClientImpl build() throws IOException; } } diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java index fee447c066..b6200c149e 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/integration/GoogleCloudStorageClientImplIntegrationTest.java @@ -14,19 +14,28 @@ package com.google.cloud.hadoop.gcsio.integration; +import static com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper.assertObjectContent; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import com.google.api.client.http.HttpStatusCodes; import com.google.auth.Credentials; +import com.google.cloud.hadoop.gcsio.CreateObjectOptions; import com.google.cloud.hadoop.gcsio.GoogleCloudStorage; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageClientImpl; +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageOptions; +import com.google.cloud.hadoop.gcsio.ListObjectOptions; import com.google.cloud.hadoop.gcsio.StorageResourceId; import com.google.cloud.hadoop.gcsio.integration.GoogleCloudStorageTestHelper.TestBucketHelper; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions; +import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.PartFileCleanupType; import com.google.cloud.hadoop.util.AsyncWriteChannelOptions.UploadType; +import com.google.cloud.storage.StorageException; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.io.Files; +import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -34,6 +43,8 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -59,9 +70,20 @@ public class GoogleCloudStorageClientImplIntegrationTest { private static final String GCS_WRITE_TMP_DIR_1 = String.format("%s/%s", TEMP_DIR_PATH, "gcs-write-dir-1"); + private static final int ONE_MiB = 1024 * 1024; + private static GoogleCloudStorage helperGcs; - private GoogleCloudStorage gcs; + private static final int partFileCount = 2; + private static final int bufferCapacity = partFileCount * ONE_MiB; + + private static final AsyncWriteChannelOptions pcuDefaultOptions = + AsyncWriteChannelOptions.builder() + .setUploadType(UploadType.PARALLEL_COMPOSITE_UPLOAD) + .setPartFileCleanupType(PartFileCleanupType.ALWAYS) + .setPCUBufferCount(partFileCount) + .setPCUBufferCapacity(bufferCapacity) + .build(); private static ImmutableSet tempDirs = ImmutableSet.of(GCS_WRITE_TMP_DIR_1, GCS_WRITE_TMP_DIR); @@ -70,6 +92,8 @@ public class GoogleCloudStorageClientImplIntegrationTest { @Rule public TestName name = new TestName(); + private GoogleCloudStorage gcs; + @BeforeClass public static void before() throws IOException { helperGcs = GoogleCloudStorageTestHelper.createGcsClientImpl(); @@ -116,14 +140,14 @@ public void writeToDiskDisabled() throws IOException { GoogleCloudStorageOptions storageOptions = GoogleCloudStorageTestHelper.getStandardOptionBuilder() .setWriteChannelOptions( - AsyncWriteChannelOptions.builder().setUploadType(UploadType.DEFAULT).build()) + AsyncWriteChannelOptions.builder().setUploadType(UploadType.CHUNK_UPLOAD).build()) .build(); gcs = getGCSImpl(storageOptions); StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); // validate that there were no temporaryFiles created files - writeAndVerifyTemporaryFiles(resourceId, 0); + writeAndVerifyTemporaryFiles(resourceId, /* expectedPartFileCountAfterCleanup */ 0); } @Test @@ -139,7 +163,7 @@ public void writeToDefaultPathThenUploadEnabled() throws IOException { gcs = getGCSImpl(storageOptions); StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); - writeAndVerifyTemporaryFiles(resourceId, 1); + writeAndVerifyTemporaryFiles(resourceId, /* expectedPartFileCountAfterCleanup */ 1); } @Test @@ -157,7 +181,7 @@ public void writeToPathThenUploadEnabled() throws IOException { gcs = getGCSImpl(storageOptions); StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); - writeAndVerifyTemporaryFiles(resourceId, 1); + writeAndVerifyTemporaryFiles(resourceId, /* expectedPartFileCountAfterCleanup */ 1); } @Test @@ -189,6 +213,210 @@ public void uploadViaJournaling() throws IOException { writeAndVerifyTemporaryFiles(resourceId, 1); } + @Test + public void uploadViaPCUVerifyPartFileCleanup() throws IOException, InterruptedException { + String partFilePrefix = name.getMethodName(); + GoogleCloudStorageOptions storageOptions = + GoogleCloudStorageTestHelper.getStandardOptionBuilder() + .setWriteChannelOptions( + pcuDefaultOptions.toBuilder().setPartFileNamePrefix(partFilePrefix).build()) + .build(); + + gcs = getGCSImpl(storageOptions); + StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); + + writeAndVerifyPartFiles( + bufferCapacity, resourceId, /* expectedPartFileCountAfterCleanup */ 0, partFilePrefix); + } + + @Test + public void uploadViaPCUVerifyPartFileNotCleanedUp() throws IOException { + String partFilePrefix = name.getMethodName(); + GoogleCloudStorageOptions storageOptions = + GoogleCloudStorageTestHelper.getStandardOptionBuilder() + .setWriteChannelOptions( + pcuDefaultOptions.toBuilder() + .setPartFileNamePrefix(partFilePrefix) + .setPartFileCleanupType(PartFileCleanupType.NEVER) + .build()) + .build(); + + gcs = getGCSImpl(storageOptions); + StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); + // part file not cleaned up because PartFileCleanupType.NEVER is used. + writeAndVerifyPartFiles(bufferCapacity, resourceId, partFileCount, partFilePrefix); + } + + @Test + public void uploadViaPCUComposeFileMissingFailure() throws IOException { + String partFilePrefix = name.getMethodName(); + GoogleCloudStorageOptions storageOptions = + GoogleCloudStorageTestHelper.getStandardOptionBuilder() + .setWriteChannelOptions( + pcuDefaultOptions.toBuilder().setPartFileNamePrefix(partFilePrefix).build()) + .build(); + + gcs = getGCSImpl(storageOptions); + StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); + + byte[] bytesToWrite = new byte[partFileCount * bufferCapacity]; + GoogleCloudStorageTestHelper.fillBytes(bytesToWrite); + WritableByteChannel writeChannel = gcs.create(resourceId); + writeChannel.write(ByteBuffer.wrap(bytesToWrite)); + + List partFiles = getPartFiles(partFilePrefix); + + // delete one part file + StorageResourceId partFileToBeDeleted = partFiles.get(0).getResourceId(); + gcs.deleteObjects(ImmutableList.of(partFileToBeDeleted)); + + Exception e = assertThrows(IOException.class, writeChannel::close); + verifyPartFileNotFound(e, partFileToBeDeleted.getObjectName()); + + partFiles = getPartFiles(partFilePrefix); + // part files were cleaned up even after failure + assertThat(partFiles.size()).isEqualTo(0); + } + + @Test + public void uploadViaPCUComposeMissingObjectVersion() throws IOException { + String partFilePrefix = name.getMethodName(); + GoogleCloudStorageOptions storageOptions = + GoogleCloudStorageTestHelper.getStandardOptionBuilder() + .setWriteChannelOptions( + pcuDefaultOptions.toBuilder() + .setPartFileNamePrefix(partFilePrefix) + .setPartFileCleanupType(PartFileCleanupType.ON_SUCCESS) + .build()) + .build(); + + gcs = getGCSImpl(storageOptions); + StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); + + byte[] bytesToWrite = new byte[partFileCount * bufferCapacity]; + GoogleCloudStorageTestHelper.fillBytes(bytesToWrite); + WritableByteChannel writeChannel = gcs.create(resourceId); + writeChannel.write(ByteBuffer.wrap(bytesToWrite)); + + List partFiles = getPartFiles(partFilePrefix); + // get one part file and override its content + GoogleCloudStorageItemInfo itemInfoBeforeModification = partFiles.get(0); + gcs.create(itemInfoBeforeModification.getResourceId(), CreateObjectOptions.DEFAULT_OVERWRITE) + .close(); + + GoogleCloudStorageItemInfo itemInfoAfterModification = + gcs.getItemInfo(itemInfoBeforeModification.getResourceId()); + List updatedFiles = getPartFiles(partFilePrefix); + // object with same name is present but generationId is different + assertThat( + updatedFiles.stream() + .anyMatch( + itemInfo -> + (itemInfo.getObjectName().equals(itemInfoAfterModification.getObjectName()) + && itemInfo.getContentGeneration() + != itemInfoBeforeModification.getContentGeneration()) + ? true + : false)) + .isTrue(); + + Exception e = assertThrows(IOException.class, writeChannel::close); + verifyPartFileNotFound(e, itemInfoBeforeModification.getObjectName()); + partFiles = getPartFiles(partFilePrefix); + // part files weren't cleaned up on failure as PartFileCleanupType.ON_SUCCESS was used + assertThat(partFiles.size()).isEqualTo(partFileCount); + } + + @Test + public void uploadViaPCUInvalidPartFileNamePrefix() throws IOException { + // invalid object name https://cloud.google.com/storage/docs/objects#naming + String partFilePrefix = "\n"; + GoogleCloudStorageOptions storageOptions = + GoogleCloudStorageTestHelper.getStandardOptionBuilder() + .setWriteChannelOptions( + pcuDefaultOptions.toBuilder() + .setPartFileNamePrefix(partFilePrefix) + .setPartFileCleanupType(PartFileCleanupType.NEVER) + .build()) + .build(); + gcs = getGCSImpl(storageOptions); + StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); + byte[] bytesToWrite = new byte[partFileCount * bufferCapacity]; + GoogleCloudStorageTestHelper.fillBytes(bytesToWrite); + WritableByteChannel writeChannel = gcs.create(resourceId); + writeChannel.write(ByteBuffer.wrap(bytesToWrite)); + Exception e = assertThrows(IOException.class, writeChannel::close); + verifyPartFileInvalidArgument(e); + } + + @Test + public void uploadViaPCUPartFileCleanupOnSuccess() throws IOException, InterruptedException { + String partFilePrefix = name.getMethodName(); + GoogleCloudStorageOptions storageOptions = + GoogleCloudStorageTestHelper.getStandardOptionBuilder() + .setWriteChannelOptions( + pcuDefaultOptions.toBuilder() + .setPartFileNamePrefix(partFilePrefix) + .setPartFileCleanupType(PartFileCleanupType.ON_SUCCESS) + .build()) + .build(); + gcs = getGCSImpl(storageOptions); + StorageResourceId resourceId = new StorageResourceId(TEST_BUCKET, name.getMethodName()); + writeAndVerifyPartFiles( + bufferCapacity, resourceId, /* expectedPartFileCountAfterCleanup */ 0, partFilePrefix); + } + + private void verifyPartFileNotFound(Throwable throwable, String partFileName) { + StorageException exception = getStorageException(throwable); + assertThat(exception.getMessage()).contains(partFileName); + assertThat(exception.getCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_NOT_FOUND); + } + + private void verifyPartFileInvalidArgument(Throwable throwable) { + StorageException exception = getStorageException(throwable); + assertThat(exception.getMessage()).contains("INVALID_ARGUMENT"); + assertThat(exception.getCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_BAD_REQUEST); + } + + private StorageException getStorageException(Throwable throwable) { + Throwable cause = throwable; + while (cause != null) { + if (cause instanceof StorageException) { + return (StorageException) cause; + } + cause = cause.getCause(); + } + return null; + } + + private List getPartFiles(String prefix) throws IOException { + // list all object + List itemInfos = + gcs.listObjectInfo( + TEST_BUCKET, prefix, ListObjectOptions.builder().setDelimiter(null).build()); + return itemInfos.stream() + .filter(x -> x.getObjectName().endsWith(".part")) + .collect(Collectors.toList()); + } + + private void writeAndVerifyPartFiles( + int bufferCapacity, + StorageResourceId resourceId, + int expectedPartFileCountAfterCleanup, + String partFilePrefix) + throws IOException { + byte[] bytesToWrite = new byte[partFileCount * bufferCapacity]; + GoogleCloudStorageTestHelper.fillBytes(bytesToWrite); + WritableByteChannel writeChannel = gcs.create(resourceId); + writeChannel.write(ByteBuffer.wrap(bytesToWrite)); + + writeChannel.close(); + List partFiles = getPartFiles(partFilePrefix); + // part files are deleted once upload is finished. + assertThat(partFiles.stream().count()).isEqualTo(expectedPartFileCountAfterCleanup); + // verify file content + verifyFileContent(resourceId, bytesToWrite); + } + private void writeAndVerifyTemporaryFiles( StorageResourceId resourceId, int expectedTemporaryFileCount) throws IOException { byte[] bytesToWrite = new byte[1024 * 1024 * 3]; @@ -212,6 +440,7 @@ private GoogleCloudStorage getGCSImpl(GoogleCloudStorageOptions storageOptions) return GoogleCloudStorageClientImpl.builder() .setOptions(storageOptions) .setCredentials(credentials) + .setPCUExecutorService(MoreExecutors.newDirectExecutorService()) .build(); } @@ -226,6 +455,14 @@ private void verifyTemporaryFileCount(ImmutableSet paths, int expectedCoun assertThat(fileCount).isEqualTo(expectedCount); } + private void verifyFileContent(StorageResourceId resourceId, byte[] bytesWritten) + throws IOException { + GoogleCloudStorageItemInfo fileInfo = gcs.getItemInfo(resourceId); + assertThat(fileInfo.exists()).isTrue(); + + assertObjectContent(gcs, resourceId, bytesWritten); + } + private int getFileCount(File file) { File[] files = file.listFiles(); if (files == null) { diff --git a/pom.xml b/pom.xml index 6e3a2c5647..9ce672b667 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 1.14.0 1.10.1 2.9.0 - 2.27.1 + 2.28.0 2.16 0.7.4 2.35.0 diff --git a/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java b/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java index 74a8625c59..336346c2a1 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/AsyncWriteChannelOptions.java @@ -36,6 +36,13 @@ public enum PipeType { NIO_CHANNEL_PIPE, } + /** Part file cleanup strategy for parallel composite upload. */ + public enum PartFileCleanupType { + ALWAYS, + NEVER, + ON_SUCCESS + } + /** * UploadType are in parity with various upload configuration offered by google-java-storage * client ref: @@ -43,16 +50,24 @@ public enum PipeType { */ public enum UploadType { /* Upload chunks to gcs and waits for acknowledgement before uploading another chunk*/ - DEFAULT, + CHUNK_UPLOAD, /* Write whole file to disk and then upload.*/ WRITE_TO_DISK_THEN_UPLOAD, /* Write chunks to file along with uploading to gcs, and failure will be retried from data on disk.*/ - JOURNALING + JOURNALING, + /* Write are performed using parallel composite upload strategy. */ + PARALLEL_COMPOSITE_UPLOAD } + // TODO: update these config with better default values. + private static final int PARALLEL_COMPOSITE_UPLOAD_BUFFER_COUNT = 1; + private static final int PARALLEL_COMPOSITE_UPLOAD_BUFFER_CAPACITY = 32 * 1024 * 1024; + /** Upload chunk size granularity */ private static final int UPLOAD_CHUNK_SIZE_GRANULARITY = 8 * 1024 * 1024; + private static final String PART_FILE_PREFIX = ""; + /** Default upload chunk size. */ private static final int DEFAULT_UPLOAD_CHUNK_SIZE = Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024 @@ -73,8 +88,12 @@ public static Builder builder() { .setPipeType(PipeType.IO_STREAM_PIPE) .setUploadCacheSize(0) .setUploadChunkSize(DEFAULT_UPLOAD_CHUNK_SIZE) - .setUploadType(UploadType.DEFAULT) - .setTemporaryPaths(ImmutableSet.of()); + .setUploadType(UploadType.CHUNK_UPLOAD) + .setTemporaryPaths(ImmutableSet.of()) + .setPCUBufferCount(PARALLEL_COMPOSITE_UPLOAD_BUFFER_COUNT) + .setPCUBufferCapacity(PARALLEL_COMPOSITE_UPLOAD_BUFFER_CAPACITY) + .setPartFileCleanupType(PartFileCleanupType.ALWAYS) + .setPartFileNamePrefix(PART_FILE_PREFIX); } public abstract Builder toBuilder(); @@ -101,8 +120,16 @@ public static Builder builder() { public abstract UploadType getUploadType(); + public abstract PartFileCleanupType getPartFileCleanupType(); + public abstract ImmutableSet getTemporaryPaths(); + public abstract int getPCUBufferCount(); + + public abstract int getPCUBufferCapacity(); + + public abstract String getPartFileNamePrefix(); + /** Mutable builder for the GoogleCloudStorageWriteChannelOptions class. */ @AutoValue.Builder public abstract static class Builder { @@ -133,8 +160,16 @@ public abstract static class Builder { public abstract Builder setUploadType(UploadType uploadType); + public abstract Builder setPartFileCleanupType(PartFileCleanupType partFileCleanupType); + public abstract Builder setTemporaryPaths(ImmutableSet temporaryPaths); + public abstract Builder setPCUBufferCount(int bufferCount); + + public abstract Builder setPCUBufferCapacity(int bufferCapacity); + + public abstract Builder setPartFileNamePrefix(String prefix); + abstract AsyncWriteChannelOptions autoBuild(); public AsyncWriteChannelOptions build() {