Skip to content

Commit

Permalink
Optimise memory for rest cluster health calls with in-line shard aggr…
Browse files Browse the repository at this point in the history
…egations for levels cluster and indices. (opensearch-project#15492)

Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
SwethaGuptha authored and dk2k committed Oct 16, 2024
1 parent d4f0c2c commit 0eaa026
Show file tree
Hide file tree
Showing 16 changed files with 674 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145))
- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292))
- Memory optimisations in _cluster/health API ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.health.ClusterIndexHealth;
import org.opensearch.cluster.health.ClusterShardHealth;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -49,6 +51,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -439,4 +442,164 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
completionFuture.actionGet(TimeValue.timeValueSeconds(30));
}
}

public void testHealthWithClusterLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}

public void testHealthWithIndicesLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("indices")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());

assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(1, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
assertEquals("test1", indexName);
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(1, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
assertTrue(indicesHealthValue.getShards().isEmpty());
}
}

public void testHealthWithShardLevelAppliedAtTransportLayer() {
int dataNodes = internalCluster().getDataNodeNames().size();
int greenClusterReplicaCount = dataNodes - 1;
int yellowClusterReplicaCount = dataNodes;

createIndex(
"test1",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, greenClusterReplicaCount)
.build()
);
ensureGreen(TimeValue.timeValueSeconds(120), "test1");
createIndex(
"test2",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, greenClusterReplicaCount)
.build()
);
ensureGreen(TimeValue.timeValueSeconds(120));
client().admin()
.indices()
.prepareUpdateSettings()
.setIndices("test2")
.setSettings(Settings.builder().put("index.number_of_replicas", yellowClusterReplicaCount).build())
.execute()
.actionGet();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("shards")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.YELLOW, healthResponse.getStatus());

assertEquals(2 * dataNodes, healthResponse.getActiveShards());
assertEquals(2, healthResponse.getActivePrimaryShards());
assertEquals(1, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());

Map<String, ClusterIndexHealth> indices = healthResponse.getIndices();
assertFalse(indices.isEmpty());
assertEquals(2, indices.size());
for (Map.Entry<String, ClusterIndexHealth> indicesHealth : indices.entrySet()) {
String indexName = indicesHealth.getKey();
boolean indexHasMoreReplicas = indexName.equals("test2");
ClusterIndexHealth indicesHealthValue = indicesHealth.getValue();
assertEquals(dataNodes, indicesHealthValue.getActiveShards());
assertEquals(1, indicesHealthValue.getActivePrimaryShards());
assertEquals(0, indicesHealthValue.getInitializingShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, indicesHealthValue.getUnassignedShards());
assertEquals(0, indicesHealthValue.getDelayedUnassignedShards());
assertEquals(0, indicesHealthValue.getRelocatingShards());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, indicesHealthValue.getStatus());
Map<Integer, ClusterShardHealth> shards = indicesHealthValue.getShards();
assertFalse(shards.isEmpty());
assertEquals(1, shards.size());
for (Map.Entry<Integer, ClusterShardHealth> shardHealth : shards.entrySet()) {
ClusterShardHealth clusterShardHealth = shardHealth.getValue();
assertEquals(dataNodes, clusterShardHealth.getActiveShards());
assertEquals(indexHasMoreReplicas ? 1 : 0, clusterShardHealth.getUnassignedShards());
assertEquals(0, clusterShardHealth.getDelayedUnassignedShards());
assertEquals(0, clusterShardHealth.getRelocatingShards());
assertEquals(0, clusterShardHealth.getInitializingShards());
assertTrue(clusterShardHealth.isPrimaryActive());
assertEquals(indexHasMoreReplicas ? ClusterHealthStatus.YELLOW : ClusterHealthStatus.GREEN, clusterShardHealth.getStatus());
}
}
}

public void testHealthWithAwarenessAttributesLevelAppliedAtTransportLayer() {
createIndex(
"test1",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen();
ClusterHealthResponse healthResponse = client().admin()
.cluster()
.prepareHealth()
.setLevel("awareness_attributes")
.setApplyLevelAtTransportLayer(true)
.execute()
.actionGet();
assertEquals(ClusterHealthStatus.GREEN, healthResponse.getStatus());
assertTrue(healthResponse.getIndices().isEmpty());
assertNotNull(healthResponse.getClusterAwarenessHealth());
assertEquals(1, healthResponse.getActiveShards());
assertEquals(1, healthResponse.getActivePrimaryShards());
assertEquals(0, healthResponse.getUnassignedShards());
assertEquals(0, healthResponse.getInitializingShards());
assertEquals(0, healthResponse.getRelocatingShards());
assertEquals(0, healthResponse.getDelayedUnassignedShards());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -109,7 +110,7 @@ public void testNewRestoredIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMix
setDirection(REMOTE_STORE.direction);
String restoredIndexName2 = TEST_INDEX + "-restored2";
restoreSnapshot(snapshotRepoName, snapshotName, restoredIndexName2);
ensureGreen(restoredIndexName2);
ensureGreen(TimeValue.timeValueSeconds(90), restoredIndexName2);

logger.info("Verify that restored index is non remote-backed");
assertRemoteStoreBackedIndex(restoredIndexName2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
*/
private Level level = Level.CLUSTER;

/**
* This flag will be used by the TransportClusterHealthAction to decide if indices/shards info is required in the ClusterHealthResponse or not.
* When the flag is disabled - indices/shard info will be returned in ClusterHealthResponse regardless of the health level requested.
* When the flag is enabled - indices/shards info will be set according to health level requested.
* For Level.CLUSTER (or) Level.AWARENESS_ATTRIBUTES - information on indices/shards will NOT be returned to the transport client
* For Level.INDICES - information on indices will be returned to the transport client.
* For Level.SHARDS - information on indices and shards will be returned to the transport client
* By default, the flag is disabled.
*/
private boolean applyLevelAtTransportLayer = false;

public ClusterHealthRequest() {}

public ClusterHealthRequest(String... indices) {
Expand Down Expand Up @@ -104,6 +115,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeWeighedIn = in.readBoolean();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
applyLevelAtTransportLayer = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -139,6 +153,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeWeighedIn);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(applyLevelAtTransportLayer);
}
}

@Override
Expand Down Expand Up @@ -337,6 +354,14 @@ public final boolean ensureNodeWeighedIn() {
return ensureNodeWeighedIn;
}

public boolean isApplyLevelAtTransportLayer() {
return applyLevelAtTransportLayer;
}

public void setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
this.applyLevelAtTransportLayer = applyLevelAtTransportLayer;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,9 @@ public final ClusterHealthRequestBuilder setEnsureNodeWeighedIn(boolean ensureNo
request.ensureNodeWeighedIn(ensureNodeCommissioned);
return this;
}

public ClusterHealthRequestBuilder setApplyLevelAtTransportLayer(boolean applyLevelAtTransportLayer) {
request.setApplyLevelAtTransportLayer(applyLevelAtTransportLayer);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,26 @@ public ClusterHealthResponse(
this.clusterHealthStatus = clusterStateHealth.getStatus();
}

public ClusterHealthResponse(
String clusterName,
String[] concreteIndices,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this.clusterName = clusterName;
this.numberOfPendingTasks = numberOfPendingTasks;
this.numberOfInFlightFetch = numberOfInFlightFetch;
this.taskMaxWaitingTime = taskMaxWaitingTime;
this.clusterStateHealth = clusterHealthRequest.isApplyLevelAtTransportLayer()
? new ClusterStateHealth(clusterState, concreteIndices, clusterHealthRequest.level())
: new ClusterStateHealth(clusterState, concreteIndices);
this.clusterHealthStatus = clusterStateHealth.getStatus();
this.delayedUnassignedShards = clusterStateHealth.getDelayedUnassignedShards();
}

// Awareness Attribute health
public ClusterHealthResponse(
String clusterName,
Expand All @@ -261,6 +281,29 @@ public ClusterHealthResponse(
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

public ClusterHealthResponse(
String clusterName,
ClusterHealthRequest clusterHealthRequest,
ClusterState clusterState,
ClusterSettings clusterSettings,
String[] concreteIndices,
String awarenessAttributeName,
int numberOfPendingTasks,
int numberOfInFlightFetch,
TimeValue taskMaxWaitingTime
) {
this(
clusterName,
concreteIndices,
clusterHealthRequest,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
taskMaxWaitingTime
);
this.clusterAwarenessHealth = new ClusterAwarenessHealth(clusterState, clusterSettings, awarenessAttributeName);
}

/**
* For XContent Parser and serialization tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.NodeWeighedAwayException;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.WeightedRoutingUtils;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -487,7 +486,12 @@ private ClusterHealthResponse clusterHealth(
TimeValue pendingTaskTimeInQueue
) {
if (logger.isTraceEnabled()) {
logger.trace("Calculating health based on state version [{}]", clusterState.version());
logger.trace(
"Calculating health based on state version [{}] for health level [{}] and applyLevelAtTransportLayer set to [{}]",
clusterState.version(),
request.level(),
request.isApplyLevelAtTransportLayer()
);
}

String[] concreteIndices;
Expand All @@ -496,13 +500,13 @@ private ClusterHealthResponse clusterHealth(
concreteIndices = clusterState.getMetadata().getConcreteAllIndices();
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
request,
clusterState,
clusterService.getClusterSettings(),
concreteIndices,
awarenessAttribute,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand All @@ -514,10 +518,10 @@ private ClusterHealthResponse clusterHealth(
ClusterHealthResponse response = new ClusterHealthResponse(
clusterState.getClusterName().value(),
Strings.EMPTY_ARRAY,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
response.setStatus(ClusterHealthStatus.RED);
Expand All @@ -527,10 +531,10 @@ private ClusterHealthResponse clusterHealth(
return new ClusterHealthResponse(
clusterState.getClusterName().value(),
concreteIndices,
request,
clusterState,
numberOfPendingTasks,
numberOfInFlightFetch,
UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue
);
}
Expand Down
Loading

0 comments on commit 0eaa026

Please sign in to comment.