Skip to content

Commit

Permalink
Retry remote state download while bootstrap
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Sep 17, 2024
1 parent 2e98df3 commit 8f79550
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.UncategorizedExecutionException;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.RemoteClusterStateService;
Expand Down Expand Up @@ -312,6 +313,7 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {
updateIndexBlock(false, secondIndexName);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/pull/15950")
public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathThrowsException() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand Down Expand Up @@ -342,7 +344,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
} catch (IOException e) {
throw new RuntimeException(e);
}
assertThrows(IllegalStateException.class, () -> addNewNodes(dataNodeCount, clusterManagerNodeCount));
assertThrows(UncategorizedExecutionException.class, () -> addNewNodes(dataNodeCount, clusterManagerNodeCount));
// Test is complete

// Starting a node without remote state to ensure test cleanup
Expand Down
57 changes: 48 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOError;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
Expand Down Expand Up @@ -175,15 +175,11 @@ public void start(
);
if (ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID) == false) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
// Remote Metadata should always override local disk Metadata
// if local disk Metadata's cluster uuid is UNKNOWN_UUID
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
new String[] {}
clusterState = restoreClusterStateWithRetries(
remoteStoreRestoreService,
clusterState,
lastKnownClusterUUID
);
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService, lastKnownClusterUUID);
Expand Down Expand Up @@ -258,6 +254,49 @@ public void start(
}
}

private ClusterState restoreClusterStateWithRetries(
RemoteStoreRestoreService remoteStoreRestoreService,
ClusterState clusterState,
String lastKnownClusterUUID
) {
int maxAttempts = 5;
int delayInMills = 100;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return restoreClusterState(remoteStoreRestoreService, clusterState, lastKnownClusterUUID);
} catch (Exception e) {
if (attempt == maxAttempts) {
// Throw an Error so that the process is halted.
throw new IOError(e);
}
try {
TimeUnit.MILLISECONDS.sleep(delayInMills);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Restore interrupted status
throw new RuntimeException(ie);

Check warning on line 276 in server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java#L274-L276

Added lines #L274 - L276 were not covered by tests
}
delayInMills = delayInMills * 2;
}
}
// This statement will never be reached.
return null;

Check warning on line 282 in server/src/main/java/org/opensearch/gateway/GatewayMetaState.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/gateway/GatewayMetaState.java#L282

Added line #L282 was not covered by tests
}

ClusterState restoreClusterState(
RemoteStoreRestoreService remoteStoreRestoreService,
ClusterState clusterState,
String lastKnownClusterUUID
) {
return remoteStoreRestoreService.restore(
// Remote Metadata should always override local disk Metadata
// if local disk Metadata's cluster uuid is UNKNOWN_UUID
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
new String[] {}
).getClusterState();
}

// exposed so it can be overridden by tests
ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) {
assert clusterState.nodes().getLocalNode() == null : "prepareInitialClusterState must only be called once";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,14 +1244,72 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I
}
}

private MockGatewayMetaState newGatewayForRemoteState(
public void testGatewayMetaStateRemoteStateDownloadRetries() throws IOException {
MockGatewayMetaState gateway = null;
MockGatewayMetaState gatewayMetaStateSpy = null;
try {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")).thenReturn("test-cluster-uuid");
RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenThrow(
new IllegalStateException("unable to download cluster state")
).thenReturn(RemoteRestoreResult.build("test-cluster-uuid", null, ClusterState.EMPTY_STATE));
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
gateway = initializeGatewayForRemoteState(true);
gatewayMetaStateSpy = Mockito.spy(gateway);
startGatewayForRemoteState(
gatewayMetaStateSpy,
remoteClusterStateService,
remoteStoreRestoreService,
persistedStateRegistry,
ClusterState.EMPTY_STATE
);
verify(gatewayMetaStateSpy, times(2)).restoreClusterState(Mockito.any(), Mockito.any(), Mockito.any());
} finally {
IOUtils.close(gatewayMetaStateSpy);
}
}

public void testGatewayMetaStateRemoteStateDownloadFailure() throws IOException {
MockGatewayMetaState gateway = null;
final MockGatewayMetaState gatewayMetaStateSpy;
try {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")).thenReturn("test-cluster-uuid");
RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenThrow(
new IllegalStateException("unable to download cluster state")
);
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
gateway = initializeGatewayForRemoteState(true);
gatewayMetaStateSpy = Mockito.spy(gateway);
assertThrows(
Error.class,
() -> startGatewayForRemoteState(
gatewayMetaStateSpy,
remoteClusterStateService,
remoteStoreRestoreService,
persistedStateRegistry,
ClusterState.EMPTY_STATE
)
);
verify(gatewayMetaStateSpy, times(5)).restoreClusterState(Mockito.any(), Mockito.any(), Mockito.any());
} finally {
IOUtils.close(gateway);
}
}

private MockGatewayMetaState initializeGatewayForRemoteState(boolean prepareFullState) {
return new MockGatewayMetaState(localNode, bigArrays, prepareFullState);
}

private MockGatewayMetaState startGatewayForRemoteState(
MockGatewayMetaState gateway,
RemoteClusterStateService remoteClusterStateService,
RemoteStoreRestoreService remoteStoreRestoreService,
PersistedStateRegistry persistedStateRegistry,
ClusterState currentState,
boolean prepareFullState
ClusterState currentState
) throws IOException {
MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays, prepareFullState);
String randomRepoName = "randomRepoName";
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
Expand Down Expand Up @@ -1305,6 +1363,24 @@ private MockGatewayMetaState newGatewayForRemoteState(
return gateway;
}

private MockGatewayMetaState newGatewayForRemoteState(
RemoteClusterStateService remoteClusterStateService,
RemoteStoreRestoreService remoteStoreRestoreService,
PersistedStateRegistry persistedStateRegistry,
ClusterState currentState,
boolean prepareFullState
) throws IOException {
MockGatewayMetaState gatewayMetaState = initializeGatewayForRemoteState(prepareFullState);
startGatewayForRemoteState(
gatewayMetaState,
remoteClusterStateService,
remoteStoreRestoreService,
persistedStateRegistry,
currentState
);
return gatewayMetaState;
}

private static BigArrays getBigArrays() {
return usually()
? BigArrays.NON_RECYCLING_INSTANCE
Expand Down

0 comments on commit 8f79550

Please sign in to comment.