diff --git a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt index 7adfc8aa..acc08e43 100644 --- a/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt +++ b/src/main/kotlin/org/opensearch/replication/repository/RemoteClusterRestoreLeaderService.kt @@ -114,8 +114,8 @@ class RemoteClusterRestoreLeaderService @Inject constructor(private val indicesS var fromSeqNo = RetentionLeaseActions.RETAIN_ALL // Adds the retention lease for fromSeqNo for the next stage of the replication. - retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo, - request.followerShardId, RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC) + retentionLeaseHelper.addRetentionLease(request.leaderShardId, fromSeqNo, request.followerShardId, + RemoteClusterRepository.REMOTE_CLUSTER_REPO_REQ_TIMEOUT_IN_MILLI_SEC) /** * At this point, it should be safe to release retention lock as the retention lease diff --git a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt index e755a7be..29116c51 100644 --- a/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt +++ b/src/main/kotlin/org/opensearch/replication/seqno/RemoteClusterRetentionLeaseHelper.kt @@ -22,6 +22,7 @@ import org.opensearch.index.seqno.RetentionLeaseActions import org.opensearch.index.seqno.RetentionLeaseAlreadyExistsException import org.opensearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException import org.opensearch.index.seqno.RetentionLeaseNotFoundException +import org.opensearch.index.shard.IndexShard import org.opensearch.index.shard.ShardId import org.opensearch.replication.metadata.store.ReplicationMetadata import org.opensearch.replication.repository.RemoteClusterRepository @@ -175,22 +176,47 @@ class RemoteClusterRetentionLeaseHelper constructor(var followerClusterNameWithU } } + public fun attemptRetentionLeaseRemoval(leaderShardId: ShardId, followerShardId: ShardId, timeout: Long) { + val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) + val request = RetentionLeaseActions.RemoveRequest(leaderShardId, retentionLeaseId) + try { + client.execute(RetentionLeaseActions.Remove.INSTANCE, request).actionGet(timeout) + log.info("Removed retention lease with id - $retentionLeaseId") + } catch(e: RetentionLeaseNotFoundException) { + // log error and bail + log.error(e.stackTraceToString()) + } catch (e: Exception) { + // We are not bubbling up the exception as the stop action/ task cleanup should succeed + // even if we fail to remove the retention lease from leader cluster + log.error("Exception in removing retention lease", e) + } + } + /** * Remove these once the callers are moved to above APIs */ public fun addRetentionLease(leaderShardId: ShardId, seqNo: Long, - followerShardId: ShardId, timeout: Long) { + followerShardId: ShardId, timeout: Long) { val retentionLeaseId = retentionLeaseIdForShard(followerClusterNameWithUUID, followerShardId) val request = RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, seqNo, retentionLeaseSource) - try { - client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout) - } catch (e: RetentionLeaseAlreadyExistsException) { - log.error(e.stackTraceToString()) - log.info("Renew retention lease as it already exists $retentionLeaseId with $seqNo") - // Only one retention lease should exists for the follower shard - // Ideally, this should have got cleaned-up - renewRetentionLease(leaderShardId, seqNo, followerShardId, timeout) + var canRetry = true + while (true) { + try { + log.info("Adding retention lease $retentionLeaseId") + client.execute(RetentionLeaseActions.Add.INSTANCE, request).actionGet(timeout) + break + } catch (e: RetentionLeaseAlreadyExistsException) { + log.info("Found a stale retention lease $retentionLeaseId on leader.") + if (canRetry) { + canRetry = false + attemptRetentionLeaseRemoval(leaderShardId, followerShardId, timeout) + log.info("Cleared stale retention lease $retentionLeaseId on leader. Retrying...") + } else { + log.error(e.stackTraceToString()) + throw e + } + } } } diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 2796eb43..4d2537ad 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -290,7 +290,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript private suspend fun pollShardTaskStatus(): IndexReplicationState { val failedShardTasks = findAllReplicationFailedShardTasks(followerIndexName, clusterService.state()) if (failedShardTasks.isNotEmpty()) { - log.info("Failed shard tasks - ", failedShardTasks) + log.info("Failed shard tasks - $failedShardTasks") var msg = "" for ((shard, task) in failedShardTasks) { val taskState = task.state diff --git a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt index 7b58d419..f08b2c6b 100644 --- a/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/shard/ShardReplicationTask.kt @@ -218,6 +218,10 @@ class ShardReplicationTask(id: Long, type: String, action: String, description: val changeTracker = ShardReplicationChangesTracker(indexShard, replicationSettings) followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint + // In case the shard task starts on a new node and there are no active writes on the leader shard, leader checkpoint + // never gets initialized and defaults to 0. To get around this, we set the leaderCheckpoint to follower shard's + // localCheckpoint as the leader shard is guaranteed to equal or more. + followerClusterStats.stats[followerShardId]!!.leaderCheckpoint = indexShard.localCheckpoint coroutineScope { while (isActive) { rateLimiter.acquire() diff --git a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt index 0e3e3208..dc7b11c0 100644 --- a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt @@ -11,7 +11,6 @@ package org.opensearch.replication -import com.nhaarman.mockitokotlin2.stub import org.opensearch.replication.MultiClusterAnnotations.ClusterConfiguration import org.opensearch.replication.MultiClusterAnnotations.ClusterConfigurations import org.opensearch.replication.MultiClusterAnnotations.getAnnotationsFromClass @@ -21,6 +20,7 @@ import org.apache.http.HttpHost import org.apache.http.HttpStatus import org.apache.http.client.config.RequestConfig import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity import org.apache.http.impl.nio.client.HttpAsyncClientBuilder import org.apache.http.message.BasicHeader import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy @@ -512,6 +512,28 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() { return OpenSearchRestTestCase.entityAsList(client.performRequest(Request("GET", endpoint))) } + protected fun deleteConnection(fromClusterName: String, connectionName: String="source") { + val fromCluster = getNamedCluster(fromClusterName) + val persistentConnectionRequest = Request("PUT", "_cluster/settings") + + val entityAsString = """ + { + "persistent": { + "cluster": { + "remote": { + "$connectionName": { + "seeds": null + } + } + } + } + }""".trimMargin() + + persistentConnectionRequest.entity = StringEntity(entityAsString, ContentType.APPLICATION_JSON) + val persistentConnectionResponse = fromCluster.lowLevelClient.performRequest(persistentConnectionRequest) + assertEquals(HttpStatus.SC_OK.toLong(), persistentConnectionResponse.statusLine.statusCode.toLong()) + } + protected fun createConnectionBetweenClusters(fromClusterName: String, toClusterName: String, connectionName: String="source") { val toCluster = getNamedCluster(toClusterName) val fromCluster = getNamedCluster(fromClusterName) @@ -635,12 +657,22 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() { followerClient.cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT) } -// TODO Find a way to skip tests when tests are run for remote clusters + // TODO Find a way to skip tests when tests are run for remote clusters protected fun checkifIntegTestRemote(): Boolean { val systemProperties = BootstrapInfo.getSystemProperties() val integTestRemote = systemProperties.get("tests.integTestRemote") as String? return integTestRemote.equals("true") } + protected fun docCount(cluster: RestHighLevelClient, indexName: String) : Int { + val persistentConnectionRequest = Request("GET", "/$indexName/_search?pretty&q=*") + + val persistentConnectionResponse = cluster.lowLevelClient.performRequest(persistentConnectionRequest) + val statusResponse: Map>> = OpenSearchRestTestCase.entityAsMap(persistentConnectionResponse) as Map>> + return statusResponse["hits"]?.get("total")?.get("value") as Int + } + protected fun deleteIndex(testCluster: RestHighLevelClient, indexName: String) { + testCluster.lowLevelClient.performRequest(Request("DELETE", indexName)) + } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStopThenRestartIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStopThenRestartIT.kt new file mode 100644 index 00000000..33ebb790 --- /dev/null +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/ReplicationStopThenRestartIT.kt @@ -0,0 +1,62 @@ +package org.opensearch.replication.integ.rest + +import org.opensearch.replication.MultiClusterRestTestCase +import org.opensearch.replication.MultiClusterAnnotations +import org.opensearch.replication.StartReplicationRequest +import org.opensearch.replication.startReplication +import org.opensearch.replication.stopReplication +import org.assertj.core.api.Assertions +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.CreateIndexRequest +import org.junit.Assert +import java.util.concurrent.TimeUnit + + +@MultiClusterAnnotations.ClusterConfigurations( + MultiClusterAnnotations.ClusterConfiguration(clusterName = LEADER), + MultiClusterAnnotations.ClusterConfiguration(clusterName = FOLLOWER) +) + +class ReplicationStopThenRestartIT : MultiClusterRestTestCase() { + private val leaderIndexName = "leader_index" + private val followerIndexName = "follower_index" + + fun `test replication works after unclean stop and start`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + changeTemplate(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER) + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + Assertions.assertThat(createIndexResponse.isAcknowledged).isTrue() + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + insertDocToIndex(LEADER, "1", "dummy data 1",leaderIndexName) + insertDocToIndex(LEADER, "2", "dummy data 1",leaderIndexName) + + assertBusy ({ + try { + Assert.assertEquals(2, docCount(followerClient, followerIndexName)) + } catch (ex: Exception) { + ex.printStackTrace(); + Assert.fail("Exception while querying follower cluster. Failing to retry again {}") + } + }, 1, TimeUnit.MINUTES) + + + deleteConnection(FOLLOWER) + followerClient.stopReplication(followerIndexName, shouldWait = true) + deleteIndex(followerClient, followerIndexName) + + createConnectionBetweenClusters(FOLLOWER, LEADER) + insertDocToIndex(LEADER, "3", "dummy data 1",leaderIndexName) + insertDocToIndex(LEADER, "4", "dummy data 1",leaderIndexName) + followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName)) + + assertBusy ({ + try { + Assert.assertEquals(4, docCount(followerClient, followerIndexName)) + } catch (ex: Exception) { + Assert.fail("Exception while querying follower cluster. Failing to retry again") + } + }, 1, TimeUnit.MINUTES) + } +}