Skip to content

Commit

Permalink
adding missing checkpoint and correcting follower stats test case
Browse files Browse the repository at this point in the history
Signed-off-by: sricharanvuppu <[email protected]>
  • Loading branch information
sricharanvuppu committed Sep 26, 2023
1 parent 453bd02 commit 7aa6fde
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
val rateLimiter = Semaphore(writersPerShard)
var highWatermark = initialSeqNo
for (m in channel) {
rateLimiter.acquire()
while (unAppliedChanges.containsKey(highWatermark + 1)) {
val next = unAppliedChanges.remove(highWatermark + 1)!!
val replayRequest = ReplayChangesRequest(followerShardId, next.changes, next.maxSeqNoOfUpdatesOrDeletes,
leaderAlias, leaderIndexName)
leaderAlias, leaderIndexName)
replayRequest.parentTask = parentTaskId
rateLimiter.acquire()
launch {
var relativeStartNanos = System.nanoTime()
val retryOnExceptions = ArrayList<Class<*>>()
Expand Down Expand Up @@ -113,15 +113,16 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
followerClusterStats.stats[followerShardId]!!.opsWritten.addAndGet(
replayRequest.changes.size.toLong()
)
followerClusterStats.stats[followerShardId]!!.followerCheckpoint = indexShard.localCheckpoint
} catch (e: OpenSearchException) {
if (e !is IndexNotFoundException && (retryOnExceptions.contains(e.javaClass)
|| TransportActions.isShardNotAvailableException(e)
// This waits for the dependencies to load and retry. Helps during boot-up
|| e.status().status >= 500
|| e.status() == RestStatus.TOO_MANY_REQUESTS)) {
tryReplay = true
}
else {
tryReplay = true
}
else {
log.error("Got non-retriable Exception:${e.message} with status:${e.status()}")
throw e
}
Expand All @@ -147,4 +148,4 @@ class TranslogSequencer(scope: CoroutineScope, private val replicationMetadata:
unAppliedChanges[changes.fromSeqNo] = changes
sequencer.send(Unit)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -865,49 +865,48 @@ class StartReplicationIT: MultiClusterRestTestCase() {
}, 60L, TimeUnit.SECONDS)
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/cross-cluster-replication/issues/176")
fun `test follower stats`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
val leaderIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader"
val followerIndexName2 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower"
val leaderIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"leader"
val followerIndexName3 = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)+"follower"
// val followerIndex2 = "follower_index_2"
// val followerIndex3 = "follower_index_3"
createConnectionBetweenClusters(FOLLOWER, LEADER)
val createIndexResponse = leaderClient.indices().create(
CreateIndexRequest(leaderIndexName),
RequestOptions.DEFAULT
CreateIndexRequest(leaderIndexName),
RequestOptions.DEFAULT
)
assertThat(createIndexResponse.isAcknowledged).isTrue()
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
)
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName2, followerIndexName2),
TimeValue.timeValueSeconds(10),
true
StartReplicationRequest("source", leaderIndexName, followerIndexName2),
TimeValue.timeValueSeconds(10),
true
)
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName3, followerIndexName3),
TimeValue.timeValueSeconds(10),
true
StartReplicationRequest("source", leaderIndexName, followerIndexName3),
TimeValue.timeValueSeconds(10),
true
)
val docCount = 50
for (i in 1..docCount) {
val sourceMap = mapOf("name" to randomAlphaOfLength(5))
leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
}
followerClient.pauseReplication(followerIndexName2)
val stats = followerClient.followerStats()
followerClient.stopReplication(followerIndexName3)
var stats = followerClient.followerStats()
assertThat(stats.getValue("num_syncing_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_paused_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("num_failed_indices").toString()).isEqualTo("0")
assertThat(stats.getValue("num_shard_tasks").toString()).isEqualTo("1")
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
assertBusy({
stats = followerClient.followerStats()
assertThat(stats.getValue("operations_written").toString()).isEqualTo("50")
}, 60, TimeUnit.SECONDS)
assertThat(stats.getValue("operations_read").toString()).isEqualTo("50")
assertThat(stats.getValue("failed_read_requests").toString()).isEqualTo("0")
assertThat(stats.getValue("failed_write_requests").toString()).isEqualTo("0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@

package org.opensearch.replication.task.shard

import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.test.runBlockingTest
Expand All @@ -27,21 +35,13 @@ import org.opensearch.index.shard.IndexShard
import org.opensearch.index.shard.ShardId
import org.opensearch.index.translog.Translog
import org.opensearch.indices.IndicesService
import org.opensearch.replication.action.changes.GetChangesResponse
import org.opensearch.replication.action.replay.ReplayChangesAction
import org.opensearch.replication.action.replay.ReplayChangesRequest
import org.opensearch.replication.action.replay.ReplayChangesResponse
import org.opensearch.replication.metadata.ReplicationOverallState
import org.opensearch.replication.metadata.store.ReplicationContext
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.metadata.store.ReplicationStoreMetadataType
import org.opensearch.replication.util.indicesService
import org.opensearch.tasks.TaskId.EMPTY_TASK_ID
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.OpenSearchTestCase.randomList
import org.opensearch.test.client.NoOpClient
import java.util.Locale


@ObsoleteCoroutinesApi
class TranslogSequencerTests : OpenSearchTestCase() {

Expand Down Expand Up @@ -72,7 +72,7 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val leaderIndex = "leaderIndex"
val followerShardId = ShardId("follower", "follower_uuid", 0)
val replicationMetadata = ReplicationMetadata(leaderAlias, ReplicationStoreMetadataType.INDEX.name, ReplicationOverallState.RUNNING.name, "test user",
ReplicationContext(followerShardId.indexName, null), ReplicationContext(leaderIndex, null), Settings.EMPTY)
ReplicationContext(followerShardId.indexName, null), ReplicationContext(leaderIndex, null), Settings.EMPTY)
val client = RequestCapturingClient()
init {
closeAfterSuite(client)
Expand All @@ -94,7 +94,7 @@ class TranslogSequencerTests : OpenSearchTestCase() {
Mockito.`when`(indicesService.indexServiceSafe(followerShardId.index)).thenReturn(followerIndexService)
Mockito.`when`(followerIndexService.getShard(followerShardId.id)).thenReturn(indexShard)
val sequencer = TranslogSequencer(this, replicationMetadata, followerShardId, leaderAlias, leaderIndex, EMPTY_TASK_ID,
client, startSeqNo, stats, 2)
client, startSeqNo, stats, 2)

// Send requests out of order (shuffled seqNo) and await for them to be processed.
var batchSeqNo = startSeqNo
Expand All @@ -120,7 +120,7 @@ class TranslogSequencerTests : OpenSearchTestCase() {
val changes = randomList(1, randomIntBetween(1, 512)) {
seqNo = seqNo.inc()
Translog.Index(randomAlphaOfLength(10).toLowerCase(Locale.ROOT), seqNo,
1L, "{}".toByteArray(Charsets.UTF_8))
1L, "{}".toByteArray(Charsets.UTF_8))
}
return Pair(GetChangesResponse(changes, startSeqNo.inc(), startSeqNo, -1), seqNo)
}
Expand Down

0 comments on commit 7aa6fde

Please sign in to comment.