Skip to content

Commit

Permalink
Move list object API to java storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
shilpi23pandey committed Dec 19, 2023
1 parent 0acb27a commit e99f850
Showing 1 changed file with 55 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@

package com.google.cloud.hadoop.gcsio;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo.createInferredDirectory;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.decodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.encodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo.createInferredDirectory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.Math.toIntExact;

import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.util.Data;
import com.google.api.gax.paging.Page;
import com.google.api.gax.paging.Page;
import com.google.auth.Credentials;
import com.google.auto.value.AutoBuilder;
Expand All @@ -43,7 +41,6 @@
import com.google.cloud.hadoop.util.GcsClientStatisticInterface;
import com.google.cloud.hadoop.util.GrpcErrorTypeExtractor;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobWriteSessionConfig;
Expand All @@ -58,9 +55,8 @@
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BucketField;
Expand All @@ -73,10 +69,8 @@
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.flogger.GoogleLogger;
import com.google.common.io.BaseEncoding;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ClientInterceptor;
Expand All @@ -88,11 +82,11 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
Expand All @@ -108,7 +102,6 @@
*/
@VisibleForTesting
public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

// Maximum number of times to retry deletes in the case of precondition failures.
Expand All @@ -128,17 +121,6 @@ public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {
.setDaemon(true)
.build());

@Nullable
private static byte[] decodeMetadataValues(String value) {
try {
return BaseEncoding.base64().decode(value);
} catch (IllegalArgumentException iae) {
logger.atSevere().withCause(iae).log(
"Failed to parse base64 encoded attribute value %s - %s", value, iae);
return null;
}
}

/**
* Having an instance of gscImpl to redirect calls to Json client while new client implementation
* is in WIP.
Expand Down Expand Up @@ -478,11 +460,14 @@ public List<GoogleCloudStorageItemInfo> listObjectInfo(

String pageToken = null;
do {
pageToken = listObjectInfoPageInternal(bucketName, objectNamePrefix, objectInfos, listOptions,
pageToken);
pageToken =
listObjectInfoPageInternal(
bucketName, objectNamePrefix, objectInfos, listOptions, pageToken);
} while (pageToken != null
&& getMaxRemainingResults(listOptions.getMaxResults(), objectInfos) > 0);

objectInfos.sort(Comparator.comparing(GoogleCloudStorageItemInfo::getObjectName));

return objectInfos;
}

Expand All @@ -503,17 +488,19 @@ public ListPage<GoogleCloudStorageItemInfo> listObjectInfoPage(

List<GoogleCloudStorageItemInfo> objectInfos = new ArrayList<>();

String nextPageToken = listObjectInfoPageInternal(bucketName, objectNamePrefix, objectInfos,
listOptions, pageToken);
String nextPageToken =
listObjectInfoPageInternal(
bucketName, objectNamePrefix, objectInfos, listOptions, pageToken);
return new ListPage<>(objectInfos, nextPageToken);

}

private String listObjectInfoPageInternal(String bucketName,
private String listObjectInfoPageInternal(
String bucketName,
String objectNamePrefix,
List<GoogleCloudStorageItemInfo> objectInfos,
ListObjectOptions listOptions,
String pageToken) throws IOException {
String pageToken)
throws IOException {
// TODO: Check if we need to set maxResults + 1 in case of listing prefixes.
logger.atFiner().log("listObjectInfoPage(%s, %s)", objectInfos, listOptions);

Expand All @@ -536,9 +523,11 @@ private String listObjectInfoPageInternal(String bucketName,
Iterator<Blob> blobIterator;
String nextPageToken = null;
try {
Page<Blob> blobListPage = storage.list(bucketName,
prepareBlobListOptions(listOptions, objectNamePrefix, pageToken).toArray(
new BlobListOption[0]));
Page<Blob> blobListPage =
storage.list(
bucketName,
prepareBlobListOptions(listOptions, objectNamePrefix, pageToken)
.toArray(new BlobListOption[0]));
nextPageToken = emptyToNull(blobListPage.getNextPageToken());
blobIterator = blobListPage.getValues().iterator();
} catch (StorageException e) {
Expand All @@ -556,15 +545,13 @@ && getMaxRemainingResults(listOptions.getMaxResults(), objectInfos) > 0) {
Blob blob = blobIterator.next();
if (blob.isDirectory() && listOptions.isIncludePrefix()) {
// Handle prefixes.
objectInfos.add(
createInferredDirectory(new StorageResourceId(bucketName, blob.getName())));
objectInfos.add(createInferredDirectory(new StorageResourceId(bucketName, blob.getName())));
} else if (!objectPrefixEndsWithDelimiter || !blob.getName().equals(objectNamePrefix)) {
// Handle objects.
objectInfos.add(createItemInfoForBlob(blob));
} else if (blob.getName().equals(objectNamePrefix) && listOptions.isIncludePrefix()) {
// Handle object prefixes. Object prefixes show up as non-directory.
objectInfos.add(
createInferredDirectory(new StorageResourceId(bucketName, blob.getName())));
objectInfos.add(createItemInfoForBlob(blob));
objectPrefixIncluded = true;
}
}
Expand All @@ -578,37 +565,41 @@ && getMaxRemainingResults(listOptions.getMaxResults(), objectInfos) > 0) {
// Only add an inferred directory if listed any prefixes or objects, i.e. prefix "exists"
&& !objectInfos.isEmpty()
// Only add an inferred directory if prefix object is not listed already
&& !objectPrefixIncluded
) {
objectInfos.add(
createInferredDirectory(new StorageResourceId(bucketName, objectNamePrefix)));
&& !objectPrefixIncluded) {
objectInfos.add(createInferredDirectory(new StorageResourceId(bucketName, objectNamePrefix)));
}
return nextPageToken;
}

private static long getMaxRemainingResults(long maxResults,
List<GoogleCloudStorageItemInfo> blobs) {
private static long getMaxRemainingResults(
long maxResults, List<GoogleCloudStorageItemInfo> blobs) {
if (maxResults <= 0) {
return Long.MAX_VALUE;
}
return maxResults - blobs.size();
}

private static BlobField getBlobField(String field) {
Optional<BlobField> resultField = Arrays.stream(BlobField.values())
.filter(blobField -> blobField.getApiaryName().equals(field)).findFirst();
Optional<BlobField> resultField =
Arrays.stream(BlobField.values())
.filter(blobField -> blobField.getApiaryName().equals(field))
.findFirst();
return resultField.orElse(null);
}

private List<BlobListOption> prepareBlobListOptions(ListObjectOptions listOptions,
@Nullable String objectNamePrefix, @Nullable String pageToken) {
private List<BlobListOption> prepareBlobListOptions(
ListObjectOptions listOptions,
@Nullable String objectNamePrefix,
@Nullable String pageToken) {
long maxResults = listOptions.getMaxResults();

List<BlobListOption> blobListOptions = new ArrayList<>();

blobListOptions.add(BlobListOption.pageSize(
maxResults <= 0 || maxResults >= storageOptions.getMaxListItemsPerCall()
? storageOptions.getMaxListItemsPerCall() : maxResults));
blobListOptions.add(
BlobListOption.pageSize(
maxResults <= 0 || maxResults >= storageOptions.getMaxListItemsPerCall()
? storageOptions.getMaxListItemsPerCall()
: maxResults));

if (listOptions.getDelimiter() != null) {
blobListOptions.add(BlobListOption.delimiter(listOptions.getDelimiter()));
Expand All @@ -617,8 +608,11 @@ private List<BlobListOption> prepareBlobListOptions(ListObjectOptions listOption
blobListOptions.add(BlobListOption.prefix(objectNamePrefix));
}
if (listOptions.getFields() != null) {
blobListOptions.add(BlobListOption.fields(Arrays.stream(listOptions.getFields().split(","))
.map(GoogleCloudStorageClientImpl::getBlobField).toArray(BlobField[]::new)));
blobListOptions.add(
BlobListOption.fields(
Arrays.stream(listOptions.getFields().split(","))
.map(GoogleCloudStorageClientImpl::getBlobField)
.toArray(BlobField[]::new)));
}
if (pageToken != null) {
blobListOptions.add(BlobListOption.pageToken(pageToken));
Expand All @@ -630,11 +624,10 @@ private static GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) {
checkNotNull(blob, "object must not be null");
checkArgument(!isNullOrEmpty(blob.getBucket()), "object must have a bucket: %s", blob);
checkArgument(!isNullOrEmpty(blob.getName()), "object must have a name: %s", blob);
return createItemInfoForStorageBlob(new StorageResourceId(blob.getBucket(), blob.getName()),
blob);
return createItemInfoForBlob(new StorageResourceId(blob.getBucket(), blob.getName()), blob);
}

private static GoogleCloudStorageItemInfo createItemInfoForStorageBlob(
private static GoogleCloudStorageItemInfo createItemInfoForBlob(
StorageResourceId resourceId, Blob blob) {
checkArgument(resourceId != null, "resourceId must not be null");
checkArgument(blob != null, "object must not be null");
Expand Down Expand Up @@ -667,12 +660,15 @@ private static GoogleCloudStorageItemInfo createItemInfoForStorageBlob(
md5Hash = BaseEncoding.base64().decode(blob.getMd5());
}

// TODO : Replace the deprecated time fields.
return GoogleCloudStorageItemInfo.createObject(
resourceId,
blob.getCreateTime() == null ? 0 : blob.getCreateTime(),
blob.getUpdateTime() == null ? 0 : blob.getUpdateTime(),
blob.getSize() == null ? 0 : blob.getSize().longValue(),
blob.getCreateTimeOffsetDateTime() == null
? 0
: blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getUpdateTimeOffsetDateTime() == null
? 0
: blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getSize() == null ? 0 : blob.getSize(),
blob.getContentType(),
blob.getContentEncoding(),
decodedMetadata,
Expand All @@ -681,11 +677,6 @@ private static GoogleCloudStorageItemInfo createItemInfoForStorageBlob(
new VerificationAttributes(md5Hash, crc32c));
}

@VisibleForTesting
static Map<String, byte[]> decodeMetadata(Map<String, String> metadata) {
return Maps.transformValues(metadata, GoogleCloudStorageClientImpl::decodeMetadataValues);
}

@Override
public SeekableByteChannel open(
StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) throws IOException {
Expand Down Expand Up @@ -786,8 +777,7 @@ public GoogleCloudStorageItemInfo composeObjects(
/**
* Gets the object generation for a write operation
*
* <p>making getItemInfo call even if overwrite is disabled to fail fast in case file is
* existing.
* <p>making getItemInfo call even if overwrite is disabled to fail fast in case file is existing.
*
* @param resourceId object for which generation info is requested
* @param overwrite whether existing object should be overwritten
Expand Down Expand Up @@ -906,57 +896,6 @@ private static ExecutorSupplier getPCUExecutorSupplier(ExecutorService pCUExecut
: ExecutorSupplier.useExecutor(pCUExecutorService);
}

/** Helper for converting a StorageResourceId + Blob into a GoogleCloudStorageItemInfo. */
private static GoogleCloudStorageItemInfo createItemInfoForBlob(
StorageResourceId resourceId, Blob blob) {
checkArgument(resourceId != null, "resourceId must not be null");
checkArgument(blob != null, "object must not be null");
checkArgument(
resourceId.isStorageObject(),
"resourceId must be a StorageObject. resourceId: %s",
resourceId);
checkArgument(
resourceId.getBucketName().equals(blob.getBucket()),
"resourceId.getBucketName() must equal object.getBucket(): '%s' vs '%s'",
resourceId.getBucketName(),
blob.getBucket());
checkArgument(
resourceId.getObjectName().equals(blob.getName()),
"resourceId.getObjectName() must equal object.getName(): '%s' vs '%s'",
resourceId.getObjectName(),
blob.getName());

Map<String, byte[]> decodedMetadata =
blob.getMetadata() == null ? null : decodeMetadata(blob.getMetadata());

byte[] md5Hash = null;
byte[] crc32c = null;

if (!isNullOrEmpty(blob.getCrc32c())) {
crc32c = BaseEncoding.base64().decode(blob.getCrc32c());
}

if (!isNullOrEmpty(blob.getMd5())) {
md5Hash = BaseEncoding.base64().decode(blob.getMd5());
}

return GoogleCloudStorageItemInfo.createObject(
resourceId,
blob.getCreateTimeOffsetDateTime() == null
? 0
: blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getUpdateTimeOffsetDateTime() == null
? 0
: blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getSize() == null ? 0 : blob.getSize(),
blob.getContentType(),
blob.getContentEncoding(),
decodedMetadata,
blob.getGeneration() == null ? 0 : blob.getGeneration(),
blob.getMetageneration() == null ? 0 : blob.getMetageneration(),
new VerificationAttributes(md5Hash, crc32c));
}

public static Builder builder() {
return new AutoBuilder_GoogleCloudStorageClientImpl_Builder();
}
Expand Down

0 comments on commit e99f850

Please sign in to comment.