From dd6568ef897c7e96c6cf4a130655e6b72131a8ba Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 7 Aug 2024 16:24:34 +0530 Subject: [PATCH] Hashed prefix for snapshots Signed-off-by: Ashish Singh --- .../index/remote/RemoteStorePathStrategy.java | 73 ++++++++++ .../blobstore/BlobStoreRepository.java | 134 ++++++++++++++---- .../snapshots/SnapshotsService.java | 1 + .../org/opensearch/threadpool/ThreadPool.java | 3 + 4 files changed, 185 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java index 05357aaf6ec72..04eec68905198 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java @@ -259,4 +259,77 @@ public PathInput build() { } } + /** + * Wrapper class for the snapshot aware input required to generate path for optimised snapshot paths. This input is + * composed of the parent inputs, shard id, and static indices. + * + * @opensearch.internal + */ + @PublicApi(since = "2.14.0") + @ExperimentalApi + public static class SnapshotShardPathInput extends BasePathInput { + private final String shardId; + + public SnapshotShardPathInput(BlobPath basePath, String indexUUID, String shardId) { + super(basePath, indexUUID); + this.shardId = shardId; + } + + public SnapshotShardPathInput(Builder builder) { + super(builder); + this.shardId = Objects.requireNonNull(builder.shardId); + } + + String shardId() { + return shardId; + } + + @Override + BlobPath fixedSubPath() { + return BlobPath.cleanPath().add("indices").add(shardId).add(super.fixedSubPath()); + } + + /** + * Returns a new builder for {@link PathInput}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder for {@link PathInput}. + * + * @opensearch.internal + */ + @PublicApi(since = "2.14.0") + @ExperimentalApi + public static class Builder extends BasePathInput.Builder { + private String shardId; + + public Builder basePath(BlobPath basePath) { + super.basePath = basePath; + return this; + } + + public Builder indexUUID(String indexUUID) { + super.indexUUID = indexUUID; + return this; + } + + public Builder shardId(String shardId) { + this.shardId = shardId; + return this; + } + + @Override + protected Builder self() { + return this; + } + + public SnapshotShardPathInput build() { + return new SnapshotShardPathInput(this); + } + } + } + } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index ab70b152a8e55..ad6bc9ffa9725 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -110,6 +110,7 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.remote.RemoteStorePathStrategy; import org.opensearch.index.remote.RemoteStorePathStrategy.BasePathInput; +import org.opensearch.index.remote.RemoteStorePathStrategy.SnapshotShardPathInput; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.snapshots.IndexShardSnapshotStatus; import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot; @@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final SetOnce blobContainer = new SetOnce<>(); + private final SetOnce rootBlobContainer = new SetOnce<>(); + private final SetOnce blobStore = new SetOnce<>(); protected final ClusterService clusterService; @@ -813,6 +816,22 @@ protected BlobContainer blobContainer() { return blobContainer; } + private BlobContainer rootBlobContainer() { + assertSnapshotOrGenericThread(); + + BlobContainer rootBlobContainer = this.rootBlobContainer.get(); + if (rootBlobContainer == null) { + synchronized (lock) { + rootBlobContainer = this.rootBlobContainer.get(); + if (rootBlobContainer == null) { + rootBlobContainer = blobStore().blobContainer(BlobPath.cleanPath()); + this.rootBlobContainer.set(rootBlobContainer); + } + } + } + return rootBlobContainer; + } + /** * Maintains single lazy instance of {@link BlobStore}. * Public for testing. @@ -1087,6 +1106,7 @@ private void doDeleteShardSnapshots( foundIndices, rootBlobs, updatedRepoData, + repositoryData, remoteStoreLockManagerFactory, afterCleanupsListener ); @@ -1112,6 +1132,7 @@ private void doDeleteShardSnapshots( foundIndices, rootBlobs, newRepoData, + repositoryData, remoteStoreLockManagerFactory, afterCleanupsListener ); @@ -1142,6 +1163,7 @@ private void cleanupUnlinkedRootAndIndicesBlobs( Map foundIndices, Map rootBlobs, RepositoryData updatedRepoData, + RepositoryData oldRepoData, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { @@ -1150,6 +1172,7 @@ private void cleanupUnlinkedRootAndIndicesBlobs( foundIndices, rootBlobs, updatedRepoData, + oldRepoData, remoteStoreLockManagerFactory, ActionListener.map(listener, ignored -> null) ); @@ -1181,7 +1204,7 @@ private void asyncCleanupUnlinkedShardLevelBlobs( ); // Start as many workers as fit into the snapshot pool at once at the most - final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size()); + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), staleFilesToDeleteInBatch.size()); for (int i = 0; i < workers; ++i) { executeStaleShardDelete(staleFilesToDeleteInBatch, remoteStoreLockManagerFactory, groupedListener); } @@ -1280,21 +1303,24 @@ private void executeStaleShardDelete( RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, GroupedActionListener listener ) throws InterruptedException { + final String basePath = basePath().buildAsString(); + final int basePathLen = basePath.length(); List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); if (filesToDelete != null) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.wrap(listener, l -> { try { // filtering files for which remote store lock release and cleanup succeeded, // remaining files for which it failed will be retried in next snapshot delete run. List eligibleFilesToDelete = new ArrayList<>(); for (String fileToDelete : filesToDelete) { if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) { - String[] fileToDeletePath = fileToDelete.split("/"); + String relativeFileToDeletePath = fileToDelete.substring(basePathLen); + String[] fileToDeletePath = relativeFileToDeletePath.split("/"); String indexId = fileToDeletePath[1]; String shardId = fileToDeletePath[2]; String shallowSnapBlob = fileToDeletePath[3]; String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow(); - BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId)); + BlobContainer shardContainer = shardContainer(indexId, shardId); try { releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory); eligibleFilesToDelete.add(fileToDelete); @@ -1311,7 +1337,7 @@ private void executeStaleShardDelete( } } // Deleting the shard blobs - deleteFromContainer(blobContainer(), eligibleFilesToDelete); + deleteFromContainer(rootBlobContainer(), eligibleFilesToDelete); l.onResponse(null); } catch (Exception e) { logger.warn( @@ -1338,7 +1364,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes( ActionListener> onAllShardsCompleted ) { - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION); final List indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds); if (indices.isEmpty()) { @@ -1474,7 +1500,7 @@ private List resolveFilesToDelete( Collection deleteResults ) { final String basePath = basePath().buildAsString(); - final int basePathLen = basePath.length(); + // final int basePathLen = basePath.length(); final Map> indexMetaGenerations = oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots( snapshotIds ); @@ -1484,10 +1510,11 @@ private List resolveFilesToDelete( }), indexMetaGenerations.entrySet().stream().flatMap(entry -> { final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString(); return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id)); - })).map(absolutePath -> { - assert absolutePath.startsWith(basePath); - return absolutePath.substring(basePathLen); - }).collect(Collectors.toList()); + })).collect(Collectors.toList()); + // .map(absolutePath -> { + // assert absolutePath.startsWith(basePath); + // return absolutePath.substring(basePathLen); + // }).collect(Collectors.toList()); } /** @@ -1509,6 +1536,7 @@ private void cleanupStaleBlobs( Map foundIndices, Map rootBlobs, RepositoryData newRepoData, + RepositoryData oldRepoData, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, ActionListener listener ) { @@ -1520,7 +1548,7 @@ private void cleanupStaleBlobs( listener.onResponse(deleteResult); }, listener::onFailure), 2); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION); final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); if (staleRootBlobs.isEmpty()) { groupedListener.onResponse(DeleteResult.ZERO); @@ -1535,7 +1563,14 @@ private void cleanupStaleBlobs( if (foundIndices.keySet().equals(survivingIndexIds)) { groupedListener.onResponse(DeleteResult.ZERO); } else { - cleanupStaleIndices(foundIndices, survivingIndexIds, remoteStoreLockManagerFactory, groupedListener); + cleanupStaleIndices( + foundIndices, + survivingIndexIds, + remoteStoreLockManagerFactory, + groupedListener, + oldRepoData, + deletedSnapshots + ); } } @@ -1589,6 +1624,7 @@ public void cleanup( foundIndices, rootBlobs, repositoryData, + repositoryData, remoteStoreLockManagerFactory, ActionListener.map(listener, RepositoryCleanupResult::new) ), @@ -1682,7 +1718,9 @@ private void cleanupStaleIndices( Map foundIndices, Set survivingIndexIds, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, - GroupedActionListener listener + GroupedActionListener listener, + RepositoryData oldRepoData, + Collection deletedSnapshots ) { final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> { DeleteResult deleteResult = DeleteResult.ZERO; @@ -1702,11 +1740,17 @@ private void cleanupStaleIndices( // Start as many workers as fit into the snapshot pool at once at the most final int workers = Math.min( - threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + threadPool.info(ThreadPool.Names.SNAPSHOT_DELETION).getMax(), foundIndices.size() - survivingIndexIds.size() ); for (int i = 0; i < workers; ++i) { - executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, groupedListener); + executeOneStaleIndexDelete( + staleIndicesToDelete, + remoteStoreLockManagerFactory, + groupedListener, + oldRepoData, + deletedSnapshots + ); } } catch (Exception e) { // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. @@ -1729,12 +1773,14 @@ private static boolean isIndexPresent(ClusterService clusterService, String inde private void executeOneStaleIndexDelete( BlockingQueue> staleIndicesToDelete, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, - GroupedActionListener listener + GroupedActionListener listener, + RepositoryData oldRepoData, + Collection deletedSnapshots ) throws InterruptedException { Map.Entry indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS); if (indexEntry != null) { final String indexSnId = indexEntry.getKey(); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { + threadPool.executor(ThreadPool.Names.SNAPSHOT_DELETION).execute(ActionRunnable.supply(listener, () -> { DeleteResult deleteResult = DeleteResult.ZERO; try { logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); @@ -1753,9 +1799,41 @@ private void executeOneStaleIndexDelete( } } } + } else { + String indexToBeDeleted = indexEntry.getKey(); + // The indices map within RepositoryData should have just 1 IndexId which has same id as the one + // being deleted. Hence we are adding an assertion here. We still let the deletion happen as + // usual considering there can be more than 1 matching IndexIds. + List indexIds = oldRepoData.getIndices() + .values() + .stream() + .filter(idx -> idx.getId().equals(indexToBeDeleted)) + .collect(Collectors.toList()); + if (indexIds.size() > 1) { + logger.warn("There are more than 1 matching index ids [{}]", indexIds); + } + assert indexIds.size() == 1 : "There should be exact 1 match of IndexId"; + for (SnapshotId snId : deletedSnapshots) { + for (IndexId idx : indexIds) { + String indexMetaGeneration = oldRepoData.indexMetaDataGenerations().indexMetaBlobId(snId, idx); + final BlobContainer indexContainer = indexContainer(idx); + IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.read( + indexContainer, + indexMetaGeneration, + namedXContentRegistry + ); + int numOfShards = indexMetadata.getNumberOfShards(); + for (int i = 0; i < numOfShards; i++) { + deleteResult.add(shardContainer(idx, i).delete()); + } + } + } } + // TODO - We need to do a metadata lookup and delete the shard level folders. + // TODO - Shallow snapshot only has shallow snap file. Need to check if there will be more changes + // to handle shallow snapshot deletion. // Deleting the index folder - deleteResult = indexEntry.getValue().delete(); + deleteResult.add(indexEntry.getValue().delete()); logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); } catch (IOException e) { logger.warn( @@ -1772,7 +1850,7 @@ private void executeOneStaleIndexDelete( logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e); } - executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener); + executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener, oldRepoData, deletedSnapshots); return deleteResult; })); } @@ -1893,18 +1971,16 @@ public void finalizeSnapshot( // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) { final List toDelete = new ArrayList<>(); - final int prefixPathLen = basePath().buildAsString().length(); updatedRepositoryData.shardGenerations() .obsoleteShardGenerations(existingRepositoryData.shardGenerations()) .forEach( (indexId, gens) -> gens.forEach( - (shardId, oldGen) -> toDelete.add( - shardContainer(indexId, shardId).path().buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen - ) + (shardId, oldGen) -> toDelete.add(shardContainer(indexId, shardId).path().buildAsString() + INDEX_FILE_PREFIX + oldGen) ) ); try { - deleteFromContainer(blobContainer(), toDelete); + logger.info("{} shards generations to be deleted as part of cleanupOldShardGens", toDelete); + deleteFromContainer(rootBlobContainer(), toDelete); } catch (Exception e) { logger.warn("Failed to clean up old shard generation blobs", e); } @@ -1963,7 +2039,13 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) { } public BlobContainer shardContainer(IndexId indexId, int shardId) { - return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId))); + return shardContainer(indexId.getId(), String.valueOf(shardId)); + } + + private BlobContainer shardContainer(String indexId, String shardId) { + BasePathInput pathInput = SnapshotShardPathInput.builder().basePath(basePath()).indexUUID(indexId).shardId(shardId).build(); + BlobPath shardPath = HASHED_PREFIX.path(pathInput, FNV_1A_COMPOSITE_1); + return blobStore().blobContainer(shardPath); } /** diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 02819678d39bc..baec18029a1da 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -914,6 +914,7 @@ protected void doRun() { ) ), e -> { + logger.error("Clone failed during development", e); logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId); failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 38bb1a7490a58..86b79a9e10126 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -92,6 +92,7 @@ public class ThreadPool implements ReportingService, Scheduler { public static class Names { public static final String SAME = "same"; public static final String GENERIC = "generic"; + public static final String SNAPSHOT_DELETION = "snapshot_deletion"; @Deprecated public static final String LISTENER = "listener"; public static final String GET = "get"; @@ -165,6 +166,7 @@ public static ThreadPoolType fromType(String type) { HashMap map = new HashMap<>(); map.put(Names.SAME, ThreadPoolType.DIRECT); map.put(Names.GENERIC, ThreadPoolType.SCALING); + map.put(Names.SNAPSHOT_DELETION, ThreadPoolType.SCALING); map.put(Names.LISTENER, ThreadPoolType.FIXED); map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.ANALYZE, ThreadPoolType.FIXED); @@ -234,6 +236,7 @@ public ThreadPool( final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512); builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); + builders.put(Names.SNAPSHOT_DELETION, new ScalingExecutorBuilder(Names.SNAPSHOT_DELETION, 1, 500, TimeValue.timeValueSeconds(30))); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));