Skip to content

Commit

Permalink
Fix shard failure on flush during upload failures for remote indexes
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 9, 2023
1 parent 8bb11a6 commit a77b784
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.opensearch.OpenSearchException.OpenSearchExceptionHandleRegistry.registerExceptionHandle;
import static org.opensearch.OpenSearchException.UNKNOWN_VERSION_ADDED;
import static org.opensearch.Version.V_2_10_0;
import static org.opensearch.Version.V_2_11_0;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_2_4_0;
import static org.opensearch.Version.V_2_5_0;
Expand Down Expand Up @@ -1175,6 +1176,14 @@ public static void registerExceptions() {
)
);
registerExceptionHandle(new OpenSearchExceptionHandle(CryptoRegistryException.class, CryptoRegistryException::new, 171, V_2_10_0));
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.index.translog.transfer.TranslogUploadFailedException.class,
org.opensearch.index.translog.transfer.TranslogUploadFailedException::new,
172,
V_2_11_0
)
);
registerExceptionHandle(
new OpenSearchExceptionHandle(
org.opensearch.cluster.block.IndexCreateBlockException.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -1888,6 +1889,9 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
}

translogManager.trimUnreferencedReaders();
} catch (TranslogUploadFailedException e) {
// Do not fail engine as this is due to translog upload failure
throw new FlushFailedEngineException(shardId, e);
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.index.engine.LifecycleAware;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.translog.listener.TranslogEventListener;
import org.opensearch.index.translog.transfer.TranslogUploadFailedException;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -88,6 +89,9 @@ public void rollTranslogGeneration() throws TranslogException {
engineLifeCycleAware.ensureOpen();
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (TranslogUploadFailedException e) {
// Do not trigger the translogEventListener as it fails the Engine while this is only an issue with remote upload
throw e;
} catch (AlreadyClosedException e) {
translogEventListener.onFailure("translog roll generation failed", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L);
remoteTranslogTransferTracker.addUploadBytesFailed(metadataBytesToUpload);
// outer catch handles capturing stats on upload failure
throw exception;
throw new TranslogUploadFailedException(shardId, "Failed to upload " + tlogMetadata.getName(), exception);
}

remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L);
Expand All @@ -185,7 +185,10 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
translogTransferListener.onUploadComplete(transferSnapshot);
return true;
} else {
Exception ex = new IOException("Failed to upload " + exceptionList.size() + " files during transfer");
Exception ex = new TranslogUploadFailedException(
shardId,
"Failed to upload " + exceptionList.size() + " files during transfer"
);
exceptionList.forEach(ex::addSuppressed);
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.translog.TranslogException;

import java.io.IOException;

/**
* Exception is thrown if there are any exceptions while uploading translog to remote store.
* @opensearch.internal
*/
public class TranslogUploadFailedException extends TranslogException {

public TranslogUploadFailedException(ShardId shardId, String message) {
super(shardId, message);
}

public TranslogUploadFailedException(ShardId shardId, String message, Throwable cause) {
super(shardId, message, cause);
}

public TranslogUploadFailedException(StreamInput in) throws IOException {
super(in);
}
}

0 comments on commit a77b784

Please sign in to comment.