From 84379d52f62cbd6ef2280fef3fc8f0ceb1da5037 Mon Sep 17 00:00:00 2001 From: Dev Agarwal Date: Tue, 11 Jun 2024 18:56:46 +0530 Subject: [PATCH] [Backport 2.x] changes to merge PA-RTF (#662) * changes to merge PA-RTF (#651) * PA RTF merging init Signed-off-by: Atharva Sharma * working model Signed-off-by: Atharva Sharma * working model tip Signed-off-by: Atharva Sharma * functional model init Signed-off-by: Atharva Sharma * Migrated HeapMetricsCollector Signed-off-by: Atharva Sharma * Added RTFThreadPoolMetricsCollector Signed-off-by: Atharva Sharma * migrated NodeStats and DiskMetricsCollector Signed-off-by: Atharva Sharma * Added gauge data model for Heap_Max metric Signed-off-by: Atharva Sharma * implemented TelemetryAwarePlugin Signed-off-by: Atharva Sharma * Framework changes for PA RTF merging Signed-off-by: Atharva Sharma * refactored Signed-off-by: Atharva Sharma * spotless applied Signed-off-by: Atharva Sharma * Addressed small comments Signed-off-by: Atharva Sharma * Added different flag for RCA collectors Signed-off-by: Atharva Sharma * Addressed more comments Signed-off-by: Atharva Sharma * Added RTF collectors in config map Signed-off-by: Atharva Sharma * Added UTs Signed-off-by: Atharva Sharma * Added further UTs Signed-off-by: Atharva Sharma * Added dynamic control support to all collectors Signed-off-by: Atharva Sharma * fixed UT Signed-off-by: Atharva Sharma * refactoring Signed-off-by: Atharva Sharma * Revert "refactoring" This reverts commit 25d66e8bb3554d9d7674a87201beee0b08a2a33d. Signed-off-by: Atharva Sharma * Revert "fixed UT" This reverts commit 369bd957f473c52ae39284f1d7257e254be8d036. Signed-off-by: Atharva Sharma * Revert "Added dynamic control support to all collectors" This reverts commit 447e15f15084ce911b8944446e54d8d36e0e2a6f. Signed-off-by: Atharva Sharma * Adding two new collector interfaces Signed-off-by: Atharva Sharma * simplified interfaces Signed-off-by: Atharva Sharma * Added units and javadocs Signed-off-by: Atharva Sharma * Changes metrics semantic conventions Signed-off-by: Atharva Sharma * refactored Signed-off-by: Atharva Sharma * fixed UT Signed-off-by: Atharva Sharma * Added stats metrics for rtf collectors Signed-off-by: Atharva Sharma * reverted test delete Signed-off-by: Atharva Sharma * Fixes javadoc compilation issue Signed-off-by: Gagan Juneja --------- Signed-off-by: Atharva Sharma Signed-off-by: Gagan Juneja Co-authored-by: Gagan Juneja * Empty-Commit Signed-off-by: Dev Agarwal --------- Signed-off-by: Atharva Sharma Signed-off-by: Gagan Juneja Signed-off-by: Dev Agarwal Co-authored-by: Atharva Sharma <60044988+atharvasharma61@users.noreply.github.com> Co-authored-by: Gagan Juneja --- .../OpenSearchResources.java | 11 + .../PerformanceAnalyzerPlugin.java | 85 +++- .../collectors/ValueCalculator.java | 2 +- .../telemetry/RTFDisksCollector.java | 113 +++++ .../telemetry/RTFHeapMetricsCollector.java | 130 ++++++ ...RTFNodeStatsAllShardsMetricsCollector.java | 405 ++++++++++++++++++ .../RTFThreadPoolMetricsCollector.java | 228 ++++++++++ .../config/PerformanceAnalyzerController.java | 15 + .../PerformanceAnalyzerClusterSettings.java | 11 + ...manceAnalyzerCollectorsSettingHandler.java | 65 +++ ...erformanceAnalyzerClusterConfigAction.java | 21 +- .../performanceanalyzer/util/Utils.java | 8 + .../PerformanceAnalyzerPluginTests.java | 27 +- .../collectors/CollectorTestBase.java | 30 ++ .../telemetry/RTFDisksCollectorTests.java | 42 ++ .../RTFHeapMetricsCollectorTests.java | 68 +++ ...deStatsAllShardsMetricsCollectorTests.java | 138 ++++++ .../RTFThreadPoolMetricsCollectorTests.java | 91 ++++ ...AnalyzerCollectorsSettingHandlerTests.java | 41 ++ ...manceAnalyzerClusterConfigActionTests.java | 10 +- 20 files changed, 1512 insertions(+), 29 deletions(-) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollector.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollector.java create mode 100644 src/main/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandler.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/CollectorTestBase.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollectorTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollectorTests.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandlerTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/OpenSearchResources.java b/src/main/java/org/opensearch/performanceanalyzer/OpenSearchResources.java index c0540642..0ddab1a6 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/OpenSearchResources.java +++ b/src/main/java/org/opensearch/performanceanalyzer/OpenSearchResources.java @@ -11,6 +11,7 @@ import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.env.Environment; import org.opensearch.indices.IndicesService; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.ThreadPool; public final class OpenSearchResources { @@ -20,6 +21,7 @@ public final class OpenSearchResources { private CircuitBreakerService circuitBreakerService; private ClusterService clusterService; private IndicesService indicesService; + private MetricsRegistry metricsRegistry; private Settings settings; private Environment environment; private java.nio.file.Path configPath; @@ -32,6 +34,7 @@ private OpenSearchResources() { clusterService = null; settings = null; indicesService = null; + metricsRegistry = null; environment = null; configPath = null; pluginFileLocation = null; @@ -101,6 +104,14 @@ public void setIndicesService(IndicesService indicesService) { this.indicesService = indicesService; } + public MetricsRegistry getMetricsRegistry() { + return metricsRegistry; + } + + public void setMetricsRegistry(MetricsRegistry metricsRegistry) { + this.metricsRegistry = metricsRegistry; + } + public void setClient(final Client client) { this.client = client; } diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index f2fe6cc4..28421766 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -54,6 +54,10 @@ import org.opensearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector; import org.opensearch.performanceanalyzer.collectors.ShardStateCollector; import org.opensearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector; import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory; import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector; import org.opensearch.performanceanalyzer.commons.collectors.GCInfoCollector; @@ -73,6 +77,7 @@ import org.opensearch.performanceanalyzer.config.setting.handler.ConfigOverridesClusterSettingHandler; import org.opensearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler; import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerClusterSettingHandler; +import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerCollectorsSettingHandler; import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerClusterConfigAction; import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerConfigAction; import org.opensearch.performanceanalyzer.http_action.config.PerformanceAnalyzerOverridesClusterConfigAction; @@ -87,9 +92,11 @@ import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPlugin; +import org.opensearch.plugins.TelemetryAwarePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.script.ScriptService; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; @@ -97,7 +104,7 @@ import org.opensearch.watcher.ResourceWatcherService; public final class PerformanceAnalyzerPlugin extends Plugin - implements ActionPlugin, NetworkPlugin, SearchPlugin { + implements ActionPlugin, NetworkPlugin, SearchPlugin, TelemetryAwarePlugin { private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerPlugin.class); public static final String PLUGIN_NAME = "opensearch-performance-analyzer"; private static final String ADD_FAULT_DETECTION_METHOD = "addFaultDetectionListener"; @@ -107,6 +114,8 @@ public final class PerformanceAnalyzerPlugin extends Plugin private static SecurityManager sm = null; private final PerformanceAnalyzerClusterSettingHandler perfAnalyzerClusterSettingHandler; private final NodeStatsSettingHandler nodeStatsSettingHandler; + private final PerformanceAnalyzerCollectorsSettingHandler + performanceAnalyzerCollectorsSettingHandler; private final ConfigOverridesClusterSettingHandler configOverridesClusterSettingHandler; private final ConfigOverridesWrapper configOverridesWrapper; private final PerformanceAnalyzerController performanceAnalyzerController; @@ -165,7 +174,8 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa new ClusterSettingsManager( Arrays.asList( PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING, - PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING), + PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, + PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING), Collections.singletonList( PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING)); configOverridesClusterSettingHandler = @@ -188,27 +198,62 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa clusterSettingsManager.addSubscriberForIntSetting( PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, nodeStatsSettingHandler); + performanceAnalyzerCollectorsSettingHandler = + new PerformanceAnalyzerCollectorsSettingHandler( + performanceAnalyzerController, clusterSettingsManager); + clusterSettingsManager.addSubscriberForIntSetting( + PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING, + performanceAnalyzerCollectorsSettingHandler); + + scheduleTelemetryCollectors(); + scheduleRcaCollectors(); + + scheduledMetricCollectorsExecutor.start(); + + EventLog eventLog = new EventLog(); + EventLogFileHandler eventLogFileHandler = + new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation()); + new EventLogQueueProcessor( + eventLogFileHandler, + MetricsConfiguration.SAMPLING_INTERVAL, + QUEUE_PURGE_INTERVAL_MS, + performanceAnalyzerController) + .scheduleExecutor(); + } + + private void scheduleTelemetryCollectors() { + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFDisksCollector(performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFHeapMetricsCollector(performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFThreadPoolMetricsCollector( + performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFNodeStatsAllShardsMetricsCollector( + performanceAnalyzerController, configOverridesWrapper)); + } + + private void scheduleRcaCollectors() { scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new ThreadPoolMetricsCollector()); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector()); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new CacheConfigMetricsCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new CircuitBreakerCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new OSMetricsCollector()); - scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new HeapMetricsCollector()); - scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new NodeDetailsCollector(configOverridesWrapper)); - scheduledMetricCollectorsExecutor.addScheduledMetricCollector( - new NodeStatsAllShardsMetricsCollector(performanceAnalyzerController)); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new ClusterManagerServiceMetrics()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new ClusterManagerServiceEventMetrics()); - scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new DisksCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new NetworkInterfaceCollector()); - scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector(StatsCollector.instance()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new FaultDetectionMetricsCollector( @@ -222,6 +267,7 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa new AdmissionControlMetricsCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new ElectionTermCollector(performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector(new GCInfoCollector()); try { Class.forName(ShardIndexingPressureMetricsCollector.SHARD_INDEXING_PRESSURE_CLASS_NAME); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( @@ -231,17 +277,6 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa LOG.info( "Shard IndexingPressure not present in this OpenSearch version. Skipping ShardIndexingPressureMetricsCollector"); } - scheduledMetricCollectorsExecutor.start(); - - EventLog eventLog = new EventLog(); - EventLogFileHandler eventLogFileHandler = - new EventLogFileHandler(eventLog, PluginSettings.instance().getMetricsLocation()); - new EventLogQueueProcessor( - eventLogFileHandler, - MetricsConfiguration.SAMPLING_INTERVAL, - QUEUE_PURGE_INTERVAL_MS, - performanceAnalyzerController) - .scheduleExecutor(); } // - http level: bulk, search @@ -314,7 +349,8 @@ public List getRestHandlers( settings, restController, perfAnalyzerClusterSettingHandler, - nodeStatsSettingHandler); + nodeStatsSettingHandler, + performanceAnalyzerCollectorsSettingHandler); PerformanceAnalyzerOverridesClusterConfigAction paOverridesConfigClusterAction = new PerformanceAnalyzerOverridesClusterConfigAction( settings, @@ -341,12 +377,14 @@ public Collection createComponents( NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameExpressionResolver, - Supplier repositoriesServiceSupplier) { + Supplier repositoriesServiceSupplier, + Tracer tracer, + MetricsRegistry metricsRegistry) { OpenSearchResources.INSTANCE.setClusterService(clusterService); OpenSearchResources.INSTANCE.setThreadPool(threadPool); OpenSearchResources.INSTANCE.setEnvironment(environment); OpenSearchResources.INSTANCE.setClient(client); - + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); // ClusterSettingsManager needs ClusterService to have been created before we can // initialize it. This is the earliest point at which we know ClusterService is created. // So, call the initialize method here. @@ -374,6 +412,7 @@ public List> getSettings() { return Arrays.asList( PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING, PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, - PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING); + PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING, + PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/ValueCalculator.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/ValueCalculator.java index 83c0e59d..2242607d 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/ValueCalculator.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/ValueCalculator.java @@ -8,6 +8,6 @@ import org.opensearch.action.admin.indices.stats.ShardStats; @FunctionalInterface -interface ValueCalculator { +public interface ValueCalculator { long calculateValue(ShardStats shardStats); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollector.java new file mode 100644 index 00000000..14fb730d --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollector.java @@ -0,0 +1,113 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics_generator.DiskMetricsGenerator; +import org.opensearch.performanceanalyzer.commons.metrics_generator.OSMetricsGenerator; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class RTFDisksCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + + private Histogram diskWaitTimeMetrics; + private Histogram diskServiceRateMetrics; + private Histogram diskUtilizationMetrics; + private MetricsRegistry metricsRegistry; + private boolean metricsInitialised; + private static final Logger LOG = LogManager.getLogger(RTFDisksCollector.class); + private PerformanceAnalyzerController performanceAnalyzerController; + private ConfigOverridesWrapper configOverridesWrapper; + + public RTFDisksCollector( + PerformanceAnalyzerController performanceAnalyzerController, + ConfigOverridesWrapper configOverridesWrapper) { + super( + MetricsConfiguration.CONFIG_MAP.get(RTFDisksCollector.class).samplingInterval, + "RTFDisksCollector", + StatMetrics.RTF_DISKS_COLLECTOR_EXECUTION_TIME, + StatExceptionCode.RTF_DISK_METRICS_COLLECTOR_ERROR); + this.metricsInitialised = false; + this.performanceAnalyzerController = performanceAnalyzerController; + this.configOverridesWrapper = configOverridesWrapper; + } + + @Override + public void collectMetrics(long startTime) { + if (performanceAnalyzerController.isCollectorDisabled( + configOverridesWrapper, getCollectorName())) { + LOG.info("RTFDisksCollector is disabled. Skipping collection."); + return; + } + + OSMetricsGenerator generator = OSMetricsGeneratorFactory.getInstance(); + if (generator == null) { + LOG.error("could not get the instance of OSMetricsGeneratorFactory class"); + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("could not get the instance of MetricsRegistry class"); + return; + } + + LOG.debug("Executing collect metrics for RTFDisksCollector"); + + initialiseMetricsIfNeeded(); + DiskMetricsGenerator diskMetricsGenerator = generator.getDiskMetricsGenerator(); + diskMetricsGenerator.addSample(); + + recordMetrics(diskMetricsGenerator); + } + + public void recordMetrics(DiskMetricsGenerator diskMetricsGenerator) { + for (String disk : diskMetricsGenerator.getAllDisks()) { + Tags diskNameTag = + Tags.create().addTag(RTFMetrics.DiskDimension.DISK_NAME.toString(), disk); + double diskWaitTime = diskMetricsGenerator.getAwait(disk); + double diskServiceRate = diskMetricsGenerator.getServiceRate(disk); + double diskUtilization = diskMetricsGenerator.getDiskUtilization(disk); + diskWaitTimeMetrics.record(diskWaitTime, diskNameTag); + diskUtilizationMetrics.record(diskUtilization, diskNameTag); + diskServiceRateMetrics.record(diskServiceRate, diskNameTag); + } + } + + private void initialiseMetricsIfNeeded() { + if (metricsInitialised == false) { + diskWaitTimeMetrics = + metricsRegistry.createHistogram( + RTFMetrics.DiskValue.Constants.WAIT_VALUE, + "DiskWaitTimeMetrics", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + diskServiceRateMetrics = + metricsRegistry.createHistogram( + RTFMetrics.DiskValue.Constants.SRATE_VALUE, + "DiskServiceRateMetrics", + RTFMetrics.MetricUnits.MEGABYTE_PER_SEC.toString()); + diskUtilizationMetrics = + metricsRegistry.createHistogram( + RTFMetrics.DiskValue.Constants.UTIL_VALUE, + "DiskUtilizationMetrics", + RTFMetrics.MetricUnits.PERCENT.toString()); + metricsInitialised = true; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java new file mode 100644 index 00000000..6872089d --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import java.lang.management.MemoryUsage; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.jvm.GCMetrics; +import org.opensearch.performanceanalyzer.commons.jvm.HeapMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class RTFHeapMetricsCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + private static final Logger LOG = LogManager.getLogger(RTFHeapMetricsCollector.class); + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(RTFHeapMetricsCollector.class).samplingInterval; + private Histogram gcCollectionEventMetrics; + private Histogram gcCollectionTimeMetrics; + private Histogram heapUsedMetrics; + private MetricsRegistry metricsRegistry; + private final String memTypeAttributeKey = "mem_type"; + private boolean metricsInitialised; + private PerformanceAnalyzerController performanceAnalyzerController; + private ConfigOverridesWrapper configOverridesWrapper; + + public RTFHeapMetricsCollector( + PerformanceAnalyzerController performanceAnalyzerController, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + "RTFHeapMetricsCollector", + StatMetrics.RTF_HEAP_METRICS_COLLECTOR_EXECUTION_TIME, + StatExceptionCode.RTF_HEAP_METRICS_COLLECTOR_ERROR); + this.metricsInitialised = false; + this.performanceAnalyzerController = performanceAnalyzerController; + this.configOverridesWrapper = configOverridesWrapper; + } + + @Override + public void collectMetrics(long startTime) { + if (performanceAnalyzerController.isCollectorDisabled( + configOverridesWrapper, getCollectorName())) { + LOG.info("RTFDisksCollector is disabled. Skipping collection."); + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("could not get the instance of MetricsRegistry class"); + return; + } + + initialiseMetricsIfNeeded(); + GCMetrics.runGCMetrics(); + LOG.debug("Executing collect metrics for RTFHeapMetricsCollector"); + recordMetrics(); + } + + private void initialiseMetricsIfNeeded() { + if (metricsInitialised == false) { + gcCollectionEventMetrics = + metricsRegistry.createHistogram( + RTFMetrics.HeapValue.Constants.COLLECTION_COUNT_VALUE, + "GC Collection Event PA Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + gcCollectionTimeMetrics = + metricsRegistry.createHistogram( + RTFMetrics.HeapValue.Constants.COLLECTION_TIME_VALUE, + "GC Collection Time PA Metrics", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + + heapUsedMetrics = + metricsRegistry.createHistogram( + RTFMetrics.HeapValue.Constants.USED_VALUE, + "GC Heap Used PA Metrics", + RTFMetrics.MetricUnits.BYTE.toString()); + metricsInitialised = true; + } + } + + private void recordMetrics() { + Tags totYoungGCTag = + Tags.create() + .addTag( + RTFMetrics.HeapDimension.MEM_TYPE.getName(), + RTFMetrics.GCType.TOT_YOUNG_GC.toString()); + + Tags totFullGCTag = + Tags.create().addTag(memTypeAttributeKey, RTFMetrics.GCType.TOT_FULL_GC.toString()); + + gcCollectionEventMetrics.record(GCMetrics.getTotYoungGCCollectionCount(), totYoungGCTag); + + gcCollectionEventMetrics.record(GCMetrics.getTotFullGCCollectionCount(), totFullGCTag); + + gcCollectionTimeMetrics.record(GCMetrics.getTotYoungGCCollectionTime(), totYoungGCTag); + + gcCollectionTimeMetrics.record(GCMetrics.getTotFullGCCollectionTime(), totFullGCTag); + + for (Map.Entry> entry : + HeapMetrics.getMemoryUsageSuppliers().entrySet()) { + MemoryUsage memoryUsage = entry.getValue().get(); + heapUsedMetrics.record( + memoryUsage.getUsed(), + Tags.create().addTag(memTypeAttributeKey, entry.getKey())); + metricsRegistry.createGauge( + RTFMetrics.HeapValue.Constants.MAX_VALUE, + "Heap Max PA metrics", + "", + () -> (double) memoryUsage.getMax(), + Tags.create().addTag(memTypeAttributeKey, entry.getKey())); + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java new file mode 100644 index 00000000..63033dba --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java @@ -0,0 +1,405 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.RTF_NODESTATS_COLLECTION_ERROR; +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics.RTF_NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.admin.indices.stats.IndexShardStats; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.collectors.ValueCalculator; +import org.opensearch.performanceanalyzer.commons.collectors.MetricStatus; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class RTFNodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(RTFNodeStatsAllShardsMetricsCollector.class) + .samplingInterval; + private static final Logger LOG = + LogManager.getLogger(RTFNodeStatsAllShardsMetricsCollector.class); + private Map currentShards; + private Map currentPerShardStats; + private Map prevPerShardStats; + private MetricsRegistry metricsRegistry; + private Counter cacheQueryHitMetrics; + private Counter cacheQueryMissMetrics; + private Counter cacheQuerySizeMetrics; + private Counter cacheFieldDataEvictionMetrics; + private Counter cacheFieldDataSizeMetrics; + private Counter cacheRequestHitMetrics; + private Counter cacheRequestMissMetrics; + private Counter cacheRequestEvictionMetrics; + private Counter cacheRequestSizeMetrics; + private boolean metricsInitialised; + private PerformanceAnalyzerController performanceAnalyzerController; + private ConfigOverridesWrapper configOverridesWrapper; + + public RTFNodeStatsAllShardsMetricsCollector( + PerformanceAnalyzerController performanceAnalyzerController, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + "RTFNodeStatsMetricsCollector", + RTF_NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME, + RTF_NODESTATS_COLLECTION_ERROR); + currentShards = new HashMap<>(); + prevPerShardStats = new HashMap<>(); + currentPerShardStats = new HashMap<>(); + this.metricsInitialised = false; + this.performanceAnalyzerController = performanceAnalyzerController; + this.configOverridesWrapper = configOverridesWrapper; + } + + private void populateCurrentShards() { + if (!currentShards.isEmpty()) { + prevPerShardStats.putAll(currentPerShardStats); + currentPerShardStats.clear(); + } + currentShards.clear(); + currentShards = Utils.getShards(); + } + + private static final ImmutableMap valueCalculators = + ImmutableMap.of( + RTFMetrics.ShardStatsValue.INDEXING_THROTTLE_TIME.toString(), + (shardStats) -> + shardStats + .getStats() + .getIndexing() + .getTotal() + .getThrottleTime() + .millis(), + RTFMetrics.ShardStatsValue.CACHE_QUERY_HIT.toString(), + (shardStats) -> shardStats.getStats().getQueryCache().getHitCount(), + RTFMetrics.ShardStatsValue.CACHE_QUERY_MISS.toString(), + (shardStats) -> shardStats.getStats().getQueryCache().getMissCount(), + RTFMetrics.ShardStatsValue.CACHE_QUERY_SIZE.toString(), + (shardStats) -> shardStats.getStats().getQueryCache().getMemorySizeInBytes(), + RTFMetrics.ShardStatsValue.CACHE_FIELDDATA_EVICTION.toString(), + (shardStats) -> shardStats.getStats().getFieldData().getEvictions(), + RTFMetrics.ShardStatsValue.CACHE_FIELDDATA_SIZE.toString(), + (shardStats) -> shardStats.getStats().getFieldData().getMemorySizeInBytes(), + RTFMetrics.ShardStatsValue.CACHE_REQUEST_HIT.toString(), + (shardStats) -> shardStats.getStats().getRequestCache().getHitCount(), + RTFMetrics.ShardStatsValue.CACHE_REQUEST_MISS.toString(), + (shardStats) -> shardStats.getStats().getRequestCache().getMissCount(), + RTFMetrics.ShardStatsValue.CACHE_REQUEST_EVICTION.toString(), + (shardStats) -> shardStats.getStats().getRequestCache().getEvictions(), + RTFMetrics.ShardStatsValue.CACHE_REQUEST_SIZE.toString(), + (shardStats) -> shardStats.getStats().getRequestCache().getMemorySizeInBytes()); + + @Override + public void collectMetrics(long startTime) { + if (performanceAnalyzerController.isCollectorDisabled( + configOverridesWrapper, getCollectorName())) { + LOG.info("RTFDisksCollector is disabled. Skipping collection."); + return; + } + IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService(); + if (indicesService == null) { + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("could not get the instance of MetricsRegistry class"); + return; + } + + LOG.debug("Executing collect metrics for RTFNodeStatsAllShardsMetricsCollector"); + initialiseMetricsIfNeeded(); + populateCurrentShards(); + populatePerShardStats(indicesService); + + for (Map.Entry currentShard : currentPerShardStats.entrySet()) { + ShardId shardId = (ShardId) currentShard.getKey(); + ShardStats currentShardStats = (ShardStats) currentShard.getValue(); + if (prevPerShardStats.size() == 0) { + // Populating value for the first run. + recordMetrics( + new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), + shardId.getIndexName(), + String.valueOf(shardId.id())); + continue; + } + ShardStats prevShardStats = prevPerShardStats.get(shardId); + if (prevShardStats == null) { + // Populate value for shards which are new and were not present in the previous + // run. + recordMetrics( + new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), + shardId.getIndexName(), + String.valueOf(shardId.id())); + continue; + } + NodeStatsMetricsAllShardsPerCollectionStatus prevValue = + new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); + NodeStatsMetricsAllShardsPerCollectionStatus currValue = + new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats); + populateDiffMetricValue(prevValue, currValue, shardId.getIndexName(), shardId.id()); + } + } + + private void initialiseMetricsIfNeeded() { + if (metricsInitialised == false) { + cacheQueryHitMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.QUEY_CACHE_HIT_COUNT_VALUE, + "CacheQueryHit Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + cacheQueryMissMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.QUERY_CACHE_MISS_COUNT_VALUE, + "CacheQueryMiss Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + cacheQuerySizeMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.QUERY_CACHE_IN_BYTES_VALUE, + "CacheQuerySize Metrics", + RTFMetrics.MetricUnits.BYTE.toString()); + + cacheFieldDataEvictionMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.FIELDDATA_EVICTION_VALUE, + "CacheFieldDataEviction Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + cacheFieldDataSizeMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.FIELD_DATA_IN_BYTES_VALUE, + "CacheFieldDataSize Metrics", + RTFMetrics.MetricUnits.BYTE.toString()); + + cacheRequestHitMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_HIT_COUNT_VALUE, + "CacheRequestHit Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + cacheRequestMissMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_MISS_COUNT_VALUE, + "CacheRequestMiss Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + cacheRequestEvictionMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_EVICTION_VALUE, + "CacheRequestEviction Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + cacheRequestSizeMetrics = + metricsRegistry.createCounter( + RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_IN_BYTES_VALUE, + "CacheRequestSize Metrics", + RTFMetrics.MetricUnits.BYTE.toString()); + metricsInitialised = true; + } + } + + public void populatePerShardStats(IndicesService indicesService) { + // Populate the shard stats per shard. + for (Map.Entry currentShard : currentShards.entrySet()) { + IndexShard currentIndexShard = (IndexShard) currentShard.getValue(); + IndexShardStats currentIndexShardStats = + Utils.indexShardStats( + indicesService, + currentIndexShard, + new CommonStatsFlags( + CommonStatsFlags.Flag.QueryCache, + CommonStatsFlags.Flag.FieldData, + CommonStatsFlags.Flag.RequestCache)); + for (ShardStats shardStats : currentIndexShardStats.getShards()) { + currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + } + } + } + + private void recordMetrics( + NodeStatsMetricsAllShardsPerCollectionStatus metrics, + String indexName, + String shardId) { + Tags nodeStatsMetricsTag = + Tags.create() + .addTag(RTFMetrics.CommonDimension.INDEX_NAME.toString(), indexName) + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId); + + cacheQueryMissMetrics.add(metrics.getQueryCacheMissCount(), nodeStatsMetricsTag); + cacheQuerySizeMetrics.add(metrics.getQueryCacheInBytes(), nodeStatsMetricsTag); + cacheQueryHitMetrics.add(metrics.getQueryCacheHitCount(), nodeStatsMetricsTag); + + cacheFieldDataEvictionMetrics.add(metrics.getFieldDataEvictions(), nodeStatsMetricsTag); + cacheFieldDataSizeMetrics.add(metrics.getFieldDataInBytes(), nodeStatsMetricsTag); + + cacheRequestEvictionMetrics.add(metrics.getRequestCacheEvictions(), nodeStatsMetricsTag); + cacheRequestHitMetrics.add(metrics.getRequestCacheHitCount(), nodeStatsMetricsTag); + cacheRequestMissMetrics.add(metrics.getRequestCacheMissCount(), nodeStatsMetricsTag); + cacheRequestSizeMetrics.add(metrics.getRequestCacheInBytes(), nodeStatsMetricsTag); + } + + public void populateDiffMetricValue( + NodeStatsMetricsAllShardsPerCollectionStatus prevValue, + NodeStatsMetricsAllShardsPerCollectionStatus currValue, + String indexName, + int shardId) { + + NodeStatsMetricsAllShardsPerCollectionStatus metrics = + new NodeStatsMetricsAllShardsPerCollectionStatus( + Math.max((currValue.queryCacheHitCount - prevValue.queryCacheHitCount), 0), + Math.max( + (currValue.queryCacheMissCount - prevValue.queryCacheMissCount), 0), + currValue.queryCacheInBytes, + Math.max((currValue.fieldDataEvictions - prevValue.fieldDataEvictions), 0), + currValue.fieldDataInBytes, + Math.max( + (currValue.requestCacheHitCount - prevValue.requestCacheHitCount), + 0), + Math.max( + (currValue.requestCacheMissCount - prevValue.requestCacheMissCount), + 0), + Math.max( + (currValue.requestCacheEvictions - prevValue.requestCacheEvictions), + 0), + currValue.requestCacheInBytes); + + recordMetrics(metrics, indexName, String.valueOf(shardId)); + } + + public static class NodeStatsMetricsAllShardsPerCollectionStatus extends MetricStatus { + + @JsonIgnore private ShardStats shardStats; + + private final long queryCacheHitCount; + private final long queryCacheMissCount; + private final long queryCacheInBytes; + private final long fieldDataEvictions; + private final long fieldDataInBytes; + private final long requestCacheHitCount; + private final long requestCacheMissCount; + private final long requestCacheEvictions; + private final long requestCacheInBytes; + + public NodeStatsMetricsAllShardsPerCollectionStatus(ShardStats shardStats) { + super(); + this.shardStats = shardStats; + + this.queryCacheHitCount = calculate(RTFMetrics.ShardStatsValue.CACHE_QUERY_HIT); + this.queryCacheMissCount = calculate(RTFMetrics.ShardStatsValue.CACHE_QUERY_MISS); + this.queryCacheInBytes = calculate(RTFMetrics.ShardStatsValue.CACHE_QUERY_SIZE); + this.fieldDataEvictions = + calculate(RTFMetrics.ShardStatsValue.CACHE_FIELDDATA_EVICTION); + this.fieldDataInBytes = calculate(RTFMetrics.ShardStatsValue.CACHE_FIELDDATA_SIZE); + this.requestCacheHitCount = calculate(RTFMetrics.ShardStatsValue.CACHE_REQUEST_HIT); + this.requestCacheMissCount = calculate(RTFMetrics.ShardStatsValue.CACHE_REQUEST_MISS); + this.requestCacheEvictions = + calculate(RTFMetrics.ShardStatsValue.CACHE_REQUEST_EVICTION); + this.requestCacheInBytes = calculate(RTFMetrics.ShardStatsValue.CACHE_REQUEST_SIZE); + } + + @SuppressWarnings("checkstyle:parameternumber") + public NodeStatsMetricsAllShardsPerCollectionStatus( + long queryCacheHitCount, + long queryCacheMissCount, + long queryCacheInBytes, + long fieldDataEvictions, + long fieldDataInBytes, + long requestCacheHitCount, + long requestCacheMissCount, + long requestCacheEvictions, + long requestCacheInBytes) { + super(); + this.shardStats = null; + + this.queryCacheHitCount = queryCacheHitCount; + this.queryCacheMissCount = queryCacheMissCount; + this.queryCacheInBytes = queryCacheInBytes; + this.fieldDataEvictions = fieldDataEvictions; + this.fieldDataInBytes = fieldDataInBytes; + this.requestCacheHitCount = requestCacheHitCount; + this.requestCacheMissCount = requestCacheMissCount; + this.requestCacheEvictions = requestCacheEvictions; + this.requestCacheInBytes = requestCacheInBytes; + } + + private long calculate(RTFMetrics.ShardStatsValue nodeMetric) { + return valueCalculators.get(nodeMetric.toString()).calculateValue(shardStats); + } + + @JsonIgnore + public ShardStats getShardStats() { + return shardStats; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.QUEY_CACHE_HIT_COUNT_VALUE) + public long getQueryCacheHitCount() { + return queryCacheHitCount; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.QUERY_CACHE_MISS_COUNT_VALUE) + public long getQueryCacheMissCount() { + return queryCacheMissCount; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.QUERY_CACHE_IN_BYTES_VALUE) + public long getQueryCacheInBytes() { + return queryCacheInBytes; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.FIELDDATA_EVICTION_VALUE) + public long getFieldDataEvictions() { + return fieldDataEvictions; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.FIELD_DATA_IN_BYTES_VALUE) + public long getFieldDataInBytes() { + return fieldDataInBytes; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_HIT_COUNT_VALUE) + public long getRequestCacheHitCount() { + return requestCacheHitCount; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_MISS_COUNT_VALUE) + public long getRequestCacheMissCount() { + return requestCacheMissCount; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_EVICTION_VALUE) + public long getRequestCacheEvictions() { + return requestCacheEvictions; + } + + @JsonProperty(RTFMetrics.ShardStatsValue.Constants.REQUEST_CACHE_IN_BYTES_VALUE) + public long getRequestCacheInBytes() { + return requestCacheInBytes; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollector.java new file mode 100644 index 00000000..46ad8ae9 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollector.java @@ -0,0 +1,228 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.RTF_THREADPOOL_METRICS_COLLECTOR_ERROR; +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.THREADPOOL_METRICS_COLLECTOR_ERROR; +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics.RTF_THREADPOOL_METRICS_COLLECTOR_EXECUTION_TIME; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ThreadPoolExecutor; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.util.concurrent.SizeBlockingQueue; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats; + +public class RTFThreadPoolMetricsCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + + private static final Logger LOG = LogManager.getLogger(RTFThreadPoolMetricsCollector.class); + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(RTFThreadPoolMetricsCollector.class) + .samplingInterval; + private final Map statsRecordMap; + private Histogram threadPoolQueueSizeMetrics; + private Histogram threadPoolRejectedReqsMetrics; + private Histogram threadPoolTotalThreadsMetrics; + private Histogram threadPoolActiveThreadsMetrics; + + // Skipping ThreadPoolQueueLatencyMetrics since they are always emitting -1 in the original + // collector + // private Histogram ThreadPoolQueueLatencyMetrics; + private Histogram ThreadPoolQueueCapacityMetrics; + private MetricsRegistry metricsRegistry; + private boolean metricsInitialised; + private PerformanceAnalyzerController performanceAnalyzerController; + private ConfigOverridesWrapper configOverridesWrapper; + + public RTFThreadPoolMetricsCollector( + PerformanceAnalyzerController performanceAnalyzerController, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + "RTFThreadPoolMetricsCollector", + RTF_THREADPOOL_METRICS_COLLECTOR_EXECUTION_TIME, + RTF_THREADPOOL_METRICS_COLLECTOR_ERROR); + statsRecordMap = new HashMap<>(); + this.metricsInitialised = false; + this.performanceAnalyzerController = performanceAnalyzerController; + this.configOverridesWrapper = configOverridesWrapper; + } + + @Override + public void collectMetrics(long startTime) { + if (performanceAnalyzerController.isCollectorDisabled( + configOverridesWrapper, getCollectorName())) { + LOG.info("RTFDisksCollector is disabled. Skipping collection."); + return; + } + + if (OpenSearchResources.INSTANCE.getThreadPool() == null) { + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("could not get the instance of MetricsRegistry class"); + return; + } + + LOG.debug("Executing collect metrics for RTFThreadPoolMetricsCollector"); + + initialiseMetricsIfNeeded(); + + Iterator statsIterator = + OpenSearchResources.INSTANCE.getThreadPool().stats().iterator(); + + while (statsIterator.hasNext()) { + ThreadPoolStats.Stats stats = statsIterator.next(); + long rejectionDelta = 0; + String threadPoolName = stats.getName(); + if (statsRecordMap.containsKey(threadPoolName)) { + ThreadPoolStatsRecord lastRecord = statsRecordMap.get(threadPoolName); + // if the timestamp in previous record is greater than 15s (3 * intervals), + // then the scheduler might hang or freeze due to long GC etc. We simply drop + // previous record here and set rejectionDelta to 0. + if (startTime - lastRecord.getTimestamp() <= SAMPLING_TIME_INTERVAL * 3L) { + rejectionDelta = stats.getRejected() - lastRecord.getRejected(); + // we might not run into this as rejection is a LongAdder which never decrement + // its count. + // regardless, let's set it to 0 to be safe. + if (rejectionDelta < 0) { + rejectionDelta = 0; + } + } + } + statsRecordMap.put( + threadPoolName, new ThreadPoolStatsRecord(startTime, stats.getRejected())); + final long finalRejectionDelta = rejectionDelta; + final int capacity = + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + try { + ThreadPool threadPool = + (ThreadPool) + FieldUtils.readField( + OpenSearchResources.INSTANCE + .getIndicesService(), + "threadPool", + true); + ThreadPoolExecutor threadPoolExecutor = + (ThreadPoolExecutor) + threadPool.executor(threadPoolName); + Object queue = threadPoolExecutor.getQueue(); + // TODO: we might want to read the capacity of + // SifiResizableBlockingQueue in the future. + // In order to do that we can create a new + // PerformanceAnalyzerLibrary package and push + // all the code which depends on core OpenSearch + // specific + // changes into that library. + if (queue instanceof SizeBlockingQueue) { + return ((SizeBlockingQueue) queue).capacity(); + } + } catch (Exception e) { + LOG.warn("Fail to read queue capacity via reflection"); + StatsCollector.instance() + .logException( + THREADPOOL_METRICS_COLLECTOR_ERROR); + } + return -1; + }); + + recordMetrics(stats, finalRejectionDelta, capacity); + } + } + + private void recordMetrics( + ThreadPoolStats.Stats stats, long finalRejectionDelta, int capacity) { + Tags threadPoolTypeTag = + Tags.create() + .addTag( + RTFMetrics.ThreadPoolDimension.THREAD_POOL_TYPE.toString(), + stats.getName()); + + threadPoolQueueSizeMetrics.record(stats.getQueue(), threadPoolTypeTag); + threadPoolRejectedReqsMetrics.record(finalRejectionDelta, threadPoolTypeTag); + threadPoolActiveThreadsMetrics.record(stats.getActive(), threadPoolTypeTag); + threadPoolTotalThreadsMetrics.record(stats.getThreads(), threadPoolTypeTag); + + if (capacity >= 0) { + ThreadPoolQueueCapacityMetrics.record(capacity, threadPoolTypeTag); + } + } + + private void initialiseMetricsIfNeeded() { + if (metricsInitialised == false) { + threadPoolQueueSizeMetrics = + metricsRegistry.createHistogram( + RTFMetrics.ThreadPoolValue.Constants.QUEUE_SIZE_VALUE, + "ThreadPool Queue Size Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + threadPoolRejectedReqsMetrics = + metricsRegistry.createHistogram( + RTFMetrics.ThreadPoolValue.Constants.REJECTED_VALUE, + "ThreadPool Rejected Reqs Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + threadPoolTotalThreadsMetrics = + metricsRegistry.createHistogram( + RTFMetrics.ThreadPoolValue.Constants.THREADS_COUNT_VALUE, + "ThreadPool Total Threads Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + threadPoolActiveThreadsMetrics = + metricsRegistry.createHistogram( + RTFMetrics.ThreadPoolValue.Constants.THREADS_ACTIVE_VALUE, + "ThreadPool Active Threads Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + + ThreadPoolQueueCapacityMetrics = + metricsRegistry.createHistogram( + RTFMetrics.ThreadPoolValue.Constants.QUEUE_CAPACITY_VALUE, + "ThreadPool Queue Capacity Metrics", + RTFMetrics.MetricUnits.COUNT.toString()); + metricsInitialised = true; + } + } + + private static class ThreadPoolStatsRecord { + private final long timestamp; + private final long rejected; + + ThreadPoolStatsRecord(long timestamp, long rejected) { + this.timestamp = timestamp; + this.rejected = rejected; + } + + public long getTimestamp() { + return timestamp; + } + + public long getRejected() { + return rejected; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java index 1fb6994d..faaebc7c 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/PerformanceAnalyzerController.java @@ -35,6 +35,7 @@ public class PerformanceAnalyzerController { "thread_contention_monitoring_enabled.conf"; private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerController.class); public static final int DEFAULT_NUM_OF_SHARDS_PER_COLLECTION = 0; + public static final int DEFAULT_COLLECTORS_SETTING_VALUE = 0; private boolean paEnabled; private boolean rcaEnabled; @@ -42,6 +43,7 @@ public class PerformanceAnalyzerController { private boolean batchMetricsEnabled; private boolean threadContentionMonitoringEnabled; private volatile int shardsPerCollection; + private volatile int collectorsSettingValue; private static final boolean paEnabledDefaultValue = false; private static final boolean rcaEnabledDefaultValue = true; private static final boolean loggingEnabledDefaultValue = false; @@ -58,6 +60,7 @@ public PerformanceAnalyzerController( initBatchMetricsStateFromConf(); initThreadContentionMonitoringStateFromConf(); shardsPerCollection = DEFAULT_NUM_OF_SHARDS_PER_COLLECTION; + collectorsSettingValue = DEFAULT_COLLECTORS_SETTING_VALUE; } /** @@ -115,6 +118,18 @@ public void updateNodeStatsShardsPerCollection(int value) { shardsPerCollection = value; } + /** + * Updates the collector mode setting + * + * @param value the desired collector mode value + */ + public void updateCollectorsSetting(int value) { + collectorsSettingValue = value; + if (scheduledMetricCollectorsExecutor != null) { + scheduledMetricCollectorsExecutor.setCollectorsSetting(collectorsSettingValue); + } + } + /** * Updates the state of performance analyzer(writer and engine). * diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java b/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java index 035817c7..e1775128 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java +++ b/src/main/java/org/opensearch/performanceanalyzer/config/setting/PerformanceAnalyzerClusterSettings.java @@ -39,6 +39,17 @@ public enum PerformanceAnalyzerFeatureBits { Setting.Property.NodeScope, Setting.Property.Dynamic); + /** + * Cluster setting controlling the collector mode : 0 - only RCA Collectors enabled (Default) 1 + * - only Telemetry Collectors enabled 2 - both RCA and Telemetry Collectors enabled + */ + public static final Setting PA_COLLECTORS_SETTING = + Setting.intSetting( + "cluster.metadata.perf_analyzer.collectors.mode", + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + /** * Cluster setting controlling the config overrides to be applied on performance analyzer * components. diff --git a/src/main/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandler.java b/src/main/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandler.java new file mode 100644 index 00000000..a5034262 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandler.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.config.setting.handler; + +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.config.setting.ClusterSettingListener; +import org.opensearch.performanceanalyzer.config.setting.ClusterSettingsManager; +import org.opensearch.performanceanalyzer.config.setting.PerformanceAnalyzerClusterSettings; + +/* + * This class is responsible for handling the collector setting value updates through API calls. + * This also acts as a listener to the collector mode setting update + */ +public class PerformanceAnalyzerCollectorsSettingHandler + implements ClusterSettingListener { + private final PerformanceAnalyzerController controller; + private final ClusterSettingsManager clusterSettingsManager; + + private Integer currentClusterSetting = + PerformanceAnalyzerController.DEFAULT_COLLECTORS_SETTING_VALUE; + + public PerformanceAnalyzerCollectorsSettingHandler( + PerformanceAnalyzerController controller, + ClusterSettingsManager clusterSettingsManager) { + this.controller = controller; + this.clusterSettingsManager = clusterSettingsManager; + } + + /** + * Updates the Collectors mode setting across the cluster. + * + * @param value The desired collector mode amongst: 0 - only RCA Collectors enabled (Default) * + * 1 - only Telemetry Collectors enabled * 2 - both RCA and Telemetry Collectors enabled + */ + public void updateCollectorsSetting(final int value) { + clusterSettingsManager.updateSetting( + PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING, value); + } + + /** + * Handler that gets called when there is a new value for the setting that this listener is + * listening to. + * + * @param newSettingValue The value of the new setting. + */ + @Override + public void onSettingUpdate(final Integer newSettingValue) { + if (newSettingValue != null) { + currentClusterSetting = newSettingValue; + controller.updateCollectorsSetting(newSettingValue); + } + } + + /** + * Gets the current(last seen) cluster setting value. + * + * @return integer value for setting. + */ + public int getCollectorsEnabledSetting() { + return currentClusterSetting; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigAction.java b/src/main/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigAction.java index df7b503c..b1495134 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigAction.java +++ b/src/main/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigAction.java @@ -22,6 +22,7 @@ import org.opensearch.performanceanalyzer.commons.config.PluginSettings; import org.opensearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler; import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerClusterSettingHandler; +import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerCollectorsSettingHandler; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestController; @@ -40,6 +41,7 @@ public class PerformanceAnalyzerClusterConfigAction extends BaseRestHandler { "batchMetricsRetentionPeriodMinutes"; public static final String ENABLED = "enabled"; public static final String SHARDS_PER_COLLECTION = "shardsPerCollection"; + public static final String COLLECTORS_SETTING = "collectorsSetting"; public static final String PA_CLUSTER_CONFIG_PATH = RestConfig.PA_BASE_URI + "/cluster/config"; public static final String RCA_CLUSTER_CONFIG_PATH = @@ -115,14 +117,20 @@ public class PerformanceAnalyzerClusterConfigAction extends BaseRestHandler { private final PerformanceAnalyzerClusterSettingHandler clusterSettingHandler; private final NodeStatsSettingHandler nodeStatsSettingHandler; + private final PerformanceAnalyzerCollectorsSettingHandler + performanceAnalyzerCollectorsSettingHandler; public PerformanceAnalyzerClusterConfigAction( final Settings settings, final RestController restController, final PerformanceAnalyzerClusterSettingHandler clusterSettingHandler, - final NodeStatsSettingHandler nodeStatsSettingHandler) { + final NodeStatsSettingHandler nodeStatsSettingHandler, + final PerformanceAnalyzerCollectorsSettingHandler + performanceAnalyzerCollectorsSettingHandler) { this.clusterSettingHandler = clusterSettingHandler; this.nodeStatsSettingHandler = nodeStatsSettingHandler; + this.performanceAnalyzerCollectorsSettingHandler = + performanceAnalyzerCollectorsSettingHandler; } /** @@ -195,6 +203,14 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No (Integer) shardPerCollectionValue); } } + + if (map.containsKey(COLLECTORS_SETTING)) { + Object collectorsSettingValue = map.get(COLLECTORS_SETTING); + if (collectorsSettingValue instanceof Integer) { + performanceAnalyzerCollectorsSettingHandler.updateCollectorsSetting( + (Integer) collectorsSettingValue); + } + } } return channel -> { @@ -207,6 +223,9 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No ? clusterSettingHandler.getCurrentClusterSettingValueVerbose() : clusterSettingHandler.getCurrentClusterSettingValue()); builder.field(SHARDS_PER_COLLECTION, nodeStatsSettingHandler.getNodeStatsSetting()); + builder.field( + COLLECTORS_SETTING, + performanceAnalyzerCollectorsSettingHandler.getCollectorsEnabledSetting()); builder.field( BATCH_METRICS_RETENTION_PERIOD_MINUTES, PluginSettings.instance().getBatchMetricsRetentionPeriodMinutes()); diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index f62ce5ff..21f254ca 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -19,6 +19,10 @@ import org.opensearch.indices.IndicesService; import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.collectors.*; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector; import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics; @@ -42,6 +46,10 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put(ClusterApplierServiceStatsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ElectionTermCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ShardIndexingPressureMetricsCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put(RTFDisksCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put(RTFHeapMetricsCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put(RTFNodeStatsAllShardsMetricsCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put(RTFThreadPoolMetricsCollector.class, cdefault); } // These methods are utility functions for the Node Stat Metrics Collectors. These methods are diff --git a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java index e3045e1a..fc5f95fa 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPluginTests.java @@ -22,6 +22,7 @@ import org.opensearch.action.ActionRequest; import org.opensearch.action.support.ActionFilter; import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.ClusterManagerMetrics; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -43,6 +44,8 @@ import org.opensearch.plugins.ActionPlugin.ActionHandler; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.NoopMetricsRegistryFactory; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -64,6 +67,8 @@ public class PerformanceAnalyzerPluginTests extends OpenSearchTestCase { private ClusterSettings clusterSettings; private IdentityService identityService; + private MetricsRegistry metricsRegistry; + @Before public void setup() { initMocks(this); @@ -78,7 +83,19 @@ public void setup() { threadPool = new TestThreadPool("test"); nodeClient = new NodeClient(settings, threadPool); environment = TestEnvironment.newEnvironment(settings); - clusterService = new ClusterService(settings, clusterSettings, threadPool); + NoopMetricsRegistryFactory metricsRegistryFactory = new NoopMetricsRegistryFactory(); + metricsRegistry = metricsRegistryFactory.getMetricsRegistry(); + try { + metricsRegistryFactory.close(); + } catch (Exception e) { + e.printStackTrace(); + } + clusterService = + new ClusterService( + settings, + clusterSettings, + threadPool, + new ClusterManagerMetrics(metricsRegistry)); identityService = new IdentityService(Settings.EMPTY, List.of()); restController = new RestController( @@ -144,12 +161,15 @@ public void testCreateComponents() { null, null, null, - null); + null, + null, + metricsRegistry); assertEquals(1, components.size()); assertEquals(settings, OpenSearchResources.INSTANCE.getSettings()); assertEquals(threadPool, OpenSearchResources.INSTANCE.getThreadPool()); assertEquals(environment, OpenSearchResources.INSTANCE.getEnvironment()); assertEquals(nodeClient, OpenSearchResources.INSTANCE.getClient()); + assertEquals(metricsRegistry, OpenSearchResources.INSTANCE.getMetricsRegistry()); } @Test @@ -172,9 +192,10 @@ public void testGetTransports() { @Test public void testGetSettings() { List> list = plugin.getSettings(); - assertEquals(3, list.size()); + assertEquals(4, list.size()); assertEquals(PerformanceAnalyzerClusterSettings.COMPOSITE_PA_SETTING, list.get(0)); assertEquals(PerformanceAnalyzerClusterSettings.PA_NODE_STATS_SETTING, list.get(1)); assertEquals(PerformanceAnalyzerClusterSettings.CONFIG_OVERRIDES_SETTING, list.get(2)); + assertEquals(PerformanceAnalyzerClusterSettings.PA_COLLECTORS_SETTING, list.get(3)); } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/CollectorTestBase.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/CollectorTestBase.java new file mode 100644 index 00000000..26f6baa1 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/CollectorTestBase.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; + +public class CollectorTestBase { + protected PerformanceAnalyzerController mockController; + protected ConfigOverridesWrapper mockWrapper; + + @Before + public void setUp() throws Exception { + initMocks(this); + System.setProperty("performanceanalyzer.metrics.log.enabled", "false"); + mockController = mock(PerformanceAnalyzerController.class); + mockWrapper = mock(ConfigOverridesWrapper.class); + Mockito.when(mockController.isCollectorDisabled(any(), anyString())).thenReturn(false); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollectorTests.java new file mode 100644 index 00000000..4dc9bab8 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFDisksCollectorTests.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +import java.io.IOException; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.collectors.CollectorTestBase; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; + +public class RTFDisksCollectorTests extends CollectorTestBase { + private RTFDisksCollector rtfDisksCollector; + private static MetricsRegistry metricsRegistry; + private static Histogram testHistogram; + + @Before + public void init() { + MetricsConfiguration.CONFIG_MAP.put(RTFDisksCollector.class, MetricsConfiguration.cdefault); + System.setProperty("os.name", "Linux"); + metricsRegistry = mock(MetricsRegistry.class); + testHistogram = mock(Histogram.class); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())) + .thenReturn(testHistogram); + rtfDisksCollector = spy(new RTFDisksCollector(mockController, mockWrapper)); + } + + @Test + public void testCollectMetrics() throws IOException { + rtfDisksCollector.collectMetrics(System.currentTimeMillis()); + verify(rtfDisksCollector, atLeastOnce()).recordMetrics(any()); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java new file mode 100644 index 00000000..928a5a29 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeastOnce; + +import java.io.IOException; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.collectors.CollectorTestBase; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; + +public class RTFHeapMetricsCollectorTests extends CollectorTestBase { + private RTFHeapMetricsCollector rtfHeapMetricsCollector; + + private static MetricsRegistry metricsRegistry; + private static Histogram gcCollectionEventHistogram; + private static Histogram gcCollectionTimeHistogram; + private static Histogram heapUsedHistogram; + + @Before + public void init() { + MetricsConfiguration.CONFIG_MAP.put( + RTFHeapMetricsCollector.class, MetricsConfiguration.cdefault); + + metricsRegistry = mock(MetricsRegistry.class); + gcCollectionEventHistogram = mock(Histogram.class); + gcCollectionTimeHistogram = mock(Histogram.class); + heapUsedHistogram = mock(Histogram.class); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.contains( + RTFMetrics.HeapValue.Constants.COLLECTION_COUNT_VALUE)) { + return gcCollectionEventHistogram; + } else if (histogramName.contains( + RTFMetrics.HeapValue.Constants.COLLECTION_TIME_VALUE)) { + return gcCollectionTimeHistogram; + } + return heapUsedHistogram; + }); + rtfHeapMetricsCollector = new RTFHeapMetricsCollector(mockController, mockWrapper); + } + + @Test + public void testCollectMetrics() throws IOException { + rtfHeapMetricsCollector.collectMetrics(System.currentTimeMillis()); + verify(heapUsedHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(gcCollectionTimeHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(gcCollectionEventHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(metricsRegistry, atLeastOnce()) + .createGauge(anyString(), anyString(), anyString(), any(), any()); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java new file mode 100644 index 00000000..089a685b --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollectorTests.java @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeastOnce; + +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.opensearch.indices.IndicesService; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +public class RTFNodeStatsAllShardsMetricsCollectorTests extends OpenSearchSingleNodeTestCase { + + private long startTimeInMills = 1153721339; + private static final String TEST_INDEX = "test"; + private RTFNodeStatsAllShardsMetricsCollector rtfNodeStatsAllShardsMetricsCollector; + private static MetricsRegistry metricsRegistry; + private static Counter cacheQueryHitCounter; + private static Counter cacheQueryMissCounter; + private static Counter cacheQuerySizeCounter; + private static Counter cacheFieldDataEvictionCounter; + private static Counter cacheFieldDataSizeCounter; + private static Counter cacheRequestHitCounter; + private static Counter cacheRequestMissCounter; + private static Counter cacheRequestEvictionCounter; + private static Counter cacheRequestSizeCounter; + + @Before + public void init() { + MetricsConfiguration.CONFIG_MAP.put( + RTFNodeStatsAllShardsMetricsCollector.class, MetricsConfiguration.cdefault); + + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + OpenSearchResources.INSTANCE.setIndicesService(indicesService); + metricsRegistry = mock(MetricsRegistry.class); + cacheFieldDataEvictionCounter = mock(Counter.class); + cacheFieldDataSizeCounter = mock(Counter.class); + cacheQueryHitCounter = mock(Counter.class); + cacheQueryMissCounter = mock(Counter.class); + cacheRequestEvictionCounter = mock(Counter.class); + cacheRequestMissCounter = mock(Counter.class); + cacheRequestHitCounter = mock(Counter.class); + cacheRequestSizeCounter = mock(Counter.class); + cacheQuerySizeCounter = mock(Counter.class); + + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String counterName = (String) invocationOnMock.getArguments()[0]; + if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .REQUEST_CACHE_IN_BYTES_VALUE)) { + return cacheRequestSizeCounter; + } else if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .REQUEST_CACHE_HIT_COUNT_VALUE)) { + return cacheRequestHitCounter; + } else if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .REQUEST_CACHE_MISS_COUNT_VALUE)) { + return cacheRequestMissCounter; + } else if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .REQUEST_CACHE_EVICTION_VALUE)) { + return cacheRequestEvictionCounter; + } else if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .FIELDDATA_EVICTION_VALUE)) { + return cacheFieldDataEvictionCounter; + } else if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .FIELD_DATA_IN_BYTES_VALUE)) { + return cacheFieldDataSizeCounter; + } else if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .QUERY_CACHE_IN_BYTES_VALUE)) { + return cacheQuerySizeCounter; + } else if (counterName.contains( + RTFMetrics.ShardStatsValue.Constants + .QUEY_CACHE_HIT_COUNT_VALUE)) { + return cacheQueryHitCounter; + } + return cacheQueryMissCounter; + }); + + ConfigOverridesWrapper mockWrapper = mock(ConfigOverridesWrapper.class); + PerformanceAnalyzerController mockController = mock(PerformanceAnalyzerController.class); + Mockito.when(mockController.isCollectorDisabled(any(), anyString())).thenReturn(false); + + rtfNodeStatsAllShardsMetricsCollector = + spy(new RTFNodeStatsAllShardsMetricsCollector(mockController, mockWrapper)); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @Test + public void testCollectMetrics() throws IOException { + createIndex(TEST_INDEX); + rtfNodeStatsAllShardsMetricsCollector.collectMetrics(startTimeInMills); + verify(rtfNodeStatsAllShardsMetricsCollector, never()) + .populateDiffMetricValue(any(), any(), anyString(), anyInt()); + startTimeInMills += 500; + rtfNodeStatsAllShardsMetricsCollector.collectMetrics(startTimeInMills); + verify(rtfNodeStatsAllShardsMetricsCollector, times(1)) + .populateDiffMetricValue(any(), any(), anyString(), anyInt()); + verify(cacheFieldDataEvictionCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheFieldDataSizeCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheQueryMissCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheQueryHitCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheQuerySizeCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheRequestEvictionCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheRequestHitCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheRequestMissCounter, atLeastOnce()).add(anyDouble(), any()); + verify(cacheRequestSizeCounter, atLeastOnce()).add(anyDouble(), any()); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollectorTests.java new file mode 100644 index 00000000..895e8be7 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFThreadPoolMetricsCollectorTests.java @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeastOnce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.collectors.CollectorTestBase; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolStats; + +public class RTFThreadPoolMetricsCollectorTests extends CollectorTestBase { + private RTFThreadPoolMetricsCollector rtfThreadPoolMetricsCollector; + private static MetricsRegistry metricsRegistry; + private static Histogram threadPoolQueueSizeHistogram; + private static Histogram threadPoolRejectedReqsHistogram; + private static Histogram threadPoolTotalThreadsHistogram; + private static Histogram threadPoolActiveThreadsHistogram; + private static Histogram threadPoolQueueCapacityHistogram; + @Mock private ThreadPool mockThreadPool; + + @Before + public void init() { + MetricsConfiguration.CONFIG_MAP.put( + RTFThreadPoolMetricsCollector.class, MetricsConfiguration.cdefault); + metricsRegistry = mock(MetricsRegistry.class); + threadPoolQueueSizeHistogram = mock(Histogram.class); + threadPoolRejectedReqsHistogram = mock(Histogram.class); + threadPoolActiveThreadsHistogram = mock(Histogram.class); + threadPoolTotalThreadsHistogram = mock(Histogram.class); + threadPoolQueueCapacityHistogram = mock(Histogram.class); + + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + OpenSearchResources.INSTANCE.setThreadPool(mockThreadPool); + + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.contains( + RTFMetrics.ThreadPoolValue.Constants.QUEUE_SIZE_VALUE)) { + return threadPoolQueueSizeHistogram; + } else if (histogramName.contains( + RTFMetrics.ThreadPoolValue.Constants.REJECTED_VALUE)) { + return threadPoolRejectedReqsHistogram; + } else if (histogramName.contains( + RTFMetrics.ThreadPoolValue.Constants.THREADS_ACTIVE_VALUE)) { + return threadPoolActiveThreadsHistogram; + } else if (histogramName.contains( + RTFMetrics.ThreadPoolValue.Constants.QUEUE_CAPACITY_VALUE)) { + return threadPoolQueueCapacityHistogram; + } + return threadPoolTotalThreadsHistogram; + }); + + rtfThreadPoolMetricsCollector = + new RTFThreadPoolMetricsCollector(mockController, mockWrapper); + } + + @Test + public void testCollectMetrics() throws IOException { + when(mockThreadPool.stats()).thenReturn(generateThreadPoolStat()); + rtfThreadPoolMetricsCollector.collectMetrics(System.currentTimeMillis()); + verify(mockThreadPool, atLeastOnce()).stats(); + verify(threadPoolQueueSizeHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(threadPoolRejectedReqsHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(threadPoolActiveThreadsHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(threadPoolTotalThreadsHistogram, atLeastOnce()).record(anyDouble(), any()); + } + + private ThreadPoolStats generateThreadPoolStat() { + List stats = new ArrayList<>(); + stats.add(new ThreadPoolStats.Stats("write", 0, 0, 0, 2, 0, 0L, 20L)); + return new ThreadPoolStats(stats); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandlerTests.java b/src/test/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandlerTests.java new file mode 100644 index 00000000..8a86a235 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/config/setting/handler/PerformanceAnalyzerCollectorsSettingHandlerTests.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.config.setting.handler; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.config.setting.ClusterSettingsManager; + +public class PerformanceAnalyzerCollectorsSettingHandlerTests { + private PerformanceAnalyzerCollectorsSettingHandler handler; + @Mock private PerformanceAnalyzerController controller; + @Mock private ClusterSettingsManager clusterSettingsManager; + + @Before + public void init() { + initMocks(this); + handler = + new PerformanceAnalyzerCollectorsSettingHandler(controller, clusterSettingsManager); + } + + @Test + public void testOnSettingUpdate() { + Integer newSettingValue = null; + handler.onSettingUpdate(newSettingValue); + verify(controller, never()).updateCollectorsSetting(anyInt()); + + newSettingValue = 1; + handler.onSettingUpdate(newSettingValue); + verify(controller).updateCollectorsSetting(newSettingValue); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigActionTests.java b/src/test/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigActionTests.java index 2bd5d823..28fdebab 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigActionTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/http_action/config/PerformanceAnalyzerClusterConfigActionTests.java @@ -35,6 +35,7 @@ import org.opensearch.performanceanalyzer.config.setting.ClusterSettingsManager; import org.opensearch.performanceanalyzer.config.setting.handler.NodeStatsSettingHandler; import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerClusterSettingHandler; +import org.opensearch.performanceanalyzer.config.setting.handler.PerformanceAnalyzerCollectorsSettingHandler; import org.opensearch.rest.RestController; import org.opensearch.rest.RestRequest; import org.opensearch.test.rest.FakeRestChannel; @@ -52,6 +53,7 @@ public class PerformanceAnalyzerClusterConfigActionTests { private ClusterSettings clusterSettings; private PerformanceAnalyzerClusterSettingHandler clusterSettingHandler; private NodeStatsSettingHandler nodeStatsSettingHandler; + private PerformanceAnalyzerCollectorsSettingHandler performanceAnalyzerCollectorsSettingHandler; private IdentityService identityService; @Mock private PerformanceAnalyzerController controller; @@ -81,12 +83,16 @@ public void init() { clusterSettingHandler = new PerformanceAnalyzerClusterSettingHandler(controller, clusterSettingsManager); nodeStatsSettingHandler = new NodeStatsSettingHandler(controller, clusterSettingsManager); + performanceAnalyzerCollectorsSettingHandler = + new PerformanceAnalyzerCollectorsSettingHandler(controller, clusterSettingsManager); + configAction = new PerformanceAnalyzerClusterConfigAction( Settings.EMPTY, restController, clusterSettingHandler, - nodeStatsSettingHandler); + nodeStatsSettingHandler, + performanceAnalyzerCollectorsSettingHandler); restController.registerHandler(configAction); } @@ -167,6 +173,7 @@ private void testWithRequestPath(String requestPath) throws IOException { assertTrue(responseStr.contains(PerformanceAnalyzerClusterConfigAction.CURRENT)); assertTrue( responseStr.contains(PerformanceAnalyzerClusterConfigAction.SHARDS_PER_COLLECTION)); + assertTrue(responseStr.contains(PerformanceAnalyzerClusterConfigAction.COLLECTORS_SETTING)); assertTrue( responseStr.contains( PerformanceAnalyzerClusterConfigAction @@ -179,6 +186,7 @@ private FakeRestRequest buildRequest(String requestPath) throws IOException { .startObject() .field(PerformanceAnalyzerClusterConfigAction.ENABLED, true) .field(PerformanceAnalyzerClusterConfigAction.SHARDS_PER_COLLECTION, 1) + .field(PerformanceAnalyzerClusterConfigAction.COLLECTORS_SETTING, 2) .endObject(); return new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)