Skip to content

Commit

Permalink
Add prefix support to hashed prefix & infix path types on remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Sep 1, 2024
1 parent 03d9a24 commit 9b1a27a
Show file tree
Hide file tree
Showing 23 changed files with 232 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -109,7 +110,8 @@ public TransportCleanupRepositoryAction(
SnapshotsService snapshotsService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteStoreSettings remoteStoreSettings
) {
super(
CleanupRepositoryAction.NAME,
Expand All @@ -122,7 +124,7 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService, remoteStoreSettings);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;
import org.opensearch.index.remote.RemoteStorePathStrategy.ShardDataPathInput;
import org.opensearch.indices.RemoteStoreSettings;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class RemoteIndexPath implements ToXContentFragment {
private final Iterable<String> basePath;
private final PathType pathType;
private final PathHashAlgorithm pathHashAlgorithm;
private final RemoteStoreSettings remoteStoreSettings;

/**
* This keeps the map of paths that would be present in the content of the index path file. For eg - It is possible
Expand All @@ -82,7 +84,8 @@ public RemoteIndexPath(
Iterable<String> basePath,
PathType pathType,
PathHashAlgorithm pathHashAlgorithm,
Map<DataCategory, List<DataType>> pathCreationMap
Map<DataCategory, List<DataType>> pathCreationMap,
RemoteStoreSettings remoteStoreSettings
) {
if (Objects.isNull(pathCreationMap)
|| Objects.isNull(pathType)
Expand Down Expand Up @@ -119,6 +122,7 @@ public RemoteIndexPath(
this.pathType = pathType;
this.pathHashAlgorithm = pathHashAlgorithm;
this.pathCreationMap = pathCreationMap;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -148,6 +152,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.shardId(Integer.toString(shardNo))
.dataCategory(dataCategory)
.dataType(type)
.fixedPrefix(
dataCategory == TRANSLOG
? remoteStoreSettings.getTranslogPathFixedPrefix()
: remoteStoreSettings.getSegmentsPathFixedPrefix()
)
.build();
builder.value(pathType.path(pathInput, pathHashAlgorithm).buildAsString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.gateway.remote.IndexMetadataUploadListener;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {
private final Settings settings;
private final boolean isRemoteDataAttributePresent;
private final boolean isTranslogSegmentRepoSame;
private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<RepositoriesService> repositoriesService;
private volatile TimeValue metadataUploadTimeout;

Expand All @@ -89,7 +91,8 @@ public RemoteIndexPathUploader(
ThreadPool threadPool,
Settings settings,
Supplier<RepositoriesService> repositoriesService,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
RemoteStoreSettings remoteStoreSettings
) {
super(threadPool, ThreadPool.Names.GENERIC);
this.settings = Objects.requireNonNull(settings);
Expand All @@ -100,6 +103,7 @@ public RemoteIndexPathUploader(
Objects.requireNonNull(clusterSettings);
metadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setMetadataUploadTimeout);
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -208,7 +212,8 @@ private void writePathToRemoteStore(
basePath,
pathType,
hashAlgorithm,
pathCreationMap
pathCreationMap,
remoteStoreSettings
);
String fileName = generateFileName(indexUUID, idxMD.getVersion(), remoteIndexPath.getVersion());
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(remoteIndexPath, blobContainer, fileName, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return BlobPath.cleanPath().add(hashAlgorithm.hash(pathInput)).add(pathInput.basePath()).add(pathInput.fixedSubPath());
return BlobPath.cleanPath()
.add(pathInput.fixedPrefix())
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.basePath())
.add(pathInput.fixedSubPath());
}

@Override
Expand All @@ -119,7 +123,7 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return pathInput.basePath().add(hashAlgorithm.hash(pathInput)).add(pathInput.fixedSubPath());
return pathInput.basePath().add(pathInput.fixedPrefix()).add(hashAlgorithm.hash(pathInput)).add(pathInput.fixedSubPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ public BlobPath generatePath(PathInput pathInput) {
public static class PathInput {
private final BlobPath basePath;
private final String indexUUID;
private final String fixedPrefix;

public PathInput(Builder<?> builder) {
this.basePath = Objects.requireNonNull(builder.basePath);
this.indexUUID = Objects.requireNonNull(builder.indexUUID);
this.fixedPrefix = Objects.isNull(builder.fixedPrefix) ? "" : builder.fixedPrefix;
}

BlobPath basePath() {
Expand All @@ -96,6 +98,10 @@ String indexUUID() {
return indexUUID;
}

String fixedPrefix() {
return fixedPrefix;
}

BlobPath fixedSubPath() {
return BlobPath.cleanPath().add(indexUUID);
}
Expand All @@ -121,17 +127,23 @@ public void assertIsValid() {
public static class Builder<T extends Builder<T>> {
private BlobPath basePath;
private String indexUUID;
private String fixedPrefix;

public T basePath(BlobPath basePath) {
this.basePath = basePath;
return self();
}

public Builder indexUUID(String indexUUID) {
public T indexUUID(String indexUUID) {
this.indexUUID = indexUUID;
return self();
}

public T fixedPrefix(String fixedPrefix) {
this.fixedPrefix = fixedPrefix;
return self();
}

protected T self() {
return (T) this;
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2857,7 +2857,7 @@ public void recoverFromLocalShards(
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener);
success = true;
} finally {
Expand All @@ -2872,13 +2872,13 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.initializing() : "can only start recovery on initializing shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromRemoteStore(this, listener);
}

Expand All @@ -2891,7 +2891,7 @@ public void restoreFromSnapshotAndRemoteStore(
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
+ recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
} catch (Exception e) {
listener.onFailure(e);
Expand All @@ -2903,7 +2903,7 @@ public void restoreFromRepository(Repository repository, ActionListener<Boolean>
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
+ recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromRepository(this, repository, listener);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.index.translog.Checkpoint;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogHeader;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -99,10 +100,12 @@ final class StoreRecovery {

private final Logger logger;
private final ShardId shardId;
private final RemoteStoreSettings remoteStoreSettings;

StoreRecovery(ShardId shardId, Logger logger) {
StoreRecovery(ShardId shardId, Logger logger, RemoteStoreSettings remoteStoreSettings) {
this.logger = logger;
this.shardId = shardId;
this.remoteStoreSettings = remoteStoreSettings;
}

/**
Expand Down Expand Up @@ -397,7 +400,8 @@ void recoverFromSnapshotAndRemoteStore(

RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(
() -> repositoriesService,
threadPool
threadPool,
remoteStoreSettings
);
RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
remoteStoreRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand All @@ -40,11 +41,17 @@
@PublicApi(since = "2.3.0")
public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
private final Supplier<RepositoriesService> repositoriesService;
private final RemoteStoreSettings remoteStoreSettings;

private final ThreadPool threadPool;

public RemoteSegmentStoreDirectoryFactory(Supplier<RepositoriesService> repositoriesService, ThreadPool threadPool) {
public RemoteSegmentStoreDirectoryFactory(
Supplier<RepositoriesService> repositoriesService,
ThreadPool threadPool,
RemoteStoreSettings remoteStoreSettings
) {
this.repositoriesService = repositoriesService;
this.remoteStoreSettings = remoteStoreSettings;
this.threadPool = threadPool;
}

Expand All @@ -71,6 +78,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
.shardId(shardIdStr)
.dataCategory(SEGMENTS)
.dataType(DATA)
.fixedPrefix(remoteStoreSettings.getSegmentsPathFixedPrefix())
.build();
// Derive the path for data directory of SEGMENTS
BlobPath dataPath = pathStrategy.generatePath(dataPathInput);
Expand All @@ -87,6 +95,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
.shardId(shardIdStr)
.dataCategory(SEGMENTS)
.dataType(METADATA)
.fixedPrefix(remoteStoreSettings.getSegmentsPathFixedPrefix())
.build();
// Derive the path for metadata directory of SEGMENTS
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
Expand All @@ -98,7 +107,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
repositoryName,
indexUUID,
shardIdStr,
pathStrategy
pathStrategy,
remoteStoreSettings
);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -31,9 +32,11 @@
@PublicApi(since = "2.8.0")
public class RemoteStoreLockManagerFactory {
private final Supplier<RepositoriesService> repositoriesService;
private final RemoteStoreSettings remoteStoreSettings;

public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesService) {
public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesService, RemoteStoreSettings remoteStoreSettings) {
this.repositoriesService = repositoriesService;
this.remoteStoreSettings = remoteStoreSettings;
}

public RemoteStoreLockManager newLockManager(
Expand All @@ -42,15 +45,16 @@ public RemoteStoreLockManager newLockManager(
String shardId,
RemoteStorePathStrategy pathStrategy
) {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId, pathStrategy);
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId, pathStrategy, remoteStoreSettings);
}

public static RemoteStoreMetadataLockManager newLockManager(
RepositoriesService repositoriesService,
String repositoryName,
String indexUUID,
String shardId,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) {
try (Repository repository = repositoriesService.repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
Expand All @@ -62,6 +66,7 @@ public static RemoteStoreMetadataLockManager newLockManager(
.shardId(shardId)
.dataCategory(SEGMENTS)
.dataType(LOCK_FILES)
.fixedPrefix(remoteStoreSettings.getSegmentsPathFixedPrefix())
.build();
BlobPath lockDirectoryPath = pathStrategy.generatePath(lockFilesPathInput);
BlobContainer lockDirectoryBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(lockDirectoryPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
.shardId(shardIdStr)
.dataCategory(TRANSLOG)
.dataType(DATA)
.fixedPrefix(remoteStoreSettings.getTranslogPathFixedPrefix())
.build();
BlobPath dataPath = pathStrategy.generatePath(dataPathInput);
RemoteStorePathStrategy.ShardDataPathInput mdPathInput = RemoteStorePathStrategy.ShardDataPathInput.builder()
Expand All @@ -318,6 +319,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
.shardId(shardIdStr)
.dataCategory(TRANSLOG)
.dataType(METADATA)
.fixedPrefix(remoteStoreSettings.getTranslogPathFixedPrefix())
.build();
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
Expand Down
Loading

0 comments on commit 9b1a27a

Please sign in to comment.