Skip to content

Commit

Permalink
URI path filtering support in cluster stats API
Browse files Browse the repository at this point in the history
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
Swetha Guptha committed Sep 19, 2024
1 parent 12ff5ed commit 9b04911
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.SubMetrics;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
Expand Down Expand Up @@ -505,6 +506,112 @@ public void testNodeRolesWithDataNodeLegacySettings() throws ExecutionException,
assertEquals(expectedNodesRoles, Set.of(getNodeRoles(client, 0), getNodeRoles(client, 1)));
}

public void testClusterStatsMetricFiltering() {
internalCluster().startNode();
ensureGreen();

client().admin().indices().prepareCreate("test1").setMapping("{\"properties\":{\"foo\":{\"type\": \"keyword\"}}}").get();

ClusterStatsResponse response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.get();
assertNotNull(response.getIndicesStats());
assertNotNull(response.getNodesStats());
assertNotNull(response.getIndicesStats().getMappings());
assertNotNull(response.getIndicesStats().getAnalysis());
assertNotNull(response.getNodesStats().getJvm());
assertNotNull(response.getNodesStats().getOs());

response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(SubMetrics.allSubMetrics(ClusterStatsRequest.Metric.INDICES))
.get();
assertNotNull(response.getIndicesStats());
assertNotNull(response.getIndicesStats().getShards());
assertNotNull(response.getIndicesStats().getDocs());
assertNotNull(response.getIndicesStats().getMappings());
assertNotNull(response.getIndicesStats().getAnalysis());
assertNull(response.getNodesStats());

response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(SubMetrics.allSubMetrics(ClusterStatsRequest.Metric.NODES))
.get();
assertNotNull(response.getNodesStats());
assertNotNull(response.getNodesStats().getJvm());
assertNotNull(response.getNodesStats().getOs());
assertNotNull(response.getNodesStats().getPlugins());
assertNotNull(response.getNodesStats().getFs());
assertNull(response.getIndicesStats());

response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(SubMetrics.MAPPINGS.metricName(), SubMetrics.ANALYSIS.metricName()))
.get();
assertNotNull(response.getIndicesStats());
assertNotNull(response.getIndicesStats().getMappings());
assertNotNull(response.getIndicesStats().getAnalysis());
assertNull(response.getIndicesStats().getShards());
assertNull(response.getIndicesStats().getDocs());
assertNull(response.getNodesStats());

response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of(SubMetrics.OS.metricName(), SubMetrics.PROCESS.metricName()))
.get();
assertNotNull(response.getNodesStats());
assertNotNull(response.getNodesStats().getOs());
assertNotNull(response.getNodesStats().getProcess());
assertNull(response.getNodesStats().getPlugins());
assertNull(response.getNodesStats().getFs());
assertNull(response.getIndicesStats());

response = client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(
Set.of(
SubMetrics.OS.metricName(),
SubMetrics.PROCESS.metricName(),
SubMetrics.MAPPINGS.metricName(),
SubMetrics.ANALYSIS.metricName()
)
)
.get();
assertNotNull(response.getIndicesStats());
assertNotNull(response.getIndicesStats().getMappings());
assertNotNull(response.getIndicesStats().getAnalysis());
assertNotNull(response.getNodesStats());
assertNotNull(response.getNodesStats().getOs());
assertNotNull(response.getNodesStats().getProcess());
assertNull(response.getNodesStats().getPlugins());
assertNull(response.getNodesStats().getFs());
assertNull(response.getIndicesStats().getShards());
assertNull(response.getIndicesStats().getDocs());

assertThrows(
IllegalStateException.class,
() -> client().admin()
.cluster()
.prepareClusterStats()
.useAggregatedNodeLevelResponses(randomBoolean())
.requestMetrics(Set.of("random_metric"))
.get()
);

}

private Map<String, Integer> getExpectedCounts(
int dataRoleCount,
int masterRoleCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.SubMetrics;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.xcontent.ToXContentFragment;
Expand All @@ -47,6 +49,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;

/**
* Cluster Stats per index
Expand All @@ -68,14 +72,57 @@ public class ClusterStatsIndices implements ToXContentFragment {
private MappingStats mappings;

public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) {
this(SubMetrics.allSubMetrics(Metric.INDICES), nodeResponses, mappingStats, analysisStats);

}

public ClusterStatsIndices(
Set<String> requestedMetrics,
List<ClusterStatsNodeResponse> nodeResponses,
MappingStats mappingStats,
AnalysisStats analysisStats
) {
Map<String, ShardStats> countsPerIndex = new HashMap<>();

this.docs = new DocsStats();
this.store = new StoreStats();
this.fieldData = new FieldDataStats();
this.queryCache = new QueryCacheStats();
this.completion = new CompletionStats();
this.segments = new SegmentsStats();
Consumer<DocsStats> docsStatsConsumer = (docs) -> {
if (SubMetrics.DOCS.containedIn(requestedMetrics)) {
if (this.docs == null) this.docs = new DocsStats();
this.docs.add(docs);
}
};
Consumer<StoreStats> storeStatsConsumer = (store) -> {
if (SubMetrics.STORE.containedIn(requestedMetrics)) {
if (this.store == null) this.store = new StoreStats();
this.store.add(store);
}
};
Consumer<FieldDataStats> fieldDataConsumer = (fieldDataStats) -> {
if (SubMetrics.FIELDDATA.containedIn(requestedMetrics)) {
if (this.fieldData == null) this.fieldData = new FieldDataStats();
this.fieldData.add(fieldDataStats);
}
};

Consumer<QueryCacheStats> queryCacheStatsConsumer = (queryCacheStats) -> {
if (SubMetrics.QUERY_CACHE.containedIn(requestedMetrics)) {
if (this.queryCache == null) this.queryCache = new QueryCacheStats();
this.queryCache.add(queryCacheStats);
}
};

Consumer<CompletionStats> completionStatsConsumer = (completionStats) -> {
if (SubMetrics.COMPLETION.containedIn(requestedMetrics)) {
if (this.completion == null) this.completion = new CompletionStats();
this.completion.add(completionStats);
}
};

Consumer<SegmentsStats> segmentsStatsConsumer = (segmentsStats) -> {
if (SubMetrics.SEGMENTS.containedIn(requestedMetrics)) {
if (this.segments == null) this.segments = new SegmentsStats();
this.segments.add(segmentsStats);
}
};

for (ClusterStatsNodeResponse r : nodeResponses) {
// Aggregated response from the node
Expand All @@ -92,12 +139,12 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
}
}

docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
docsStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.docs);
storeStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.store);
fieldDataConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCacheStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completionStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.completion);
segmentsStatsConsumer.accept(r.getAggregatedNodeLevelStats().commonStats.segments);
} else {
// Default response from the node
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
Expand All @@ -113,21 +160,23 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
docsStatsConsumer.accept(shardCommonStats.docs);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
storeStatsConsumer.accept(shardCommonStats.store);
fieldDataConsumer.accept(shardCommonStats.fieldData);
queryCacheStatsConsumer.accept(shardCommonStats.queryCache);
completionStatsConsumer.accept(shardCommonStats.completion);
segmentsStatsConsumer.accept(shardCommonStats.segments);
}
}
}

shards = new ShardStats();
indexCount = countsPerIndex.size();
for (final ShardStats indexCountsCursor : countsPerIndex.values()) {
shards.addIndexShardCount(indexCountsCursor);
if (SubMetrics.SHARDS.containedIn(requestedMetrics)) {
shards = new ShardStats();
for (final ShardStats indexCountsCursor : countsPerIndex.values()) {
shards.addIndexShardCount(indexCountsCursor);
}
}

this.mappings = mappingStats;
Expand Down Expand Up @@ -186,13 +235,27 @@ static final class Fields {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.COUNT, indexCount);
shards.toXContent(builder, params);
docs.toXContent(builder, params);
store.toXContent(builder, params);
fieldData.toXContent(builder, params);
queryCache.toXContent(builder, params);
completion.toXContent(builder, params);
segments.toXContent(builder, params);
if (shards != null) {
shards.toXContent(builder, params);
}
if (docs != null) {
docs.toXContent(builder, params);
}
if (store != null) {
store.toXContent(builder, params);
}
if (fieldData != null) {
fieldData.toXContent(builder, params);
}
if (queryCache != null) {
queryCache.toXContent(builder, params);
}
if (completion != null) {
completion.toXContent(builder, params);
}
if (segments != null) {
segments.toXContent(builder, params);
}
if (mappings != null) {
mappings.toXContent(builder, params);
}
Expand Down
Loading

0 comments on commit 9b04911

Please sign in to comment.