Skip to content

Commit

Permalink
Add node performance trackers and performance collector service
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Sep 7, 2023
1 parent d58943d commit dd54b1b
Show file tree
Hide file tree
Showing 21 changed files with 929 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466) [#9289](https://github.com/opensearch-project/OpenSearch/pull/9289))
- Added crypto-kms plugin to provide AWS KMS based key providers for encryption/decryption. ([#8465](https://github.com/opensearch-project/OpenSearch/pull/8465))

- Added Performance collector service and node performance stats tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890))
### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
- Bump `forbiddenapis` from 3.3 to 3.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.DownstreamNodePerfStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -142,6 +143,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchPipelineStats searchPipelineStats;

@Nullable
private DownstreamNodePerfStats downstreamNodePerfStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -198,6 +202,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchPipelineStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_11_0)) {
downstreamNodePerfStats = in.readOptionalWriteable(DownstreamNodePerfStats::new);
} else {
downstreamNodePerfStats = in.readOptionalWriteable(DownstreamNodePerfStats::new);
}
}

public NodeStats(
Expand All @@ -216,6 +225,7 @@ public NodeStats(
@Nullable DiscoveryStats discoveryStats,
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable DownstreamNodePerfStats downstreamNodePerfStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
Expand All @@ -240,7 +250,7 @@ public NodeStats(
this.scriptStats = scriptStats;
this.discoveryStats = discoveryStats;
this.ingestStats = ingestStats;
this.adaptiveSelectionStats = adaptiveSelectionStats;
this.downstreamNodePerfStats = downstreamNodePerfStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
Expand All @@ -250,6 +260,7 @@ public NodeStats(
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
this.adaptiveSelectionStats = adaptiveSelectionStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -344,6 +355,11 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() {
return adaptiveSelectionStats;
}

@Nullable
public DownstreamNodePerfStats getNodesPerformanceStats() {
return downstreamNodePerfStats;
}

@Nullable
public ScriptCacheStats getScriptCacheStats() {
return scriptCacheStats;
Expand Down Expand Up @@ -430,6 +446,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_2_11_0)) {
out.writeOptionalWriteable(downstreamNodePerfStats);
}
}

@Override
Expand Down Expand Up @@ -520,7 +539,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}

if (getNodesPerformanceStats() != null) {
getNodesPerformanceStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ public enum Metric {
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");
SEARCH_PIPELINE("search_pipeline"),
NODES_PERFORMANCE_STATS("nodes_performance_stats");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.NODES_PERFORMANCE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.tracker.NodePerformanceTracker;
import org.opensearch.transport.ProxyConnectionStrategy;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -651,6 +652,12 @@ public void apply(Settings value, Settings current, Settings previous) {
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to admission control
NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING,
NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_POLLING_INTERVAL_SETTING,
NodePerformanceTracker.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
NodePerformanceTracker.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.node;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class DownstreamNodePerfStats implements Writeable, ToXContentFragment {
private final Map<String, PerformanceCollectorService.NodePerformanceStatistics> nodePerfStats;

public DownstreamNodePerfStats(Map<String, PerformanceCollectorService.NodePerformanceStatistics> nodePerfStats) {
this.nodePerfStats = nodePerfStats;
}

public DownstreamNodePerfStats(StreamInput in) throws IOException {
this.nodePerfStats = in.readMap(StreamInput::readString, PerformanceCollectorService.NodePerformanceStatistics::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.nodePerfStats, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
}

public Map<String, PerformanceCollectorService.NodePerformanceStatistics> getNodePerfStats() {
return nodePerfStats;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes_performance_stats");
for (String nodeId : nodePerfStats.keySet()) {
builder.startObject(nodeId);
PerformanceCollectorService.NodePerformanceStatistics perfStats = nodePerfStats.get(nodeId);
if (perfStats != null) {

builder.field("cpu_usage_percent", String.format(Locale.ROOT, "%.1f", perfStats.cpuPercent));
builder.field("memory_usage_percent", String.format(Locale.ROOT, "%.1f", perfStats.memoryPercent));
builder.field("io_usage_percent", String.format(Locale.ROOT, "%.1f", perfStats.ioUtilizationPercent));
builder.field(
"elapsed_time",
new TimeValue(System.currentTimeMillis() - perfStats.timestamp, TimeUnit.MILLISECONDS).toString()
);
}
builder.endObject();
}
builder.endObject();
return builder;
}
}
19 changes: 18 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.tracker.NodePerformanceTracker;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
Expand Down Expand Up @@ -775,6 +776,15 @@ protected Node(
remoteStoreStatsTrackerFactory
);

final PerformanceCollectorService performanceCollectorService = new PerformanceCollectorService(clusterService);

final NodePerformanceTracker nodePerformanceTracker = new NodePerformanceTracker(
performanceCollectorService,
threadPool,
settings,
clusterService.getClusterSettings()
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -1059,7 +1069,9 @@ protected Node(
searchBackpressureService,
searchPipelineService,
fileCache,
taskCancellationMonitoringService
taskCancellationMonitoringService,
nodePerformanceTracker,
performanceCollectorService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1192,6 +1204,8 @@ protected Node(
b.bind(RerouteService.class).toInstance(rerouteService);
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
b.bind(FsHealthService.class).toInstance(fsHealthService);
b.bind(PerformanceCollectorService.class).toInstance(performanceCollectorService);
b.bind(NodePerformanceTracker.class).toInstance(nodePerformanceTracker);
b.bind(SystemIndices.class).toInstance(systemIndices);
b.bind(IdentityService.class).toInstance(identityService);
b.bind(Tracer.class).toInstance(tracer);
Expand Down Expand Up @@ -1308,6 +1322,7 @@ public Node start() throws NodeValidationException {
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
nodeService.getNodePerformanceTracker().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1464,6 +1479,7 @@ private Node stop() {
injector.getInstance(FsHealthService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
nodeService.getNodePerformanceTracker().stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
Expand Down Expand Up @@ -1523,6 +1539,7 @@ public synchronized void close() throws IOException {
toClose.add(() -> stopWatch.stop().start("monitor"));
toClose.add(nodeService.getMonitorService());
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(nodeService.getNodePerformanceTracker());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
Expand Down
17 changes: 15 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.tracker.NodePerformanceTracker;
import org.opensearch.transport.TransportService;

import java.io.Closeable;
Expand All @@ -83,10 +84,12 @@ public class NodeService implements Closeable {
private final ScriptService scriptService;
private final HttpServerTransport httpServerTransport;
private final ResponseCollectorService responseCollectorService;
private final PerformanceCollectorService performanceCollectorService;
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
private final SearchBackpressureService searchBackpressureService;
private final NodePerformanceTracker nodePerformanceTracker;
private final SearchPipelineService searchPipelineService;
private final ClusterService clusterService;
private final Discovery discovery;
Expand Down Expand Up @@ -114,7 +117,9 @@ public class NodeService implements Closeable {
SearchBackpressureService searchBackpressureService,
SearchPipelineService searchPipelineService,
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService
TaskCancellationMonitoringService taskCancellationMonitoringService,
NodePerformanceTracker nodePerformanceTracker,
PerformanceCollectorService performanceCollectorService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -137,6 +142,8 @@ public class NodeService implements Closeable {
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.nodePerformanceTracker = nodePerformanceTracker;
this.performanceCollectorService = performanceCollectorService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -217,7 +224,8 @@ public NodeStats stats(
boolean weightedRoutingStats,
boolean fileCacheStats,
boolean taskCancellation,
boolean searchPipelineStats
boolean searchPipelineStats,
boolean nodesPerfStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -237,6 +245,7 @@ public NodeStats stats(
discoveryStats ? discovery.stats() : null,
ingest ? ingestService.stats() : null,
adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null,
nodesPerfStats ? performanceCollectorService.stats() : null,
scriptCache ? scriptService.cacheStats() : null,
indexingPressure ? this.indexingPressureService.nodeStats() : null,
shardIndexingPressure ? this.indexingPressureService.shardStats(indices) : null,
Expand All @@ -261,6 +270,10 @@ public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

public NodePerformanceTracker getNodePerformanceTracker() {
return nodePerformanceTracker;
}

public TaskCancellationMonitoringService getTaskCancellationMonitoringService() {
return taskCancellationMonitoringService;
}
Expand Down
Loading

0 comments on commit dd54b1b

Please sign in to comment.