Skip to content

Commit

Permalink
Fix the deletion of shard level segment files
Browse files Browse the repository at this point in the history
  • Loading branch information
ashking94 committed Aug 4, 2024
1 parent 27fcb40 commit 2f5d61b
Showing 1 changed file with 61 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ private void doDeleteShardSnapshots(
foundIndices,
rootBlobs,
updatedRepoData,
repositoryData,
remoteStoreLockManagerFactory,
afterCleanupsListener
);
Expand All @@ -1131,6 +1132,7 @@ private void doDeleteShardSnapshots(
foundIndices,
rootBlobs,
newRepoData,
repositoryData,
remoteStoreLockManagerFactory,
afterCleanupsListener
);
Expand Down Expand Up @@ -1161,6 +1163,7 @@ private void cleanupUnlinkedRootAndIndicesBlobs(
Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs,
RepositoryData updatedRepoData,
RepositoryData oldRepoData,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<Void> listener
) {
Expand All @@ -1169,6 +1172,7 @@ private void cleanupUnlinkedRootAndIndicesBlobs(
foundIndices,
rootBlobs,
updatedRepoData,
oldRepoData,
remoteStoreLockManagerFactory,
ActionListener.map(listener, ignored -> null)
);
Expand Down Expand Up @@ -1532,6 +1536,7 @@ private void cleanupStaleBlobs(
Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs,
RepositoryData newRepoData,
RepositoryData oldRepoData,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<DeleteResult> listener
) {
Expand All @@ -1558,7 +1563,14 @@ private void cleanupStaleBlobs(
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
cleanupStaleIndices(foundIndices, survivingIndexIds, remoteStoreLockManagerFactory, groupedListener);
cleanupStaleIndices(
foundIndices,
survivingIndexIds,
remoteStoreLockManagerFactory,
groupedListener,
oldRepoData,
deletedSnapshots
);
}
}

Expand Down Expand Up @@ -1612,6 +1624,7 @@ public void cleanup(
foundIndices,
rootBlobs,
repositoryData,
repositoryData,
remoteStoreLockManagerFactory,
ActionListener.map(listener, RepositoryCleanupResult::new)
),
Expand Down Expand Up @@ -1705,7 +1718,9 @@ private void cleanupStaleIndices(
Map<String, BlobContainer> foundIndices,
Set<String> survivingIndexIds,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
GroupedActionListener<DeleteResult> listener
GroupedActionListener<DeleteResult> listener,
RepositoryData oldRepoData,
Collection<SnapshotId> deletedSnapshots
) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
Expand All @@ -1729,7 +1744,13 @@ private void cleanupStaleIndices(
foundIndices.size() - survivingIndexIds.size()
);
for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, groupedListener);
executeOneStaleIndexDelete(
staleIndicesToDelete,
remoteStoreLockManagerFactory,
groupedListener,
oldRepoData,
deletedSnapshots
);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
Expand All @@ -1752,7 +1773,9 @@ private static boolean isIndexPresent(ClusterService clusterService, String inde
private void executeOneStaleIndexDelete(
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
GroupedActionListener<DeleteResult> listener
GroupedActionListener<DeleteResult> listener,
RepositoryData oldRepoData,
Collection<SnapshotId> deletedSnapshots
) throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry != null) {
Expand All @@ -1776,9 +1799,41 @@ private void executeOneStaleIndexDelete(
}
}
}
} else {
String indexToBeDeleted = indexEntry.getKey();
// The indices map within RepositoryData should have just 1 IndexId which has same id as the one
// being deleted. Hence we are adding an assertion here. We still let the deletion happen as
// usual considering there can be more than 1 matching IndexIds.
List<IndexId> indexIds = oldRepoData.getIndices()
.values()
.stream()
.filter(idx -> idx.getId().equals(indexToBeDeleted))
.collect(Collectors.toList());
if (indexIds.size() > 1) {
logger.warn("There are more than 1 matching index ids [{}]", indexIds);
}
assert indexIds.size() == 1 : "There should be exact 1 match of IndexId";
for (SnapshotId snId : deletedSnapshots) {
for (IndexId idx : indexIds) {
String indexMetaGeneration = oldRepoData.indexMetaDataGenerations().indexMetaBlobId(snId, idx);
final BlobContainer indexContainer = indexContainer(idx);
IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.read(
indexContainer,
indexMetaGeneration,
namedXContentRegistry
);
int numOfShards = indexMetadata.getNumberOfShards();
for (int i = 0; i < numOfShards; i++) {
deleteResult.add(shardContainer(idx, i).delete());
}
}
}
}
// TODO - We need to do a metadata lookup and delete the shard level folders.
// TODO - Shallow snapshot only has shallow snap file. Need to check if there will be more changes
// to handle shallow snapshot deletion.
// Deleting the index folder
deleteResult = indexEntry.getValue().delete();
deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} catch (IOException e) {
logger.warn(
Expand All @@ -1795,7 +1850,7 @@ private void executeOneStaleIndexDelete(
logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e);
}

executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener);
executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener, oldRepoData, deletedSnapshots);
return deleteResult;
}));
}
Expand Down

0 comments on commit 2f5d61b

Please sign in to comment.