Skip to content

Commit

Permalink
Async deletion with S3
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Aug 12, 2024
1 parent 0e406ea commit 72707c1
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.repositories.s3;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.exception.SdkException;
Expand Down Expand Up @@ -62,6 +64,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -875,4 +878,143 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
final AtomicLong deletedBlobs = new AtomicLong();
final AtomicLong deletedBytes = new AtomicLong();

try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(blobStore.bucket())
.prefix(keyPath)
.build();

ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

CompletableFuture<Void> listingFuture = new CompletableFuture<>();

listPublisher.subscribe(new Subscriber<>() {
private Subscription subscription;
private final List<ObjectIdentifier> objectsToDelete = new ArrayList<>();
private final List<CompletableFuture<DeleteObjectsResponse>> deleteFutures = new ArrayList<>();

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}

@Override
public void onNext(ListObjectsV2Response response) {
response.contents().forEach(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());
objectsToDelete.add(ObjectIdentifier.builder().key(s3Object.key()).build());
});

if (objectsToDelete.size() > blobStore.getBulkDeletesSize()) {
int bulkDeleteSize = blobStore.getBulkDeletesSize();
while (objectsToDelete.size() >= bulkDeleteSize) {
List<ObjectIdentifier> batch = new ArrayList<>(objectsToDelete.subList(0, bulkDeleteSize));
deleteFutures.add(deleteObjects(batch));
objectsToDelete.subList(0, bulkDeleteSize).clear();
}
}

subscription.request(1);
}

@Override
public void onError(Throwable t) {
listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t));
}

@Override
public void onComplete() {
if (!objectsToDelete.isEmpty()) {
deleteFutures.add(deleteObjects(objectsToDelete));
}

CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture<?>[0]))
.whenComplete((v, throwable) -> {
if (throwable != null) {
listingFuture.completeExceptionally(new IOException("Failed to delete some objects", throwable));
} else {
listingFuture.complete(null);
}
});
}

private CompletableFuture<DeleteObjectsResponse> deleteObjects(List<ObjectIdentifier> objects) {
DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder()
.bucket(blobStore.bucket())
.delete(Delete.builder().objects(objects).quiet(true).build())
.build();

return s3AsyncClient.deleteObjects(deleteRequest);
}
});

listingFuture.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(throwable instanceof Exception ? (Exception) throwable
: new IOException("Unexpected error during async deletion", throwable));
} else {
completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get()));
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async deletion", e));
}
}



@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
if (blobNames.isEmpty()) {
completionListener.onResponse(null);
return;
}

try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

List<ObjectIdentifier> objectsToDelete = blobNames.stream()
.map(name -> ObjectIdentifier.builder().key(buildKey(name)).build())
.collect(Collectors.toList());

// Use blobStore.getBulkDeletesSize() instead of hardcoded 1000
int bulkDeleteSize = blobStore.getBulkDeletesSize();
List<List<ObjectIdentifier>> batches = new ArrayList<>();
for (int i = 0; i < objectsToDelete.size(); i += bulkDeleteSize) {
batches.add(objectsToDelete.subList(i, Math.min(objectsToDelete.size(), i + bulkDeleteSize)));
}

List<CompletableFuture<DeleteObjectsResponse>> deleteFutures = new ArrayList<>();

for (List<ObjectIdentifier> batch : batches) {
DeleteObjectsRequest deleteRequest = DeleteObjectsRequest.builder()
.bucket(blobStore.bucket())
.delete(Delete.builder().objects(batch).quiet(true).build())
.build();
deleteFutures.add(s3AsyncClient.deleteObjects(deleteRequest));
}

CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture<?>[0]))
.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(throwable instanceof Exception ? (Exception) throwable
: new IOException("Failed to delete some blobs", throwable));
} else {
completionListener.onResponse(null);
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.util.List;

/**
* An extension of {@link BlobContainer} that adds {@link AsyncMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand Down Expand Up @@ -48,4 +49,8 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer {
* by underlying blobContainer. In this case, caller doesn't need to ensure integrity of data.
*/
boolean remoteIntegrityCheckSupported();

void deleteAsync(ActionListener<DeleteResult> completionListener);

void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,14 @@ private InputStreamContainer decryptInputStreamContainer(InputStreamContainer in
return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos);
}
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
blobContainer.deleteAsync(completionListener);
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, completionListener);
}
}

0 comments on commit 72707c1

Please sign in to comment.