Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move list object API to java storage. #1083

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageExceptions.createFileNotFoundException;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.decodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.encodeMetadata;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo.createInferredDirectory;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.emptyToNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.Math.toIntExact;

Expand Down Expand Up @@ -54,6 +56,7 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BucketField;
Expand All @@ -77,9 +80,13 @@
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.ExecutorService;
Expand All @@ -95,7 +102,6 @@
*/
@VisibleForTesting
public class GoogleCloudStorageClientImpl extends ForwardingGoogleCloudStorage {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

// Maximum number of times to retry deletes in the case of precondition failures.
Expand Down Expand Up @@ -442,6 +448,235 @@ public void deleteBuckets(List<String> bucketNames) throws IOException {
}
}

/**
* @see GoogleCloudStorage#listObjectInfo(String, String, ListObjectOptions)
*/
@Override
public List<GoogleCloudStorageItemInfo> listObjectInfo(
String bucketName, String objectNamePrefix, ListObjectOptions listOptions)
throws IOException {
logger.atFiner().log("listObjectInfo(%s, %s, %s)", bucketName, objectNamePrefix, listOptions);
List<GoogleCloudStorageItemInfo> objectInfos = new ArrayList<>();

String pageToken = null;
do {
pageToken =
listObjectInfoPageInternal(
bucketName, objectNamePrefix, objectInfos, listOptions, pageToken);
} while (pageToken != null
&& getMaxRemainingResults(listOptions.getMaxResults(), objectInfos) > 0);

objectInfos.sort(Comparator.comparing(GoogleCloudStorageItemInfo::getObjectName));

return objectInfos;
}

/**
* @see GoogleCloudStorage#listObjectInfoPage(String, String, ListObjectOptions, String)
*/
@Override
public ListPage<GoogleCloudStorageItemInfo> listObjectInfoPage(
String bucketName, String objectNamePrefix, ListObjectOptions listOptions, String pageToken)
throws IOException {
logger.atFiner().log(
"listObjectInfoPage(%s, %s, %s, %s)", bucketName, objectNamePrefix, listOptions, pageToken);

checkArgument(
listOptions.getMaxResults() == MAX_RESULTS_UNLIMITED,
"maxResults should be unlimited for 'listObjectInfoPage' call, but was %s",
listOptions.getMaxResults());

List<GoogleCloudStorageItemInfo> objectInfos = new ArrayList<>();

String nextPageToken =
listObjectInfoPageInternal(
bucketName, objectNamePrefix, objectInfos, listOptions, pageToken);
return new ListPage<>(objectInfos, nextPageToken);
}

private String listObjectInfoPageInternal(
String bucketName,
String objectNamePrefix,
List<GoogleCloudStorageItemInfo> objectInfos,
ListObjectOptions listOptions,
String pageToken)
throws IOException {
// TODO: Check if we need to set maxResults + 1 in case of listing prefixes.
logger.atFiner().log("listObjectInfoPage(%s, %s)", objectInfos, listOptions);

// Although GCS does not implement a file system, it treats objects that end
// in delimiter as different from other objects when listing objects.
//
// If caller sends foo/ as the prefix, foo/ is returned as an object name.
// That is inconsistent with listing items in a directory.
// Not sure if that is a bug in GCS or the intended behavior.
//
// In this case, we do not want foo/ in the returned list because we want to
// keep the behavior more like a file system without calling it as such.
// Therefore, we filter out such entry.
// Determine if the caller sent a directory name as a prefix.
boolean objectPrefixEndsWithDelimiter =
!isNullOrEmpty(objectNamePrefix) && objectNamePrefix.endsWith(PATH_DELIMITER);

boolean objectPrefixIncluded = false;

Iterator<Blob> blobIterator;
String nextPageToken = null;
try {
Page<Blob> blobListPage =
storage.list(
bucketName,
prepareBlobListOptions(listOptions, objectNamePrefix, pageToken)
.toArray(new BlobListOption[0]));
nextPageToken = emptyToNull(blobListPage.getNextPageToken());
blobIterator = blobListPage.getValues().iterator();
} catch (StorageException e) {
String resource = StringPaths.fromComponents(bucketName, objectNamePrefix);
if (errorExtractor.getErrorType(e) == ErrorType.NOT_FOUND) {
logger.atFiner().withCause(e).log(
"listStorageObjectsInternal(%s, %s): item not found", resource, listOptions);
return null;
}
throw new IOException("Error listing " + resource, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we include the e.message as well to the IOException message. Sometime some of the tools just log the message and skip the inner exception details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Veneer would have retried, correct?

}

while (blobIterator.hasNext()
&& getMaxRemainingResults(listOptions.getMaxResults(), objectInfos) > 0) {
Blob blob = blobIterator.next();
if (blob.isDirectory() && listOptions.isIncludePrefix()) {
// Handle prefixes.
objectInfos.add(createInferredDirectory(new StorageResourceId(bucketName, blob.getName())));
} else if (!objectPrefixEndsWithDelimiter || !blob.getName().equals(objectNamePrefix)) {
// Handle objects.
objectInfos.add(createItemInfoForBlob(blob));
} else if (blob.getName().equals(objectNamePrefix) && listOptions.isIncludePrefix()) {
// Handle object prefixes. Object prefixes show up as non-directory.
objectInfos.add(createItemInfoForBlob(blob));
objectPrefixIncluded = true;
}
}

// Create inferred directory for the prefix object if necessary.
if (listOptions.isIncludePrefix()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code duplication. Is there a way to avoid this?

// Only add an inferred directory for non-null prefix name
&& objectNamePrefix != null
// Only add an inferred directory if listing in directory mode (non-flat listing)
&& listOptions.getDelimiter() != null
// Only add an inferred directory if listed any prefixes or objects, i.e. prefix "exists"
&& !objectInfos.isEmpty()
// Only add an inferred directory if prefix object is not listed already
&& !objectPrefixIncluded) {
objectInfos.add(createInferredDirectory(new StorageResourceId(bucketName, objectNamePrefix)));
}
return nextPageToken;
}

private static long getMaxRemainingResults(
long maxResults, List<GoogleCloudStorageItemInfo> blobs) {
if (maxResults <= 0) {
return Long.MAX_VALUE;
}
return maxResults - blobs.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is different from Apiary implementation. Why?

}

private static BlobField getBlobField(String field) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this getting used?

Optional<BlobField> resultField =
Arrays.stream(BlobField.values())
.filter(blobField -> blobField.getApiaryName().equals(field))
.findFirst();
return resultField.orElse(null);
}

private List<BlobListOption> prepareBlobListOptions(
ListObjectOptions listOptions,
@Nullable String objectNamePrefix,
@Nullable String pageToken) {
long maxResults = listOptions.getMaxResults();

List<BlobListOption> blobListOptions = new ArrayList<>();

blobListOptions.add(
BlobListOption.pageSize(
maxResults <= 0 || maxResults >= storageOptions.getMaxListItemsPerCall()
? storageOptions.getMaxListItemsPerCall()
: maxResults));

if (listOptions.getDelimiter() != null) {
blobListOptions.add(BlobListOption.delimiter(listOptions.getDelimiter()));
}
if (objectNamePrefix != null) {
blobListOptions.add(BlobListOption.prefix(objectNamePrefix));
}
if (listOptions.getFields() != null) {
blobListOptions.add(
BlobListOption.fields(
Arrays.stream(listOptions.getFields().split(","))
.map(GoogleCloudStorageClientImpl::getBlobField)
.toArray(BlobField[]::new)));
}
if (pageToken != null) {
blobListOptions.add(BlobListOption.pageToken(pageToken));
}
return blobListOptions;
}

private static GoogleCloudStorageItemInfo createItemInfoForBlob(Blob blob) {
checkNotNull(blob, "object must not be null");
checkArgument(!isNullOrEmpty(blob.getBucket()), "object must have a bucket: %s", blob);
checkArgument(!isNullOrEmpty(blob.getName()), "object must have a name: %s", blob);
return createItemInfoForBlob(new StorageResourceId(blob.getBucket(), blob.getName()), blob);
}

private static GoogleCloudStorageItemInfo createItemInfoForBlob(
StorageResourceId resourceId, Blob blob) {
checkArgument(resourceId != null, "resourceId must not be null");
checkArgument(blob != null, "object must not be null");
checkArgument(
resourceId.isStorageObject(),
"resourceId must be a StorageObject. resourceId: %s",
resourceId);
checkArgument(
resourceId.getBucketName().equals(blob.getBucket()),
"resourceId.getBucketName() must equal object.getBucket(): '%s' vs '%s'",
resourceId.getBucketName(),
blob.getBucket());
checkArgument(
resourceId.getObjectName().equals(blob.getName()),
"resourceId.getObjectName() must equal object.getName(): '%s' vs '%s'",
resourceId.getObjectName(),
blob.getName());

Map<String, byte[]> decodedMetadata =
blob.getMetadata() == null ? null : decodeMetadata(blob.getMetadata());

byte[] md5Hash = null;
byte[] crc32c = null;

if (!isNullOrEmpty(blob.getCrc32c())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try to avoid code duplication.

crc32c = BaseEncoding.base64().decode(blob.getCrc32c());
}

if (!isNullOrEmpty(blob.getMd5())) {
md5Hash = BaseEncoding.base64().decode(blob.getMd5());
}

return GoogleCloudStorageItemInfo.createObject(
resourceId,
blob.getCreateTimeOffsetDateTime() == null
? 0
: blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getUpdateTimeOffsetDateTime() == null
? 0
: blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getSize() == null ? 0 : blob.getSize(),
blob.getContentType(),
blob.getContentEncoding(),
decodedMetadata,
blob.getGeneration() == null ? 0 : blob.getGeneration(),
blob.getMetageneration() == null ? 0 : blob.getMetageneration(),
new VerificationAttributes(md5Hash, crc32c));
}

@Override
public SeekableByteChannel open(
StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) throws IOException {
Expand Down Expand Up @@ -661,57 +896,6 @@ private static ExecutorSupplier getPCUExecutorSupplier(ExecutorService pCUExecut
: ExecutorSupplier.useExecutor(pCUExecutorService);
}

/** Helper for converting a StorageResourceId + Blob into a GoogleCloudStorageItemInfo. */
private static GoogleCloudStorageItemInfo createItemInfoForBlob(
StorageResourceId resourceId, Blob blob) {
checkArgument(resourceId != null, "resourceId must not be null");
checkArgument(blob != null, "object must not be null");
checkArgument(
resourceId.isStorageObject(),
"resourceId must be a StorageObject. resourceId: %s",
resourceId);
checkArgument(
resourceId.getBucketName().equals(blob.getBucket()),
"resourceId.getBucketName() must equal object.getBucket(): '%s' vs '%s'",
resourceId.getBucketName(),
blob.getBucket());
checkArgument(
resourceId.getObjectName().equals(blob.getName()),
"resourceId.getObjectName() must equal object.getName(): '%s' vs '%s'",
resourceId.getObjectName(),
blob.getName());

Map<String, byte[]> decodedMetadata =
blob.getMetadata() == null ? null : decodeMetadata(blob.getMetadata());

byte[] md5Hash = null;
byte[] crc32c = null;

if (!isNullOrEmpty(blob.getCrc32c())) {
crc32c = BaseEncoding.base64().decode(blob.getCrc32c());
}

if (!isNullOrEmpty(blob.getMd5())) {
md5Hash = BaseEncoding.base64().decode(blob.getMd5());
}

return GoogleCloudStorageItemInfo.createObject(
resourceId,
blob.getCreateTimeOffsetDateTime() == null
? 0
: blob.getCreateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getUpdateTimeOffsetDateTime() == null
? 0
: blob.getUpdateTimeOffsetDateTime().toInstant().toEpochMilli(),
blob.getSize() == null ? 0 : blob.getSize(),
blob.getContentType(),
blob.getContentEncoding(),
decodedMetadata,
blob.getGeneration() == null ? 0 : blob.getGeneration(),
blob.getMetageneration() == null ? 0 : blob.getMetageneration(),
new VerificationAttributes(md5Hash, crc32c));
}

public static Builder builder() {
return new AutoBuilder_GoogleCloudStorageClientImpl_Builder();
}
Expand Down
Loading