diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java
index 9ffdba5eaae3a..9777bd974d56c 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java
@@ -113,13 +113,6 @@ class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamB
private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);
- /**
- * Maximum number of deletes in a {@link DeleteObjectsRequest}.
- *
- * @see S3 Documentation.
- */
- private static final int MAX_BULK_DELETES = 1000;
-
private final S3BlobStore blobStore;
private final String keyPath;
@@ -339,12 +332,12 @@ private void doDeleteBlobs(List blobNames, boolean relative) throws IOEx
outstanding = new HashSet<>(blobNames);
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
- // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
+ // S3 API allows 1k blobs per delete so we split up the given blobs into requests of bulk size deletes
final List deleteRequests = new ArrayList<>();
final List partition = new ArrayList<>();
for (String key : outstanding) {
partition.add(key);
- if (partition.size() == MAX_BULK_DELETES) {
+ if (partition.size() == blobStore.getBulkDeletesSize()) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java
index 3dd373b5b9f32..80005d92344a4 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java
@@ -52,6 +52,7 @@
import static org.opensearch.repositories.s3.S3Repository.BUCKET_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
+import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
@@ -74,6 +75,8 @@ class S3BlobStore implements BlobStore {
private volatile StorageClass storageClass;
+ private volatile int bulkDeletesSize;
+
private volatile RepositoryMetadata repositoryMetadata;
private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();
@@ -92,6 +95,7 @@ class S3BlobStore implements BlobStore {
ByteSizeValue bufferSize,
String cannedACL,
String storageClass,
+ int bulkDeletesSize,
RepositoryMetadata repositoryMetadata,
AsyncTransferManager asyncTransferManager,
AsyncExecutorContainer priorityExecutorBuilder,
@@ -105,6 +109,7 @@ class S3BlobStore implements BlobStore {
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
+ this.bulkDeletesSize = bulkDeletesSize;
this.repositoryMetadata = repositoryMetadata;
this.asyncTransferManager = asyncTransferManager;
this.normalExecutorBuilder = normalExecutorBuilder;
@@ -119,6 +124,7 @@ public void reload(RepositoryMetadata repositoryMetadata) {
this.bufferSize = BUFFER_SIZE_SETTING.get(repositoryMetadata.settings());
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
+ this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
}
@Override
@@ -150,6 +156,10 @@ public long bufferSizeInBytes() {
return bufferSize.getBytes();
}
+ public int getBulkDeletesSize() {
+ return bulkDeletesSize;
+ }
+
@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java
index b944bd8bf6260..fd8952cb5abd1 100644
--- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java
+++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java
@@ -196,6 +196,13 @@ class S3Repository extends MeteredBlobStoreRepository {
new ByteSizeValue(5, ByteSizeUnit.TB)
);
+ /**
+ * Maximum number of deletes in a DeleteObjectsRequest.
+ *
+ * @see S3 Documentation.
+ */
+ static final Setting BULK_DELETE_SIZE = Setting.intSetting("bulk_delete_size", 1000, 1, 1000);
+
/**
* Sets the S3 storage class type for the backup files. Values may be standard, reduced_redundancy,
* standard_ia, onezone_ia and intelligent_tiering. Defaults to standard.
@@ -262,6 +269,8 @@ class S3Repository extends MeteredBlobStoreRepository {
private final AsyncExecutorContainer normalExecutorBuilder;
private final Path pluginConfigPath;
+ private volatile int bulkDeletesSize;
+
// Used by test classes
S3Repository(
final RepositoryMetadata metadata,
@@ -426,6 +435,7 @@ protected S3BlobStore createBlobStore() {
bufferSize,
cannedACL,
storageClass,
+ bulkDeletesSize,
metadata,
asyncUploadUtils,
priorityExecutorBuilder,
@@ -485,6 +495,7 @@ private void readRepositoryMetadata() {
this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings());
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
this.cannedACL = CANNED_ACL_SETTING.get(metadata.settings());
+ this.bulkDeletesSize = BULK_DELETE_SIZE.get(metadata.settings());
if (S3ClientSettings.checkDeprecatedCredentials(metadata.settings())) {
// provided repository settings
deprecationLogger.deprecate(
diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java
index 8c8524212e08e..6eb8faa746d34 100644
--- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java
+++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java
@@ -64,6 +64,7 @@
import org.mockito.invocation.InvocationOnMock;
+import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -387,6 +388,7 @@ private S3BlobStore createBlobStore() {
S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY),
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
+ BULK_DELETE_SIZE.get(Settings.EMPTY),
repositoryMetadata,
new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java
index ecad68474b601..a2214f5218991 100644
--- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java
+++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java
@@ -95,6 +95,7 @@
import static org.opensearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING;
import static org.opensearch.repositories.s3.S3ClientSettings.READ_TIMEOUT_SETTING;
import static org.opensearch.repositories.s3.S3ClientSettings.REGION;
+import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@@ -215,6 +216,7 @@ protected AsyncMultiStreamBlobContainer createBlobContainer(
bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
+ BULK_DELETE_SIZE.get(Settings.EMPTY),
repositoryMetadata,
new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java
index e266bba372d80..2701cae6a733b 100644
--- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java
+++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java
@@ -276,10 +276,12 @@ public void testDelete() throws IOException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = new BlobPath();
+ int bulkDeleteSize = 5;
final S3BlobStore blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher());
+ when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize);
final S3Client client = mock(S3Client.class);
doAnswer(invocation -> new AmazonS3Reference(client)).when(blobStore).clientReference();
@@ -297,8 +299,11 @@ public void testDelete() throws IOException {
when(client.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Iterable);
final List keysDeleted = new ArrayList<>();
+ AtomicInteger deleteCount = new AtomicInteger();
doAnswer(invocation -> {
DeleteObjectsRequest deleteObjectsRequest = invocation.getArgument(0);
+ deleteCount.getAndIncrement();
+ logger.info("Object sizes are{}", deleteObjectsRequest.delete().objects().size());
keysDeleted.addAll(deleteObjectsRequest.delete().objects().stream().map(ObjectIdentifier::key).collect(Collectors.toList()));
return DeleteObjectsResponse.builder().build();
}).when(client).deleteObjects(any(DeleteObjectsRequest.class));
@@ -311,6 +316,8 @@ public void testDelete() throws IOException {
// keysDeleted will have blobPath also
assertEquals(listObjectsV2ResponseIterator.getKeysListed().size(), keysDeleted.size() - 1);
assertTrue(keysDeleted.contains(blobPath.buildAsString()));
+ // keysDeleted will have blobPath also
+ assertEquals((int) Math.ceil(((double) keysDeleted.size() + 1) / bulkDeleteSize), deleteCount.get());
keysDeleted.remove(blobPath.buildAsString());
assertEquals(new HashSet<>(listObjectsV2ResponseIterator.getKeysListed()), new HashSet<>(keysDeleted));
}