Skip to content

Commit

Permalink
Add UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Sep 10, 2024
1 parent ffc81c0 commit 4d61852
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) {
public void close() {}
};

public MetricPublisher getDeleteObjectsMetricPublisher() {
return deleteObjectsMetricPublisher;
}

public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteS
return batches;
}

private static CompletableFuture<Void> executeDeleteBatches(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<List<String>> batches
) {
static CompletableFuture<Void> executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<List<String>> batches) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
Expand All @@ -64,16 +60,12 @@ private static CompletableFuture<Void> executeDeleteBatches(
return allDeletesFuture;
}

private static CompletableFuture<Void> executeSingleDeleteBatch(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> batch
) {
static CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List<String> batch) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
}

private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
if (!deleteObjectsResponse.errors().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
Expand All @@ -88,7 +80,7 @@ private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsRes
return null;
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Expand All @@ -97,7 +89,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;

import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -101,16 +102,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -1275,6 +1279,106 @@ public void testTransformResponseToInputStreamContainer() throws Exception {
assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available());
}

public void testDeleteAsync() throws Exception {
for (int i = 0; i < 100; i++) {
testDeleteAsync(i + 1);
}
}

private void testDeleteAsync(int bulkDeleteSize) throws InterruptedException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();

final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize);

final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class);
final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class);
when(blobStore.asyncClientReference()).thenReturn(asyncClientReference);
AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create(
s3AsyncClient,
s3AsyncClient,
s3AsyncClient,
null
);
when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials);

final List<S3Object> s3Objects = new ArrayList<>();
int numObjects = randomIntBetween(20, 100);
long totalSize = 0;
for (int i = 0; i < numObjects; i++) {
long size = randomIntBetween(1, 100);
s3Objects.add(S3Object.builder().key(randomAlphaOfLength(10)).size(size).build());
totalSize += size;
}

final List<ListObjectsV2Response> responseList = new ArrayList<>();
int size = 0;
while (size < numObjects) {
int toAdd = randomIntBetween(10, 20);
int endIndex = (int) Math.min(numObjects, size + toAdd);
responseList.add(ListObjectsV2Response.builder().contents(s3Objects.subList(size, endIndex)).build());
size = endIndex;
}
int expectedDeletedObjectsCall = numObjects / bulkDeleteSize + (numObjects % bulkDeleteSize > 0 ? 1 : 0);

final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class);
AtomicInteger counter = new AtomicInteger();
doAnswer(invocation -> {
Subscriber<? super ListObjectsV2Response> subscriber = invocation.getArgument(0);
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
int currentCounter = counter.getAndIncrement();
if (currentCounter < responseList.size()) {
subscriber.onNext(responseList.get(currentCounter));
}
if (currentCounter == responseList.size()) {
subscriber.onComplete();
}
}

@Override
public void cancel() {}
});
return null;
}).when(listPublisher).subscribe(ArgumentMatchers.<Subscriber<ListObjectsV2Response>>any());
when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher);

when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(
CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build())
);

final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DeleteResult> deleteResultRef = new AtomicReference<>();
blobContainer.deleteAsync(new ActionListener<>() {
@Override
public void onResponse(DeleteResult deleteResult) {
deleteResultRef.set(deleteResult);
latch.countDown();
}

@Override
public void onFailure(Exception e) {
logger.error("exception during deleteAsync", e);
fail("Unexpected failure: " + e.getMessage());
}
});

latch.await();

DeleteResult deleteResult = deleteResultRef.get();
assertEquals(numObjects, deleteResult.blobsDeleted());
assertEquals(totalSize, deleteResult.bytesDeleted());

verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class));
verify(s3AsyncClient, times(expectedDeletedObjectsCall)).deleteObjects(any(DeleteObjectsRequest.class));
}

private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) {

final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize));
Expand Down
Loading

0 comments on commit 4d61852

Please sign in to comment.