Skip to content

Commit

Permalink
Experiment of snapshots with hashed prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
ashking94 committed Aug 4, 2024
1 parent 98d5f0d commit c752a16
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,77 @@ public PathInput build() {
}
}

/**
* Wrapper class for the snapshot aware input required to generate path for optimised snapshot paths. This input is
* composed of the parent inputs, shard id, and static indices.
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public static class SnapshotShardPathInput extends BasePathInput {
private final String shardId;

public SnapshotShardPathInput(BlobPath basePath, String indexUUID, String shardId) {
super(basePath, indexUUID);
this.shardId = shardId;
}

public SnapshotShardPathInput(Builder builder) {
super(builder);
this.shardId = Objects.requireNonNull(builder.shardId);
}

String shardId() {
return shardId;
}

@Override
BlobPath fixedSubPath() {
return BlobPath.cleanPath().add("indices").add(super.fixedSubPath()).add(shardId);
}

/**
* Returns a new builder for {@link PathInput}.
*/
public static Builder builder() {
return new Builder();
}

/**
* Builder for {@link PathInput}.
*
* @opensearch.internal
*/
@PublicApi(since = "2.14.0")
@ExperimentalApi
public static class Builder extends BasePathInput.Builder<Builder> {
private String shardId;

public Builder basePath(BlobPath basePath) {
super.basePath = basePath;
return this;
}

public Builder indexUUID(String indexUUID) {
super.indexUUID = indexUUID;
return this;
}

public Builder shardId(String shardId) {
this.shardId = shardId;
return this;
}

@Override
protected Builder self() {
return this;
}

public SnapshotShardPathInput build() {
return new SnapshotShardPathInput(this);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStorePathStrategy.BasePathInput;
import org.opensearch.index.remote.RemoteStorePathStrategy.SnapshotShardPathInput;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
Expand Down Expand Up @@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final SetOnce<BlobContainer> blobContainer = new SetOnce<>();

private final SetOnce<BlobContainer> rootBlobContainer = new SetOnce<>();

private final SetOnce<BlobStore> blobStore = new SetOnce<>();

protected final ClusterService clusterService;
Expand Down Expand Up @@ -813,6 +816,22 @@ protected BlobContainer blobContainer() {
return blobContainer;
}

private BlobContainer rootBlobContainer() {
assertSnapshotOrGenericThread();

BlobContainer rootBlobContainer = this.rootBlobContainer.get();
if (rootBlobContainer == null) {
synchronized (lock) {
rootBlobContainer = this.rootBlobContainer.get();
if (rootBlobContainer == null) {
rootBlobContainer = blobStore().blobContainer(BlobPath.cleanPath());
this.rootBlobContainer.set(rootBlobContainer);
}
}
}
return rootBlobContainer;
}

/**
* Maintains single lazy instance of {@link BlobStore}.
* Public for testing.
Expand Down Expand Up @@ -1087,6 +1106,7 @@ private void doDeleteShardSnapshots(
foundIndices,
rootBlobs,
updatedRepoData,
repositoryData,
remoteStoreLockManagerFactory,
afterCleanupsListener
);
Expand All @@ -1112,6 +1132,7 @@ private void doDeleteShardSnapshots(
foundIndices,
rootBlobs,
newRepoData,
repositoryData,
remoteStoreLockManagerFactory,
afterCleanupsListener
);
Expand Down Expand Up @@ -1142,6 +1163,7 @@ private void cleanupUnlinkedRootAndIndicesBlobs(
Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs,
RepositoryData updatedRepoData,
RepositoryData oldRepoData,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<Void> listener
) {
Expand All @@ -1150,6 +1172,7 @@ private void cleanupUnlinkedRootAndIndicesBlobs(
foundIndices,
rootBlobs,
updatedRepoData,
oldRepoData,
remoteStoreLockManagerFactory,
ActionListener.map(listener, ignored -> null)
);
Expand Down Expand Up @@ -1280,6 +1303,8 @@ private void executeStaleShardDelete(
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
GroupedActionListener<Void> listener
) throws InterruptedException {
final String basePath = basePath().buildAsString();
final int basePathLen = basePath.length();
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
Expand All @@ -1289,12 +1314,13 @@ private void executeStaleShardDelete(
List<String> eligibleFilesToDelete = new ArrayList<>();
for (String fileToDelete : filesToDelete) {
if (fileToDelete.contains(SHALLOW_SNAPSHOT_PREFIX)) {
String[] fileToDeletePath = fileToDelete.split("/");
String relativeFileToDeletePath = fileToDelete.substring(basePathLen);
String[] fileToDeletePath = relativeFileToDeletePath.split("/");
String indexId = fileToDeletePath[1];
String shardId = fileToDeletePath[2];
String shallowSnapBlob = fileToDeletePath[3];
String snapshotUUID = extractShallowSnapshotUUID(shallowSnapBlob).orElseThrow();
BlobContainer shardContainer = blobStore().blobContainer(indicesPath().add(indexId).add(shardId));
BlobContainer shardContainer = shardContainer(indexId, shardId);
try {
releaseRemoteStoreLockAndCleanup(shardId, snapshotUUID, shardContainer, remoteStoreLockManagerFactory);
eligibleFilesToDelete.add(fileToDelete);
Expand All @@ -1311,7 +1337,7 @@ private void executeStaleShardDelete(
}
}
// Deleting the shard blobs
deleteFromContainer(blobContainer(), eligibleFilesToDelete);
deleteFromContainer(rootBlobContainer(), eligibleFilesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
Expand Down Expand Up @@ -1474,7 +1500,7 @@ private List<String> resolveFilesToDelete(
Collection<ShardSnapshotMetaDeleteResult> deleteResults
) {
final String basePath = basePath().buildAsString();
final int basePathLen = basePath.length();
// final int basePathLen = basePath.length();
final Map<IndexId, Collection<String>> indexMetaGenerations = oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(
snapshotIds
);
Expand All @@ -1484,10 +1510,11 @@ private List<String> resolveFilesToDelete(
}), indexMetaGenerations.entrySet().stream().flatMap(entry -> {
final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString();
return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id));
})).map(absolutePath -> {
assert absolutePath.startsWith(basePath);
return absolutePath.substring(basePathLen);
}).collect(Collectors.toList());
})).collect(Collectors.toList());
// .map(absolutePath -> {
// assert absolutePath.startsWith(basePath);
// return absolutePath.substring(basePathLen);
// }).collect(Collectors.toList());
}

/**
Expand All @@ -1509,6 +1536,7 @@ private void cleanupStaleBlobs(
Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs,
RepositoryData newRepoData,
RepositoryData oldRepoData,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
ActionListener<DeleteResult> listener
) {
Expand All @@ -1535,7 +1563,14 @@ private void cleanupStaleBlobs(
if (foundIndices.keySet().equals(survivingIndexIds)) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
cleanupStaleIndices(foundIndices, survivingIndexIds, remoteStoreLockManagerFactory, groupedListener);
cleanupStaleIndices(
foundIndices,
survivingIndexIds,
remoteStoreLockManagerFactory,
groupedListener,
oldRepoData,
deletedSnapshots
);
}
}

Expand Down Expand Up @@ -1589,6 +1624,7 @@ public void cleanup(
foundIndices,
rootBlobs,
repositoryData,
repositoryData,
remoteStoreLockManagerFactory,
ActionListener.map(listener, RepositoryCleanupResult::new)
),
Expand Down Expand Up @@ -1682,7 +1718,9 @@ private void cleanupStaleIndices(
Map<String, BlobContainer> foundIndices,
Set<String> survivingIndexIds,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
GroupedActionListener<DeleteResult> listener
GroupedActionListener<DeleteResult> listener,
RepositoryData oldRepoData,
Collection<SnapshotId> deletedSnapshots
) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
Expand All @@ -1706,7 +1744,13 @@ private void cleanupStaleIndices(
foundIndices.size() - survivingIndexIds.size()
);
for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, groupedListener);
executeOneStaleIndexDelete(
staleIndicesToDelete,
remoteStoreLockManagerFactory,
groupedListener,
oldRepoData,
deletedSnapshots
);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
Expand All @@ -1729,7 +1773,9 @@ private static boolean isIndexPresent(ClusterService clusterService, String inde
private void executeOneStaleIndexDelete(
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
GroupedActionListener<DeleteResult> listener
GroupedActionListener<DeleteResult> listener,
RepositoryData oldRepoData,
Collection<SnapshotId> deletedSnapshots
) throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry != null) {
Expand All @@ -1753,9 +1799,41 @@ private void executeOneStaleIndexDelete(
}
}
}
} else {
String indexToBeDeleted = indexEntry.getKey();
// The indices map within RepositoryData should have just 1 IndexId which has same id as the one
// being deleted. Hence we are adding an assertion here. We still let the deletion happen as
// usual considering there can be more than 1 matching IndexIds.
List<IndexId> indexIds = oldRepoData.getIndices()
.values()
.stream()
.filter(idx -> idx.getId().equals(indexToBeDeleted))
.collect(Collectors.toList());
if (indexIds.size() > 1) {
logger.warn("There are more than 1 matching index ids [{}]", indexIds);
}
assert indexIds.size() == 1 : "There should be exact 1 match of IndexId";
for (SnapshotId snId : deletedSnapshots) {
for (IndexId idx : indexIds) {
String indexMetaGeneration = oldRepoData.indexMetaDataGenerations().indexMetaBlobId(snId, idx);
final BlobContainer indexContainer = indexContainer(idx);
IndexMetadata indexMetadata = INDEX_METADATA_FORMAT.read(
indexContainer,
indexMetaGeneration,
namedXContentRegistry
);
int numOfShards = indexMetadata.getNumberOfShards();
for (int i = 0; i < numOfShards; i++) {
deleteResult.add(shardContainer(idx, i).delete());
}
}
}
}
// TODO - We need to do a metadata lookup and delete the shard level folders.
// TODO - Shallow snapshot only has shallow snap file. Need to check if there will be more changes
// to handle shallow snapshot deletion.
// Deleting the index folder
deleteResult = indexEntry.getValue().delete();
deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} catch (IOException e) {
logger.warn(
Expand All @@ -1772,7 +1850,7 @@ private void executeOneStaleIndexDelete(
logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e);
}

executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener);
executeOneStaleIndexDelete(staleIndicesToDelete, remoteStoreLockManagerFactory, listener, oldRepoData, deletedSnapshots);
return deleteResult;
}));
}
Expand Down Expand Up @@ -1963,7 +2041,13 @@ private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
}

public BlobContainer shardContainer(IndexId indexId, int shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId)));
return shardContainer(indexId.getId(), String.valueOf(shardId));
}

private BlobContainer shardContainer(String indexId, String shardId) {
BasePathInput pathInput = SnapshotShardPathInput.builder().basePath(basePath()).indexUUID(indexId).shardId(shardId).build();
BlobPath shardPath = HASHED_PREFIX.path(pathInput, FNV_1A_COMPOSITE_1);
return blobStore().blobContainer(shardPath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ protected void doRun() {
)
),
e -> {
logger.error("Clone failed during development", e);
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
Expand Down

0 comments on commit c752a16

Please sign in to comment.