Skip to content

Commit

Permalink
[Remote Store] Fix shard failure on flush due to upload timeout (open…
Browse files Browse the repository at this point in the history
  • Loading branch information
ashking94 authored Oct 26, 2023
1 parent 003b2cf commit fe8b2d5
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
Expand Down Expand Up @@ -156,14 +155,17 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans

try {
if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) {
Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete");
Exception ex = new TranslogUploadFailedException(
"Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"
);
exceptionList.forEach(ex::addSuppressed);
throw ex;
}
} catch (InterruptedException ex) {
exceptionList.forEach(ex::addSuppressed);
Exception exception = new TranslogUploadFailedException("Failed to upload " + transferSnapshot, ex);
exceptionList.forEach(exception::addSuppressed);
Thread.currentThread().interrupt();
throw ex;
throw exception;
}
if (exceptionList.isEmpty()) {
TransferFileSnapshot tlogMetadata = prepareMetadata(transferSnapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.SetOnce;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand All @@ -35,6 +36,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
Expand Down Expand Up @@ -180,6 +182,93 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
assertEquals(4, fileTransferTracker.allUploaded().size());
}

public void testTransferSnapshotOnUploadTimeout() throws Exception {
doAnswer(invocationOnMock -> {
Thread.sleep(31 * 1000);
return null;
}).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class));
FileTransferTracker fileTransferTracker = new FileTransferTracker(
new ShardId("index", "indexUUid", 0),
remoteTranslogTransferTracker
);
TranslogTransferManager translogTransferManager = new TranslogTransferManager(
shardId,
transferService,
remoteBaseTransferPath,
fileTransferTracker,
remoteTranslogTransferTracker
);
SetOnce<Exception> exception = new SetOnce<>();
translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() {
@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) {}

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
exception.set(ex);
}
});
assertNotNull(exception.get());
assertTrue(exception.get() instanceof TranslogUploadFailedException);
assertEquals("Timed out waiting for transfer of snapshot test-to-string to complete", exception.get().getMessage());
}

public void testTransferSnapshotOnThreadInterrupt() throws Exception {
SetOnce<Thread> uploadThread = new SetOnce<>();
doAnswer(invocationOnMock -> {
uploadThread.set(new Thread(() -> {
ActionListener<TransferFileSnapshot> listener = invocationOnMock.getArgument(2);
try {
Thread.sleep(31 * 1000);
} catch (InterruptedException ignore) {
List<TransferFileSnapshot> list = new ArrayList<>(invocationOnMock.getArgument(0));
listener.onFailure(new FileTransferException(list.get(0), ignore));
}
}));
uploadThread.get().start();
return null;
}).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class));
FileTransferTracker fileTransferTracker = new FileTransferTracker(
new ShardId("index", "indexUUid", 0),
remoteTranslogTransferTracker
);
TranslogTransferManager translogTransferManager = new TranslogTransferManager(
shardId,
transferService,
remoteBaseTransferPath,
fileTransferTracker,
remoteTranslogTransferTracker
);
SetOnce<Exception> exception = new SetOnce<>();

Thread thread = new Thread(() -> {
try {
translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() {
@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) {}

@Override
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
exception.set(ex);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});
thread.start();

Thread.sleep(1000);
// Interrupt the thread
thread.interrupt();
assertBusy(() -> {
assertNotNull(exception.get());
assertTrue(exception.get() instanceof TranslogUploadFailedException);
assertEquals("Failed to upload test-to-string", exception.get().getMessage());
});
uploadThread.get().interrupt();
}

private TransferSnapshot createTransferSnapshot() {
return new TransferSnapshot() {
@Override
Expand Down Expand Up @@ -232,6 +321,11 @@ public Set<TransferFileSnapshot> getTranslogFileSnapshots() {
public TranslogTransferMetadata getTranslogTransferMetadata() {
return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5));
}

@Override
public String toString() {
return "test-to-string";
}
};
}

Expand Down

0 comments on commit fe8b2d5

Please sign in to comment.