Skip to content

Commit

Permalink
Adding support to fetch changes from Lucene store while migrating fro…
Browse files Browse the repository at this point in the history
…m/to r… (opensearch-project#1369)

Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
astute-decipher authored and Shubh Sahu committed Apr 24, 2024
1 parent d20ef56 commit af33c7e
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 89 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ testClusters {
testDistribution = "ARCHIVE"
}
int debugPort = 5005
//adding it to test migration
systemProperty('opensearch.experimental.feature.remote_store.migration.enabled','true')

if (_numNodes > 1) numberOfNodes = _numNodes
//numberOfNodes = 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ internal class ReplicationPlugin : Plugin(), ActionPlugin, PersistentTaskPlugin,
override fun getCustomTranslogDeletionPolicyFactory(): Optional<TranslogDeletionPolicyFactory> {
// We don't need a retention lease translog deletion policy for remote store enabled clusters as
// we fetch the operations directly from lucene in such cases.
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService) == false) {
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService) == false) {
Optional.of(TranslogDeletionPolicyFactory { indexSettings, retentionLeasesSupplier ->
ReplicationTranslogDeletionPolicy(indexSettings, retentionLeasesSupplier)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.lastFetchTime.set(relativeStartNanos)

val indexShard = indicesService.indexServiceSafe(shardId.index).getShard(shardId.id)
val isRemoteStoreEnabled = ValidationUtil.isRemoteStoreEnabledCluster(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
val isRemoteEnabledOrMigrating = ValidationUtil.isRemoteEnabledOrMigrating(clusterService)
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
Expand All @@ -87,18 +87,18 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
// TODO: Figure out a better way to wait for the global checkpoint to be synced to the translog
if (lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled)}" }
if (lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating) < request.fromSeqNo) {
assert(gcp > lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)) { "Checkpoint didn't advance at all $gcp ${lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating)}" }
throw OpenSearchTimeoutException("global checkpoint not synced. Retry after a few miliseconds...")
}
}

relativeStartNanos = System.nanoTime()
// At this point lastSyncedGlobalCheckpoint is at least fromSeqNo
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled), request.toSeqNo)
val toSeqNo = min(lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating), request.toSeqNo)

var ops: List<Translog.Operation> = listOf()
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteStoreEnabled == false
var fetchFromTranslog = isTranslogPruningByRetentionLeaseEnabled(shardId) && isRemoteEnabledOrMigrating == false
if(fetchFromTranslog) {
try {
ops = translogService.getHistoryOfOperations(indexShard, request.fromSeqNo, toSeqNo)
Expand Down Expand Up @@ -136,16 +136,16 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
indexMetric.ops.addAndGet(ops.size.toLong())

ops.stream().forEach{op -> indexMetric.bytesRead.addAndGet(op.estimateSize()) }
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteStoreEnabled))
GetChangesResponse(ops, request.fromSeqNo, indexShard.maxSeqNoOfUpdatesOrDeletes, lastGlobalCheckpoint(indexShard, isRemoteEnabledOrMigrating))
}
}
}

private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteStoreEnabled: Boolean): Long {
private fun lastGlobalCheckpoint(indexShard: IndexShard, isRemoteEnabledOrMigrating: Boolean): Long {
// We rely on lastSyncedGlobalCheckpoint as it has been durably written to disk. In case of remote store
// enabled clusters, the semantics are slightly different, and we can't use lastSyncedGlobalCheckpoint. Falling back to
// lastKnownGlobalCheckpoint in such cases.
return if (isRemoteStoreEnabled) {
return if (isRemoteEnabledOrMigrating) {
indexShard.lastKnownGlobalCheckpoint
} else {
indexShard.lastSyncedGlobalCheckpoint
Expand Down Expand Up @@ -173,7 +173,7 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
override fun shards(state: ClusterState, request: InternalRequest): ShardsIterator {
val shardIt = state.routingTable().shardRoutingTable(request.request().shardId)
// Random active shards
return if (ValidationUtil.isRemoteStoreEnabledCluster(clusterService)) shardIt.primaryShardIt()
return if (ValidationUtil.isRemoteEnabledOrMigrating(clusterService)) shardIt.primaryShardIt()
else shardIt.activeInitializingShardsRandomIt()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ import org.opensearch.Version
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.MetadataCreateIndexService
import org.opensearch.core.common.Strings
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
import org.opensearch.common.settings.Settings
import org.opensearch.core.common.Strings
import org.opensearch.env.Environment
import org.opensearch.index.IndexNotFoundException
import java.io.UnsupportedEncodingException
import org.opensearch.cluster.service.ClusterService
import org.opensearch.node.Node
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute
import org.opensearch.node.remotestore.RemoteStoreNodeService
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_INDEX_SETTING
import org.opensearch.replication.ReplicationPlugin.Companion.KNN_PLUGIN_PRESENT_SETTING
import org.opensearch.replication.action.changes.TransportGetChangesAction
import java.io.UnsupportedEncodingException
import java.nio.file.Files
import java.nio.file.Path
import java.util.Locale
Expand Down Expand Up @@ -161,4 +161,8 @@ object ValidationUtil {
return clusterService.settings.getByPrefix(Node.NODE_ATTRIBUTES.key + RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX).isEmpty == false
}

fun isRemoteEnabledOrMigrating(clusterService: ClusterService): Boolean {
return isRemoteStoreEnabledCluster(clusterService) ||
clusterService.clusterSettings.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING).equals(RemoteStoreNodeService.CompatibilityMode.MIXED)
}
}
Loading

0 comments on commit af33c7e

Please sign in to comment.