Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.1] Handling OpenSearchRejectExecuteException Exception (#1004) #1023

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/main/kotlin/org/opensearch/replication/ReplicationException.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,28 @@
package org.opensearch.replication

import org.opensearch.OpenSearchException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ShardOperationFailedException
import org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE
import org.opensearch.index.shard.ShardId
import org.opensearch.rest.RestStatus

/**
* Base class replication exceptions. Note: Replication process may throw exceptions that do not derive from this such as
* [org.opensearch.ResourceAlreadyExistsException], [org.opensearch.index.IndexNotFoundException] or
* [org.opensearch.index.shard.ShardNotFoundException].
*/
class ReplicationException: OpenSearchException {
class ReplicationException: OpenSearchStatusException {

constructor(message: String, vararg args: Any) : super(message, *args)
constructor(message: String, status : RestStatus, cause: Throwable, vararg args: Any) : super(message, status, cause, *args)

constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args)
constructor(message: String, vararg args: Any) : super(message, RestStatus.INTERNAL_SERVER_ERROR, *args)

constructor(message: String, shardFailures: Array<ShardOperationFailedException>) : super(message) {
constructor(message: String, status: RestStatus, vararg args: Any) : super(message, status, *args)

constructor(cause: Throwable, status: RestStatus, vararg args: Any) : super(cause.message, status, *args)

constructor(message: String, shardFailures: Array<ShardOperationFailedException>): super(message, shardFailures.firstOrNull()?.status()?:RestStatus.INTERNAL_SERVER_ERROR) {
shardFailures.firstOrNull()?.let {
setShard(ShardId(it.index(), INDEX_UUID_NA_VALUE, it.shardId()))
// Add first failure as cause and rest as suppressed...
Expand Down
16 changes: 9 additions & 7 deletions src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_readers_per_shard", 2, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_writers_per_shard", 2, 1,
Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_PARALLEL_READ_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.follower.poll_interval", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1),
TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope)
val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.autofollow.fetch_poll_interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30),
Expand Down Expand Up @@ -342,14 +344,14 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,

override fun getSettings(): List<Setting<*>> {
return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE,
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL,
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING,
REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE)
REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD,
REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS,
REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL,
REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL,
REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING,
REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE,
REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD)
}

override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry,
clusterService: ClusterService, recoverySettings: RecoverySettings): Map<String, Repository.Factory> {
val repoFactory = Repository.Factory { repoMetadata: RepositoryMetadata ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ open class ReplicationSettings(clusterService: ClusterService) {
@Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings)
@Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings)
@Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD)
@Volatile var writersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD)
@Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE)
@Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL)
@Volatile var autofollowFetchPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL)
Expand All @@ -41,6 +42,7 @@ open class ReplicationSettings(clusterService: ClusterService) {
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE) { value: ByteSizeValue -> this.chunkSize = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS) { value: Int -> this.concurrentFileChunks = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) { value: Int -> this.readersPerShard = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) { value: Int -> this.writersPerShard = value}
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) { batchSize = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) { pollDuration = it }
clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION) { leaseRenewalMaxFailureDuration = it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
// Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job.
val rateLimiter = Semaphore(replicationSettings.readersPerShard)
val sequencer = TranslogSequencer(scope, replicationMetadata, followerShardId, leaderAlias, leaderShardId.indexName,
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats)
TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats, replicationSettings.writersPerShard)

val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
Expand Down Expand Up @@ -251,7 +251,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1)
logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}")
changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1)

// Propagate 4xx exceptions up the chain and halt replication as they are irrecoverable
val range4xx = 400.rangeTo(499)
if (e is OpenSearchException &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@ import org.opensearch.replication.util.suspendExecuteWithRetries
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import org.opensearch.client.Client
import org.opensearch.OpenSearchException
import org.opensearch.action.support.TransportActions
import org.opensearch.common.logging.Loggers
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId
import java.util.ArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import org.opensearch.rest.RestStatus


/**
* A TranslogSequencer allows multiple producers of [Translog.Operation]s to write them in sequence number order to an
Expand All @@ -50,7 +55,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
private val followerShardId: ShardId,
private val leaderAlias: String, private val leaderIndexName: String,
private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long,
private val followerClusterStats: FollowerClusterStats) {
private val followerClusterStats: FollowerClusterStats, writersPerShard : Int) {

private val unAppliedChanges = ConcurrentHashMap<Long, GetChangesResponse>()
private val log = Loggers.getLogger(javaClass, followerShardId)!!
Expand All @@ -59,11 +64,14 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val followerIndexService = indicesService.indexServiceSafe(followerShardId.index)
val indexShard = followerIndexService.getShard(followerShardId.id)

private val sequencer = scope.actor<Unit>(capacity = Channel.UNLIMITED) {
private val sequencer = scope.actor<Unit>(capacity = 0) {

// Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will
// raise the same exception. See [SendChannel.close] method for details.
val rateLimiter = Semaphore(writersPerShard)
var highWatermark = initialSeqNo
for (m in channel) {
rateLimiter.acquire()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be here, can you move it to before the while statement.

while (unAppliedChanges.containsKey(highWatermark + 1)) {
val next = unAppliedChanges.remove(highWatermark + 1)!!
val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes,
Expand All @@ -73,26 +81,55 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
var relativeStartNanos = System.nanoTime()
val retryOnExceptions = ArrayList<Class<*>>()
retryOnExceptions.add(MappingNotAvailableException::class.java)
var tryReplay = true
try {
while (tryReplay) {
tryReplay = false
try {
val replayResponse = client.suspendExecuteWithRetries(
replicationMetadata,
ReplayChangesAction.INSTANCE,
replayRequest,
log = log,
retryOn = retryOnExceptions
)
if (replayResponse.shardInfo.failed > 0) {
replayResponse.shardInfo.failures.forEachIndexed { i, failure ->
log.error("Failed replaying changes. Failure:$i:$failure}")
}
followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(
replayResponse.shardInfo.failed.toLong()
)
throw ReplicationException(
"failed to replay changes",
replayResponse.shardInfo.failures
)
}

val replayResponse = client.suspendExecuteWithRetries(
replicationMetadata,
ReplayChangesAction.INSTANCE,
replayRequest,
log = log,
retryOn = retryOnExceptions
)
if (replayResponse.shardInfo.failed > 0) {
replayResponse.shardInfo.failures.forEachIndexed { i, failure ->
log.error("Failed replaying changes. Failure:$i:$failure}")
val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(
TimeUnit.NANOSECONDS.toMillis(tookInNanos)
)
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(
replayRequest.changes.size.toLong()
)
} catch (e: OpenSearchException) {
if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass)
|| TransportActions.isShardNotAvailableException(e)
// This waits for the dependencies to load and retry. Helps during boot-up
|| e.status().status >= 500
|| e.status() == RestStatus.TOO_MANY_REQUESTS)) {
tryReplay = true
}
else {
log.error("Got non-retriable Exception:${e.message} with status:${e.status()}")
throw e
}
}
}
followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(replayResponse.shardInfo.failed.toLong())
throw ReplicationException("failed to replay changes", replayResponse.shardInfo.failures)
} finally {
rateLimiter.release()
}

val tookInNanos = System.nanoTime() - relativeStartNanos
followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos))
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong())
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
}
highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark
}
Expand All @@ -105,6 +142,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
completed.await()
}


suspend fun send(changes : GetChangesResponse) {
unAppliedChanges[changes.fromSeqNo] = changes
sequencer.send(Unit)
Expand Down
25 changes: 19 additions & 6 deletions src/main/kotlin/org/opensearch/replication/util/Extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.action.index.IndexRequestBuilder
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.TransportActions
import org.opensearch.client.Client
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.shard.ShardId
Expand All @@ -43,6 +44,7 @@ import org.opensearch.transport.NodeDisconnectedException
import org.opensearch.transport.NodeNotConnectedException
import java.io.PrintWriter
import java.io.StringWriter
import java.lang.Exception

/*
* Extension function to use the store object
Expand Down Expand Up @@ -110,7 +112,8 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
defaultContext: Boolean = false): Resp {
var currentBackoff = backoff
retryOn.addAll(defaultRetryableExceptions())
repeat(numberOfRetries - 1) {
var retryException: Exception
repeat(numberOfRetries - 1) { index ->
try {
return suspendExecute(replicationMetadata, action, req,
injectSecurityContext = injectSecurityContext, defaultContext = defaultContext)
Expand All @@ -122,19 +125,29 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
// This waits for the dependencies to load and retry. Helps during boot-up
|| e.status().status >= 500
|| e.status() == RestStatus.TOO_MANY_REQUESTS)) {
log.warn("Encountered a failure while executing in $req. Retrying in ${currentBackoff/1000} seconds" +
".", e)
delay(currentBackoff)
currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut)
retryException = e;
} else {
throw e
}
} catch (e: OpenSearchRejectedExecutionException) {
if(index < numberOfRetries-2) {
retryException = e;
}
else {
throw ReplicationException(e, RestStatus.TOO_MANY_REQUESTS)
}
}
log.warn(
"Encountered a failure while executing in $req. Retrying in ${currentBackoff / 1000} seconds" +
".", retryException
)
delay(currentBackoff)
currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut)

}
return suspendExecute(replicationMetadata, action, req,
injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) // last attempt
}

/**
* Restore shard from leader cluster with retries.
* Only specified error are retried
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class TranslogSequencerTests : OpenSearchTestCase() {
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats)
client, startSeqNo, stats, 2)

// Send requests out of order (shuffled seqNo) and await for them to be processed.
var batchSeqNo = startSeqNo
Expand Down