Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support to fetch changes from Lucene store while migrating from/to r… #1369

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ testClusters {
testDistribution = "ARCHIVE"
}
int debugPort = 5005
//adding it to test migration
systemProperty('opensearch.experimental.feature.remote_store.migration.enabled','true')

if (_numNodes > 1) numberOfNodes = _numNodes
//numberOfNodes = 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
// we fetch the operations directly from lucene in such cases.
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) {
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
Expand All @@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo)

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
Expand Down Expand Up @@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating))
}
}
}

private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long {
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
// lastKnownGlobalCheckpoint in such cases.
return if (isRemoteStoreEnabled) {
return if (isRemoteEnabledOrMigrating) {
indexShard.lastKnownGlobalCheckpoint
} else {
indexShard.lastSyncedGlobalCheckpoint
Expand Down Expand Up @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
// Random active shards
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt()
else shardIt.activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.core.common.Strings
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
import org.opensearch.common.settings.Settings
import org.opensearch.core.common.Strings
import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import org.opensearch.cluster.service.ClusterService
import org.opensearch.node.Node
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
import org.opensearch.node.remotestore.RemoteStoreNodeService
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
import org.opensearch.replication.action.changes.TransportGetChangesAction
import java.io.UnsupportedEncodingException
import java.nio.file.Files
import java.nio.file.Path
import java.util.Locale
Expand Down Expand Up @@ -161,4 +161,8 @@ object ValidationUtil {
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
}

fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean {
return isRemoteStoreEnabledCluster(clusterService) ||
clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ import java.nio.file.Files
import java.util.*
import java.util.concurrent.TimeUnit
import org.opensearch.bootstrap.BootstrapInfo
import org.opensearch.cluster.service.ClusterService
import org.opensearch.index.mapper.Mapping
import org.opensearch.indices.replication.common.ReplicationType
import org.opensearch.replication.util.ValidationUtil


@MultiClusterAnnotations.ClusterConfigurations(
Expand Down Expand Up @@ -1255,6 +1258,62 @@ class StartReplicationIT: MultiClusterRestTestCase() {
)
}

fun `test operations are fetched from lucene when leader is in mixed mode`() {
ankitkala marked this conversation as resolved.
Show resolved Hide resolved

val leaderClient = getClientForCluster(LEADER)
val followerClient = getClientForCluster(FOLLOWER)

// create index on leader cluster
val settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build()
val createIndexResponse = leaderClient.indices().create(
CreateIndexRequest(leaderIndexName).settings(settings),
RequestOptions.DEFAULT
)
assertThat(createIndexResponse.isAcknowledged).isTrue()

// Update leader cluster settings to enable mixed mode and set migration direction to remote_store
val leaderClusterUpdateSettingsRequest = Request("PUT", "_cluster/settings")
val entityAsString = """
{
"persistent": {
"remote_store.compatibility_mode": "mixed",
"migration.direction" : "remote_store"
}
}""".trimMargin()

leaderClusterUpdateSettingsRequest.entity = StringEntity(entityAsString,ContentType.APPLICATION_JSON)
val updateSettingResponse = leaderClient.lowLevelClient.performRequest(leaderClusterUpdateSettingsRequest)
assertEquals(HttpStatus.SC_OK.toLong(), updateSettingResponse.statusLine.statusCode.toLong())

//create connection and start replication
createConnectionBetweenClusters(FOLLOWER, LEADER)

followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
)

//Index documents on leader index
val docCount = 50
for (i in 1..docCount) {
val sourceMap = mapOf("name" to randomAlphaOfLength(5))
leaderClient.index(IndexRequest(leaderIndexName).id(i.toString()).source(sourceMap), RequestOptions.DEFAULT)
}

// Verify that all the documents are replicated to follower index and are fetched from lucene
assertBusy({
val stats = leaderClient.leaderStats()
assertThat(stats.size).isEqualTo(9)
assertThat(stats.getValue("num_replicated_indices").toString()).isEqualTo("1")
assertThat(stats.getValue("operations_read").toString()).isEqualTo(docCount.toString())
assertThat(stats.getValue("operations_read_lucene").toString()).isEqualTo(docCount.toString())
assertThat(stats.getValue("operations_read_translog").toString()).isEqualTo("0")
assertThat(stats.containsKey("index_stats"))
}, 60L, TimeUnit.SECONDS)
}

private fun excludeAllClusterNodes(clusterName: String) {
val transientSettingsRequest = Request("PUT", "_cluster/settings")
Expand Down
Loading