diff --git a/build.gradle b/build.gradle index c6658dd5..7563e849 100644 --- a/build.gradle +++ b/build.gradle @@ -380,6 +380,8 @@ testClusters { testDistribution = "ARCHIVE" } int debugPort = 5005 + //adding it to test migration + systemProperty('opensearch.experimental.feature.remote_store.migration.enabled','true') if (_numNodes > 1) numberOfNodes = _numNodes //numberOfNodes = 3 diff --git a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt index 792ebe27..3790c3c0 100644 --- a/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt +++ b/src/main/kotlin/org/opensearch/replication/ReplicationPlugin.kt @@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin, override fun getCustomTranslogDeletionPolicyFactory(): Optional { // We don't need a retention lease translog deletion policy for remote store enabled clusters as // we fetch the operations directly from lucene in such cases. - return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) { + return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) { Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier -> ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier) }) diff --git a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt index 392555b5..4e364906 100644 --- a/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/changes/TransportGetChangesAction.kt @@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.lastFetchTime.set(relativeStartNanos) val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id) - val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService) - if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) { + val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService) + if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) { // There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If // the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller // should catch and start a new poll. @@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus // At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced // to the translog, which means we can't return those changes. Return to the caller to retry. // TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog - if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) { - assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" } + if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) { + assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" } throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...") } } relativeStartNanos = System.nanoTime() // At this point lastSyncedGlobalCheckpoint is at least fromSeqNo - val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo) + val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo) var ops: List = listOf() - var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false + var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false if(fetchFromTranslog) { try { ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo) @@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus indexMetric.ops.addAndGet(ops.size.toLong()) ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) } - GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) + GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) } } } - private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long { + private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long { // We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store // enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to // lastKnownGlobalCheckpoint in such cases. - return if (isRemoteStoreEnabled) { + return if (isRemoteEnabledOrMigrating) { indexShard.lastKnownGlobalCheckpoint } else { indexShard.lastSyncedGlobalCheckpoint @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator { val shardIt = state.routingTable().shardRoutingTable(request.request().shardId) // Random active shards - return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt() + return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt() else shardIt.activeInitializingShardsRandomIt() } } \ No newline at end of file diff --git a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt index afbcdf60..91d012c2 100644 --- a/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt +++ b/src/main/kotlin/org/opensearch/replication/util/ValidationUtil.kt @@ -17,18 +17,18 @@ import org.opensearch.Version import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.MetadataCreateIndexService -import org.opensearch.core.common.Strings +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.ValidationException import org.opensearch.common.settings.Settings +import org.opensearch.core.common.Strings import org.opensearch.env.Environment import org.opensearch.index.IndexNotFoundException -import java.io.UnsupportedEncodingException -import org.opensearch.cluster.service.ClusterService import org.opensearch.node.Node import org.opensearch.node.remotestore.RemoteStoreNodeAttribute +import org.opensearch.node.remotestore.RemoteStoreNodeService import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING -import org.opensearch.replication.action.changes.TransportGetChangesAction +import java.io.UnsupportedEncodingException import java.nio.file.Files import java.nio.file.Path import java.util.Locale @@ -161,4 +161,8 @@ object ValidationUtil { return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false } + fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean { + return isRemoteStoreEnabledCluster(clusterService) || + clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED) + } } diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 0fa3c5ab..a0238b93 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -79,7 +79,10 @@ import java.nio.file.Files import java.util.* import java.util.concurrent.TimeUnit import org.opensearch.bootstrap.BootstrapInfo +import org.opensearch.cluster.service.ClusterService import org.opensearch.index.mapper.Mapping +import org.opensearch.indices.replication.common.ReplicationType +import org.opensearch.replication.util.ValidationUtil @MultiClusterAnnotations.ClusterConfigurations( @@ -1255,6 +1258,62 @@ class StartReplicationIT: MultiClusterRestTestCase() { ) } + fun `test operations are fetched from lucene when leader is in mixed mode`() { + + val leaderClient = getClientForCluster(LEADER) + val followerClient = getClientForCluster(FOLLOWER) + + // create index on leader cluster + val settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build() + val createIndexResponse = leaderClient.indices().create( + CreateIndexRequest(leaderIndexName).settings(settings), + RequestOptions.DEFAULT + ) + assertThat(createIndexResponse.isAcknowledged).isTrue() + + // Update leader cluster settings to enable mixed mode and set migration direction to remote_store + val leaderClusterUpdateSettingsRequest = Request("PUT", "_cluster/settings") + val entityAsString = """ + { + "persistent": { + "remote_store.compatibility_mode": "mixed", + "migration.direction" : "remote_store" + } + }""".trimMargin() + + leaderClusterUpdateSettingsRequest.entity = StringEntity(entityAsString,ContentType.APPLICATION_JSON) + val updateSettingResponse = leaderClient.lowLevelClient.performRequest(leaderClusterUpdateSettingsRequest) + assertEquals(HttpStatus.SC_OK.toLong(), updateSettingResponse.statusLine.statusCode.toLong()) + + //create connection and start replication + createConnectionBetweenClusters(FOLLOWER, LEADER) + + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true + ) + + //Index documents on leader index + val docCount = 50 + for (i in 1..docCount) { + val sourceMap = mapOf("name" to randomAlphaOfLength(5)) + leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT) + } + + // Verify that all the documents are replicated to follower index and are fetched from lucene + assertBusy({ + val stats = leaderClient.leaderStats() + assertThat(stats.size).isEqualTo(9) + assertThat(stats.getValue("num_replicated_indices").toString()).isEqualTo("1") + assertThat(stats.getValue("operations_read").toString()).isEqualTo(docCount.toString()) + assertThat(stats.getValue("operations_read_lucene").toString()).isEqualTo(docCount.toString()) + assertThat(stats.getValue("operations_read_translog").toString()).isEqualTo("0") + assertThat(stats.containsKey("index_stats")) + }, 60L, TimeUnit.SECONDS) + } private fun excludeAllClusterNodes(clusterName: String) { val transientSettingsRequest = Request("PUT", "_cluster/settings")