Skip to content

Commit

Permalink
Restore snapshot changes for V2
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Sep 1, 2024
1 parent 770a791 commit 4a68ff5
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RestoreShallowSnapshotV2IT extends RemoteRestoreSnapshotIT {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.build();
}

@Override
protected void createRepository(String repoName, String type, Settings.Builder settings) {
logger.info("--> creating repository [{}] [{}]", repoName, type);
settings.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);
assertAcked(clusterAdmin().preparePutRepository(repoName).setType(type).setSettings(settings));
}

@Override
protected void createRepository(String repoName, String type, Path location) {
Settings.Builder settings = Settings.builder()
.put("location", location)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true);

createRepository(repoName, type, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private Map<ShardId, IndexShardSnapshotStatus> snapshotShards(
// could not be taken due to partial being set to false.
shardSnapshotStatus = IndexShardSnapshotStatus.newFailed("skipped");
} else {
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo.snapshotId(), indexId, shardId);
shardSnapshotStatus = repository.getShardSnapshotStatus(snapshotInfo, indexId, shardId);
}
shardStatus.put(shardId, shardSnapshotStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.io.IOException;
import java.util.Objects;

import static org.opensearch.Version.CURRENT;

/**
* Represents the recovery source of a shard. Available recovery types are:
* <p>
Expand Down Expand Up @@ -265,8 +267,14 @@ public static class SnapshotRecoverySource extends RecoverySource {
private final boolean remoteStoreIndexShallowCopy;
private final String sourceRemoteStoreRepository;

private final long pinnedTimestamp;

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId) {
this(restoreUUID, snapshot, version, indexId, false, false, null);
this(restoreUUID, snapshot, version, indexId, false, false, null, 0L);
}

public SnapshotRecoverySource(String restoreUUID, Snapshot snapshot, Version version, IndexId indexId, long pinnedTimestamp) {
this(restoreUUID, snapshot, version, indexId, false, false, null, pinnedTimestamp);
}

public SnapshotRecoverySource(
Expand All @@ -285,6 +293,27 @@ public SnapshotRecoverySource(
this.isSearchableSnapshot = isSearchableSnapshot;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
this.pinnedTimestamp = 0L;
}

public SnapshotRecoverySource(
String restoreUUID,
Snapshot snapshot,
Version version,
IndexId indexId,
boolean isSearchableSnapshot,
boolean remoteStoreIndexShallowCopy,
@Nullable String sourceRemoteStoreRepository,
long pinnedTimestamp
) {
this.restoreUUID = restoreUUID;
this.snapshot = Objects.requireNonNull(snapshot);
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
this.isSearchableSnapshot = isSearchableSnapshot;
this.remoteStoreIndexShallowCopy = remoteStoreIndexShallowCopy;
this.sourceRemoteStoreRepository = sourceRemoteStoreRepository;
this.pinnedTimestamp = pinnedTimestamp;
}

SnapshotRecoverySource(StreamInput in) throws IOException {
Expand All @@ -304,6 +333,11 @@ public SnapshotRecoverySource(
remoteStoreIndexShallowCopy = false;
sourceRemoteStoreRepository = null;
}
if (in.getVersion().onOrAfter(CURRENT)) {
pinnedTimestamp = in.readLong();
} else {
pinnedTimestamp = 0L;
}
}

public String restoreUUID() {
Expand Down Expand Up @@ -340,6 +374,10 @@ public boolean remoteStoreIndexShallowCopy() {
return remoteStoreIndexShallowCopy;
}

public long getPinnedTimestamp() {
return pinnedTimestamp;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
Expand All @@ -353,6 +391,10 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeBoolean(remoteStoreIndexShallowCopy);
out.writeOptionalString(sourceRemoteStoreRepository);
}
if (out.getVersion().onOrAfter(CURRENT)) {
out.writeLong(pinnedTimestamp);
}

}

@Override
Expand Down
76 changes: 66 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteStorePathStrategy;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.search.stats.ShardSearchStats;
Expand Down Expand Up @@ -2479,6 +2480,10 @@ private void loadGlobalCheckpointToReplicationTracker() throws IOException {
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
openEngineAndRecoverFromTranslog(true);
}

public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOException {
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
Expand All @@ -2499,7 +2504,16 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
loadGlobalCheckpointToReplicationTracker();
}

innerOpenEngineAndTranslog(replicationTracker);
if (isSnapshotV2Restore()) {
translogConfig.setDownloadRemoteTranslogOnInit(false);
}

innerOpenEngineAndTranslog(replicationTracker, syncFromRemote);

if (isSnapshotV2Restore()) {
translogConfig.setDownloadRemoteTranslogOnInit(true);
}

getEngine().translogManager()
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
}
Expand Down Expand Up @@ -2561,7 +2575,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
if (shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
} else if (isSnapshotV2Restore() == false) {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
Expand Down Expand Up @@ -2607,6 +2621,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

private boolean isSnapshotV2Restore() {
return routingEntry().recoverySource().getType() == RecoverySource.Type.SNAPSHOT
&& ((SnapshotRecoverySource) routingEntry().recoverySource()).getPinnedTimestamp() > 0;
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = fetchUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
Expand Down Expand Up @@ -2892,7 +2911,12 @@ public void restoreFromSnapshotAndRemoteStore(
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT : "invalid recovery type: "
+ recoveryState.getRecoverySource();
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState().getRecoverySource();
if (recoverySource.getPinnedTimestamp() != 0) {
storeRecovery.recoverShallowSnapshotV2(this, repository, repositoriesService, listener, threadPool);
} else {
storeRecovery.recoverFromSnapshotAndRemoteStore(this, repository, repositoriesService, listener, threadPool);
}
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down Expand Up @@ -5000,16 +5024,33 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting);
assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory;
Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository();
syncTranslogFilesFromRemoteTranslog(
repository,
shardId,
indexSettings.getRemoteStorePathStrategy(),
indexSettings().isTranslogMetadataEnabled(),
0
);
}

public void syncTranslogFilesFromRemoteTranslog(
Repository repository,
ShardId shardId,
RemoteStorePathStrategy remoteStorePathStrategy,
boolean isTranslogMetadataEnabled,
long timestamp
) throws IOException {
RemoteFsTranslog.download(
repository,
shardId,
getThreadPool(),
shardPath().resolveTranslog(),
indexSettings.getRemoteStorePathStrategy(),
remoteStorePathStrategy,
remoteStoreSettings,
logger,
shouldSeedRemoteStore(),
indexSettings().isTranslogMetadataEnabled()
isTranslogMetadataEnabled,
timestamp
);
}

Expand Down Expand Up @@ -5098,15 +5139,13 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
* Downloads segments from given remote segment store for a specific commit.
* @param overrideLocal flag to override local segment files with those in remote store
* @param sourceRemoteDirectory RemoteSegmentDirectory Instance from which we need to sync segments
* @param primaryTerm Primary Term for shard at the time of commit operation for which we are syncing segments
* @param commitGeneration commit generation at the time of commit operation for which we are syncing segments
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromGivenRemoteSegmentStore(
boolean overrideLocal,
RemoteSegmentStoreDirectory sourceRemoteDirectory,
long primaryTerm,
long commitGeneration
RemoteSegmentMetadata remoteSegmentMetadata,
boolean pinnedTimestamp
) throws IOException {
logger.trace("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = null;
Expand Down Expand Up @@ -5142,12 +5181,29 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
overrideLocal,
() -> {}
);
if (segmentsNFile != null) {
if (pinnedTimestamp) {
final SegmentInfos infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
remoteSegmentMetadata.getGeneration()
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
// delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
// Extra segments will be wiped on engine open.
for (String file : List.of(store.directory().listAll())) {
if (file.startsWith(IndexFileNames.SEGMENTS)) {
store.deleteQuiet(file);
}
}
assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} else if (segmentsNFile != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentsNFile, IOContext.DEFAULT)
)
) {
long commitGeneration = SegmentInfos.generationFromSegmentsFileName(segmentsNFile);
SegmentInfos infosSnapshot = SegmentInfos.readCommit(store.directory(), indexInput, commitGeneration);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
if (remoteStore != null) {
Expand Down
Loading

0 comments on commit 4a68ff5

Please sign in to comment.