From dd54b1b5bd2a72b32a0123867ecdec204c6dc9bd Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 7 Sep 2023 19:30:57 +0530 Subject: [PATCH] Add node performance trackers and performance collector service Signed-off-by: Bharathwaj G --- CHANGELOG.md | 2 +- .../admin/cluster/node/stats/NodeStats.java | 25 ++- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../stats/TransportClusterStatsAction.java | 1 + .../common/settings/ClusterSettings.java | 7 + .../node/DownstreamNodePerfStats.java | 64 ++++++ .../main/java/org/opensearch/node/Node.java | 19 +- .../java/org/opensearch/node/NodeService.java | 17 +- .../node/PerformanceCollectorService.java | 163 +++++++++++++++ .../tracker/AbstractAverageUsageTracker.java | 69 +++++++ .../tracker/AverageCpuUsageTracker.java | 35 ++++ .../tracker/AverageMemoryUsageTracker.java | 39 ++++ .../tracker/NodePerformanceTracker.java | 194 ++++++++++++++++++ .../throttling/tracker/package-info.java | 12 ++ .../cluster/node/stats/NodeStatsTests.java | 44 ++++ .../opensearch/cluster/DiskUsageTests.java | 6 + .../PerformanceCollectorServiceTests.java | 163 +++++++++++++++ .../tracker/NodePerformanceTrackerTests.java | 69 +++++++ .../MockInternalClusterInfoService.java | 1 + .../opensearch/test/InternalTestCluster.java | 1 + 21 files changed, 929 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/opensearch/node/DownstreamNodePerfStats.java create mode 100644 server/src/main/java/org/opensearch/node/PerformanceCollectorService.java create mode 100644 server/src/main/java/org/opensearch/throttling/tracker/AbstractAverageUsageTracker.java create mode 100644 server/src/main/java/org/opensearch/throttling/tracker/AverageCpuUsageTracker.java create mode 100644 server/src/main/java/org/opensearch/throttling/tracker/AverageMemoryUsageTracker.java create mode 100644 server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java create mode 100644 server/src/main/java/org/opensearch/throttling/tracker/package-info.java create mode 100644 server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java create mode 100644 server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index fe90492f3d309..18fb4510f8372 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679)) - Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466) [#9289](https://github.com/opensearch-project/OpenSearch/pull/9289)) - Added crypto-kms plugin to provide AWS KMS based key providers for encryption/decryption. ([#8465](https://github.com/opensearch-project/OpenSearch/pull/8465)) - +- Added Performance collector service and node performance stats tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 - Bump `forbiddenapis` from 3.3 to 3.4 diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index dd36b3b8db3ab..a1f6fc801c30a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -56,6 +56,7 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; +import org.opensearch.node.DownstreamNodePerfStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.search.backpressure.stats.SearchBackpressureStats; @@ -142,6 +143,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private SearchPipelineStats searchPipelineStats; + @Nullable + private DownstreamNodePerfStats downstreamNodePerfStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -198,6 +202,11 @@ public NodeStats(StreamInput in) throws IOException { } else { searchPipelineStats = null; } + if (in.getVersion().onOrAfter(Version.V_2_11_0)) { + downstreamNodePerfStats = in.readOptionalWriteable(DownstreamNodePerfStats::new); + } else { + downstreamNodePerfStats = in.readOptionalWriteable(DownstreamNodePerfStats::new); + } } public NodeStats( @@ -216,6 +225,7 @@ public NodeStats( @Nullable DiscoveryStats discoveryStats, @Nullable IngestStats ingestStats, @Nullable AdaptiveSelectionStats adaptiveSelectionStats, + @Nullable DownstreamNodePerfStats downstreamNodePerfStats, @Nullable ScriptCacheStats scriptCacheStats, @Nullable IndexingPressureStats indexingPressureStats, @Nullable ShardIndexingPressureStats shardIndexingPressureStats, @@ -240,7 +250,7 @@ public NodeStats( this.scriptStats = scriptStats; this.discoveryStats = discoveryStats; this.ingestStats = ingestStats; - this.adaptiveSelectionStats = adaptiveSelectionStats; + this.downstreamNodePerfStats = downstreamNodePerfStats; this.scriptCacheStats = scriptCacheStats; this.indexingPressureStats = indexingPressureStats; this.shardIndexingPressureStats = shardIndexingPressureStats; @@ -250,6 +260,7 @@ public NodeStats( this.fileCacheStats = fileCacheStats; this.taskCancellationStats = taskCancellationStats; this.searchPipelineStats = searchPipelineStats; + this.adaptiveSelectionStats = adaptiveSelectionStats; } public long getTimestamp() { @@ -344,6 +355,11 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() { return adaptiveSelectionStats; } + @Nullable + public DownstreamNodePerfStats getNodesPerformanceStats() { + return downstreamNodePerfStats; + } + @Nullable public ScriptCacheStats getScriptCacheStats() { return scriptCacheStats; @@ -430,6 +446,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_9_0)) { out.writeOptionalWriteable(searchPipelineStats); } + if (out.getVersion().onOrAfter(Version.V_2_11_0)) { + out.writeOptionalWriteable(downstreamNodePerfStats); + } } @Override @@ -520,7 +539,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getSearchPipelineStats() != null) { getSearchPipelineStats().toXContent(builder, params); } - + if (getNodesPerformanceStats() != null) { + getNodesPerformanceStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index b0caa469033eb..e9948f0781d68 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -213,7 +213,8 @@ public enum Metric { WEIGHTED_ROUTING_STATS("weighted_routing"), FILE_CACHE_STATS("file_cache"), TASK_CANCELLATION("task_cancellation"), - SEARCH_PIPELINE("search_pipeline"); + SEARCH_PIPELINE("search_pipeline"), + NODES_PERFORMANCE_STATS("nodes_performance_stats"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 615abbaef845d..90272ec7e1318 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -124,7 +124,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics), NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics), NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics), - NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics) + NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics), + NodesStatsRequest.Metric.NODES_PERFORMANCE_STATS.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 18098bc31432f..d8323e209be23 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -168,6 +168,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8c04e01a75fb4..575b3afbb312b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -152,6 +152,7 @@ import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.tracker.NodePerformanceTracker; import org.opensearch.transport.ProxyConnectionStrategy; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; @@ -651,6 +652,12 @@ public void apply(Settings value, Settings current, Settings previous) { SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING, SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS, + // Settings related to admission control + NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING, + NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING, + NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, + NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, + // Settings related to Searchable Snapshots Node.NODE_SEARCH_CACHE_SIZE_SETTING, FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING, diff --git a/server/src/main/java/org/opensearch/node/DownstreamNodePerfStats.java b/server/src/main/java/org/opensearch/node/DownstreamNodePerfStats.java new file mode 100644 index 0000000000000..497fb9f8d3f0f --- /dev/null +++ b/server/src/main/java/org/opensearch/node/DownstreamNodePerfStats.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class DownstreamNodePerfStats implements Writeable, ToXContentFragment { + private final Map nodePerfStats; + + public DownstreamNodePerfStats(Map nodePerfStats) { + this.nodePerfStats = nodePerfStats; + } + + public DownstreamNodePerfStats(StreamInput in) throws IOException { + this.nodePerfStats = in.readMap(StreamInput::readString, PerformanceCollectorService.NodePerformanceStatistics::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.nodePerfStats, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream)); + } + + public Map getNodePerfStats() { + return nodePerfStats; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("nodes_performance_stats"); + for (String nodeId : nodePerfStats.keySet()) { + builder.startObject(nodeId); + PerformanceCollectorService.NodePerformanceStatistics perfStats = nodePerfStats.get(nodeId); + if (perfStats != null) { + + builder.field("cpu_usage_percent", String.format(Locale.ROOT, "%.1f", perfStats.cpuPercent)); + builder.field("memory_usage_percent", String.format(Locale.ROOT, "%.1f", perfStats.memoryPercent)); + builder.field("io_usage_percent", String.format(Locale.ROOT, "%.1f", perfStats.ioUtilizationPercent)); + builder.field( + "elapsed_time", + new TimeValue(System.currentTimeMillis() - perfStats.timestamp, TimeUnit.MILLISECONDS).toString() + ); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d34d4ed3bed3e..b8cdbe9545405 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -229,6 +229,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.tracker.NodePerformanceTracker; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -775,6 +776,15 @@ protected Node( remoteStoreStatsTrackerFactory ); + final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(clusterService); + + final NodePerformanceTracker nodePerformanceTracker = new NodePerformanceTracker( + performanceCollectorService, + threadPool, + settings, + clusterService.getClusterSettings() + ); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); @@ -1059,7 +1069,9 @@ protected Node( searchBackpressureService, searchPipelineService, fileCache, - taskCancellationMonitoringService + taskCancellationMonitoringService, + nodePerformanceTracker, + performanceCollectorService ); final SearchService searchService = newSearchService( @@ -1192,6 +1204,8 @@ protected Node( b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); + b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService); + b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker); b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); @@ -1308,6 +1322,7 @@ public Node start() throws NodeValidationException { nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); + nodeService.getNodePerformanceTracker().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); @@ -1464,6 +1479,7 @@ private Node stop() { injector.getInstance(FsHealthService.class).stop(); nodeService.getMonitorService().stop(); nodeService.getSearchBackpressureService().stop(); + nodeService.getNodePerformanceTracker().stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); @@ -1523,6 +1539,7 @@ public synchronized void close() throws IOException { toClose.add(() -> stopWatch.stop().start("monitor")); toClose.add(nodeService.getMonitorService()); toClose.add(nodeService.getSearchBackpressureService()); + toClose.add(nodeService.getNodePerformanceTracker()); toClose.add(() -> stopWatch.stop().start("fsHealth")); toClose.add(injector.getInstance(FsHealthService.class)); toClose.add(() -> stopWatch.stop().start("gateway")); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 2688b894cb9a7..a501dc54c8378 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -59,6 +59,7 @@ import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.tasks.TaskCancellationMonitoringService; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.tracker.NodePerformanceTracker; import org.opensearch.transport.TransportService; import java.io.Closeable; @@ -83,10 +84,12 @@ public class NodeService implements Closeable { private final ScriptService scriptService; private final HttpServerTransport httpServerTransport; private final ResponseCollectorService responseCollectorService; + private final PerformanceCollectorService performanceCollectorService; private final SearchTransportService searchTransportService; private final IndexingPressureService indexingPressureService; private final AggregationUsageService aggregationUsageService; private final SearchBackpressureService searchBackpressureService; + private final NodePerformanceTracker nodePerformanceTracker; private final SearchPipelineService searchPipelineService; private final ClusterService clusterService; private final Discovery discovery; @@ -114,7 +117,9 @@ public class NodeService implements Closeable { SearchBackpressureService searchBackpressureService, SearchPipelineService searchPipelineService, FileCache fileCache, - TaskCancellationMonitoringService taskCancellationMonitoringService + TaskCancellationMonitoringService taskCancellationMonitoringService, + NodePerformanceTracker nodePerformanceTracker, + PerformanceCollectorService performanceCollectorService ) { this.settings = settings; this.threadPool = threadPool; @@ -137,6 +142,8 @@ public class NodeService implements Closeable { this.clusterService = clusterService; this.fileCache = fileCache; this.taskCancellationMonitoringService = taskCancellationMonitoringService; + this.nodePerformanceTracker = nodePerformanceTracker; + this.performanceCollectorService = performanceCollectorService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); } @@ -217,7 +224,8 @@ public NodeStats stats( boolean weightedRoutingStats, boolean fileCacheStats, boolean taskCancellation, - boolean searchPipelineStats + boolean searchPipelineStats, + boolean nodesPerfStats ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -237,6 +245,7 @@ public NodeStats stats( discoveryStats ? discovery.stats() : null, ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null, + nodesPerfStats ? performanceCollectorService.stats() : null, scriptCache ? scriptService.cacheStats() : null, indexingPressure ? this.indexingPressureService.nodeStats() : null, shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null, @@ -261,6 +270,10 @@ public SearchBackpressureService getSearchBackpressureService() { return searchBackpressureService; } + public NodePerformanceTracker getNodePerformanceTracker() { + return nodePerformanceTracker; + } + public TaskCancellationMonitoringService getTaskCancellationMonitoringService() { return taskCancellationMonitoringService; } diff --git a/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java new file mode 100644 index 0000000000000..41e37a9ca7fd1 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentMap; + +/** + * This collects node level performance statistics such as cpu, memory, IO of each node and makes it available for + * coordinator node to aid in throttling, ranking etc + */ +public class PerformanceCollectorService implements ClusterStateListener { + private final ConcurrentMap nodeIdToPerfStats = ConcurrentCollections + .newConcurrentMap(); + + public PerformanceCollectorService(ClusterService clusterService) { + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + removeNode(removedNode.getId()); + } + } + } + + void removeNode(String nodeId) { + nodeIdToPerfStats.remove(nodeId); + } + + public void addNodePerfStatistics(String nodeId, double cpuUsage, double ioUtilization, double memoryUsage, long timestamp) { + nodeIdToPerfStats.compute(nodeId, (id, ns) -> { + if (ns == null) { + return new PerformanceCollectorService.NodePerformanceStatistics(nodeId, cpuUsage, ioUtilization, memoryUsage, timestamp); + } else { + ns.cpuPercent = cpuUsage; + ns.memoryPercent = memoryUsage; + ns.ioUtilizationPercent = ioUtilization; + ns.timestamp = timestamp; + return ns; + } + }); + } + + /** + * Get all node statistics which will be used for node stats + */ + public Map getAllNodeStatistics() { + Map nodeStats = new HashMap<>(nodeIdToPerfStats.size()); + nodeIdToPerfStats.forEach((k, v) -> { nodeStats.put(k, new PerformanceCollectorService.NodePerformanceStatistics(v)); }); + return nodeStats; + } + + /** + * Optionally return a {@code NodePerformanceStatistics} for the given nodeid, if + * performance stats information exists for the given node. Returns an empty + * {@code Optional} if the node was not found. + */ + public Optional getNodeStatistics(final String nodeId) { + return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(ns -> new NodePerformanceStatistics(ns)); + } + + public DownstreamNodePerfStats stats() { + return new DownstreamNodePerfStats(getAllNodeStatistics()); + } + + /** + * This represents the performance stats of a node along with the timestamp at which the stats object was created + * in the respective node + */ + public static class NodePerformanceStatistics implements Writeable { + final String nodeId; + long timestamp; + double cpuPercent; + double ioUtilizationPercent; + double memoryPercent; + + public NodePerformanceStatistics( + String nodeId, + double cpuPercent, + double ioUtilizationPercent, + double memoryPercent, + long timestamp + ) { + this.nodeId = nodeId; + this.cpuPercent = cpuPercent; + this.ioUtilizationPercent = ioUtilizationPercent; + this.memoryPercent = memoryPercent; + this.timestamp = timestamp; + } + + public NodePerformanceStatistics(StreamInput in) throws IOException { + this.nodeId = in.readString(); + this.cpuPercent = in.readDouble(); + this.ioUtilizationPercent = in.readDouble(); + this.memoryPercent = in.readDouble(); + this.timestamp = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); + out.writeDouble(this.cpuPercent); + out.writeDouble(this.ioUtilizationPercent); + out.writeDouble(this.memoryPercent); + out.writeLong(this.timestamp); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("NodePerformanceStatistics["); + sb.append(nodeId).append("]("); + sb.append("CPU percent: ").append(String.format(Locale.ROOT, "%.1f", cpuPercent)); + sb.append(", IO utilization percent: ").append(String.format(Locale.ROOT, "%.1f", ioUtilizationPercent)); + sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryPercent)); + sb.append(", Timestamp: ").append(memoryPercent); + sb.append(")"); + return sb.toString(); + } + + NodePerformanceStatistics(NodePerformanceStatistics nodeStats) { + this(nodeStats.nodeId, nodeStats.cpuPercent, nodeStats.ioUtilizationPercent, nodeStats.memoryPercent, nodeStats.timestamp); + } + + public double getMemoryPercent() { + return memoryPercent; + } + + public double getCpuPercent() { + return cpuPercent; + } + + public double getIoUtilizationPercent() { + return ioUtilizationPercent; + } + + public long getTimestamp() { + return timestamp; + } + } + +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AbstractAverageUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AbstractAverageUsageTracker.java new file mode 100644 index 0000000000000..b1e1eab521820 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AbstractAverageUsageTracker.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.MovingAverage; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Base class for sliding window resource usage trackers + */ +public abstract class AbstractAverageUsageTracker extends AbstractLifecycleComponent { + private static final Logger LOGGER = LogManager.getLogger(AbstractAverageUsageTracker.class); + + private final ThreadPool threadPool; + private final TimeValue pollingInterval; + private final AtomicReference observations = new AtomicReference<>(); + + private volatile Scheduler.Cancellable scheduledFuture; + + public AbstractAverageUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { + this.threadPool = threadPool; + this.pollingInterval = pollingInterval; + this.setWindowDuration(windowDuration); + } + + public abstract long getUsage(); + + public double getAverage() { + return observations.get().getAverage(); + } + + public void setWindowDuration(TimeValue windowDuration) { + int windowSize = (int) (windowDuration.nanos() / pollingInterval.nanos()); + LOGGER.debug("updated window size: {}", windowSize); + observations.set(new MovingAverage(windowSize)); + } + + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + long usage = getUsage(); + observations.get().record(usage); + }, pollingInterval, ThreadPool.Names.GENERIC); + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() throws IOException {} +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageCpuUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageCpuUsageTracker.java new file mode 100644 index 0000000000000..7f960aaaa1089 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageCpuUsageTracker.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.threadpool.ThreadPool; + +/** + * AverageCpuUsageTracker tracks the average CPU usage by polling the CPU usage every (pollingInterval) + * and keeping track of the rolling average over a defined time window (windowDuration). + */ +public class AverageCpuUsageTracker extends AbstractAverageUsageTracker { + private static final Logger LOGGER = LogManager.getLogger(AverageCpuUsageTracker.class); + + public AverageCpuUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { + super(threadPool, pollingInterval, windowDuration); + } + + @Override + public long getUsage() { + long usage = ProcessProbe.getInstance().getProcessCpuPercent(); + LOGGER.debug("Recording cpu usage: {}%", usage); + return usage; + } + +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageMemoryUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageMemoryUsageTracker.java new file mode 100644 index 0000000000000..0c35f80d7f7bc --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageMemoryUsageTracker.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; + +/** + * AverageMemoryUsageTracker tracks the average JVM usage by polling the JVM usage every (pollingInterval) + * and keeping track of the rolling average over a defined time window (windowDuration). + */ +public class AverageMemoryUsageTracker extends AbstractAverageUsageTracker { + + private static final Logger LOGGER = LogManager.getLogger(AverageMemoryUsageTracker.class); + + private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); + + public AverageMemoryUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { + super(threadPool, pollingInterval, windowDuration); + } + + @Override + public long getUsage() { + long usage = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() * 100 / MEMORY_MX_BEAN.getHeapMemoryUsage().getMax(); + LOGGER.debug("Recording memory usage: {}%", usage); + return usage; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java new file mode 100644 index 0000000000000..8757271b5c8ff --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java @@ -0,0 +1,194 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.PerformanceCollectorService; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; + +/** + * This tracks the performance of node resources such as CPU, IO and memory + */ +public class NodePerformanceTracker extends AbstractLifecycleComponent { + private double cpuPercentUsed; + private double memoryPercentUsed; + private double ioPercentUsed; + private ThreadPool threadPool; + private volatile Scheduler.Cancellable scheduledFuture; + private final Settings settings; + private final ClusterSettings clusterSettings; + private AverageCpuUsageTracker cpuUsageTracker; + private AverageMemoryUsageTracker memoryUsageTracker; + private PerformanceCollectorService performanceCollectorService; + private static final Logger logger = LogManager.getLogger(NodePerformanceTracker.class); + private final TimeValue interval; + private static final long refreshIntervalInMills = 1000; + public static final Setting REFRESH_INTERVAL_MILLIS = Setting.longSetting( + "node.performance_tracker.interval_millis", + refreshIntervalInMills, + 1, + Setting.Property.NodeScope + ); + + public static final String LOCAL_NODE = "LOCAL"; + + public static final Setting GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.global_cpu_usage.polling_interval", + TimeValue.timeValueMillis(500), + Setting.Property.NodeScope + ); + public static final Setting GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.global_cpu_usage.window_duration", + TimeValue.timeValueSeconds(30), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.global_jvmmp.polling_interval", + TimeValue.timeValueMillis(500), + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.global_jvmmp.window_duration", + TimeValue.timeValueSeconds(30), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public NodePerformanceTracker( + PerformanceCollectorService performanceCollectorService, + ThreadPool threadPool, + Settings settings, + ClusterSettings clusterSettings + ) { + this.performanceCollectorService = performanceCollectorService; + this.threadPool = threadPool; + this.settings = settings; + this.clusterSettings = clusterSettings; + interval = new TimeValue(REFRESH_INTERVAL_MILLIS.get(settings)); + initialize(); + } + + private double getAverageCpuUsed() { + return cpuUsageTracker.getAverage(); + } + + private double getAverageMemoryUsed() { + return memoryUsageTracker.getAverage(); + } + + private double getAverageIoUsed() { + // IO utilization percentage - this will be completed after we enhance FS stats + return 0; + } + + public TimeValue getCpuWindow() { + return GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); + } + + private void setCpuPercentUsed(double cpuPercentUsed) { + this.cpuPercentUsed = cpuPercentUsed; + } + + private void setMemoryPercentUsed(double memoryPercentUsed) { + this.memoryPercentUsed = memoryPercentUsed; + } + + private void setIoPercentUsed(double ioPercentUsed) { + this.ioPercentUsed = ioPercentUsed; + } + + public double getCpuPercentUsed() { + return cpuPercentUsed; + } + + public double getIoPercentUsed() { + return ioPercentUsed; + } + + public double getMemoryPercentUsed() { + return memoryPercentUsed; + } + + void doRun() { + setCpuPercentUsed(getAverageCpuUsed()); + setMemoryPercentUsed(getAverageMemoryUsed()); + setIoPercentUsed(getAverageIoUsed()); + performanceCollectorService.addNodePerfStatistics( + LOCAL_NODE, + getCpuPercentUsed(), + getIoPercentUsed(), + getMemoryPercentUsed(), + System.currentTimeMillis() + ); + } + + void initialize() { + cpuUsageTracker = new AverageCpuUsageTracker( + threadPool, + GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings), + GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.get(settings) + ); + + clusterSettings.addSettingsUpdateConsumer( + GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, + duration -> cpuUsageTracker.setWindowDuration(duration) + ); + + memoryUsageTracker = new AverageMemoryUsageTracker( + threadPool, + GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings), + GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.get(settings) + ); + + clusterSettings.addSettingsUpdateConsumer( + GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, + duration -> memoryUsageTracker.setWindowDuration(duration) + ); + } + + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + try { + doRun(); + } catch (Exception e) { + logger.debug("failure in search search backpressure", e); + } + }, interval, ThreadPool.Names.GENERIC); + cpuUsageTracker.doStart(); + memoryUsageTracker.doStart(); + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + cpuUsageTracker.doStop(); + memoryUsageTracker.doStop(); + } + + @Override + protected void doClose() throws IOException { + cpuUsageTracker.doClose(); + memoryUsageTracker.doClose(); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/package-info.java b/server/src/main/java/org/opensearch/throttling/tracker/package-info.java new file mode 100644 index 0000000000000..351e6168279c0 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Node level performance stats tracker package + */ +package org.opensearch.throttling.tracker; diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index b50ce17a3d391..0e6ed8a1d7ae2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -58,6 +58,8 @@ import org.opensearch.monitor.os.OsStats; import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; +import org.opensearch.node.DownstreamNodePerfStats; +import org.opensearch.node.PerformanceCollectorService; import org.opensearch.node.ResponseCollectorService; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; @@ -392,6 +394,22 @@ public void testSerialization() throws IOException { assertEquals(aStats.responseTime, bStats.responseTime, 0.01); }); } + DownstreamNodePerfStats downstreamNodePerfStats = nodeStats.getNodesPerformanceStats(); + DownstreamNodePerfStats deserializedNodePerfStats = deserializedNodeStats.getNodesPerformanceStats(); + if (downstreamNodePerfStats == null) { + assertNull(deserializedNodePerfStats); + } else { + downstreamNodePerfStats.getNodePerfStats().forEach((k, v) -> { + PerformanceCollectorService.NodePerformanceStatistics aPerfStats = downstreamNodePerfStats.getNodePerfStats() + .get(k); + PerformanceCollectorService.NodePerformanceStatistics bPerfStats = downstreamNodePerfStats.getNodePerfStats() + .get(k); + assertEquals(aPerfStats.getMemoryPercent(), bPerfStats.getMemoryPercent(), 0.0); + assertEquals(aPerfStats.getIoUtilizationPercent(), bPerfStats.getIoUtilizationPercent(), 0.0); + assertEquals(aPerfStats.getCpuPercent(), bPerfStats.getCpuPercent(), 0.0); + assertEquals(aPerfStats.getTimestamp(), bPerfStats.getTimestamp()); + }); + } ScriptCacheStats scriptCacheStats = nodeStats.getScriptCacheStats(); ScriptCacheStats deserializedScriptCacheStats = deserializedNodeStats.getScriptCacheStats(); if (scriptCacheStats == null) { @@ -754,6 +772,31 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { } adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats); } + DownstreamNodePerfStats downstreamNodePerfStats = null; + if (frequently()) { + int numNodes = randomIntBetween(0, 10); + Map nodeConnections = new HashMap<>(); + Map nodePerfStats = new HashMap<>(); + for (int i = 0; i < numNodes; i++) { + String nodeId = randomAlphaOfLengthBetween(3, 10); + // add outgoing connection info + if (frequently()) { + nodeConnections.put(nodeId, randomLongBetween(0, 100)); + } + // add node calculations + if (frequently()) { + PerformanceCollectorService.NodePerformanceStatistics stats = new PerformanceCollectorService.NodePerformanceStatistics( + nodeId, + randomDoubleBetween(1.0, 100.0, true), + randomDoubleBetween(1.0, 100.0, true), + randomDoubleBetween(1.0, 100.0, true), + System.currentTimeMillis() + ); + nodePerfStats.put(nodeId, stats); + } + } + downstreamNodePerfStats = new DownstreamNodePerfStats(nodePerfStats); + } ClusterManagerThrottlingStats clusterManagerThrottlingStats = null; if (frequently()) { clusterManagerThrottlingStats = new ClusterManagerThrottlingStats(); @@ -785,6 +828,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { discoveryStats, ingestStats, adaptiveSelectionStats, + downstreamNodePerfStats, scriptCacheStats, null, null, diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index 8ba965b3df1ab..64949cf861f70 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -190,6 +190,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -216,6 +217,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -242,6 +244,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -299,6 +302,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -325,6 +329,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -351,6 +356,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java new file mode 100644 index 0000000000000..30db11a1a2a1d --- /dev/null +++ b/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.greaterThan; + +public class PerformanceCollectorServiceTests extends OpenSearchTestCase { + + private ClusterService clusterService; + private PerformanceCollectorService collector; + private ThreadPool threadpool; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadpool = new TestThreadPool("performance_collector_tests"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadpool + ); + collector = new PerformanceCollectorService(clusterService); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdownNow(); + } + + public void testNodePerformanceStats() { + collector.addNodePerfStatistics("node1", 99, 98, 97, System.currentTimeMillis()); + Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertEquals(99.0, nodeStats.get("node1").cpuPercent, 0.0); + assertEquals(98.0, nodeStats.get("node1").ioUtilizationPercent, 0.0); + assertEquals(97.0, nodeStats.get("node1").memoryPercent, 0.0); + + Optional nodePerformanceStatistics = collector.getNodeStatistics("node1"); + + assertNotNull(nodePerformanceStatistics.get()); + assertEquals(99.0, nodePerformanceStatistics.get().cpuPercent, 0.0); + assertEquals(98.0, nodePerformanceStatistics.get().ioUtilizationPercent, 0.0); + assertEquals(97.0, nodePerformanceStatistics.get().memoryPercent, 0.0); + + nodePerformanceStatistics = collector.getNodeStatistics("node2"); + assertTrue(nodePerformanceStatistics.isEmpty()); + } + + /* + * Test that concurrently adding values and removing nodes does not cause exceptions + */ + public void testConcurrentAddingAndRemovingNodes() throws Exception { + String[] nodes = new String[] { "a", "b", "c", "d" }; + + final CountDownLatch latch = new CountDownLatch(5); + + Runnable f = () -> { + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + fail("should not be interrupted"); + } + for (int i = 0; i < randomIntBetween(100, 200); i++) { + if (randomBoolean()) { + collector.removeNode(randomFrom(nodes)); + } + collector.addNodePerfStatistics( + randomFrom(nodes), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + System.currentTimeMillis() + ); + } + }; + + Thread t1 = new Thread(f); + Thread t2 = new Thread(f); + Thread t3 = new Thread(f); + Thread t4 = new Thread(f); + + t1.start(); + t2.start(); + t3.start(); + t4.start(); + latch.countDown(); + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + final Map nodeStats = collector.getAllNodeStatistics(); + logger.info("--> got stats: {}", nodeStats); + for (String nodeId : nodes) { + if (nodeStats.containsKey(nodeId)) { + assertThat(nodeStats.get(nodeId).memoryPercent, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).cpuPercent, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).ioUtilizationPercent, greaterThan(0.0)); + } + } + } + + public void testNodeRemoval() throws Exception { + collector.addNodePerfStatistics( + "node1", + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + System.currentTimeMillis() + ); + collector.addNodePerfStatistics( + "node2", + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + System.currentTimeMillis() + ); + + ClusterState previousState = ClusterState.builder(new ClusterName("cluster")) + .nodes( + DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node1")) + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9201), "node2")) + ) + .build(); + ClusterState newState = ClusterState.builder(previousState) + .nodes(DiscoveryNodes.builder(previousState.nodes()).remove("node2")) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState); + + collector.clusterChanged(event); + final Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertFalse(nodeStats.containsKey("node2")); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java b/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java new file mode 100644 index 0000000000000..61c24da0be751 --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.PerformanceCollectorService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThan; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class NodePerformanceTrackerTests extends OpenSearchTestCase { + ThreadPool threadPool; + + @Before + public void setup() { + threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void cleanup() { + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + } + + private ClusterService mockClusterService() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return clusterService; + } + + public void testStats() throws InterruptedException { + PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(mockClusterService()); + + NodePerformanceTracker tracker = new NodePerformanceTracker( + performanceCollectorService, + threadPool, + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + tracker.start(); + Thread.sleep(2000); + double memory = tracker.getMemoryPercentUsed(); + assertThat(tracker.getMemoryPercentUsed(), greaterThan(0.0)); + // cpu percent used is mostly 0, so skipping assertion for that + tracker.stop(); + assertTrue(performanceCollectorService.getNodeStatistics(NodePerformanceTracker.LOCAL_NODE).isPresent()); + PerformanceCollectorService.NodePerformanceStatistics perfStats = performanceCollectorService.getNodeStatistics( + NodePerformanceTracker.LOCAL_NODE + ).get(); + assertEquals(memory, perfStats.getMemoryPercent(), 0.0); + assertTrue(performanceCollectorService.getNodeStatistics("Invalid").isEmpty()); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 6354cf18e8b62..3e0d19c5505d1 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -112,6 +112,7 @@ List adjustNodesStats(List nodesStats) { nodeStats.getDiscoveryStats(), nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), + nodeStats.getNodesPerformanceStats(), nodeStats.getScriptCacheStats(), nodeStats.getIndexingPressureStats(), nodeStats.getShardIndexingPressureStats(), diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index d3e24ccd90500..938bf0c1271c9 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2718,6 +2718,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(