diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index ad6bc9ffa9725..d79e2c7f627f1 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1914,9 +1914,10 @@ public void finalizeSnapshot( stateTransformer, ActionListener.wrap(newRepoData -> { if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData, newRepoData, listener); + } else { + listener.onResponse(newRepoData); } - listener.onResponse(newRepoData); }, onUpdateFailure) ); }, onUpdateFailure), 2 + indices.size()); @@ -1969,7 +1970,12 @@ public void finalizeSnapshot( } // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data - private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) { + private void cleanupOldShardGens( + RepositoryData existingRepositoryData, + RepositoryData updatedRepositoryData, + RepositoryData newRepositoryData, + ActionListener listener + ) { final List toDelete = new ArrayList<>(); updatedRepositoryData.shardGenerations() .obsoleteShardGenerations(existingRepositoryData.shardGenerations()) @@ -1978,11 +1984,57 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito (shardId, oldGen) -> toDelete.add(shardContainer(indexId, shardId).path().buildAsString() + INDEX_FILE_PREFIX + oldGen) ) ); + if (toDelete.isEmpty()) { + listener.onResponse(newRepositoryData); + return; + } try { - logger.info("{} shards generations to be deleted as part of cleanupOldShardGens", toDelete); - deleteFromContainer(rootBlobContainer(), toDelete); + AtomicInteger counter = new AtomicInteger(); + Collection> subList = toDelete.stream() + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch)) + .values(); + final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList); + logger.info("cleanupOldShardGens toDeleteSize={} groupSize={}", toDelete.size(), staleFilesToDeleteInBatch.size()); + final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(r -> { + logger.info("completed cleanupOldShardGens"); + listener.onResponse(newRepositoryData); + }, ex -> { + logger.error("exception in cleanupOldShardGens", ex); + listener.onResponse(newRepositoryData); + }), staleFilesToDeleteInBatch.size()); + + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size()); + for (int i = 0; i < workers; ++i) { + cleanupOldFiles(staleFilesToDeleteInBatch, groupedListener); + } } catch (Exception e) { logger.warn("Failed to clean up old shard generation blobs", e); + listener.onResponse(newRepositoryData); + } + } + + private void cleanupOldFiles(BlockingQueue> staleFilesToDeleteInBatch, GroupedActionListener listener) + throws InterruptedException { + List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); + if (filesToDelete != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> { + try { + deleteFromContainer(rootBlobContainer(), filesToDelete); + l.onResponse(null); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "[{}] Failed to delete following blobs during cleanupOldFiles : {}", + metadata.name(), + filesToDelete + ), + e + ); + l.onFailure(e); + } + cleanupOldFiles(staleFilesToDeleteInBatch, listener); + })); } }