diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationException.kt b/src/main/kotlin/org/opensearch/replication/ReplicationException.kt index 89d2456c..891be0a3 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationException.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationException.kt @@ -12,22 +12,28 @@ package org.opensearch.replication import org.opensearch.OpenSearchException +import org.opensearch.OpenSearchStatusException import org.opensearch.action.ShardOperationFailedException import org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE import org.opensearch.index.shard.ShardId +import org.opensearch.rest.RestStatus /** * Base class replication exceptions. Note: Replication process may throw exceptions that do not derive from this such as * [org.opensearch.ResourceAlreadyExistsException], [org.opensearch.index.IndexNotFoundException] or * [org.opensearch.index.shard.ShardNotFoundException]. */ -class ReplicationException: OpenSearchException { +class ReplicationException: OpenSearchStatusException { - constructor(message: String, vararg args: Any) : super(message, *args) + constructor(message: String, status : RestStatus, cause: Throwable, vararg args: Any) : super(message, status, cause, *args) - constructor(message: String, cause: Throwable, vararg args: Any) : super(message, cause, *args) + constructor(message: String, vararg args: Any) : super(message, RestStatus.INTERNAL_SERVER_ERROR, *args) - constructor(message: String, shardFailures: Array) : super(message) { + constructor(message: String, status: RestStatus, vararg args: Any) : super(message, status, *args) + + constructor(cause: Throwable, status: RestStatus, vararg args: Any) : super(cause.message, status, *args) + + constructor(message: String, shardFailures: Array): super(message, shardFailures.firstOrNull()?.status()?:RestStatus.INTERNAL_SERVER_ERROR) { shardFailures.firstOrNull()?.let { setShard(ShardId(it.index(), INDEX_UUID_NA_VALUE, it.shardId())) // Add first failure as cause and rest as suppressed... diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 3ea64fe0..4254412c 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -175,6 +175,8 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_readers_per_shard", 2, 1, Setting.Property.Dynamic, Setting.Property.NodeScope) + val REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD = Setting.intSetting("plugins.replication.follower.concurrent_writers_per_shard", 2, 1, + Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_PARALLEL_READ_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.follower.poll_interval", TimeValue.timeValueMillis(50), TimeValue.timeValueMillis(1), TimeValue.timeValueSeconds(1), Setting.Property.Dynamic, Setting.Property.NodeScope) val REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL = Setting.timeSetting ("plugins.replication.autofollow.fetch_poll_interval", TimeValue.timeValueSeconds(30), TimeValue.timeValueSeconds(30), @@ -346,14 +348,14 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getSettings(): List> { return listOf(REPLICATED_INDEX_SETTING, REPLICATION_FOLLOWER_OPS_BATCH_SIZE, REPLICATION_LEADER_THREADPOOL_SIZE, - REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD, - REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS, - REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL, - REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL, - REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING, - REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE) + REPLICATION_LEADER_THREADPOOL_QUEUE_SIZE, REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD, + REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE, REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS, + REPLICATION_PARALLEL_READ_POLL_INTERVAL, REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL, + REPLICATION_AUTOFOLLOW_REMOTE_INDICES_RETRY_POLL_INTERVAL, REPLICATION_METADATA_SYNC_INTERVAL, + REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION, REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING, + REPLICATION_INDEX_TRANSLOG_RETENTION_SIZE, REPLICATION_FOLLOWER_BLOCK_START, REPLICATION_AUTOFOLLOW_CONCURRENT_REPLICATION_JOBS_TRIGGER_SIZE, + REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) } - override fun getInternalRepositories(env: Environment, namedXContentRegistry: NamedXContentRegistry, clusterService: ClusterService, recoverySettings: RecoverySettings): Map { val repoFactory = Repository.Factory { repoMetadata: RepositoryMetadata -> diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt index a6d1bbd3..2b516f8e 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationSettings.kt @@ -24,6 +24,7 @@ open class ReplicationSettings(clusterService: ClusterService) { @Volatile var chunkSize = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE.get(clusterService.settings) @Volatile var concurrentFileChunks = ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS.get(clusterService.settings) @Volatile var readersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) + @Volatile var writersPerShard = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) @Volatile var batchSize = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) @Volatile var pollDuration: TimeValue = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) @Volatile var autofollowFetchPollDuration = clusterService.clusterSettings.get(ReplicationPlugin.REPLICATION_AUTOFOLLOW_REMOTE_INDICES_POLL_INTERVAL) @@ -41,6 +42,7 @@ open class ReplicationSettings(clusterService: ClusterService) { clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_CHUNK_SIZE) { value: ByteSizeValue -> this.chunkSize = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_RECOVERY_PARALLEL_CHUNKS) { value: Int -> this.concurrentFileChunks = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_READERS_PER_SHARD) { value: Int -> this.readersPerShard = value} + clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_CONCURRENT_WRITERS_PER_SHARD) { value: Int -> this.writersPerShard = value} clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_FOLLOWER_OPS_BATCH_SIZE) { batchSize = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_PARALLEL_READ_POLL_INTERVAL) { pollDuration = it } clusterSettings.addSettingsUpdateConsumer(ReplicationPlugin.REPLICATION_RETENTION_LEASE_MAX_FAILURE_DURATION) { leaseRenewalMaxFailureDuration = it } diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index f08b2c6b..e393805e 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -214,7 +214,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: // Since this setting is not dynamic, setting update would only reflect after pause-resume or on a new replication job. val rateLimiter = Semaphore(replicationSettings.readersPerShard) val sequencer = TranslogSequencer(scope, replicationMetadata, followerShardId, leaderAlias, leaderShardId.indexName, - TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats) + TaskId(clusterService.nodeName, id), client, indexShard.localCheckpoint, followerClusterStats, replicationSettings.writersPerShard) val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint @@ -255,7 +255,6 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: followerClusterStats.stats[followerShardId]!!.opsReadFailures.addAndGet(1) logInfo("Unable to get changes from seqNo: $fromSeqNo. ${e.stackTraceToString()}") changeTracker.updateBatchFetched(false, fromSeqNo, toSeqNo, fromSeqNo - 1,-1) - // Propagate 4xx exceptions up the chain and halt replication as they are irrecoverable val range4xx = 400.rangeTo(499) if (e is OpenSearchException && diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt index 38b625bf..fc14d0b5 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/TranslogSequencer.kt @@ -21,11 +21,14 @@ import org.opensearch.replication.util.suspendExecuteWithRetries import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ObsoleteCoroutinesApi -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.actor import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore import org.opensearch.client.Client +import org.opensearch.OpenSearchException +import org.opensearch.action.support.TransportActions import org.opensearch.common.logging.Loggers +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.translog.Translog import org.opensearch.replication.util.indicesService @@ -33,6 +36,8 @@ import org.opensearch.tasks.TaskId import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.TimeUnit +import org.opensearch.rest.RestStatus + /** * A TranslogSequencer allows multiple producers of [Translog.Operation]s to write them in sequence number order to an @@ -50,7 +55,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: private val followerShardId: ShardId, private val leaderAlias: String, private val leaderIndexName: String, private val parentTaskId: TaskId, private val client: Client, initialSeqNo: Long, - private val followerClusterStats: FollowerClusterStats) { + private val followerClusterStats: FollowerClusterStats, writersPerShard : Int) { private val unAppliedChanges = ConcurrentHashMap() private val log = Loggers.getLogger(javaClass, followerShardId)!! @@ -59,11 +64,14 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: val followerIndexService = indicesService.indexServiceSafe(followerShardId.index) val indexShard = followerIndexService.getShard(followerShardId.id) - private val sequencer = scope.actor(capacity = Channel.UNLIMITED) { + private val sequencer = scope.actor(capacity = 0) { + // Exceptions thrown here will mark the channel as failed and the next attempt to send to the channel will // raise the same exception. See [SendChannel.close] method for details. + val rateLimiter = Semaphore(writersPerShard) var highWatermark = initialSeqNo for (m in channel) { + rateLimiter.acquire() while (unAppliedChanges.containsKey(highWatermark + 1)) { val next = unAppliedChanges.remove(highWatermark + 1)!! val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes, @@ -73,26 +81,55 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: var relativeStartNanos = System.nanoTime() val retryOnExceptions = ArrayList>() retryOnExceptions.add(MappingNotAvailableException::class.java) + var tryReplay = true + try { + while (tryReplay) { + tryReplay = false + try { + val replayResponse = client.suspendExecuteWithRetries( + replicationMetadata, + ReplayChangesAction.INSTANCE, + replayRequest, + log = log, + retryOn = retryOnExceptions + ) + if (replayResponse.shardInfo.failed > 0) { + replayResponse.shardInfo.failures.forEachIndexed { i, failure -> + log.error("Failed replaying changes. Failure:$i:$failure}") + } + followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet( + replayResponse.shardInfo.failed.toLong() + ) + throw ReplicationException( + "failed to replay changes", + replayResponse.shardInfo.failures + ) + } - val replayResponse = client.suspendExecuteWithRetries( - replicationMetadata, - ReplayChangesAction.INSTANCE, - replayRequest, - log = log, - retryOn = retryOnExceptions - ) - if (replayResponse.shardInfo.failed > 0) { - replayResponse.shardInfo.failures.forEachIndexed { i, failure -> - log.error("Failed replaying changes. Failure:$i:$failure}") + val tookInNanos = System.nanoTime() - relativeStartNanos + followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet( + TimeUnit.NANOSECONDS.toMillis(tookInNanos) + ) + followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet( + replayRequest.changes.size.toLong() + ) + } catch (e: OpenSearchException) { + if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass) + || TransportActions.isShardNotAvailableException(e) + // This waits for the dependencies to load and retry. Helps during boot-up + || e.status().status >= 500 + || e.status() == RestStatus.TOO_MANY_REQUESTS)) { + tryReplay = true + } + else { + log.error("Got non-retriable Exception:${e.message} with status:${e.status()}") + throw e + } + } } - followerClusterStats.stats[followerShardId]!!.opsWriteFailures.addAndGet(replayResponse.shardInfo.failed.toLong()) - throw ReplicationException("failed to replay changes", replayResponse.shardInfo.failures) + } finally { + rateLimiter.release() } - - val tookInNanos = System.nanoTime() - relativeStartNanos - followerClusterStats.stats[followerShardId]!!.totalWriteTime.addAndGet(TimeUnit.NANOSECONDS.toMillis(tookInNanos)) - followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(replayRequest.changes.size.toLong()) - followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint } highWatermark = next.changes.lastOrNull()?.seqNo() ?: highWatermark } @@ -105,6 +142,7 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata: completed.await() } + suspend fun send(changes : GetChangesResponse) { unAppliedChanges[changes.fromSeqNo] = changes sequencer.send(Unit) diff --git a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt index 96749f7b..643cc010 100644 --- a/src/main/kotlin/org/opensearch/replication/util/Extensions.kt +++ b/src/main/kotlin/org/opensearch/replication/util/Extensions.kt @@ -29,6 +29,7 @@ import org.opensearch.action.index.IndexResponse import org.opensearch.action.support.TransportActions import org.opensearch.client.Client import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException import org.opensearch.index.IndexNotFoundException import org.opensearch.index.shard.ShardId import org.opensearch.index.store.Store @@ -43,6 +44,7 @@ import org.opensearch.transport.NodeDisconnectedException import org.opensearch.transport.NodeNotConnectedException import java.io.PrintWriter import java.io.StringWriter +import java.lang.Exception /* * Extension function to use the store object @@ -110,7 +112,8 @@ suspend fun Client.suspendExecuteWith defaultContext: Boolean = false): Resp { var currentBackoff = backoff retryOn.addAll(defaultRetryableExceptions()) - repeat(numberOfRetries - 1) { + var retryException: Exception + repeat(numberOfRetries - 1) { index -> try { return suspendExecute(replicationMetadata, action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) @@ -122,19 +125,29 @@ suspend fun Client.suspendExecuteWith // This waits for the dependencies to load and retry. Helps during boot-up || e.status().status >= 500 || e.status() == RestStatus.TOO_MANY_REQUESTS)) { - log.warn("Encountered a failure while executing in $req. Retrying in ${currentBackoff/1000} seconds" + - ".", e) - delay(currentBackoff) - currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut) + retryException = e; } else { throw e } + } catch (e: OpenSearchRejectedExecutionException) { + if(index < numberOfRetries-2) { + retryException = e; + } + else { + throw ReplicationException(e, RestStatus.TOO_MANY_REQUESTS) + } } + log.warn( + "Encountered a failure while executing in $req. Retrying in ${currentBackoff / 1000} seconds" + + ".", retryException + ) + delay(currentBackoff) + currentBackoff = (currentBackoff * factor).toLong().coerceAtMost(maxTimeOut) + } return suspendExecute(replicationMetadata, action, req, injectSecurityContext = injectSecurityContext, defaultContext = defaultContext) // last attempt } - /** * Restore shard from leader cluster with retries. * Only specified error are retried diff --git a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt index ac377687..fe6ad1c8 100644 --- a/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt +++ b/src/test/kotlin/org/opensearch/replication/task/shard/TranslogSequencerTests.kt @@ -94,7 +94,7 @@ class TranslogSequencerTests : OpenSearchTestCase() { Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService) Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard) val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID, - client, startSeqNo, stats) + client, startSeqNo, stats, 2) // Send requests out of order (shuffled seqNo) and await for them to be processed. var batchSeqNo = startSeqNo