diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java index 8d2772d42ebca..9f73c67df3b18 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java @@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) { public void close() {} }; + public MetricPublisher getDeleteObjectsMetricPublisher() { + return deleteObjectsMetricPublisher; + } + public MetricPublisher getObjectMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java index 5ce2c242bdefb..03b4023293db7 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java @@ -64,16 +64,12 @@ private static CompletableFuture executeDeleteBatches( return allDeletesFuture; } - private static CompletableFuture executeSingleDeleteBatch( - S3AsyncClient s3AsyncClient, - S3BlobStore blobStore, - List batch - ) { + static CompletableFuture executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List batch) { DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore); return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse); } - private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) { + static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) { if (!deleteObjectsResponse.errors().isEmpty()) { logger.warn( () -> new ParameterizedMessage( @@ -97,7 +93,7 @@ private static DeleteObjectsRequest bulkDelete(String bucket, List blobs .quiet(true) .build() ) - .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher)) + .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher())) .build(); } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelperTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelperTests.java new file mode 100644 index 0000000000000..3c536daea3d55 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelperTests.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import software.amazon.awssdk.metrics.MetricPublisher; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Error; + +import org.opensearch.repositories.s3.S3BlobStore; +import org.opensearch.repositories.s3.StatsMetricPublisher; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class S3AsyncDeleteHelperTests extends OpenSearchTestCase { + + @Mock + private S3AsyncClient s3AsyncClient; + + @Mock + private S3BlobStore blobStore; + + @Override + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.openMocks(this); + } + + public void testExecuteDeleteChain() { + List objectsToDelete = Arrays.asList("key1", "key2", "key3"); + CompletableFuture currentChain = CompletableFuture.completedFuture(null); + + // Mock the deleteObjects method of S3AsyncClient + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build()) + ); + + // Mock the getBulkDeletesSize method of S3BlobStore + when(blobStore.getBulkDeletesSize()).thenReturn(2); + + // Mock the getStatsMetricPublisher method of S3BlobStore to return a non-null value + StatsMetricPublisher mockMetricPublisher = mock(StatsMetricPublisher.class); + MetricPublisher mockDeleteObjectsMetricPublisher = mock(MetricPublisher.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(mockMetricPublisher); + when(mockMetricPublisher.getDeleteObjectsMetricPublisher()).thenReturn(mockDeleteObjectsMetricPublisher); + + CompletableFuture newChain = S3AsyncDeleteHelper.executeDeleteChain( + s3AsyncClient, + blobStore, + objectsToDelete, + currentChain, + null + ); + + // Verify that the newChain is completed without any exceptions + assertNotNull(newChain); + assertTrue(newChain.isDone()); + assertFalse(newChain.isCompletedExceptionally()); + + // Verify that the deleteObjects method of S3AsyncClient was called with the expected request + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DeleteObjectsRequest.class); + verify(s3AsyncClient, times(2)).deleteObjects(requestCaptor.capture()); + + List capturedRequests = requestCaptor.getAllValues(); + assertEquals(2, capturedRequests.size()); + + // Verify that the requests have the expected metric publisher added + for (DeleteObjectsRequest request : capturedRequests) { + assertNotNull(request.overrideConfiguration()); + assertTrue(request.overrideConfiguration().get().metricPublishers().contains(mockDeleteObjectsMetricPublisher)); + } + } + + public void testCreateDeleteBatches() { + List keys = Arrays.asList("key1", "key2", "key3", "key4", "key5", "key6"); + int bulkDeleteSize = 3; + + List> batches = S3AsyncDeleteHelper.createDeleteBatches(keys, bulkDeleteSize); + + assertEquals(2, batches.size()); + assertEquals(Arrays.asList("key1", "key2", "key3"), batches.get(0)); + assertEquals(Arrays.asList("key4", "key5", "key6"), batches.get(1)); + } + + public void testExecuteSingleDeleteBatch() throws Exception { + List batch = Arrays.asList("key1", "key2"); + DeleteObjectsResponse expectedResponse = DeleteObjectsResponse.builder().build(); + + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(CompletableFuture.completedFuture(expectedResponse)); + + // Mock the getStatsMetricPublisher method of S3BlobStore to return a non-null value + StatsMetricPublisher mockMetricPublisher = mock(StatsMetricPublisher.class); + MetricPublisher mockDeleteObjectsMetricPublisher = mock(MetricPublisher.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(mockMetricPublisher); + when(mockMetricPublisher.getDeleteObjectsMetricPublisher()).thenReturn(mockDeleteObjectsMetricPublisher); + + CompletableFuture future = S3AsyncDeleteHelper.executeSingleDeleteBatch(s3AsyncClient, blobStore, batch); + + assertNotNull(future); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + future.join(); // Wait for the CompletableFuture to complete + + // Verify that the deleteObjects method of S3AsyncClient was called with the expected request + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DeleteObjectsRequest.class); + verify(s3AsyncClient).deleteObjects(requestCaptor.capture()); + + DeleteObjectsRequest capturedRequest = requestCaptor.getValue(); + assertEquals(blobStore.bucket(), capturedRequest.bucket()); + assertEquals(batch.size(), capturedRequest.delete().objects().size()); + assertTrue(capturedRequest.delete().objects().stream().map(ObjectIdentifier::key).collect(Collectors.toList()).containsAll(batch)); + } + + public void testProcessDeleteResponse() { + DeleteObjectsResponse response = DeleteObjectsResponse.builder() + .errors( + Arrays.asList( + S3Error.builder().key("key1").code("Code1").message("Message1").build(), + S3Error.builder().key("key2").code("Code2").message("Message2").build() + ) + ) + .build(); + + // Call the processDeleteResponse method + S3AsyncDeleteHelper.processDeleteResponse(response); + } +}