Skip to content

Commit

Permalink
Optimise memory for rest cluster health calls with in-line shard aggr…
Browse files Browse the repository at this point in the history
…egations for levels cluster and indices.

Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Sep 5, 2024
1 parent 245fa47 commit 5f5f4f8
Show file tree
Hide file tree
Showing 15 changed files with 663 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471))
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292))
- Memory optimisations in _cluster/health API ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.health.ClusterShardHealth;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -49,6 +51,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -439,4 +442,155 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
}
}

public void testHealthWithClusterLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}

public void testHealthWithIndicesLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("indices")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());

assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(1, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
assertEquals("test1", indexName);
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(1, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
assertTrue(indicesHealthValue.getShards().isEmpty());
}
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15757")
public void testHealthWithShardLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(TimeValue.timeValueSeconds(120), "test1");
createIndex(
"test2",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
);
ensureGreen(TimeValue.timeValueSeconds(120));
client().admin()
.indices()
.prepareUpdateSettings()
.setIndices("test2")
.setSettings(Settings.builder().put("index.number_of_replicas", 2).build())
.execute()
.actionGet();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("shards")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.YELLOW, healthResponse.getStatus());

assertEquals(4, healthResponse.getActiveShards());
assertEquals(2, healthResponse.getActivePrimaryShards());
assertEquals(1, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(2, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
boolean indexHasMoreReplicas = indexName.equals("test2");
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(2, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
Map<Integer, ClusterShardHealth> shards = indicesHealthValue.getShards();
assertFalse(shards.isEmpty());
assertEquals(1, shards.size());
for (Map.Entry<Integer, ClusterShardHealth> shardHealth : shards.entrySet()) {
ClusterShardHealth clusterShardHealth = shardHealth.getValue();
assertEquals(2, clusterShardHealth.getActiveShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, clusterShardHealth.getUnassignedShards());
assertEquals(0, clusterShardHealth.getDelayedUnassignedShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertTrue(clusterShardHealth.isPrimaryActive());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, clusterShardHealth.getStatus());
}
}
}

public void testHealthWithAwarenessAttributesLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("awareness_attributes")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertNotNull(healthResponse.getClusterAwarenessHealth());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
*/
private Level level = Level.CLUSTER;

/**
* This flag will be used by the TransportClusterHealthAction to decide if indices/shards info is required in the ClusterHealthResponse or not.
* When the flag is disabled - indices/shard info will be returned in ClusterHealthResponse regardless of the health level requested.
* When the flag is enabled - indices/shards info will be set according to health level requested.
* For Level.CLUSTER (or) Level.AWARENESS_ATTRIBUTES - information on indices/shards will NOT be returned to the transport client
* For Level.INDICES - information on indices will be returned to the transport client.
* For Level.SHARDS - information on indices and shards will be returned to the transport client
* By default, the flag is disabled.
*/
private boolean applyLevelAtTransportLayer = false;

public ClusterHealthRequest() {}

public ClusterHealthRequest(String... indices) {
Expand Down Expand Up @@ -104,6 +115,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeWeighedIn = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
applyLevelAtTransportLayer = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -139,6 +153,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeWeighedIn);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(applyLevelAtTransportLayer);
}
}

@Override
Expand Down Expand Up @@ -337,6 +354,14 @@ public final boolean ensureNodeWeighedIn() {
return ensureNodeWeighedIn;
}

public boolean isApplyLevelAtTransportLayer() {
return applyLevelAtTransportLayer;
}

public void setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
this.applyLevelAtTransportLayer = applyLevelAtTransportLayer;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,9 @@ public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNo
request.ensureNodeWeighedIn(ensureNodeCommissioned);
return this;
}

public ClusterHealthRequestBuilder setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
request.setApplyLevelAtTransportLayer(applyLevelAtTransportLayer);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ public ClusterHealthResponse(
this.clusterHealthStatus = clusterStateHealth.getStatus();
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.taskMaxWaitingTime = taskMaxWaitingTime;
this.clusterStateHealth = clusterHealthRequest.isApplyLevelAtTransportLayer()
? new ClusterStateHealth(clusterState, concreteIndices, clusterHealthRequest.level())
: new ClusterStateHealth(clusterState, concreteIndices);
this.clusterHealthStatus = clusterStateHealth.getStatus();
this.delayedUnassignedShards = clusterStateHealth.getDelayedUnassignedShards();
}

// Awareness Attribute health
public ClusterHealthResponse(
String clusterName,
Expand All @@ -261,6 +281,29 @@ public ClusterHealthResponse(
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

public ClusterHealthResponse(
String clusterName,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this(
clusterName,
concreteIndices,
clusterHealthRequest,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
taskMaxWaitingTime
);
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

/**
* For XContent Parser and serialization tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.WeightedRoutingUtils;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -487,7 +486,12 @@ private ClusterHealthResponse clusterHealth(
TimeValue pendingTaskTimeInQueue
) {
if (logger.isTraceEnabled()) {
logger.trace("Calculating health based on state version [{}]", clusterState.version());
logger.trace(
"Calculating health based on state version [{}] for health level [{}] and applyLevelAtTransportLayer set to [{}]",
clusterState.version(),
request.level(),
request.isApplyLevelAtTransportLayer()
);
}

String[] concreteIndices;
Expand All @@ -496,13 +500,13 @@ private ClusterHealthResponse clusterHealth(
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
request,
clusterState,
clusterService.getClusterSettings(),
concreteIndices,
awarenessAttribute,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand All @@ -514,10 +518,10 @@ private ClusterHealthResponse clusterHealth(
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
response.setStatus(ClusterHealthStatus.RED);
Expand All @@ -527,10 +531,10 @@ private ClusterHealthResponse clusterHealth(
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
concreteIndices,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.indices.stats.CommonStats;
Expand Down Expand Up @@ -209,7 +210,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq

ClusterHealthStatus clusterStatus = null;
if (clusterService.state().nodes().isLocalNodeElectedClusterManager()) {
clusterStatus = new ClusterStateHealth(clusterService.state()).getStatus();
clusterStatus = new ClusterStateHealth(clusterService.state(), ClusterHealthRequest.Level.CLUSTER).getStatus();
}

return new ClusterStatsNodeResponse(
Expand Down
Loading

0 comments on commit 5f5f4f8

Please sign in to comment.