Skip to content

Commit

Permalink
Add prefix support to hashed prefix & infix path types on remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Sep 2, 2024
1 parent b54e867 commit ed54727
Show file tree
Hide file tree
Showing 25 changed files with 237 additions and 53 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -109,7 +110,8 @@ public TransportCleanupRepositoryAction(
SnapshotsService snapshotsService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteStoreSettings remoteStoreSettings
) {
super(
CleanupRepositoryAction.NAME,
Expand All @@ -122,7 +124,7 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService, remoteStoreSettings);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;
import org.opensearch.index.remote.RemoteStorePathStrategy.ShardDataPathInput;
import org.opensearch.indices.RemoteStoreSettings;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class RemoteIndexPath implements ToXContentFragment {
private final Iterable<String> basePath;
private final PathType pathType;
private final PathHashAlgorithm pathHashAlgorithm;
private final RemoteStoreSettings remoteStoreSettings;

/**
* This keeps the map of paths that would be present in the content of the index path file. For eg - It is possible
Expand All @@ -82,7 +84,8 @@ public RemoteIndexPath(
Iterable<String> basePath,
PathType pathType,
PathHashAlgorithm pathHashAlgorithm,
Map<DataCategory, List<DataType>> pathCreationMap
Map<DataCategory, List<DataType>> pathCreationMap,
RemoteStoreSettings remoteStoreSettings
) {
if (Objects.isNull(pathCreationMap)
|| Objects.isNull(pathType)
Expand Down Expand Up @@ -119,6 +122,7 @@ public RemoteIndexPath(
this.pathType = pathType;
this.pathHashAlgorithm = pathHashAlgorithm;
this.pathCreationMap = pathCreationMap;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -148,6 +152,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.shardId(Integer.toString(shardNo))
.dataCategory(dataCategory)
.dataType(type)
.fixedPrefix(
dataCategory == TRANSLOG
? remoteStoreSettings.getTranslogPathFixedPrefix()
: remoteStoreSettings.getSegmentsPathFixedPrefix()
)
.build();
builder.value(pathType.path(pathInput, pathHashAlgorithm).buildAsString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.gateway.remote.IndexMetadataUploadListener;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {
private final Settings settings;
private final boolean isRemoteDataAttributePresent;
private final boolean isTranslogSegmentRepoSame;
private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<RepositoriesService> repositoriesService;
private volatile TimeValue metadataUploadTimeout;

Expand All @@ -89,7 +91,8 @@ public RemoteIndexPathUploader(
ThreadPool threadPool,
Settings settings,
Supplier<RepositoriesService> repositoriesService,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
RemoteStoreSettings remoteStoreSettings
) {
super(threadPool, ThreadPool.Names.GENERIC);
this.settings = Objects.requireNonNull(settings);
Expand All @@ -100,6 +103,7 @@ public RemoteIndexPathUploader(
Objects.requireNonNull(clusterSettings);
metadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setMetadataUploadTimeout);
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -208,7 +212,8 @@ private void writePathToRemoteStore(
basePath,
pathType,
hashAlgorithm,
pathCreationMap
pathCreationMap,
remoteStoreSettings
);
String fileName = generateFileName(indexUUID, idxMD.getVersion(), remoteIndexPath.getVersion());
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(remoteIndexPath, blobContainer, fileName, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return BlobPath.cleanPath().add(hashAlgorithm.hash(pathInput)).add(pathInput.basePath()).add(pathInput.fixedSubPath());
return BlobPath.cleanPath()
.add(pathInput.fixedPrefix())
.add(hashAlgorithm.hash(pathInput))
.add(pathInput.basePath())
.add(pathInput.fixedSubPath());
}

@Override
Expand All @@ -119,7 +123,7 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return pathInput.basePath().add(hashAlgorithm.hash(pathInput)).add(pathInput.fixedSubPath());
return pathInput.basePath().add(pathInput.fixedPrefix()).add(hashAlgorithm.hash(pathInput)).add(pathInput.fixedSubPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,12 @@ public BlobPath generatePath(PathInput pathInput) {
public static class PathInput {
private final BlobPath basePath;
private final String indexUUID;
private final String fixedPrefix;

public PathInput(Builder<?> builder) {
this.basePath = Objects.requireNonNull(builder.basePath);
this.indexUUID = Objects.requireNonNull(builder.indexUUID);
this.fixedPrefix = Objects.isNull(builder.fixedPrefix) ? "" : builder.fixedPrefix;
}

BlobPath basePath() {
Expand All @@ -97,6 +99,10 @@ String indexUUID() {
return indexUUID;
}

String fixedPrefix() {
return fixedPrefix;
}

BlobPath fixedSubPath() {
return BlobPath.cleanPath().add(indexUUID);
}
Expand Down Expand Up @@ -126,6 +132,7 @@ public void assertIsValid() {
public static class Builder<T extends Builder<T>> {
private BlobPath basePath;
private String indexUUID;
private String fixedPrefix;

public T basePath(BlobPath basePath) {
this.basePath = basePath;
Expand All @@ -137,6 +144,11 @@ public T indexUUID(String indexUUID) {
return self();
}

public T fixedPrefix(String fixedPrefix) {
this.fixedPrefix = fixedPrefix;
return self();
}

protected T self() {
return (T) this;
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2857,7 +2857,7 @@ public void recoverFromLocalShards(
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener);
success = true;
} finally {
Expand All @@ -2872,13 +2872,13 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.initializing() : "can only start recovery on initializing shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromRemoteStore(this, listener);
}

Expand All @@ -2891,7 +2891,7 @@ public void restoreFromSnapshotAndRemoteStore(
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
+ recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
} catch (Exception e) {
listener.onFailure(e);
Expand All @@ -2903,7 +2903,7 @@ public void restoreFromRepository(Repository repository, ActionListener<Boolean>
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
+ recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger, remoteStoreSettings);
storeRecovery.recoverFromRepository(this, repository, listener);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.opensearch.index.translog.Checkpoint;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogHeader;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -99,10 +100,12 @@ final class StoreRecovery {

private final Logger logger;
private final ShardId shardId;
private final RemoteStoreSettings remoteStoreSettings;

StoreRecovery(ShardId shardId, Logger logger) {
StoreRecovery(ShardId shardId, Logger logger, RemoteStoreSettings remoteStoreSettings) {
this.logger = logger;
this.shardId = shardId;
this.remoteStoreSettings = remoteStoreSettings;
}

/**
Expand Down Expand Up @@ -397,7 +400,8 @@ void recoverFromSnapshotAndRemoteStore(

RemoteSegmentStoreDirectoryFactory directoryFactory = new RemoteSegmentStoreDirectoryFactory(
() -> repositoriesService,
threadPool
threadPool,
remoteStoreSettings
);
RemoteSegmentStoreDirectory sourceRemoteDirectory = (RemoteSegmentStoreDirectory) directoryFactory.newDirectory(
remoteStoreRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand All @@ -40,11 +41,17 @@
@PublicApi(since = "2.3.0")
public class RemoteSegmentStoreDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
private final Supplier<RepositoriesService> repositoriesService;
private final RemoteStoreSettings remoteStoreSettings;

private final ThreadPool threadPool;

public RemoteSegmentStoreDirectoryFactory(Supplier<RepositoriesService> repositoriesService, ThreadPool threadPool) {
public RemoteSegmentStoreDirectoryFactory(
Supplier<RepositoriesService> repositoriesService,
ThreadPool threadPool,
RemoteStoreSettings remoteStoreSettings
) {
this.repositoriesService = repositoriesService;
this.remoteStoreSettings = remoteStoreSettings;
this.threadPool = threadPool;
}

Expand All @@ -71,6 +78,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
.shardId(shardIdStr)
.dataCategory(SEGMENTS)
.dataType(DATA)
.fixedPrefix(remoteStoreSettings.getSegmentsPathFixedPrefix())
.build();
// Derive the path for data directory of SEGMENTS
BlobPath dataPath = pathStrategy.generatePath(dataPathInput);
Expand All @@ -87,6 +95,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
.shardId(shardIdStr)
.dataCategory(SEGMENTS)
.dataType(METADATA)
.fixedPrefix(remoteStoreSettings.getSegmentsPathFixedPrefix())
.build();
// Derive the path for metadata directory of SEGMENTS
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
Expand All @@ -98,7 +107,8 @@ public Directory newDirectory(String repositoryName, String indexUUID, ShardId s
repositoryName,
indexUUID,
shardIdStr,
pathStrategy
pathStrategy,
remoteStoreSettings
);

return new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory, mdLockManager, threadPool, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -31,9 +32,11 @@
@PublicApi(since = "2.8.0")
public class RemoteStoreLockManagerFactory {
private final Supplier<RepositoriesService> repositoriesService;
private final RemoteStoreSettings remoteStoreSettings;

public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesService) {
public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesService, RemoteStoreSettings remoteStoreSettings) {
this.repositoriesService = repositoriesService;
this.remoteStoreSettings = remoteStoreSettings;
}

public RemoteStoreLockManager newLockManager(
Expand All @@ -42,15 +45,16 @@ public RemoteStoreLockManager newLockManager(
String shardId,
RemoteStorePathStrategy pathStrategy
) {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId, pathStrategy);
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId, pathStrategy, remoteStoreSettings);
}

public static RemoteStoreMetadataLockManager newLockManager(
RepositoriesService repositoriesService,
String repositoryName,
String indexUUID,
String shardId,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
RemoteStoreSettings remoteStoreSettings
) {
try (Repository repository = repositoriesService.repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
Expand All @@ -62,6 +66,7 @@ public static RemoteStoreMetadataLockManager newLockManager(
.shardId(shardId)
.dataCategory(SEGMENTS)
.dataType(LOCK_FILES)
.fixedPrefix(remoteStoreSettings.getSegmentsPathFixedPrefix())
.build();
BlobPath lockDirectoryPath = pathStrategy.generatePath(lockFilesPathInput);
BlobContainer lockDirectoryBlobContainer = ((BlobStoreRepository) repository).blobStore().blobContainer(lockDirectoryPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
.shardId(shardIdStr)
.dataCategory(TRANSLOG)
.dataType(DATA)
.fixedPrefix(remoteStoreSettings.getTranslogPathFixedPrefix())
.build();
BlobPath dataPath = pathStrategy.generatePath(dataPathInput);
RemoteStorePathStrategy.ShardDataPathInput mdPathInput = RemoteStorePathStrategy.ShardDataPathInput.builder()
Expand All @@ -400,6 +401,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
.shardId(shardIdStr)
.dataCategory(TRANSLOG)
.dataType(METADATA)
.fixedPrefix(remoteStoreSettings.getTranslogPathFixedPrefix())
.build();
BlobPath mdPath = pathStrategy.generatePath(mdPathInput);
BlobStoreTransferService transferService = new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool);
Expand Down
Loading

0 comments on commit ed54727

Please sign in to comment.