diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8c04e01a75fb4..575b3afbb312b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -152,6 +152,7 @@ import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.tracker.NodePerformanceTracker; import org.opensearch.transport.ProxyConnectionStrategy; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; @@ -651,6 +652,12 @@ public void apply(Settings value, Settings current, Settings previous) { SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING, SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS, + // Settings related to admission control + NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING, + NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING, + NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, + NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, + // Settings related to Searchable Snapshots Node.NODE_SEARCH_CACHE_SIZE_SETTING, FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d34d4ed3bed3e..b8cdbe9545405 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -229,6 +229,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.tracker.NodePerformanceTracker; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -775,6 +776,15 @@ protected Node( remoteStoreStatsTrackerFactory ); + final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(clusterService); + + final NodePerformanceTracker nodePerformanceTracker = new NodePerformanceTracker( + performanceCollectorService, + threadPool, + settings, + clusterService.getClusterSettings() + ); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); @@ -1059,7 +1069,9 @@ protected Node( searchBackpressureService, searchPipelineService, fileCache, - taskCancellationMonitoringService + taskCancellationMonitoringService, + nodePerformanceTracker, + performanceCollectorService ); final SearchService searchService = newSearchService( @@ -1192,6 +1204,8 @@ protected Node( b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); + b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService); + b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker); b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); @@ -1308,6 +1322,7 @@ public Node start() throws NodeValidationException { nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); + nodeService.getNodePerformanceTracker().start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); @@ -1464,6 +1479,7 @@ private Node stop() { injector.getInstance(FsHealthService.class).stop(); nodeService.getMonitorService().stop(); nodeService.getSearchBackpressureService().stop(); + nodeService.getNodePerformanceTracker().stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); @@ -1523,6 +1539,7 @@ public synchronized void close() throws IOException { toClose.add(() -> stopWatch.stop().start("monitor")); toClose.add(nodeService.getMonitorService()); toClose.add(nodeService.getSearchBackpressureService()); + toClose.add(nodeService.getNodePerformanceTracker()); toClose.add(() -> stopWatch.stop().start("fsHealth")); toClose.add(injector.getInstance(FsHealthService.class)); toClose.add(() -> stopWatch.stop().start("gateway")); diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 2688b894cb9a7..e56580b6777ef 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -59,6 +59,7 @@ import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.tasks.TaskCancellationMonitoringService; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.tracker.NodePerformanceTracker; import org.opensearch.transport.TransportService; import java.io.Closeable; @@ -83,10 +84,12 @@ public class NodeService implements Closeable { private final ScriptService scriptService; private final HttpServerTransport httpServerTransport; private final ResponseCollectorService responseCollectorService; + private final PerformanceCollectorService performanceCollectorService; private final SearchTransportService searchTransportService; private final IndexingPressureService indexingPressureService; private final AggregationUsageService aggregationUsageService; private final SearchBackpressureService searchBackpressureService; + private final NodePerformanceTracker nodePerformanceTracker; private final SearchPipelineService searchPipelineService; private final ClusterService clusterService; private final Discovery discovery; @@ -114,7 +117,9 @@ public class NodeService implements Closeable { SearchBackpressureService searchBackpressureService, SearchPipelineService searchPipelineService, FileCache fileCache, - TaskCancellationMonitoringService taskCancellationMonitoringService + TaskCancellationMonitoringService taskCancellationMonitoringService, + NodePerformanceTracker nodePerformanceTracker, + PerformanceCollectorService performanceCollectorService ) { this.settings = settings; this.threadPool = threadPool; @@ -137,6 +142,8 @@ public class NodeService implements Closeable { this.clusterService = clusterService; this.fileCache = fileCache; this.taskCancellationMonitoringService = taskCancellationMonitoringService; + this.nodePerformanceTracker = nodePerformanceTracker; + this.performanceCollectorService = performanceCollectorService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); } @@ -261,6 +268,10 @@ public SearchBackpressureService getSearchBackpressureService() { return searchBackpressureService; } + public NodePerformanceTracker getNodePerformanceTracker() { + return nodePerformanceTracker; + } + public TaskCancellationMonitoringService getTaskCancellationMonitoringService() { return taskCancellationMonitoringService; } diff --git a/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java new file mode 100644 index 0000000000000..6b7085d1851c9 --- /dev/null +++ b/server/src/main/java/org/opensearch/node/PerformanceCollectorService.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ConcurrentCollections; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentMap; + +/** + * This collects node level performance statistics such as cpu, memory, IO of each node and makes it available for + * coordinator node to aid in throttling, ranking etc + */ +public class PerformanceCollectorService implements ClusterStateListener { + private final ConcurrentMap nodeIdToPerfStats = ConcurrentCollections + .newConcurrentMap(); + + public PerformanceCollectorService(ClusterService clusterService) { + clusterService.addListener(this); + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + removeNode(removedNode.getId()); + } + } + } + + void removeNode(String nodeId) { + nodeIdToPerfStats.remove(nodeId); + } + + public void addNodePerfStatistics(String nodeId, double cpuUsage, double ioUtilization, double memoryUsage, long timestamp) { + nodeIdToPerfStats.compute(nodeId, (id, ns) -> { + if (ns == null) { + return new PerformanceCollectorService.NodePerformanceStatistics(nodeId, cpuUsage, ioUtilization, memoryUsage, timestamp); + } else { + ns.cpuPercent = cpuUsage; + ns.memoryPercent = memoryUsage; + ns.ioUtilizationPercent = ioUtilization; + ns.timestamp = timestamp; + return ns; + } + }); + } + + public Map getAllNodeStatistics() { + Map nodeStats = new HashMap<>(nodeIdToPerfStats.size()); + nodeIdToPerfStats.forEach((k, v) -> { nodeStats.put(k, new PerformanceCollectorService.NodePerformanceStatistics(v)); }); + return nodeStats; + } + + /** + * Optionally return a {@code NodePerformanceStatistics} for the given nodeid, if + * performance stats information exists for the given node. Returns an empty + * {@code Optional} if the node was not found. + */ + public Optional getNodeStatistics(final String nodeId) { + return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(ns -> new NodePerformanceStatistics(ns)); + } + + public static class NodePerformanceStatistics { + final String nodeId; + long timestamp; + double cpuPercent; + double ioUtilizationPercent; + double memoryPercent; + + public NodePerformanceStatistics( + String nodeId, + double cpuPercent, + double ioUtilizationPercent, + double memoryPercent, + long timestamp + ) { + this.nodeId = nodeId; + this.cpuPercent = cpuPercent; + this.ioUtilizationPercent = ioUtilizationPercent; + this.memoryPercent = memoryPercent; + this.timestamp = timestamp; + } + + NodePerformanceStatistics(NodePerformanceStatistics nodeStats) { + this(nodeStats.nodeId, nodeStats.cpuPercent, nodeStats.ioUtilizationPercent, nodeStats.memoryPercent, nodeStats.timestamp); + } + + public double getMemoryPercent() { + return memoryPercent; + } + } + +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AbstractAverageUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AbstractAverageUsageTracker.java new file mode 100644 index 0000000000000..b1e1eab521820 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AbstractAverageUsageTracker.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.MovingAverage; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Base class for sliding window resource usage trackers + */ +public abstract class AbstractAverageUsageTracker extends AbstractLifecycleComponent { + private static final Logger LOGGER = LogManager.getLogger(AbstractAverageUsageTracker.class); + + private final ThreadPool threadPool; + private final TimeValue pollingInterval; + private final AtomicReference observations = new AtomicReference<>(); + + private volatile Scheduler.Cancellable scheduledFuture; + + public AbstractAverageUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { + this.threadPool = threadPool; + this.pollingInterval = pollingInterval; + this.setWindowDuration(windowDuration); + } + + public abstract long getUsage(); + + public double getAverage() { + return observations.get().getAverage(); + } + + public void setWindowDuration(TimeValue windowDuration) { + int windowSize = (int) (windowDuration.nanos() / pollingInterval.nanos()); + LOGGER.debug("updated window size: {}", windowSize); + observations.set(new MovingAverage(windowSize)); + } + + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + long usage = getUsage(); + observations.get().record(usage); + }, pollingInterval, ThreadPool.Names.GENERIC); + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() throws IOException {} +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageCpuUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageCpuUsageTracker.java new file mode 100644 index 0000000000000..7f960aaaa1089 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageCpuUsageTracker.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.threadpool.ThreadPool; + +/** + * AverageCpuUsageTracker tracks the average CPU usage by polling the CPU usage every (pollingInterval) + * and keeping track of the rolling average over a defined time window (windowDuration). + */ +public class AverageCpuUsageTracker extends AbstractAverageUsageTracker { + private static final Logger LOGGER = LogManager.getLogger(AverageCpuUsageTracker.class); + + public AverageCpuUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { + super(threadPool, pollingInterval, windowDuration); + } + + @Override + public long getUsage() { + long usage = ProcessProbe.getInstance().getProcessCpuPercent(); + LOGGER.debug("Recording cpu usage: {}%", usage); + return usage; + } + +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/AverageMemoryUsageTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/AverageMemoryUsageTracker.java new file mode 100644 index 0000000000000..0c35f80d7f7bc --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/AverageMemoryUsageTracker.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; + +/** + * AverageMemoryUsageTracker tracks the average JVM usage by polling the JVM usage every (pollingInterval) + * and keeping track of the rolling average over a defined time window (windowDuration). + */ +public class AverageMemoryUsageTracker extends AbstractAverageUsageTracker { + + private static final Logger LOGGER = LogManager.getLogger(AverageMemoryUsageTracker.class); + + private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean(); + + public AverageMemoryUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) { + super(threadPool, pollingInterval, windowDuration); + } + + @Override + public long getUsage() { + long usage = MEMORY_MX_BEAN.getHeapMemoryUsage().getUsed() * 100 / MEMORY_MX_BEAN.getHeapMemoryUsage().getMax(); + LOGGER.debug("Recording memory usage: {}%", usage); + return usage; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java new file mode 100644 index 0000000000000..8757271b5c8ff --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/NodePerformanceTracker.java @@ -0,0 +1,194 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.PerformanceCollectorService; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; + +/** + * This tracks the performance of node resources such as CPU, IO and memory + */ +public class NodePerformanceTracker extends AbstractLifecycleComponent { + private double cpuPercentUsed; + private double memoryPercentUsed; + private double ioPercentUsed; + private ThreadPool threadPool; + private volatile Scheduler.Cancellable scheduledFuture; + private final Settings settings; + private final ClusterSettings clusterSettings; + private AverageCpuUsageTracker cpuUsageTracker; + private AverageMemoryUsageTracker memoryUsageTracker; + private PerformanceCollectorService performanceCollectorService; + private static final Logger logger = LogManager.getLogger(NodePerformanceTracker.class); + private final TimeValue interval; + private static final long refreshIntervalInMills = 1000; + public static final Setting REFRESH_INTERVAL_MILLIS = Setting.longSetting( + "node.performance_tracker.interval_millis", + refreshIntervalInMills, + 1, + Setting.Property.NodeScope + ); + + public static final String LOCAL_NODE = "LOCAL"; + + public static final Setting GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.global_cpu_usage.polling_interval", + TimeValue.timeValueMillis(500), + Setting.Property.NodeScope + ); + public static final Setting GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.global_cpu_usage.window_duration", + TimeValue.timeValueSeconds(30), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( + "node.global_jvmmp.polling_interval", + TimeValue.timeValueMillis(500), + Setting.Property.NodeScope + ); + + public static final Setting GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING = Setting.positiveTimeSetting( + "node.global_jvmmp.window_duration", + TimeValue.timeValueSeconds(30), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public NodePerformanceTracker( + PerformanceCollectorService performanceCollectorService, + ThreadPool threadPool, + Settings settings, + ClusterSettings clusterSettings + ) { + this.performanceCollectorService = performanceCollectorService; + this.threadPool = threadPool; + this.settings = settings; + this.clusterSettings = clusterSettings; + interval = new TimeValue(REFRESH_INTERVAL_MILLIS.get(settings)); + initialize(); + } + + private double getAverageCpuUsed() { + return cpuUsageTracker.getAverage(); + } + + private double getAverageMemoryUsed() { + return memoryUsageTracker.getAverage(); + } + + private double getAverageIoUsed() { + // IO utilization percentage - this will be completed after we enhance FS stats + return 0; + } + + public TimeValue getCpuWindow() { + return GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings); + } + + private void setCpuPercentUsed(double cpuPercentUsed) { + this.cpuPercentUsed = cpuPercentUsed; + } + + private void setMemoryPercentUsed(double memoryPercentUsed) { + this.memoryPercentUsed = memoryPercentUsed; + } + + private void setIoPercentUsed(double ioPercentUsed) { + this.ioPercentUsed = ioPercentUsed; + } + + public double getCpuPercentUsed() { + return cpuPercentUsed; + } + + public double getIoPercentUsed() { + return ioPercentUsed; + } + + public double getMemoryPercentUsed() { + return memoryPercentUsed; + } + + void doRun() { + setCpuPercentUsed(getAverageCpuUsed()); + setMemoryPercentUsed(getAverageMemoryUsed()); + setIoPercentUsed(getAverageIoUsed()); + performanceCollectorService.addNodePerfStatistics( + LOCAL_NODE, + getCpuPercentUsed(), + getIoPercentUsed(), + getMemoryPercentUsed(), + System.currentTimeMillis() + ); + } + + void initialize() { + cpuUsageTracker = new AverageCpuUsageTracker( + threadPool, + GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings), + GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.get(settings) + ); + + clusterSettings.addSettingsUpdateConsumer( + GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING, + duration -> cpuUsageTracker.setWindowDuration(duration) + ); + + memoryUsageTracker = new AverageMemoryUsageTracker( + threadPool, + GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING.get(settings), + GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.get(settings) + ); + + clusterSettings.addSettingsUpdateConsumer( + GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING, + duration -> memoryUsageTracker.setWindowDuration(duration) + ); + } + + @Override + protected void doStart() { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + try { + doRun(); + } catch (Exception e) { + logger.debug("failure in search search backpressure", e); + } + }, interval, ThreadPool.Names.GENERIC); + cpuUsageTracker.doStart(); + memoryUsageTracker.doStart(); + } + + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + cpuUsageTracker.doStop(); + memoryUsageTracker.doStop(); + } + + @Override + protected void doClose() throws IOException { + cpuUsageTracker.doClose(); + memoryUsageTracker.doClose(); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/tracker/package-info.java b/server/src/main/java/org/opensearch/throttling/tracker/package-info.java new file mode 100644 index 0000000000000..351e6168279c0 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/tracker/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Node level performance stats tracker package + */ +package org.opensearch.throttling.tracker; diff --git a/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java new file mode 100644 index 0000000000000..30db11a1a2a1d --- /dev/null +++ b/server/src/test/java/org/opensearch/node/PerformanceCollectorServiceTests.java @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.Matchers.greaterThan; + +public class PerformanceCollectorServiceTests extends OpenSearchTestCase { + + private ClusterService clusterService; + private PerformanceCollectorService collector; + private ThreadPool threadpool; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadpool = new TestThreadPool("performance_collector_tests"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadpool + ); + collector = new PerformanceCollectorService(clusterService); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdownNow(); + } + + public void testNodePerformanceStats() { + collector.addNodePerfStatistics("node1", 99, 98, 97, System.currentTimeMillis()); + Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertEquals(99.0, nodeStats.get("node1").cpuPercent, 0.0); + assertEquals(98.0, nodeStats.get("node1").ioUtilizationPercent, 0.0); + assertEquals(97.0, nodeStats.get("node1").memoryPercent, 0.0); + + Optional nodePerformanceStatistics = collector.getNodeStatistics("node1"); + + assertNotNull(nodePerformanceStatistics.get()); + assertEquals(99.0, nodePerformanceStatistics.get().cpuPercent, 0.0); + assertEquals(98.0, nodePerformanceStatistics.get().ioUtilizationPercent, 0.0); + assertEquals(97.0, nodePerformanceStatistics.get().memoryPercent, 0.0); + + nodePerformanceStatistics = collector.getNodeStatistics("node2"); + assertTrue(nodePerformanceStatistics.isEmpty()); + } + + /* + * Test that concurrently adding values and removing nodes does not cause exceptions + */ + public void testConcurrentAddingAndRemovingNodes() throws Exception { + String[] nodes = new String[] { "a", "b", "c", "d" }; + + final CountDownLatch latch = new CountDownLatch(5); + + Runnable f = () -> { + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + fail("should not be interrupted"); + } + for (int i = 0; i < randomIntBetween(100, 200); i++) { + if (randomBoolean()) { + collector.removeNode(randomFrom(nodes)); + } + collector.addNodePerfStatistics( + randomFrom(nodes), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + System.currentTimeMillis() + ); + } + }; + + Thread t1 = new Thread(f); + Thread t2 = new Thread(f); + Thread t3 = new Thread(f); + Thread t4 = new Thread(f); + + t1.start(); + t2.start(); + t3.start(); + t4.start(); + latch.countDown(); + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + final Map nodeStats = collector.getAllNodeStatistics(); + logger.info("--> got stats: {}", nodeStats); + for (String nodeId : nodes) { + if (nodeStats.containsKey(nodeId)) { + assertThat(nodeStats.get(nodeId).memoryPercent, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).cpuPercent, greaterThan(0.0)); + assertThat(nodeStats.get(nodeId).ioUtilizationPercent, greaterThan(0.0)); + } + } + } + + public void testNodeRemoval() throws Exception { + collector.addNodePerfStatistics( + "node1", + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + System.currentTimeMillis() + ); + collector.addNodePerfStatistics( + "node2", + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + System.currentTimeMillis() + ); + + ClusterState previousState = ClusterState.builder(new ClusterName("cluster")) + .nodes( + DiscoveryNodes.builder() + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9200), "node1")) + .add(DiscoveryNode.createLocal(Settings.EMPTY, new TransportAddress(TransportAddress.META_ADDRESS, 9201), "node2")) + ) + .build(); + ClusterState newState = ClusterState.builder(previousState) + .nodes(DiscoveryNodes.builder(previousState.nodes()).remove("node2")) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState); + + collector.clusterChanged(event); + final Map nodeStats = collector.getAllNodeStatistics(); + assertTrue(nodeStats.containsKey("node1")); + assertFalse(nodeStats.containsKey("node2")); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java b/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java new file mode 100644 index 0000000000000..61c24da0be751 --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/tracker/NodePerformanceTrackerTests.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.tracker; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.PerformanceCollectorService; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThan; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class NodePerformanceTrackerTests extends OpenSearchTestCase { + ThreadPool threadPool; + + @Before + public void setup() { + threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void cleanup() { + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); + } + + private ClusterService mockClusterService() { + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + return clusterService; + } + + public void testStats() throws InterruptedException { + PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(mockClusterService()); + + NodePerformanceTracker tracker = new NodePerformanceTracker( + performanceCollectorService, + threadPool, + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + tracker.start(); + Thread.sleep(2000); + double memory = tracker.getMemoryPercentUsed(); + assertThat(tracker.getMemoryPercentUsed(), greaterThan(0.0)); + // cpu percent used is mostly 0, so skipping assertion for that + tracker.stop(); + assertTrue(performanceCollectorService.getNodeStatistics(NodePerformanceTracker.LOCAL_NODE).isPresent()); + PerformanceCollectorService.NodePerformanceStatistics perfStats = performanceCollectorService.getNodeStatistics( + NodePerformanceTracker.LOCAL_NODE + ).get(); + assertEquals(memory, perfStats.getMemoryPercent(), 0.0); + assertTrue(performanceCollectorService.getNodeStatistics("Invalid").isEmpty()); + } +}