Skip to content

Commit

Permalink
Adding support to fetch changes from Lucene store while migrating fro…
Browse files Browse the repository at this point in the history
…m/to r… (#1369)

Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
astute-decipher committed Apr 23, 2024
1 parent 90fcb8b commit ff42012
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 15 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ testClusters {
testDistribution = "ARCHIVE"
}
int debugPort = 5005
//adding it to test migration
systemProperty('opensearch.experimental.feature.remote_store.migration.enabled','true')

if (_numNodes > 1) numberOfNodes = _numNodes
//numberOfNodes = 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
// we fetch the operations directly from lucene in such cases.
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) {
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
Expand All @@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo)

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
Expand Down Expand Up @@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating))
}
}
}

private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long {
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
// lastKnownGlobalCheckpoint in such cases.
return if (isRemoteStoreEnabled) {
return if (isRemoteEnabledOrMigrating) {
indexShard.lastKnownGlobalCheckpoint
} else {
indexShard.lastSyncedGlobalCheckpoint
Expand Down Expand Up @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
// Random active shards
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt()
else shardIt.activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.core.common.Strings
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
import org.opensearch.common.settings.Settings
import org.opensearch.core.common.Strings
import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import org.opensearch.cluster.service.ClusterService
import org.opensearch.node.Node
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
import org.opensearch.node.remotestore.RemoteStoreNodeService
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
import org.opensearch.replication.action.changes.TransportGetChangesAction
import java.io.UnsupportedEncodingException
import java.nio.file.Files
import java.nio.file.Path
import java.util.Locale
Expand Down Expand Up @@ -161,4 +161,8 @@ object ValidationUtil {
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
}

fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean {
return isRemoteStoreEnabledCluster(clusterService) ||
clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ import java.nio.file.Files
import java.util.*
import java.util.concurrent.TimeUnit
import org.opensearch.bootstrap.BootstrapInfo
import org.opensearch.cluster.service.ClusterService
import org.opensearch.index.mapper.Mapping
import org.opensearch.indices.replication.common.ReplicationType
import org.opensearch.replication.util.ValidationUtil


@MultiClusterAnnotations.ClusterConfigurations(
Expand Down Expand Up @@ -1255,6 +1258,62 @@ class StartReplicationIT: MultiClusterRestTestCase() {
)
}

fun `test operations are fetched from lucene when leader is in mixed mode`() {

val leaderClient = getClientForCluster(LEADER)
val followerClient = getClientForCluster(FOLLOWER)

// create index on leader cluster
val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build()
val createIndexResponse = leaderClient.indices().create(
CreateIndexRequest(leaderIndexName).settings(settings),
RequestOptions.DEFAULT
)
assertThat(createIndexResponse.isAcknowledged).isTrue()

// Update leader cluster settings to enable mixed mode and set migration direction to remote_store
val leaderClusterUpdateSettingsRequest = Request("PUT", "_cluster/settings")
val entityAsString = """
{
"persistent": {
"remote_store.compatibility_mode": "mixed",
"migration.direction" : "remote_store"
}
}""".trimMargin()

leaderClusterUpdateSettingsRequest.entity = StringEntity(entityAsString,ContentType.APPLICATION_JSON)
val updateSettingResponse = leaderClient.lowLevelClient.performRequest(leaderClusterUpdateSettingsRequest)
assertEquals(HttpStatus.SC_OK.toLong(), updateSettingResponse.statusLine.statusCode.toLong())

//create connection and start replication
createConnectionBetweenClusters(FOLLOWER, LEADER)

followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
)

//Index documents on leader index
val docCount = 50
for (i in 1..docCount) {
val sourceMap = mapOf("name" to randomAlphaOfLength(5))
leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
}

// Verify that all the documents are replicated to follower index and are fetched from lucene
assertBusy({
val stats = leaderClient.leaderStats()
assertThat(stats.size).isEqualTo(9)
assertThat(stats.getValue("num_replicated_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("operations_read").toString()).isEqualTo(docCount.toString())
assertThat(stats.getValue("operations_read_lucene").toString()).isEqualTo(docCount.toString())
assertThat(stats.getValue("operations_read_translog").toString()).isEqualTo("0")
assertThat(stats.containsKey("index_stats"))
}, 60L, TimeUnit.SECONDS)
}

private fun excludeAllClusterNodes(clusterName: String) {
val transientSettingsRequest = Request("PUT", "_cluster/settings")
Expand Down

0 comments on commit ff42012

Please sign in to comment.