Skip to content

Commit

Permalink
Add node performance trackers and performance collector service
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Sep 7, 2023
1 parent d58943d commit ae316e7
Show file tree
Hide file tree
Showing 12 changed files with 740 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466) [#9289](https://github.com/opensearch-project/OpenSearch/pull/9289))
- Added crypto-kms plugin to provide AWS KMS based key providers for encryption/decryption. ([#8465](https://github.com/opensearch-project/OpenSearch/pull/8465))

- Added Performance collector service and node performance stats tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890))
### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
- Bump `forbiddenapis` from 3.3 to 3.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.tracker.NodePerformanceTracker;
import org.opensearch.transport.ProxyConnectionStrategy;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -651,6 +652,12 @@ public void apply(Settings value, Settings current, Settings previous) {
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to admission control
NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING,
NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING,
NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
Expand Down
19 changes: 18 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.tracker.NodePerformanceTracker;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
Expand Down Expand Up @@ -775,6 +776,15 @@ protected Node(
remoteStoreStatsTrackerFactory
);

final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(clusterService);

final NodePerformanceTracker nodePerformanceTracker = new NodePerformanceTracker(
performanceCollectorService,
threadPool,
settings,
clusterService.getClusterSettings()
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -1059,7 +1069,9 @@ protected Node(
searchBackpressureService,
searchPipelineService,
fileCache,
taskCancellationMonitoringService
taskCancellationMonitoringService,
nodePerformanceTracker,
performanceCollectorService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1192,6 +1204,8 @@ protected Node(
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService);
b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
Expand Down Expand Up @@ -1308,6 +1322,7 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
nodeService.getNodePerformanceTracker().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1464,6 +1479,7 @@ private Node stop() {
injector.getInstance(FsHealthService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
nodeService.getNodePerformanceTracker().stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
Expand Down Expand Up @@ -1523,6 +1539,7 @@ public synchronized void close() throws IOException {
toClose.add(() -> stopWatch.stop().start("monitor"));
toClose.add(nodeService.getMonitorService());
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(nodeService.getNodePerformanceTracker());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.tracker.NodePerformanceTracker;
import org.opensearch.transport.TransportService;

import java.io.Closeable;
Expand All @@ -83,10 +84,12 @@ public class NodeService implements Closeable {
private final ScriptService scriptService;
private final HttpServerTransport httpServerTransport;
private final ResponseCollectorService responseCollectorService;
private final PerformanceCollectorService performanceCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
private final SearchBackpressureService searchBackpressureService;
private final NodePerformanceTracker nodePerformanceTracker;
private final SearchPipelineService searchPipelineService;
private final ClusterService clusterService;
private final Discovery discovery;
Expand Down Expand Up @@ -114,7 +117,9 @@ public class NodeService implements Closeable {
SearchBackpressureService searchBackpressureService,
SearchPipelineService searchPipelineService,
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService
TaskCancellationMonitoringService taskCancellationMonitoringService,
NodePerformanceTracker nodePerformanceTracker,
PerformanceCollectorService performanceCollectorService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -137,6 +142,8 @@ public class NodeService implements Closeable {
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.nodePerformanceTracker = nodePerformanceTracker;
this.performanceCollectorService = performanceCollectorService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -261,6 +268,10 @@ public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

public NodePerformanceTracker getNodePerformanceTracker() {
return nodePerformanceTracker;
}

public TaskCancellationMonitoringService getTaskCancellationMonitoringService() {
return taskCancellationMonitoringService;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node;

import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;

/**
* This collects node level performance statistics such as cpu, memory, IO of each node and makes it available for
* coordinator node to aid in throttling, ranking etc
*/
public class PerformanceCollectorService implements ClusterStateListener {
private final ConcurrentMap<String, PerformanceCollectorService.NodePerformanceStatistics> nodeIdToPerfStats = ConcurrentCollections
.newConcurrentMap();

public PerformanceCollectorService(ClusterService clusterService) {
clusterService.addListener(this);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesRemoved()) {
for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) {
removeNode(removedNode.getId());
}
}
}

void removeNode(String nodeId) {
nodeIdToPerfStats.remove(nodeId);
}

public void addNodePerfStatistics(String nodeId, double cpuUsage, double ioUtilization, double memoryUsage, long timestamp) {
nodeIdToPerfStats.compute(nodeId, (id, ns) -> {
if (ns == null) {
return new PerformanceCollectorService.NodePerformanceStatistics(nodeId, cpuUsage, ioUtilization, memoryUsage, timestamp);
} else {
ns.cpuPercent = cpuUsage;
ns.memoryPercent = memoryUsage;
ns.ioUtilizationPercent = ioUtilization;
ns.timestamp = timestamp;
return ns;
}
});
}

/**
* Get all node statistics which will be used for node stats
*/
public Map<String, PerformanceCollectorService.NodePerformanceStatistics> getAllNodeStatistics() {
Map<String, NodePerformanceStatistics> nodeStats = new HashMap<>(nodeIdToPerfStats.size());
nodeIdToPerfStats.forEach((k, v) -> { nodeStats.put(k, new PerformanceCollectorService.NodePerformanceStatistics(v)); });
return nodeStats;
}

/**
* Optionally return a {@code NodePerformanceStatistics} for the given nodeid, if
* performance stats information exists for the given node. Returns an empty
* {@code Optional} if the node was not found.
*/
public Optional<NodePerformanceStatistics> getNodeStatistics(final String nodeId) {
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(ns -> new NodePerformanceStatistics(ns));
}

/**
* This represents the performance stats of a node along with the timestamp at which the stats object was created
* in the respective node
*/
public static class NodePerformanceStatistics implements Writeable {
final String nodeId;
long timestamp;
double cpuPercent;
double ioUtilizationPercent;
double memoryPercent;

public NodePerformanceStatistics(
String nodeId,
double cpuPercent,
double ioUtilizationPercent,
double memoryPercent,
long timestamp
) {
this.nodeId = nodeId;
this.cpuPercent = cpuPercent;
this.ioUtilizationPercent = ioUtilizationPercent;
this.memoryPercent = memoryPercent;
this.timestamp = timestamp;
}

NodePerformanceStatistics(NodePerformanceStatistics nodeStats) {
this(nodeStats.nodeId, nodeStats.cpuPercent, nodeStats.ioUtilizationPercent, nodeStats.memoryPercent, nodeStats.timestamp);
}

public double getMemoryPercent() {
return memoryPercent;
}

@Override
public void writeTo(StreamOutput out) throws IOException {

}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling.tracker;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.MovingAverage;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

/**
* Base class for sliding window resource usage trackers
*/
public abstract class AbstractAverageUsageTracker extends AbstractLifecycleComponent {
private static final Logger LOGGER = LogManager.getLogger(AbstractAverageUsageTracker.class);

private final ThreadPool threadPool;
private final TimeValue pollingInterval;
private final AtomicReference<MovingAverage> observations = new AtomicReference<>();

private volatile Scheduler.Cancellable scheduledFuture;

public AbstractAverageUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) {
this.threadPool = threadPool;
this.pollingInterval = pollingInterval;
this.setWindowDuration(windowDuration);
}

public abstract long getUsage();

public double getAverage() {
return observations.get().getAverage();
}

public void setWindowDuration(TimeValue windowDuration) {
int windowSize = (int) (windowDuration.nanos() / pollingInterval.nanos());
LOGGER.debug("updated window size: {}", windowSize);
observations.set(new MovingAverage(windowSize));
}

@Override
protected void doStart() {
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
long usage = getUsage();
observations.get().record(usage);
}, pollingInterval, ThreadPool.Names.GENERIC);
}

@Override
protected void doStop() {
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
}

@Override
protected void doClose() throws IOException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling.tracker;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.threadpool.ThreadPool;

/**
* AverageCpuUsageTracker tracks the average CPU usage by polling the CPU usage every (pollingInterval)
* and keeping track of the rolling average over a defined time window (windowDuration).
*/
public class AverageCpuUsageTracker extends AbstractAverageUsageTracker {
private static final Logger LOGGER = LogManager.getLogger(AverageCpuUsageTracker.class);

public AverageCpuUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) {
super(threadPool, pollingInterval, windowDuration);
}

@Override
public long getUsage() {
long usage = ProcessProbe.getInstance().getProcessCpuPercent();
LOGGER.debug("Recording cpu usage: {}%", usage);
return usage;
}

}
Loading

0 comments on commit ae316e7

Please sign in to comment.