diff --git a/CHANGELOG.md b/CHANGELOG.md
index 93a98de2c5af8..3d15401e4f819 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
+- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java
new file mode 100644
index 0000000000000..46912de17f213
--- /dev/null
+++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java
@@ -0,0 +1,47 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.action.admin.indices.tiering;
+
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.cluster.routing.ShardRouting;
+import org.opensearch.cluster.routing.allocation.RoutingAllocation;
+import org.opensearch.index.IndexModule;
+
+/**
+ * Utility class for tiering operations
+ *
+ * @opensearch.internal
+ */
+public class TieringUtils {
+
+ /**
+ * Checks if the specified shard is a partial shard by
+ * checking the INDEX_STORE_LOCALITY_SETTING for its index.
+ * see {@link #isPartialIndex(IndexMetadata)}
+ * @param shard ShardRouting object representing the shard
+ * @param allocation RoutingAllocation object representing the allocation
+ * @return true if the shard is a partial shard, false otherwise
+ */
+ public static boolean isPartialShard(ShardRouting shard, RoutingAllocation allocation) {
+ IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index());
+ return isPartialIndex(indexMetadata);
+ }
+
+ /**
+ * Checks if the specified index is a partial index by
+ * checking the INDEX_STORE_LOCALITY_SETTING for the index.
+ *
+ * @param indexMetadata the metadata of the index
+ * @return true if the index is a partial index, false otherwise
+ */
+ public static boolean isPartialIndex(final IndexMetadata indexMetadata) {
+ return IndexModule.DataLocalityType.PARTIAL.name()
+ .equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
+ }
+}
diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java
index db10ad61c7d6d..647e993339476 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java
@@ -11,6 +11,9 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
+import org.opensearch.common.util.FeatureFlags;
+
+import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;
/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
@@ -58,6 +61,7 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
- return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
+ return indexMetadata.isRemoteSnapshot()
+ || (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialIndex(indexMetadata)) ? REMOTE_CAPABLE : LOCAL_ONLY;
}
}
diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java
index adb8ee2cf7e85..7f6a7790d1db0 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java
@@ -30,6 +30,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
+import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.PriorityComparator;
import java.util.ArrayList;
@@ -45,6 +46,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
+import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialShard;
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
/**
@@ -552,6 +554,16 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) {
}
}
+ /**
+ * Checks if the shard can be skipped from the local shard balancer operations
+ * @param shardRouting the shard to be checked
+ * @return true if the shard can be skipped, false otherwise
+ */
+ private boolean canShardBeSkipped(ShardRouting shardRouting) {
+ return (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))
+ && !(FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shardRouting, allocation)));
+ }
+
/**
* Move started shards that can not be allocated to a node anymore
*
@@ -603,7 +615,7 @@ void moveShards() {
ShardRouting shardRouting = it.next();
- if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
+ if (canShardBeSkipped(shardRouting)) {
continue;
}
@@ -669,7 +681,7 @@ void moveShards() {
*/
@Override
MoveDecision decideMove(final ShardRouting shardRouting) {
- if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
+ if (canShardBeSkipped(shardRouting)) {
return MoveDecision.NOT_TAKEN;
}
@@ -758,7 +770,9 @@ private Map buildModelFromAssigned()
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
- if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
+ if ((RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation))
+ || (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shard, allocation)))
+ && shard.state() != RELOCATING) {
node.addShard(shard);
++totalShardCount;
if (logger.isTraceEnabled()) {
diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java
index 76f9f44077ad8..493d23b57d271 100644
--- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java
+++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java
@@ -87,6 +87,36 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
return canAllocate(shardRouting, node, allocation);
}
+ @Override
+ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
+ RoutingPool targetPool = RoutingPool.getShardPool(shardRouting, allocation);
+ RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shardRouting.currentNodeId()));
+ if (RoutingPool.REMOTE_CAPABLE.equals(targetPool) && targetPool != currentNodePool) {
+ logger.debug(
+ "Shard: [{}] has current pool: [{}], target pool: [{}]. Cannot remain on node: [{}]",
+ shardRouting.shortSummary(),
+ currentNodePool.name(),
+ RoutingPool.REMOTE_CAPABLE.name(),
+ node.node()
+ );
+ return allocation.decision(
+ Decision.NO,
+ NAME,
+ "Shard %s is allocated on a different pool %s than the target pool %s",
+ shardRouting.shortSummary(),
+ currentNodePool,
+ targetPool
+ );
+ }
+ return allocation.decision(
+ Decision.YES,
+ NAME,
+ "Routing pools are compatible. Shard pool: [%s], node pool: [%s]",
+ currentNodePool,
+ targetPool
+ );
+ }
+
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex());
return canAllocateInTargetPool(indexMetadata, node, allocation);
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java
new file mode 100644
index 0000000000000..8d45ebd2781b1
--- /dev/null
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java
@@ -0,0 +1,128 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.cluster.routing.allocation;
+
+import org.opensearch.Version;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.cluster.routing.RoutingNode;
+import org.opensearch.cluster.routing.RoutingNodes;
+import org.opensearch.cluster.routing.RoutingPool;
+import org.opensearch.cluster.routing.ShardRouting;
+import org.opensearch.common.util.FeatureFlags;
+import org.opensearch.index.IndexModule;
+import org.opensearch.test.FeatureFlagSetter;
+import org.junit.Before;
+
+import static org.opensearch.cluster.routing.RoutingPool.LOCAL_ONLY;
+import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
+import static org.opensearch.cluster.routing.RoutingPool.getIndexPool;
+import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING;
+
+public class ShardsTieringAllocationTests extends TieringAllocationBaseTestCase {
+
+ @Before
+ public void setup() {
+ FeatureFlagSetter.set(FeatureFlags.TIERED_REMOTE_INDEX);
+ }
+
+ public void testShardsInLocalPool() {
+ int localOnlyNodes = 5;
+ int remoteCapableNodes = 3;
+ int localIndices = 5;
+ int remoteIndices = 0;
+ ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
+ AllocationService service = this.createRemoteCapableAllocationService();
+ // assign shards to respective nodes
+ clusterState = allocateShardsAndBalance(clusterState, service);
+ RoutingNodes routingNodes = clusterState.getRoutingNodes();
+ RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
+ assertEquals(0, routingNodes.unassigned().size());
+
+ for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
+ assertFalse(shard.unassigned());
+ RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
+ assertEquals(LOCAL_ONLY, shardPool);
+ }
+ }
+
+ public void testShardsInRemotePool() {
+ int localOnlyNodes = 7;
+ int remoteCapableNodes = 3;
+ int localIndices = 0;
+ int remoteIndices = 13;
+ ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
+ AllocationService service = this.createRemoteCapableAllocationService();
+ // assign shards to respective nodes
+ clusterState = allocateShardsAndBalance(clusterState, service);
+ RoutingNodes routingNodes = clusterState.getRoutingNodes();
+ RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
+ assertEquals(0, routingNodes.unassigned().size());
+
+ for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
+ assertFalse(shard.unassigned());
+ RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
+ assertEquals(REMOTE_CAPABLE, shardPool);
+ }
+ }
+
+ public void testShardsWithTiering() {
+ int localOnlyNodes = 15;
+ int remoteCapableNodes = 13;
+ int localIndices = 10;
+ int remoteIndices = 0;
+ ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
+ AllocationService service = this.createRemoteCapableAllocationService();
+ // assign shards to respective nodes
+ clusterState = allocateShardsAndBalance(clusterState, service);
+ // put indices in the hot to warm tiering state
+ clusterState = updateIndexMetadataForTiering(
+ clusterState,
+ localIndices,
+ IndexModule.TieringState.HOT_TO_WARM.name(),
+ IndexModule.DataLocalityType.PARTIAL.name()
+ );
+ // trigger shard relocation
+ clusterState = allocateShardsAndBalance(clusterState, service);
+ RoutingNodes routingNodes = clusterState.getRoutingNodes();
+ RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
+ assertEquals(0, routingNodes.unassigned().size());
+
+ for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
+ assertFalse(shard.unassigned());
+ RoutingNode node = routingNodes.node(shard.currentNodeId());
+ RoutingPool nodePool = RoutingPool.getNodePool(node);
+ RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
+ assertEquals(RoutingPool.REMOTE_CAPABLE, shardPool);
+ assertEquals(nodePool, shardPool);
+ }
+ }
+
+ public void testShardPoolForPartialIndices() {
+ String index = "test-index";
+ IndexMetadata indexMetadata = IndexMetadata.builder(index)
+ .settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()))
+ .numberOfShards(PRIMARIES)
+ .numberOfReplicas(REPLICAS)
+ .build();
+ RoutingPool indexPool = getIndexPool(indexMetadata);
+ assertEquals(REMOTE_CAPABLE, indexPool);
+ }
+
+ public void testShardPoolForFullIndices() {
+ String index = "test-index";
+ IndexMetadata indexMetadata = IndexMetadata.builder(index)
+ .settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()))
+ .numberOfShards(PRIMARIES)
+ .numberOfReplicas(REPLICAS)
+ .build();
+ RoutingPool indexPool = getIndexPool(indexMetadata);
+ assertEquals(LOCAL_ONLY, indexPool);
+ }
+}
diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java
new file mode 100644
index 0000000000000..aba6fe74e0634
--- /dev/null
+++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java
@@ -0,0 +1,47 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.cluster.routing.allocation;
+
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.cluster.metadata.Metadata;
+import org.opensearch.common.SuppressForbidden;
+import org.opensearch.common.settings.Settings;
+
+import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING;
+import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE;
+
+@SuppressForbidden(reason = "feature flag overrides")
+public abstract class TieringAllocationBaseTestCase extends RemoteShardsBalancerBaseTestCase {
+
+ public ClusterState updateIndexMetadataForTiering(
+ ClusterState clusterState,
+ int localIndices,
+ String tieringState,
+ String dataLocality
+ ) {
+ Metadata.Builder mb = Metadata.builder(clusterState.metadata());
+ for (int i = 0; i < localIndices; i++) {
+ IndexMetadata indexMetadata = clusterState.metadata().index(getIndexName(i, false));
+ Settings settings = indexMetadata.getSettings();
+ mb.put(
+ IndexMetadata.builder(indexMetadata)
+ .settings(
+ Settings.builder()
+ .put(settings)
+ .put(settings)
+ .put(INDEX_TIERING_STATE.getKey(), tieringState)
+ .put(INDEX_STORE_LOCALITY_SETTING.getKey(), dataLocality)
+ )
+ );
+ }
+ Metadata metadata = mb.build();
+ return ClusterState.builder(clusterState).metadata(metadata).build();
+ }
+}