From 89915b65dc51126189dd42d74c82d2cbac0cb154 Mon Sep 17 00:00:00 2001 From: SwethaGuptha <156877431+SwethaGuptha@users.noreply.github.com> Date: Fri, 6 Sep 2024 16:37:20 +0530 Subject: [PATCH] Optimise memory for rest cluster health calls with in-line shard aggregations for levels cluster and indices. (#15492) Signed-off-by: Swetha Guptha --- CHANGELOG.md | 1 + .../opensearch/cluster/ClusterHealthIT.java | 163 ++++++++++++++++++ .../RemoteStoreMigrationSettingsUpdateIT.java | 3 +- .../cluster/health/ClusterHealthRequest.java | 25 +++ .../health/ClusterHealthRequestBuilder.java | 5 + .../cluster/health/ClusterHealthResponse.java | 43 +++++ .../health/TransportClusterHealthAction.java | 14 +- .../stats/TransportClusterStatsAction.java | 3 +- .../cluster/health/ClusterIndexHealth.java | 120 ++++++++++++- .../cluster/health/ClusterShardHealth.java | 48 ++++-- .../cluster/health/ClusterStateHealth.java | 104 ++++++++++- .../routing/allocation/AllocationService.java | 7 +- .../cluster/RestClusterHealthAction.java | 1 + .../health/ClusterIndexHealthTests.java | 27 ++- .../health/ClusterShardHealthTests.java | 122 +++++++++++++ .../health/ClusterStateHealthTests.java | 29 +++- 16 files changed, 674 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index deb9b7d0bcad7..e09dc9d192b4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637)) - [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145)) - Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292)) +- Memory optimisations in _cluster/health API ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterHealthIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterHealthIT.java index 0304e00a49070..33f101cce2382 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterHealthIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/ClusterHealthIT.java @@ -37,6 +37,8 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterIndexHealth; +import org.opensearch.cluster.health.ClusterShardHealth; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.service.ClusterService; @@ -49,6 +51,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -439,4 +442,164 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS completionFuture.actionGet(TimeValue.timeValueSeconds(30)); } } + + public void testHealthWithClusterLevelAppliedAtTransportLayer() { + createIndex( + "test1", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + ensureGreen(); + ClusterHealthResponse healthResponse = client().admin() + .cluster() + .prepareHealth() + .setApplyLevelAtTransportLayer(true) + .execute() + .actionGet(); + assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus()); + assertTrue(healthResponse.getIndices().isEmpty()); + assertEquals(1, healthResponse.getActiveShards()); + assertEquals(1, healthResponse.getActivePrimaryShards()); + assertEquals(0, healthResponse.getUnassignedShards()); + assertEquals(0, healthResponse.getInitializingShards()); + assertEquals(0, healthResponse.getRelocatingShards()); + assertEquals(0, healthResponse.getDelayedUnassignedShards()); + } + + public void testHealthWithIndicesLevelAppliedAtTransportLayer() { + createIndex( + "test1", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + ensureGreen(); + ClusterHealthResponse healthResponse = client().admin() + .cluster() + .prepareHealth() + .setLevel("indices") + .setApplyLevelAtTransportLayer(true) + .execute() + .actionGet(); + assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus()); + + assertEquals(1, healthResponse.getActiveShards()); + assertEquals(1, healthResponse.getActivePrimaryShards()); + assertEquals(0, healthResponse.getUnassignedShards()); + assertEquals(0, healthResponse.getInitializingShards()); + assertEquals(0, healthResponse.getRelocatingShards()); + assertEquals(0, healthResponse.getDelayedUnassignedShards()); + + Map indices = healthResponse.getIndices(); + assertFalse(indices.isEmpty()); + assertEquals(1, indices.size()); + for (Map.Entry indicesHealth : indices.entrySet()) { + String indexName = indicesHealth.getKey(); + assertEquals("test1", indexName); + ClusterIndexHealth indicesHealthValue = indicesHealth.getValue(); + assertEquals(1, indicesHealthValue.getActiveShards()); + assertEquals(1, indicesHealthValue.getActivePrimaryShards()); + assertEquals(0, indicesHealthValue.getInitializingShards()); + assertEquals(0, indicesHealthValue.getUnassignedShards()); + assertEquals(0, indicesHealthValue.getDelayedUnassignedShards()); + assertEquals(0, indicesHealthValue.getRelocatingShards()); + assertEquals(ClusterHealthStatus.GREEN, indicesHealthValue.getStatus()); + assertTrue(indicesHealthValue.getShards().isEmpty()); + } + } + + public void testHealthWithShardLevelAppliedAtTransportLayer() { + int dataNodes = internalCluster().getDataNodeNames().size(); + int greenClusterReplicaCount = dataNodes - 1; + int yellowClusterReplicaCount = dataNodes; + + createIndex( + "test1", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, greenClusterReplicaCount) + .build() + ); + ensureGreen(TimeValue.timeValueSeconds(120), "test1"); + createIndex( + "test2", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, greenClusterReplicaCount) + .build() + ); + ensureGreen(TimeValue.timeValueSeconds(120)); + client().admin() + .indices() + .prepareUpdateSettings() + .setIndices("test2") + .setSettings(Settings.builder().put("index.number_of_replicas", yellowClusterReplicaCount).build()) + .execute() + .actionGet(); + ClusterHealthResponse healthResponse = client().admin() + .cluster() + .prepareHealth() + .setLevel("shards") + .setApplyLevelAtTransportLayer(true) + .execute() + .actionGet(); + assertEquals(ClusterHealthStatus.YELLOW, healthResponse.getStatus()); + + assertEquals(2 * dataNodes, healthResponse.getActiveShards()); + assertEquals(2, healthResponse.getActivePrimaryShards()); + assertEquals(1, healthResponse.getUnassignedShards()); + assertEquals(0, healthResponse.getInitializingShards()); + assertEquals(0, healthResponse.getRelocatingShards()); + assertEquals(0, healthResponse.getDelayedUnassignedShards()); + + Map indices = healthResponse.getIndices(); + assertFalse(indices.isEmpty()); + assertEquals(2, indices.size()); + for (Map.Entry indicesHealth : indices.entrySet()) { + String indexName = indicesHealth.getKey(); + boolean indexHasMoreReplicas = indexName.equals("test2"); + ClusterIndexHealth indicesHealthValue = indicesHealth.getValue(); + assertEquals(dataNodes, indicesHealthValue.getActiveShards()); + assertEquals(1, indicesHealthValue.getActivePrimaryShards()); + assertEquals(0, indicesHealthValue.getInitializingShards()); + assertEquals(indexHasMoreReplicas ? 1 : 0, indicesHealthValue.getUnassignedShards()); + assertEquals(0, indicesHealthValue.getDelayedUnassignedShards()); + assertEquals(0, indicesHealthValue.getRelocatingShards()); + assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, indicesHealthValue.getStatus()); + Map shards = indicesHealthValue.getShards(); + assertFalse(shards.isEmpty()); + assertEquals(1, shards.size()); + for (Map.Entry shardHealth : shards.entrySet()) { + ClusterShardHealth clusterShardHealth = shardHealth.getValue(); + assertEquals(dataNodes, clusterShardHealth.getActiveShards()); + assertEquals(indexHasMoreReplicas ? 1 : 0, clusterShardHealth.getUnassignedShards()); + assertEquals(0, clusterShardHealth.getDelayedUnassignedShards()); + assertEquals(0, clusterShardHealth.getRelocatingShards()); + assertEquals(0, clusterShardHealth.getInitializingShards()); + assertTrue(clusterShardHealth.isPrimaryActive()); + assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, clusterShardHealth.getStatus()); + } + } + } + + public void testHealthWithAwarenessAttributesLevelAppliedAtTransportLayer() { + createIndex( + "test1", + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() + ); + ensureGreen(); + ClusterHealthResponse healthResponse = client().admin() + .cluster() + .prepareHealth() + .setLevel("awareness_attributes") + .setApplyLevelAtTransportLayer(true) + .execute() + .actionGet(); + assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus()); + assertTrue(healthResponse.getIndices().isEmpty()); + assertNotNull(healthResponse.getClusterAwarenessHealth()); + assertEquals(1, healthResponse.getActiveShards()); + assertEquals(1, healthResponse.getActivePrimaryShards()); + assertEquals(0, healthResponse.getUnassignedShards()); + assertEquals(0, healthResponse.getInitializingShards()); + assertEquals(0, healthResponse.getRelocatingShards()); + assertEquals(0, healthResponse.getDelayedUnassignedShards()); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java index c701a8d92c336..4a6cb28b6db00 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteStoreMigrationSettingsUpdateIT.java @@ -11,6 +11,7 @@ import org.opensearch.client.Client; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsException; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -109,7 +110,7 @@ public void testNewRestoredIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMix setDirection(REMOTE_STORE.direction); String restoredIndexName2 = TEST_INDEX + "-restored2"; restoreSnapshot(snapshotRepoName, snapshotName, restoredIndexName2); - ensureGreen(restoredIndexName2); + ensureGreen(TimeValue.timeValueSeconds(90), restoredIndexName2); logger.info("Verify that restored index is non remote-backed"); assertRemoteStoreBackedIndex(restoredIndexName2); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java index ec8b01d853da6..55d3e2089b2a8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -76,6 +76,17 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest 0) { diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java index a9a3756755265..87dd77aca20c6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java @@ -171,4 +171,9 @@ public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNo request.ensureNodeWeighedIn(ensureNodeCommissioned); return this; } + + public ClusterHealthRequestBuilder setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) { + request.setApplyLevelAtTransportLayer(applyLevelAtTransportLayer); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java index 1a27f161343e8..2dcfb58c3d7b8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthResponse.java @@ -237,6 +237,26 @@ public ClusterHealthResponse( this.clusterHealthStatus = clusterStateHealth.getStatus(); } + public ClusterHealthResponse( + String clusterName, + String[] concreteIndices, + ClusterHealthRequest clusterHealthRequest, + ClusterState clusterState, + int numberOfPendingTasks, + int numberOfInFlightFetch, + TimeValue taskMaxWaitingTime + ) { + this.clusterName = clusterName; + this.numberOfPendingTasks = numberOfPendingTasks; + this.numberOfInFlightFetch = numberOfInFlightFetch; + this.taskMaxWaitingTime = taskMaxWaitingTime; + this.clusterStateHealth = clusterHealthRequest.isApplyLevelAtTransportLayer() + ? new ClusterStateHealth(clusterState, concreteIndices, clusterHealthRequest.level()) + : new ClusterStateHealth(clusterState, concreteIndices); + this.clusterHealthStatus = clusterStateHealth.getStatus(); + this.delayedUnassignedShards = clusterStateHealth.getDelayedUnassignedShards(); + } + // Awareness Attribute health public ClusterHealthResponse( String clusterName, @@ -261,6 +281,29 @@ public ClusterHealthResponse( this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName); } + public ClusterHealthResponse( + String clusterName, + ClusterHealthRequest clusterHealthRequest, + ClusterState clusterState, + ClusterSettings clusterSettings, + String[] concreteIndices, + String awarenessAttributeName, + int numberOfPendingTasks, + int numberOfInFlightFetch, + TimeValue taskMaxWaitingTime + ) { + this( + clusterName, + concreteIndices, + clusterHealthRequest, + clusterState, + numberOfPendingTasks, + numberOfInFlightFetch, + taskMaxWaitingTime + ); + this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName); + } + /** * For XContent Parser and serialization tests */ diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java index f69f462372888..8f0202e6d1ed0 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -52,7 +52,6 @@ import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.NodeWeighedAwayException; -import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.WeightedRoutingUtils; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; @@ -487,7 +486,12 @@ private ClusterHealthResponse clusterHealth( TimeValue pendingTaskTimeInQueue ) { if (logger.isTraceEnabled()) { - logger.trace("Calculating health based on state version [{}]", clusterState.version()); + logger.trace( + "Calculating health based on state version [{}] for health level [{}] and applyLevelAtTransportLayer set to [{}]", + clusterState.version(), + request.level(), + request.isApplyLevelAtTransportLayer() + ); } String[] concreteIndices; @@ -496,13 +500,13 @@ private ClusterHealthResponse clusterHealth( concreteIndices = clusterState.getMetadata().getConcreteAllIndices(); return new ClusterHealthResponse( clusterState.getClusterName().value(), + request, clusterState, clusterService.getClusterSettings(), concreteIndices, awarenessAttribute, numberOfPendingTasks, numberOfInFlightFetch, - UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue ); } @@ -514,10 +518,10 @@ private ClusterHealthResponse clusterHealth( ClusterHealthResponse response = new ClusterHealthResponse( clusterState.getClusterName().value(), Strings.EMPTY_ARRAY, + request, clusterState, numberOfPendingTasks, numberOfInFlightFetch, - UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue ); response.setStatus(ClusterHealthStatus.RED); @@ -527,10 +531,10 @@ private ClusterHealthResponse clusterHealth( return new ClusterHealthResponse( clusterState.getClusterName().value(), concreteIndices, + request, clusterState, numberOfPendingTasks, numberOfInFlightFetch, - UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index be7d41a7ba75e..a49ca2035783c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -34,6 +34,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.action.FailedNodeException; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.node.info.NodeInfo; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.indices.stats.CommonStats; @@ -209,7 +210,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq ClusterHealthStatus clusterStatus = null; if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) { - clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus(); + clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus(); } return new ClusterStatsNodeResponse( diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java index 19c64965e6941..77d96cb0af792 100644 --- a/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java +++ b/server/src/main/java/org/opensearch/cluster/health/ClusterIndexHealth.java @@ -32,9 +32,11 @@ package org.opensearch.cluster.health; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.ParseField; import org.opensearch.core.common.io.stream.StreamInput; @@ -141,6 +143,7 @@ public final class ClusterIndexHealth implements Iterable, W private final int relocatingShards; private final int initializingShards; private final int unassignedShards; + private int delayedUnassignedShards; private final int activePrimaryShards; private final ClusterHealthStatus status; private final Map shards; @@ -163,6 +166,7 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT int computeRelocatingShards = 0; int computeInitializingShards = 0; int computeUnassignedShards = 0; + int computeDelayedUnassignedShards = 0; for (ClusterShardHealth shardHealth : shards.values()) { if (shardHealth.isPrimaryActive()) { computeActivePrimaryShards++; @@ -171,13 +175,9 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT computeRelocatingShards += shardHealth.getRelocatingShards(); computeInitializingShards += shardHealth.getInitializingShards(); computeUnassignedShards += shardHealth.getUnassignedShards(); + computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards(); - if (shardHealth.getStatus() == ClusterHealthStatus.RED) { - computeStatus = ClusterHealthStatus.RED; - } else if (shardHealth.getStatus() == ClusterHealthStatus.YELLOW && computeStatus != ClusterHealthStatus.RED) { - // do not override an existing red - computeStatus = ClusterHealthStatus.YELLOW; - } + computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus); } if (shards.isEmpty()) { // might be since none has been created yet (two phase index creation) computeStatus = ClusterHealthStatus.RED; @@ -189,6 +189,110 @@ public ClusterIndexHealth(final IndexMetadata indexMetadata, final IndexRoutingT this.relocatingShards = computeRelocatingShards; this.initializingShards = computeInitializingShards; this.unassignedShards = computeUnassignedShards; + this.delayedUnassignedShards = computeDelayedUnassignedShards; + } + + public ClusterIndexHealth( + final IndexMetadata indexMetadata, + final IndexRoutingTable indexRoutingTable, + final ClusterHealthRequest.Level healthLevel + ) { + this.index = indexMetadata.getIndex().getName(); + this.numberOfShards = indexMetadata.getNumberOfShards(); + this.numberOfReplicas = indexMetadata.getNumberOfReplicas(); + + shards = new HashMap<>(); + + // update the index status + ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN; + int computeActivePrimaryShards = 0; + int computeActiveShards = 0; + int computeRelocatingShards = 0; + int computeInitializingShards = 0; + int computeUnassignedShards = 0; + int computeDelayedUnassignedShards = 0; + + boolean isShardLevelHealthRequired = healthLevel == ClusterHealthRequest.Level.SHARDS; + if (isShardLevelHealthRequired) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + int shardId = indexShardRoutingTable.shardId().id(); + ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, indexShardRoutingTable); + if (shardHealth.isPrimaryActive()) { + computeActivePrimaryShards++; + } + computeActiveShards += shardHealth.getActiveShards(); + computeRelocatingShards += shardHealth.getRelocatingShards(); + computeInitializingShards += shardHealth.getInitializingShards(); + computeUnassignedShards += shardHealth.getUnassignedShards(); + computeDelayedUnassignedShards += shardHealth.getDelayedUnassignedShards(); + computeStatus = getIndexHealthStatus(shardHealth.getStatus(), computeStatus); + shards.put(shardId, shardHealth); + } + } else { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + int activeShardsPerShardId = 0; + + List shardRoutings = indexShardRoutingTable.shards(); + int shardRoutingCountPerShardId = shardRoutings.size(); + for (int index = 0; index < shardRoutingCountPerShardId; index++) { + ShardRouting shardRouting = shardRoutings.get(index); + if (shardRouting.active()) { + computeActiveShards++; + activeShardsPerShardId++; + if (shardRouting.relocating()) { + computeRelocatingShards++; + } + } else if (shardRouting.initializing()) { + computeInitializingShards++; + } else if (shardRouting.unassigned()) { + computeUnassignedShards++; + if (shardRouting.unassignedInfo().isDelayed()) { + computeDelayedUnassignedShards++; + } + } + } + ShardRouting primaryShard = indexShardRoutingTable.primaryShard(); + if (primaryShard.active()) { + computeActivePrimaryShards++; + } + ClusterHealthStatus shardHealth = ClusterShardHealth.getShardHealth( + primaryShard, + activeShardsPerShardId, + shardRoutingCountPerShardId + ); + computeStatus = getIndexHealthStatus(shardHealth, computeStatus); + } + } + + if (indexRoutingTable.shards() != null && indexRoutingTable.shards().isEmpty()) { + // might be since none has been created yet (two phase index creation) + computeStatus = ClusterHealthStatus.RED; + } + + this.status = computeStatus; + this.activePrimaryShards = computeActivePrimaryShards; + this.activeShards = computeActiveShards; + this.relocatingShards = computeRelocatingShards; + this.initializingShards = computeInitializingShards; + this.unassignedShards = computeUnassignedShards; + this.delayedUnassignedShards = computeDelayedUnassignedShards; + + } + + public static ClusterHealthStatus getIndexHealthStatus(ClusterHealthStatus shardHealth, ClusterHealthStatus computeStatus) { + switch (shardHealth) { + case RED: + return ClusterHealthStatus.RED; + case YELLOW: + // do not override an existing red + if (computeStatus != ClusterHealthStatus.RED) { + return ClusterHealthStatus.YELLOW; + } else { + return ClusterHealthStatus.RED; + } + default: + return computeStatus; + } } public ClusterIndexHealth(final StreamInput in) throws IOException { @@ -269,6 +373,10 @@ public int getUnassignedShards() { return unassignedShards; } + public int getDelayedUnassignedShards() { + return delayedUnassignedShards; + } + public ClusterHealthStatus getStatus() { return status; } diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java index 1fe88f65248c2..ace4537a5e291 100644 --- a/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java +++ b/server/src/main/java/org/opensearch/cluster/health/ClusterShardHealth.java @@ -50,6 +50,7 @@ import org.opensearch.core.xcontent.XContentParser; import java.io.IOException; +import java.util.List; import java.util.Locale; import java.util.Objects; @@ -109,6 +110,7 @@ public final class ClusterShardHealth implements Writeable, ToXContentFragment { private final int relocatingShards; private final int initializingShards; private final int unassignedShards; + private int delayedUnassignedShards; private final boolean primaryActive; public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardRoutingTable) { @@ -117,7 +119,10 @@ public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardR int computeRelocatingShards = 0; int computeInitializingShards = 0; int computeUnassignedShards = 0; - for (ShardRouting shardRouting : shardRoutingTable) { + int computeDelayedUnassignedShards = 0; + List shardRoutings = shardRoutingTable.shards(); + for (int index = 0; index < shardRoutings.size(); index++) { + ShardRouting shardRouting = shardRoutings.get(index); if (shardRouting.active()) { computeActiveShards++; if (shardRouting.relocating()) { @@ -128,24 +133,18 @@ public ClusterShardHealth(final int shardId, final IndexShardRoutingTable shardR computeInitializingShards++; } else if (shardRouting.unassigned()) { computeUnassignedShards++; + if (shardRouting.unassignedInfo() != null && shardRouting.unassignedInfo().isDelayed()) { + computeDelayedUnassignedShards++; + } } } - ClusterHealthStatus computeStatus; final ShardRouting primaryRouting = shardRoutingTable.primaryShard(); - if (primaryRouting.active()) { - if (computeActiveShards == shardRoutingTable.size()) { - computeStatus = ClusterHealthStatus.GREEN; - } else { - computeStatus = ClusterHealthStatus.YELLOW; - } - } else { - computeStatus = getInactivePrimaryHealth(primaryRouting); - } - this.status = computeStatus; + this.status = getShardHealth(primaryRouting, computeActiveShards, shardRoutingTable.size()); this.activeShards = computeActiveShards; this.relocatingShards = computeRelocatingShards; this.initializingShards = computeInitializingShards; this.unassignedShards = computeUnassignedShards; + this.delayedUnassignedShards = computeDelayedUnassignedShards; this.primaryActive = primaryRouting.active(); } @@ -208,6 +207,10 @@ public int getUnassignedShards() { return unassignedShards; } + public int getDelayedUnassignedShards() { + return delayedUnassignedShards; + } + @Override public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(shardId); @@ -219,6 +222,27 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeBoolean(primaryActive); } + /** + * Computes the shard health of an index. + *

+ * Shard health is GREEN when all primary and replica shards of the indices are active. + * Shard health is YELLOW when primary shard is active but at-least one replica shard is inactive. + * Shard health is RED when the primary is not active. + *

+ */ + public static ClusterHealthStatus getShardHealth(final ShardRouting primaryRouting, final int activeShards, final int totalShards) { + assert primaryRouting != null : "Primary shard routing can't be null"; + if (primaryRouting.active()) { + if (activeShards == totalShards) { + return ClusterHealthStatus.GREEN; + } else { + return ClusterHealthStatus.YELLOW; + } + } else { + return getInactivePrimaryHealth(primaryRouting); + } + } + /** * Checks if an inactive primary shard should cause the cluster health to go RED. *

diff --git a/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java b/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java index 083159bffdc2b..f6cfdd3c42e0c 100644 --- a/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java +++ b/server/src/main/java/org/opensearch/cluster/health/ClusterStateHealth.java @@ -31,6 +31,7 @@ package org.opensearch.cluster.health; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.IndexRoutingTable; @@ -63,6 +64,7 @@ public final class ClusterStateHealth implements Iterable, W private final int activePrimaryShards; private final int initializingShards; private final int unassignedShards; + private int delayedUnassignedShards; private final double activeShardsPercent; private final ClusterHealthStatus status; private final Map indices; @@ -76,6 +78,10 @@ public ClusterStateHealth(final ClusterState clusterState) { this(clusterState, clusterState.metadata().getConcreteAllIndices()); } + public ClusterStateHealth(final ClusterState clusterState, final ClusterHealthRequest.Level clusterHealthLevel) { + this(clusterState, clusterState.metadata().getConcreteAllIndices(), clusterHealthLevel); + } + /** * Creates a new ClusterStateHealth instance considering the current cluster state and the provided index names. * @@ -105,6 +111,7 @@ public ClusterStateHealth(final ClusterState clusterState, final String[] concre int computeRelocatingShards = 0; int computeInitializingShards = 0; int computeUnassignedShards = 0; + int computeDelayedUnassignedShards = 0; for (ClusterIndexHealth indexHealth : indices.values()) { computeActivePrimaryShards += indexHealth.getActivePrimaryShards(); @@ -112,10 +119,76 @@ public ClusterStateHealth(final ClusterState clusterState, final String[] concre computeRelocatingShards += indexHealth.getRelocatingShards(); computeInitializingShards += indexHealth.getInitializingShards(); computeUnassignedShards += indexHealth.getUnassignedShards(); - if (indexHealth.getStatus() == ClusterHealthStatus.RED) { - computeStatus = ClusterHealthStatus.RED; - } else if (indexHealth.getStatus() == ClusterHealthStatus.YELLOW && computeStatus != ClusterHealthStatus.RED) { - computeStatus = ClusterHealthStatus.YELLOW; + computeDelayedUnassignedShards += indexHealth.getDelayedUnassignedShards(); + computeStatus = getClusterHealthStatus(indexHealth, computeStatus); + } + + if (clusterState.blocks().hasGlobalBlockWithStatus(RestStatus.SERVICE_UNAVAILABLE)) { + computeStatus = ClusterHealthStatus.RED; + } + + this.status = computeStatus; + this.activePrimaryShards = computeActivePrimaryShards; + this.activeShards = computeActiveShards; + this.relocatingShards = computeRelocatingShards; + this.initializingShards = computeInitializingShards; + this.unassignedShards = computeUnassignedShards; + this.delayedUnassignedShards = computeDelayedUnassignedShards; + + // shortcut on green + if (ClusterHealthStatus.GREEN.equals(computeStatus)) { + this.activeShardsPercent = 100; + } else { + List shardRoutings = clusterState.getRoutingTable().allShards(); + int activeShardCount = 0; + int totalShardCount = 0; + for (ShardRouting shardRouting : shardRoutings) { + if (shardRouting.active()) activeShardCount++; + totalShardCount++; + } + this.activeShardsPercent = (((double) activeShardCount) / totalShardCount) * 100; + } + } + + public ClusterStateHealth( + final ClusterState clusterState, + final String[] concreteIndices, + final ClusterHealthRequest.Level healthLevel + ) { + numberOfNodes = clusterState.nodes().getSize(); + numberOfDataNodes = clusterState.nodes().getDataNodes().size(); + hasDiscoveredClusterManager = clusterState.nodes().getClusterManagerNodeId() != null; + indices = new HashMap<>(); + boolean isIndexOrShardLevelHealthRequired = healthLevel == ClusterHealthRequest.Level.INDICES + || healthLevel == ClusterHealthRequest.Level.SHARDS; + + ClusterHealthStatus computeStatus = ClusterHealthStatus.GREEN; + int computeActivePrimaryShards = 0; + int computeActiveShards = 0; + int computeRelocatingShards = 0; + int computeInitializingShards = 0; + int computeUnassignedShards = 0; + int computeDelayedUnassignedShards = 0; + + for (String index : concreteIndices) { + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index); + IndexMetadata indexMetadata = clusterState.metadata().index(index); + if (indexRoutingTable == null) { + continue; + } + + ClusterIndexHealth indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable, healthLevel); + computeActivePrimaryShards += indexHealth.getActivePrimaryShards(); + computeActiveShards += indexHealth.getActiveShards(); + computeRelocatingShards += indexHealth.getRelocatingShards(); + computeInitializingShards += indexHealth.getInitializingShards(); + computeUnassignedShards += indexHealth.getUnassignedShards(); + computeDelayedUnassignedShards += indexHealth.getDelayedUnassignedShards(); + computeStatus = getClusterHealthStatus(indexHealth, computeStatus); + + if (isIndexOrShardLevelHealthRequired) { + // Store ClusterIndexHealth only when the health is requested at Index or Shard level + indices.put(indexHealth.getIndex(), indexHealth); } } @@ -129,9 +202,10 @@ public ClusterStateHealth(final ClusterState clusterState, final String[] concre this.relocatingShards = computeRelocatingShards; this.initializingShards = computeInitializingShards; this.unassignedShards = computeUnassignedShards; + this.delayedUnassignedShards = computeDelayedUnassignedShards; // shortcut on green - if (computeStatus.equals(ClusterHealthStatus.GREEN)) { + if (ClusterHealthStatus.GREEN.equals(computeStatus)) { this.activeShardsPercent = 100; } else { List shardRoutings = clusterState.getRoutingTable().allShards(); @@ -145,6 +219,22 @@ public ClusterStateHealth(final ClusterState clusterState, final String[] concre } } + private static ClusterHealthStatus getClusterHealthStatus(ClusterIndexHealth indexHealth, ClusterHealthStatus computeStatus) { + switch (indexHealth.getStatus()) { + case RED: + return ClusterHealthStatus.RED; + case YELLOW: + // do not override an existing red + if (computeStatus != ClusterHealthStatus.RED) { + return ClusterHealthStatus.YELLOW; + } else { + return ClusterHealthStatus.RED; + } + default: + return computeStatus; + } + } + public ClusterStateHealth(final StreamInput in) throws IOException { activePrimaryShards = in.readVInt(); activeShards = in.readVInt(); @@ -213,6 +303,10 @@ public int getUnassignedShards() { return unassignedShards; } + public int getDelayedUnassignedShards() { + return delayedUnassignedShards; + } + public int getNumberOfNodes() { return this.numberOfNodes; } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index e29a81a2c131f..78f17c9ff212b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.ClusterState; @@ -196,7 +197,11 @@ public ClusterState applyStartedShards(ClusterState clusterState, List { + public void testClusterShardGreenHealth() { + String indexName = "test"; + int shardID = 0; + Index index = new Index(indexName, UUID.randomUUID().toString()); + ShardId shardId = new ShardId(index, shardID); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, shardID, "node_0", null, true, ShardRoutingState.STARTED) + ); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, shardID, "node_1", null, false, ShardRoutingState.STARTED) + ); + IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build(); + ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable); + assertEquals(2, clusterShardHealth.getActiveShards()); + assertEquals(0, clusterShardHealth.getInitializingShards()); + assertEquals(0, clusterShardHealth.getRelocatingShards()); + assertEquals(0, clusterShardHealth.getUnassignedShards()); + assertEquals(0, clusterShardHealth.getDelayedUnassignedShards()); + assertEquals(ClusterHealthStatus.GREEN, clusterShardHealth.getStatus()); + assertTrue(clusterShardHealth.isPrimaryActive()); + } + + public void testClusterShardYellowHealth() { + String indexName = "test"; + int shardID = 0; + Index index = new Index(indexName, UUID.randomUUID().toString()); + ShardId shardId = new ShardId(index, shardID); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, shardID, "node_0", null, true, ShardRoutingState.STARTED) + ); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, shardID, "node_1", "node_5", false, ShardRoutingState.RELOCATING) + ); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, shardID, "node_2", null, false, ShardRoutingState.INITIALIZING) + ); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, shardID, null, null, false, ShardRoutingState.UNASSIGNED) + ); + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned( + shardId, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo( + UnassignedInfo.Reason.NODE_LEFT, + "node_4 left", + null, + 0, + 0, + 0, + true, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet() + ) + ) + ); + IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build(); + ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable); + assertEquals(2, clusterShardHealth.getActiveShards()); + assertEquals(1, clusterShardHealth.getInitializingShards()); + assertEquals(1, clusterShardHealth.getRelocatingShards()); + assertEquals(2, clusterShardHealth.getUnassignedShards()); + assertEquals(1, clusterShardHealth.getDelayedUnassignedShards()); + assertEquals(ClusterHealthStatus.YELLOW, clusterShardHealth.getStatus()); + assertTrue(clusterShardHealth.isPrimaryActive()); + } + + public void testClusterShardRedHealth() { + String indexName = "test"; + int shardID = 0; + Index index = new Index(indexName, UUID.randomUUID().toString()); + ShardId shardId = new ShardId(index, shardID); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard( + ShardRouting.newUnassigned( + shardId, + true, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo( + UnassignedInfo.Reason.NODE_LEFT, + "node_4 left", + null, + 0, + 0, + 0, + false, + UnassignedInfo.AllocationStatus.DECIDERS_NO, + Collections.emptySet() + ) + ) + ); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, shardID, null, null, false, ShardRoutingState.UNASSIGNED) + ); + IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build(); + ClusterShardHealth clusterShardHealth = new ClusterShardHealth(shardID, indexShardRoutingTable); + assertEquals(0, clusterShardHealth.getActiveShards()); + assertEquals(0, clusterShardHealth.getInitializingShards()); + assertEquals(0, clusterShardHealth.getRelocatingShards()); + assertEquals(2, clusterShardHealth.getUnassignedShards()); + assertEquals(0, clusterShardHealth.getDelayedUnassignedShards()); + assertEquals(ClusterHealthStatus.RED, clusterShardHealth.getStatus()); + assertFalse(clusterShardHealth.isPrimaryActive()); + } + + public void testShardRoutingNullCheck() { + assertThrows(AssertionError.class, () -> ClusterShardHealth.getShardHealth(null, 0, 0)); + } + @Override protected ClusterShardHealth doParseInstance(XContentParser parser) throws IOException { return ClusterShardHealth.fromXContent(parser); diff --git a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java index 795dc8a624e38..67e2fb59b0d74 100644 --- a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java @@ -73,6 +73,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -83,6 +84,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.opensearch.action.admin.cluster.health.ClusterHealthRequest.Level.CLUSTER; +import static org.opensearch.action.admin.cluster.health.ClusterHealthRequest.Level.INDICES; +import static org.opensearch.action.admin.cluster.health.ClusterHealthRequest.Level.SHARDS; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; import static org.hamcrest.CoreMatchers.allOf; @@ -225,7 +229,13 @@ public void testClusterHealth() throws IOException { clusterStateHealth.hasDiscoveredClusterManager(), equalTo(clusterService.state().nodes().getClusterManagerNodeId() != null) ); - assertClusterHealth(clusterStateHealth, counter); + assertClusterHealth(clusterStateHealth, counter, SHARDS); + clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices, ClusterHealthRequest.Level.CLUSTER); + assertClusterHealth(clusterStateHealth, counter, CLUSTER); + clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices, INDICES); + assertClusterHealth(clusterStateHealth, counter, INDICES); + clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices, SHARDS); + assertClusterHealth(clusterStateHealth, counter, SHARDS); } public void testClusterHealthOnIndexCreation() { @@ -586,7 +596,11 @@ private boolean primaryInactiveDueToRecovery(final String indexName, final Clust return true; } - private void assertClusterHealth(ClusterStateHealth clusterStateHealth, RoutingTableGenerator.ShardCounter counter) { + private void assertClusterHealth( + ClusterStateHealth clusterStateHealth, + RoutingTableGenerator.ShardCounter counter, + ClusterHealthRequest.Level level + ) { assertThat(clusterStateHealth.getStatus(), equalTo(counter.status())); assertThat(clusterStateHealth.getActiveShards(), equalTo(counter.active)); assertThat(clusterStateHealth.getActivePrimaryShards(), equalTo(counter.primaryActive)); @@ -594,5 +608,16 @@ private void assertClusterHealth(ClusterStateHealth clusterStateHealth, RoutingT assertThat(clusterStateHealth.getRelocatingShards(), equalTo(counter.relocating)); assertThat(clusterStateHealth.getUnassignedShards(), equalTo(counter.unassigned)); assertThat(clusterStateHealth.getActiveShardsPercent(), is(allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(100.0)))); + if (level.equals(INDICES) || level.equals(SHARDS)) { + assertTrue(clusterStateHealth.getIndices().size() > 0); + if (level.equals(SHARDS)) { + Collection clusterIndexHealths = clusterStateHealth.getIndices().values(); + for (ClusterIndexHealth clusterIndexHealth : clusterIndexHealths) { + assertFalse(clusterIndexHealth.getShards().isEmpty()); + } + } + } else { + assertTrue(clusterStateHealth.getIndices().isEmpty()); + } } }