diff --git a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java index f7c781ac1..5b144ac12 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/model/MetricsModel.java @@ -464,11 +464,6 @@ public class MetricsModel { MetricUnits.MILLISECOND.toString(), AllMetrics.ShardIndexingPressureDimension.values())); - // Search Back Pressure Metrics - allMetricsInitializer.put( - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString(), - new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values())); ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java b/src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java deleted file mode 100644 index a4c81a53b..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.rca.configs; - - -import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; - -public class SearchBackPressureRcaConfig { - public static final String CONFIG_NAME = "search-back-pressure-rca-policy"; - - /* Metadata fields for thresholds */ - public static final String INCREASE_THRESHOLD_BY_JVM_STR = "increase_jvm"; - public static final String DECREASE_THRESHOLD_BY_JVM_STR = "decrease_jvm"; - - public static final int SLIDING_WINDOW_SIZE_IN_MINS = 1; - - // Interval period in seconds - public static final long DEFAULT_EVALUATION_INTERVAL_IN_S = 60; - - /* interval period to call operate() */ - public static final long EVAL_INTERVAL_IN_S = 5; - - /* Increase Threshold */ - // node max heap usage in last 60 secs is less than 70% - public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD = 70; - private Integer maxHeapIncreasePercentageThreshold; - - // cancellationCount due to heap is more than 50% of all task cancellations in shard level - public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 50; - private Integer maxShardHeapCancellationPercentageThreshold; - - // cancellationCount due to heap is more than 50% of all task cancellations in task level - public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 50; - private Integer maxTaskHeapCancellationPercentageThreshold; - - /* Decrease Threshold */ - // node min heap usage in last 60 secs is more than 80% - public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD = 80; - private Integer minHeapDecreasePercentageThreshold; - - // cancellationCount due to heap is less than 30% of all task cancellations in shard level - public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 30; - private Integer minShardHeapCancellationPercentageThreshold; - - // cancellationCount due to heap is less than 30% of all task cancellations in task level - public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 30; - private Integer minTaskHeapCancellationPercentageThreshold; - - public SearchBackPressureRcaConfig(final RcaConf conf) { - // (s) -> s > 0 is the validator, if validated, fields from conf file will be returned, - // else, default value gets returned - maxHeapIncreasePercentageThreshold = - conf.readRcaConfig( - CONFIG_NAME, - SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_INCREASE_FIELD.toString(), - DEFAULT_MAX_HEAP_INCREASE_THRESHOLD, - (s) -> s >= 0 && s <= 100, - Integer.class); - maxShardHeapCancellationPercentageThreshold = - conf.readRcaConfig( - CONFIG_NAME, - SearchBackPressureRcaConfigKeys.MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD - .toString(), - DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD, - (s) -> s >= 0 && s <= 100, - Integer.class); - maxTaskHeapCancellationPercentageThreshold = - conf.readRcaConfig( - CONFIG_NAME, - SearchBackPressureRcaConfigKeys.MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD - .toString(), - DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD, - (s) -> s >= 0 && s <= 100, - Integer.class); - minHeapDecreasePercentageThreshold = - conf.readRcaConfig( - CONFIG_NAME, - SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_DECREASE_FIELD.toString(), - DEFAULT_MIN_HEAP_DECREASE_THRESHOLD, - (s) -> s >= 0 && s <= 100, - Integer.class); - minShardHeapCancellationPercentageThreshold = - conf.readRcaConfig( - CONFIG_NAME, - SearchBackPressureRcaConfigKeys.MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD - .toString(), - DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD, - (s) -> s >= 0 && s <= 100, - Integer.class); - minTaskHeapCancellationPercentageThreshold = - conf.readRcaConfig( - CONFIG_NAME, - SearchBackPressureRcaConfigKeys.MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD - .toString(), - DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD, - (s) -> s >= 0 && s <= 100, - Integer.class); - } - - // Getters for private field - public Integer getMaxHeapIncreasePercentageThreshold() { - return maxHeapIncreasePercentageThreshold; - } - - public Integer getMaxShardHeapCancellationPercentageThreshold() { - return maxShardHeapCancellationPercentageThreshold; - } - - public Integer getMaxTaskHeapCancellationPercentageThreshold() { - return maxTaskHeapCancellationPercentageThreshold; - } - - public Integer getMinHeapDecreasePercentageThreshold() { - return minHeapDecreasePercentageThreshold; - } - - public Integer getMinShardHeapCancellationPercentageThreshold() { - return minShardHeapCancellationPercentageThreshold; - } - - public Integer getMinTaskHeapCancellationPercentageThreshold() { - return minTaskHeapCancellationPercentageThreshold; - } - - // name for the configuration field - public enum SearchBackPressureRcaConfigKeys { - MAX_HEAP_USAGE_INCREASE_FIELD("max-heap-usage-increase"), - MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD("max-shard-heap-cancellation-percentage"), - MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD("max-task-heap-cancellation-percentage"), - MAX_HEAP_USAGE_DECREASE_FIELD("max-heap-usage-decrease"), - MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD("min-shard-heap-cancellation-percentage"), - MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD("min-task-heap-cancellation-percentage"); - - private final String value; - - SearchBackPressureRcaConfigKeys(final String value) { - this.value = value; - } - - @Override - public String toString() { - return this.value; - } - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/Searchbp_Stats.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/Searchbp_Stats.java deleted file mode 100644 index e655e4edc..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/metrics/Searchbp_Stats.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.rca.framework.api.metrics; - - -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; -import org.opensearch.performanceanalyzer.rca.framework.api.Metric; - -public class Searchbp_Stats extends Metric { - public Searchbp_Stats(long evaluationIntervalSeconds) { - super( - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(), - evaluationIntervalSeconds); - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java index 03db4299b..876cd4ca1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/api/summaries/ResourceUtil.java @@ -135,24 +135,6 @@ public class ResourceUtil { .setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE) .setMetricEnum(MetricEnum.CACHE_MAX_SIZE) .build(); - /* - * searchbackpressure related resource - * SEARCHBACKPRESSURE_SHARD resource indicate a searchbackpressure unhealthy resource unit is caused by shard level cancellation - */ - public static final Resource SEARCHBACKPRESSURE_SHARD = - Resource.newBuilder() - .setResourceEnum(ResourceEnum.SEARCHBP) - .setMetricEnum(MetricEnum.SEARCHBP_SHARD) - .build(); - - /* - * SEARCHBACKPRESSURE_TASK resource indicate a searchbackpressure unhealthy resource unit is caused by task level cancellation - */ - public static final Resource SEARCHBACKPRESSURE_TASK = - Resource.newBuilder() - .setResourceEnum(ResourceEnum.SEARCHBP) - .setMetricEnum(MetricEnum.SEARCHBP_TASK) - .build(); /** * Read the resourceType name from the ResourceType object diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java index 0ff06f0af..4005e1c15 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/framework/core/RcaConf.java @@ -51,7 +51,6 @@ import org.opensearch.performanceanalyzer.rca.configs.HotShardRcaConfig; import org.opensearch.performanceanalyzer.rca.configs.OldGenContendedRcaConfig; import org.opensearch.performanceanalyzer.rca.configs.QueueRejectionRcaConfig; -import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; import org.opensearch.performanceanalyzer.rca.configs.ShardRequestCacheRcaConfig; import org.opensearch.performanceanalyzer.rca.framework.api.summaries.bucket.BasicBucketCalculator; import org.opensearch.performanceanalyzer.rca.framework.api.summaries.bucket.BucketCalculator; @@ -233,10 +232,6 @@ public OldGenContendedRcaConfig getOldGenContendedRcaConfig() { return new OldGenContendedRcaConfig(this); } - public SearchBackPressureRcaConfig getSearchBackPressureRcaConfig() { - return new SearchBackPressureRcaConfig(this); - } - public T readRcaConfig( String rcaName, String key, T defaultValue, Class clazz) { return readRcaConfig(rcaName, key, defaultValue, (s) -> true, clazz); diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java index 1b11c014e..80763befb 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/OpenSearchAnalysisGraph.java @@ -41,7 +41,6 @@ import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Heap_Max; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Heap_Used; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.IndexWriter_Memory; -import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Searchbp_Stats; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_RejectedReqs; import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Thread_Blocked_Time; @@ -86,8 +85,6 @@ import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.LargeHeapClusterRca; import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.OldGenContendedRca; import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.OldGenReclamationRca; -import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureClusterRCA; -import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureRCA; import org.opensearch.performanceanalyzer.rca.store.rca.temperature.NodeTemperatureRca; import org.opensearch.performanceanalyzer.rca.store.rca.temperature.dimension.CpuUtilDimensionTemperatureRca; import org.opensearch.performanceanalyzer.rca.store.rca.temperature.dimension.HeapAllocRateTemperatureRca; @@ -120,9 +117,6 @@ public void construct() { MetricsDB.AVG, AllMetrics.CommonDimension.OPERATION.toString()); - // SearchBackpressure Metric - Metric searchbp_Stats = new Searchbp_Stats(EVALUATION_INTERVAL_SECONDS); - heapUsed.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); @@ -147,9 +141,6 @@ public void construct() { threadWaitedTime.addTag( RcaConsts.RcaTagConstants.TAG_LOCUS, RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); - searchbp_Stats.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); addLeaf(heapUsed); addLeaf(gcEvent); @@ -159,7 +150,6 @@ public void construct() { addLeaf(cpuUtilizationGroupByOperation); addLeaf(threadBlockedTime); addLeaf(threadWaitedTime); - addLeaf(searchbp_Stats); // add node stats metrics List nodeStatsMetrics = constructNodeStatsMetrics(); @@ -443,28 +433,6 @@ public void construct() { shardRequestCacheClusterRca, highHeapUsageClusterRca)); - // Search Back Pressure Service RCA enabled - SearchBackPressureRCA searchBackPressureRCA = - new SearchBackPressureRCA(RCA_PERIOD, heapMax, heapUsed, searchbp_Stats); - searchBackPressureRCA.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE); - searchBackPressureRCA.addAllUpstreams(Arrays.asList(heapMax, heapUsed, searchbp_Stats)); - - // Search Back Pressure Service Cluster RCA enabled - SearchBackPressureClusterRCA searchBackPressureClusterRCA = - new SearchBackPressureClusterRCA(RCA_PERIOD, searchBackPressureRCA); - searchBackPressureClusterRCA.addTag( - RcaConsts.RcaTagConstants.TAG_LOCUS, - RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE); - searchBackPressureClusterRCA.addAllUpstreams( - Collections.singletonList(searchBackPressureRCA)); - searchBackPressureClusterRCA.addTag( - RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM, - RcaConsts.RcaTagConstants.LOCUS_DATA_NODE); - - // TODO: Add SearchBackPressure Decider - AdmissionControlDecider admissionControlDecider = buildAdmissionControlDecider(heapUsed, heapMax); diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java index e2056eec8..fc3558339 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java +++ b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/OldGenRca.java @@ -222,7 +222,7 @@ protected boolean isOldGenCollectorCMS() { return true; } - /** Sliding window to check the minimal old gen usage within a given time frame */ + /** Sliding window to check the minimal olg gen usage within a given time frame */ public static class MinOldGenSlidingWindow extends SlidingWindow { public MinOldGenSlidingWindow(int SLIDING_WINDOW_SIZE_IN_TIMESTAMP, TimeUnit timeUnit) { @@ -250,60 +250,4 @@ public double readMin() { return Double.NaN; } } - - /** - * Sliding window to check the max/min old gen usage within a given time frame - * - * @param isMinSlidingWindow true if the sliding window is for min usage, false for max usage - * Provides a more general framework than MinOldGenSlidingWindow as this sliding window can - * be implemented as minSlidingWindow or maxSlidingWindow depending on the need. - */ - public static class MinMaxSlidingWindow extends SlidingWindow { - boolean isMinSlidingWindow; - - public MinMaxSlidingWindow( - int SLIDING_WINDOW_SIZE_IN_TIMESTAMP, - TimeUnit timeUnit, - boolean isMinSlidingWindow) { - super(SLIDING_WINDOW_SIZE_IN_TIMESTAMP, timeUnit); - this.isMinSlidingWindow = isMinSlidingWindow; - } - - @Override - public void next(SlidingWindowData e) { - if (isMinSlidingWindow) { - // monotonically decreasing sliding window - while (!windowDeque.isEmpty() - && windowDeque.peekFirst().getValue() >= e.getValue()) { - windowDeque.pollFirst(); - } - } else { - // monotonically increasing sliding window - while (!windowDeque.isEmpty() - && windowDeque.peekFirst().getValue() < e.getValue()) { - windowDeque.pollFirst(); - } - } - - windowDeque.addFirst(e); - while (!windowDeque.isEmpty() - && TimeUnit.MILLISECONDS.toSeconds( - e.getTimeStamp() - windowDeque.peekLast().getTimeStamp()) - > SLIDING_WINDOW_SIZE) { - windowDeque.pollLast(); - } - } - - /* - * read last element in the window - * if the sliding window is MinSlidingWindow then returns the min element - * else return the max element in the deque - */ - public double readLastElementInWindow() { - if (!windowDeque.isEmpty()) { - return windowDeque.peekLast().getValue(); - } - return Double.NaN; - } - } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java deleted file mode 100644 index a7b95bada..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureClusterRCA.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure; - - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.performanceanalyzer.rca.framework.api.Rca; -import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import org.opensearch.performanceanalyzer.rca.store.rca.cluster.BaseClusterRca; - -public class SearchBackPressureClusterRCA extends BaseClusterRca { - - public static final String RCA_TABLE_NAME = SearchBackPressureClusterRCA.class.getSimpleName(); - private static final Logger LOG = LogManager.getLogger(SearchBackPressureClusterRCA.class); - - public >> SearchBackPressureClusterRCA( - final int rcaPeriod, final R SearchBackPressureRCA) { - super(rcaPeriod, SearchBackPressureRCA); - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java deleted file mode 100644 index d274e6f52..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java +++ /dev/null @@ -1,520 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure; - -import static org.opensearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil.readDataFromSqlResult; -import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_SHARD; -import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_TASK; - -import java.time.Clock; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jooq.Field; -import org.jooq.impl.DSL; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; -import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage; -import org.opensearch.performanceanalyzer.metricsdb.MetricsDB; -import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; -import org.opensearch.performanceanalyzer.rca.framework.api.Metric; -import org.opensearch.performanceanalyzer.rca.framework.api.Rca; -import org.opensearch.performanceanalyzer.rca.framework.api.Resources; -import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.SlidingWindow; -import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.SlidingWindowData; -import org.opensearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext; -import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; -import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; -import org.opensearch.performanceanalyzer.rca.framework.api.persist.SQLParsingUtil; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; -import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf; -import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails; -import org.opensearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper; -import org.opensearch.performanceanalyzer.rca.store.rca.OldGenRca.MinMaxSlidingWindow; -import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.model.SearchBackPressureRCAMetric; - -public class SearchBackPressureRCA extends Rca> { - private static final Logger LOG = LogManager.getLogger(SearchBackPressureRCA.class); - private static final double BYTES_TO_GIGABYTES = Math.pow(1024, 3); - private static final long EVAL_INTERVAL_IN_S = SearchBackPressureRcaConfig.EVAL_INTERVAL_IN_S; - private static final double CONVERT_BYTES_TO_MEGABYTES = Math.pow(1024, 2); - - // Key metrics used to determine RCA Flow Unit health status - private final Metric heapUsed; - private final Metric heapMax; - private final Metric searchbp_Stats; - - // default value for heapUsed and heapMax - private static final double DEFAULT_HEAP_VAL = 0.0; - - /* - * threshold to increase heap limits - */ - private long heapUsedIncreaseThreshold; - // shard-level searchbp heap cancellation increase threshold - private long heapShardCancellationIncreaseMaxThreshold; - - // task-level searchbp heap cancellation increase threshold - private long heapTaskCancellationIncreaseMaxThreshold; - - /* - * threshold to decrease heap limits - */ - private long heapUsedDecreaseThreshold; - // shard-level searchbp heap cancellation decrease threshold - private long heapShardCancellationDecreaseMinThreashold; - - // task-level searchbp heap cancellation decrease threshold - private long heapTaskCancellationDecreaseMinThreashold; - - /* - * Sliding Window - * SearchBackPressureRCA keep track the continous performance of 3 key metrics - * TaskJVMCancellationPercent/ShardJVMCancellationPercent/HeapUsagePercent - */ - private final SlidingWindow taskJVMCancellationSlidingWindow; - private final SlidingWindow shardJVMCancellationSlidingWindow; - private final MinMaxSlidingWindow minHeapUsageSlidingWindow; - private final MinMaxSlidingWindow maxHeapUsageSlidingWindow; - - // Sliding Window Interval - private static final int SLIDING_WINDOW_SIZE_IN_MINS = - SearchBackPressureRcaConfig.SLIDING_WINDOW_SIZE_IN_MINS; - private static final int SLIDING_WINDOW_SIZE_IN_SECS = SLIDING_WINDOW_SIZE_IN_MINS * 60; - - // currentIterationNumber to check the samples has been taken, only emit flow units when - // currentIterationNumber equals to - // rcaPeriod - private long currentIterationNumber; - - // Required amount of RCA period this RCA needs to run before sending out a flowunit - private final int rcaPeriod; - - // Current time - protected Clock clock; - - public SearchBackPressureRCA( - final int rcaPeriod, final M heapMax, final M heapUsed, M searchbp_Stats) { - super(EVAL_INTERVAL_IN_S); - this.heapUsed = heapUsed; - this.heapMax = heapMax; - this.rcaPeriod = rcaPeriod; - this.clock = Clock.systemUTC(); - this.searchbp_Stats = searchbp_Stats; - - // threshold for heap usage - this.heapUsedIncreaseThreshold = - SearchBackPressureRcaConfig.DEFAULT_MAX_HEAP_INCREASE_THRESHOLD; - this.heapUsedDecreaseThreshold = - SearchBackPressureRcaConfig.DEFAULT_MIN_HEAP_DECREASE_THRESHOLD; - - /* - * threshold for search back pressure service stats - * currently, only consider the percentage of JVM Usage cancellation count compared to the total cancellation count - */ - this.heapShardCancellationIncreaseMaxThreshold = - SearchBackPressureRcaConfig.DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD; - this.heapTaskCancellationIncreaseMaxThreshold = - SearchBackPressureRcaConfig.DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD; - - this.heapShardCancellationDecreaseMinThreashold = - SearchBackPressureRcaConfig.DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD; - this.heapTaskCancellationDecreaseMinThreashold = - SearchBackPressureRcaConfig.DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD; - - // sliding window for heap usage - this.minHeapUsageSlidingWindow = - new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, true); - this.maxHeapUsageSlidingWindow = - new MinMaxSlidingWindow(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES, false); - - // sliding window for JVM - this.shardJVMCancellationSlidingWindow = - new SlidingWindow<>(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES); - this.taskJVMCancellationSlidingWindow = - new SlidingWindow<>(SLIDING_WINDOW_SIZE_IN_MINS, TimeUnit.MINUTES); - - LOG.debug("SearchBackPressureRCA initialized"); - } - - /* - * generateFlowUnitListFromWire() compute the flow units from other hosts in the cluster - * for a given Metric and try to send the subscription requests - * to stale or new hosts in cluster if need be - */ - @Override - public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) { - final List flowUnitMessages = - args.getWireHopper().readFromWire(args.getNode()); - final List> flowUnitList = new ArrayList<>(); - LOG.info("rca: Executing fromWire: {}", this.getClass().getSimpleName()); - for (FlowUnitMessage flowUnitMessage : flowUnitMessages) { - flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage)); - } - setFlowUnits(flowUnitList); - } - - /* - * operate() evaluates the current stats against threshold - * Unhealthy Flow Units is a marker that this resource at current instance is not healthy - * Autotune decision would be made by downstream classes - */ - @Override - public ResourceFlowUnit operate() { - currentIterationNumber += 1; - ResourceContext context = null; - long currentTimeMillis = System.currentTimeMillis(); - - // read key metrics into searchBackPressureRCAMetric for easier management - SearchBackPressureRCAMetric searchBackPressureRCAMetric = getSearchBackPressureRCAMetric(); - - // print out oldGenUsed and maxOldGen - LOG.debug( - "SearchBackPressureRCA: oldGenUsed: {} maxOldGen: {}, heapUsedPercentage: {}, searchbpShardCancellationCount: {}, searchbpTaskCancellationCount: {}, searchbpJVMShardCancellationCount: {}, searchbpJVMTaskCancellationCount: {}", - searchBackPressureRCAMetric.getUsedHeap(), - searchBackPressureRCAMetric.getMaxHeap(), - searchBackPressureRCAMetric.getHeapUsagePercent(), - searchBackPressureRCAMetric.getSearchbpShardCancellationCount(), - searchBackPressureRCAMetric.getSearchbpTaskCancellationCount(), - searchBackPressureRCAMetric.getSearchbpJVMShardCancellationCount(), - searchBackPressureRCAMetric.getSearchbpJVMTaskCancellationCount()); - - updateAllSlidingWindows(searchBackPressureRCAMetric, currentTimeMillis); - - LOG.debug("SearchBackPressureRCA currentIterationNumber is {}", currentIterationNumber); - // if currentIterationNumber matches the rca period, emit the flow unit - if (currentIterationNumber == this.rcaPeriod) { - LOG.debug( - "SearchBackPressureRCA currentIterationNumber in rcaPeriod is {}", - currentIterationNumber); - currentTimeMillis = System.currentTimeMillis(); - - // reset currentIterationNumber - currentIterationNumber = 0; - - double maxHeapUsagePercentage = maxHeapUsageSlidingWindow.readLastElementInWindow(); - double minHeapUsagePercentage = minHeapUsageSlidingWindow.readLastElementInWindow(); - double avgShardJVMCancellationPercentage = shardJVMCancellationSlidingWindow.readAvg(); - double avgTaskJVMCancellationPercentage = taskJVMCancellationSlidingWindow.readAvg(); - - LOG.debug( - "SearchBackPressureRCA: maxHeapUsagePercentage: {}, minHeapUsagePercentage: {}, SearchBackPressureRCA: avgShardJVMCancellationPercentage: {}, SearchBackPressureRCA: avgTaskJVMCancellationPercentage: {}", - maxHeapUsagePercentage, - minHeapUsagePercentage, - avgShardJVMCancellationPercentage, - avgTaskJVMCancellationPercentage); - InstanceDetails instanceDetails = getInstanceDetails(); - HotNodeSummary nodeSummary = - new HotNodeSummary( - instanceDetails.getInstanceId(), instanceDetails.getInstanceIp()); - - /* - * 2 cases we send Unhealthy ResourceContext when we need to autotune the threshold - * (increase) node max heap usage in last 60 secs is less than 70% and cancellationCountPercentage due to heap is more than 50% of all task cancellations - * (decrease) node min heap usage in last 60 secs is more than 80% and cancellationCountPercetange due to heap is less than 30% of all task cancellations - */ - boolean maxHeapBelowIncreaseThreshold = - maxHeapUsagePercentage < heapUsedIncreaseThreshold; - boolean minHeapAboveDecreaseThreshold = - minHeapUsagePercentage > heapUsedDecreaseThreshold; - boolean shardHeapCancellationPercentageAboveThreshold = - avgShardJVMCancellationPercentage > heapShardCancellationIncreaseMaxThreshold; - boolean shardHeapCancellationPercentageBelowThreshold = - avgShardJVMCancellationPercentage < heapShardCancellationDecreaseMinThreashold; - boolean taskHeapCancellationPercentageAboveThreshold = - avgTaskJVMCancellationPercentage > heapTaskCancellationIncreaseMaxThreshold; - boolean taskHeapCancellationPercentageBelowThreshold = - avgTaskJVMCancellationPercentage < heapTaskCancellationDecreaseMinThreashold; - - // shard level thresholds - boolean increaseJVMThresholdMetByShard = - maxHeapBelowIncreaseThreshold && shardHeapCancellationPercentageAboveThreshold; - boolean decreaseJVMThresholdMetByShard = - minHeapAboveDecreaseThreshold && shardHeapCancellationPercentageBelowThreshold; - - // task level thresholds - boolean increaseJVMThresholdMetByTask = - maxHeapBelowIncreaseThreshold && taskHeapCancellationPercentageAboveThreshold; - boolean decreaseJVMThresholdMetByTask = - minHeapAboveDecreaseThreshold && taskHeapCancellationPercentageBelowThreshold; - - // Generate a flow unit with an Unhealthy ResourceContext - LOG.debug( - "Increase/Decrease Condition Meet for Shard, maxHeapUsagePercentage: {} is less than threshold: {}, avgShardJVMCancellationPercentage: {} is bigger than heapShardCancellationIncreaseMaxThreshold: {}", - maxHeapUsagePercentage, - heapUsedIncreaseThreshold, - avgShardJVMCancellationPercentage, - heapShardCancellationIncreaseMaxThreshold); - - if (increaseJVMThresholdMetByShard || decreaseJVMThresholdMetByShard) { - context = new ResourceContext(Resources.State.UNHEALTHY); - HotResourceSummary resourceSummary; - // metadata fields indicate the reason for Unhealthy Resource Unit - if (increaseJVMThresholdMetByShard) { - resourceSummary = - new HotResourceSummary( - SEARCHBACKPRESSURE_SHARD, - 0, - 0, - 0, - SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR); - } else { - resourceSummary = - new HotResourceSummary( - SEARCHBACKPRESSURE_SHARD, - 0, - 0, - 0, - SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR); - } - - nodeSummary.appendNestedSummary(resourceSummary); - return new ResourceFlowUnit<>( - currentTimeMillis, - context, - nodeSummary, - !instanceDetails.getIsClusterManager()); - } else if (increaseJVMThresholdMetByTask || decreaseJVMThresholdMetByTask) { - LOG.debug( - "Increase/Decrease Condition Meet for Task, maxHeapUsagePercentage: {} is less than threshold: {}, avgShardJVMCancellationPercentage: {} is bigger than heapShardCancellationIncreaseMaxThreshold: {}", - maxHeapUsagePercentage, - heapUsedIncreaseThreshold, - avgTaskJVMCancellationPercentage, - heapTaskCancellationIncreaseMaxThreshold); - - context = new ResourceContext(Resources.State.UNHEALTHY); - HotResourceSummary resourceSummary; - if (increaseJVMThresholdMetByTask) { - resourceSummary = - new HotResourceSummary( - SEARCHBACKPRESSURE_TASK, - 0, - 0, - 0, - SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR); - } else { - resourceSummary = - new HotResourceSummary( - SEARCHBACKPRESSURE_TASK, - 0, - 0, - 0, - SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR); - } - - nodeSummary.appendNestedSummary(resourceSummary); - return new ResourceFlowUnit<>( - currentTimeMillis, - context, - nodeSummary, - !instanceDetails.getIsClusterManager()); - } else { - // if autotune is not triggered, return healthy state - context = new ResourceContext(Resources.State.HEALTHY); - return new ResourceFlowUnit<>( - currentTimeMillis, - context, - nodeSummary, - !instanceDetails.getIsClusterManager()); - } - - } else { - // Return Empty ResourceFlowUnit if none of the thresholds is met - LOG.debug("Empty FlowUnit returned for SearchbackPressureRCA"); - currentTimeMillis = System.currentTimeMillis(); - return new ResourceFlowUnit<>(currentTimeMillis); - } - } - - /** - * Get the Heap Related Stats (Heap Used and Heap Size in gigabytes) - * - * @param isHeapUsed is true meaning get the value of used heap in gigabytes otherwise, meaning - * get the value of max heap in gigabytes - */ - public double getHeapStats(boolean isHeapUsed) { - double heapStats = DEFAULT_HEAP_VAL; - List heapStatsMetrics; - if (isHeapUsed == true) { - if (heapUsed == null) { - throw new IllegalStateException( - "RCA: " - + this.name() - + "was not configured in the graph to " - + "take heap_Used as a metric. Please check the analysis graph!"); - } - - heapStatsMetrics = heapUsed.getFlowUnits(); - } else { - if (heapMax == null) { - throw new IllegalStateException( - "RCA: " - + this.name() - + "was not configured in the graph to " - + "take heap_Max as a metric. Please check the analysis graph!"); - } - - heapStatsMetrics = heapMax.getFlowUnits(); - } - - for (MetricFlowUnit heapStatsMetric : heapStatsMetrics) { - if (heapStatsMetric.isEmpty()) { - continue; - } - - double ret = - SQLParsingUtil.readDataFromSqlResult( - heapStatsMetric.getData(), - AllMetrics.HeapDimension.MEM_TYPE.getField(), - AllMetrics.GCType.HEAP.toString(), - MetricsDB.MAX); - if (Double.isNaN(ret)) { - LOG.error( - "Failed to parse metric in FlowUnit from {}", - heapUsed.getClass().getName()); - } else { - heapStats = ret / CONVERT_BYTES_TO_MEGABYTES; - } - } - - return heapStats; - } - - private SearchBackPressureRCAMetric getSearchBackPressureRCAMetric() { - // Get Heap Usage related metrics - double prevHeapUsage = getHeapStats(true); - double maxHeapSize = getHeapStats(false); - - // Log prevHeapUsage and maxHeapSize - LOG.debug("prevHeapUsage: {}, maxHeapSize: {}", prevHeapUsage, maxHeapSize); - - // Get SearchBack Pressure related metrics from stats type field - Field searchbp_stats_type_field = - DSL.field( - DSL.name( - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM - .toString()), - String.class); - - double searchbpShardCancellationCount = - getMetric( - this.searchbp_Stats, - searchbp_stats_type_field, - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString()); - double searchbpTaskCancellationCount = - getMetric( - this.searchbp_Stats, - searchbp_stats_type_field, - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT - .toString()); - double searchbpJVMShardCancellationCount = - getMetric( - this.searchbp_Stats, - searchbp_stats_type_field, - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - double searchbpJVMTaskCancellationCount = - getMetric( - this.searchbp_Stats, - searchbp_stats_type_field, - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString()); - - return new SearchBackPressureRCAMetric( - prevHeapUsage, - maxHeapSize, - searchbpShardCancellationCount, - searchbpTaskCancellationCount, - searchbpJVMShardCancellationCount, - searchbpJVMTaskCancellationCount); - } - - private double getMetric(M metric, Field field, String fieldName) { - if (metric == null) { - throw new IllegalStateException( - "RCA: " - + this.name() - + "was not configured in the graph to " - + "take " - + metric.name() - + " as a metric. Please check the analysis graph!"); - } - - double response = 0.0; - for (MetricFlowUnit flowUnit : metric.getFlowUnits()) { - if (!flowUnit.isEmpty()) { - double metricResponse = - readDataFromSqlResult(flowUnit.getData(), field, fieldName, MetricsDB.MAX); - LOG.debug("Searchbp metricResponse is: {}", metricResponse); - if (!Double.isNaN(metricResponse) && metricResponse >= 0.0) { - response = metricResponse; - } - } - } - LOG.debug("Searchbp response is: {}", response); - return response; - } - - /** - * read threshold values from rca.conf - * - * @param conf RcaConf object - */ - @Override - public void readRcaConf(RcaConf conf) { - final SearchBackPressureRcaConfig config = conf.getSearchBackPressureRcaConfig(); - - // threshold read from config file - this.heapUsedIncreaseThreshold = config.getMaxHeapIncreasePercentageThreshold(); - LOG.debug( - "SearchBackPressureRCA heapUsedIncreaseThreshold is set to {}", - this.heapUsedIncreaseThreshold); - this.heapShardCancellationIncreaseMaxThreshold = - config.getMaxShardHeapCancellationPercentageThreshold(); - this.heapTaskCancellationIncreaseMaxThreshold = - config.getMaxTaskHeapCancellationPercentageThreshold(); - this.heapUsedDecreaseThreshold = config.getMinHeapDecreasePercentageThreshold(); - this.heapShardCancellationDecreaseMinThreashold = - config.getMinShardHeapCancellationPercentageThreshold(); - this.heapTaskCancellationDecreaseMinThreashold = - config.getMinTaskHeapCancellationPercentageThreshold(); - } - - /* - * Update Stats for all Sliding Windows - */ - private void updateAllSlidingWindows( - SearchBackPressureRCAMetric searchBackPressureRCAMetric, long currentTimeMillis) { - double prevheapUsagePercentage = searchBackPressureRCAMetric.getHeapUsagePercent(); - if (!Double.isNaN(prevheapUsagePercentage)) { - minHeapUsageSlidingWindow.next( - new SlidingWindowData(currentTimeMillis, prevheapUsagePercentage)); - maxHeapUsageSlidingWindow.next( - new SlidingWindowData(currentTimeMillis, prevheapUsagePercentage)); - } - - double shardJVMCancellationPercentage = - searchBackPressureRCAMetric.getShardJVMCancellationPercent(); - if (!Double.isNaN(shardJVMCancellationPercentage)) { - shardJVMCancellationSlidingWindow.next( - new SlidingWindowData(currentTimeMillis, shardJVMCancellationPercentage)); - } - - double taskJVMCancellationPercentage = - searchBackPressureRCAMetric.getTaskJVMCancellationPercent(); - if (!Double.isNaN(taskJVMCancellationPercentage)) { - taskJVMCancellationSlidingWindow.next( - new SlidingWindowData(currentTimeMillis, taskJVMCancellationPercentage)); - } - } -} diff --git a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/model/SearchBackPressureRCAMetric.java b/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/model/SearchBackPressureRCAMetric.java deleted file mode 100644 index ef74e1763..000000000 --- a/src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/model/SearchBackPressureRCAMetric.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.model; - -public class SearchBackPressureRCAMetric { - private final double usedHeap; - private final double maxHeap; - private final double searchbpShardCancellationCount; - private final double searchbpTaskCancellationCount; - private final double searchbpJVMShardCancellationCount; - private final double searchbpJVMTaskCancellationCount; - - public SearchBackPressureRCAMetric( - double usedHeap, - double maxHeap, - double searchbpShardCancellationCount, - double searchbpTaskCancellationCount, - double searchbpJVMShardCancellationCount, - double searchbpJVMTaskCancellationCount) { - this.usedHeap = usedHeap; - this.maxHeap = maxHeap; - this.searchbpShardCancellationCount = searchbpShardCancellationCount; - this.searchbpTaskCancellationCount = searchbpTaskCancellationCount; - this.searchbpJVMShardCancellationCount = searchbpJVMShardCancellationCount; - this.searchbpJVMTaskCancellationCount = searchbpJVMTaskCancellationCount; - } - - public double getUsedHeap() { - return usedHeap; - } - - public double getMaxHeap() { - return maxHeap; - } - - public double getSearchbpShardCancellationCount() { - return searchbpShardCancellationCount; - } - - public double getSearchbpTaskCancellationCount() { - return searchbpTaskCancellationCount; - } - - public double getSearchbpJVMShardCancellationCount() { - return searchbpJVMShardCancellationCount; - } - - public double getSearchbpJVMTaskCancellationCount() { - return searchbpJVMTaskCancellationCount; - } - - public double getHeapUsagePercent() { - if (this.getMaxHeap() == 0) { - return 0; - } - return 100 * this.getUsedHeap() / this.getMaxHeap(); - } - - public double getShardJVMCancellationPercent() { - if (this.getSearchbpShardCancellationCount() == 0) { - return 0; - } - return 100 - * this.getSearchbpJVMShardCancellationCount() - / this.getSearchbpShardCancellationCount(); - } - - public double getTaskJVMCancellationPercent() { - if (this.getSearchbpTaskCancellationCount() == 0) { - return 0; - } - return 100 - * this.getSearchbpJVMTaskCancellationCount() - / this.getSearchbpTaskCancellationCount(); - } - - public boolean hasValues() { - return this.getUsedHeap() != 0 && this.getMaxHeap() != 0; - } - - @Override - public String toString() { - return "HeapMetric{" - + "usedHeap=" - + usedHeap - + ", maxHeap=" - + maxHeap - + ", searchbpShardCancellationCount=" - + searchbpShardCancellationCount - + ", searchbpTaskCancellationCount=" - + searchbpTaskCancellationCount - + ", searchbpJVMShardCancellationCount=" - + searchbpJVMShardCancellationCount - + ", searchbpJVMTaskCancellationCount=" - + searchbpJVMTaskCancellationCount - + '}'; - } -} diff --git a/src/main/proto/inter_node_rpc_service.proto b/src/main/proto/inter_node_rpc_service.proto index 89ea45d93..fe5c864c9 100644 --- a/src/main/proto/inter_node_rpc_service.proto +++ b/src/main/proto/inter_node_rpc_service.proto @@ -77,9 +77,6 @@ enum ResourceEnum { // Heap HEAP = 20 [(additional_fields).name = "heap"]; - // Search Back Pressure - SEARCHBP = 21 [(additional_fields).name = "search back pressure"]; - } enum MetricEnum { @@ -109,9 +106,6 @@ enum MetricEnum { OLD_GEN_USAGE_AFTER_FULL_GC = 31 [(additional_fields).name = "full gc", (additional_fields).description = "old gen usage after full gc in mb"]; // GC FULL_GC = 32 [(additional_fields).name = "full gc", (additional_fields).description = "full gc pause time in ms"]; - // Searchbp - SEARCHBP_SHARD = 33 [(additional_fields).name = "searchbackpressure shard", (additional_fields).description = "default value to indicate an unhealthy resource unit is from shard-level cancellation"]; - SEARCHBP_TASK = 34 [(additional_fields).name = "searchbackpressure task", (additional_fields).description = "default value to indicate an unhealthy resource unit is from task-level cancellation"]; } /* diff --git a/src/test/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRcaTest.java b/src/test/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRcaTest.java deleted file mode 100644 index f371064e9..000000000 --- a/src/test/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRcaTest.java +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; -import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_SHARD; -import static org.opensearch.performanceanalyzer.rca.framework.api.summaries.ResourceUtil.SEARCHBACKPRESSURE_TASK; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.IntStream; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics; -import org.opensearch.performanceanalyzer.metricsdb.MetricsDB; -import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig; -import org.opensearch.performanceanalyzer.rca.framework.api.Metric; -import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit; -import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit; -import org.opensearch.performanceanalyzer.rca.framework.api.metrics.MetricTestHelper; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary; -import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotResourceSummary; - -public class SearchBackPressureRcaTest { - // Mock Metrics - @Mock private Metric mockHeapUsed; - - @Mock private Metric mockHeapMax; - - @Mock private Metric mockGcType; - - @Mock private Metric mockSearchbpStats; - - // every 5s operate() gets initiated - private static final int RCA_PERIOD = 5; - - private SearchBackPressureRCA testRca; - private MetricTestHelper metricTestHelper; - private static final double DEFAULT_MAX_HEAP_SIZE = 4294967296.0; - - // mock heap metric columns - private final List heapTableColumns = - Arrays.asList( - AllMetrics.HeapDimension.MEM_TYPE.toString(), - MetricsDB.SUM, - MetricsDB.AVG, - MetricsDB.MIN, - MetricsDB.MAX); - - // mock search back pressure metric columns - private final List searchbpTableColumns = - Arrays.asList( - AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TYPE_DIM.toString(), - MetricsDB.SUM, - MetricsDB.AVG, - MetricsDB.MIN, - MetricsDB.MAX); - - // dummy field to create a mock gcType Metric - private static final String CMS_COLLECTOR = "ConcurrentMarkSweep"; - - /* - * initialization before running any test - * - */ - @Before - public void setup() throws Exception { - initMocks(this); - this.metricTestHelper = new MetricTestHelper(RCA_PERIOD); - setupMockHeapMetric(mockHeapUsed, 80.0); - setupMockHeapMetric(mockHeapMax, 100.0); - - // set up SearchBp_Stats table - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); - - this.testRca = - new SearchBackPressureRCA(RCA_PERIOD, mockHeapMax, mockHeapUsed, mockSearchbpStats); - } - - /* - * Test SearchBackPressure RCA returns empty resourceFlowUnit if counter is less than the rcaPeriod - */ - @Test - public void testSearchBpGetResourceContextLessRcaPeriod() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.8); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); - - ResourceFlowUnit flowUnit = testRca.operate(); - - // counter = 1 - // counter needs to equal to RcaPeriod (5 in this case) to get nonempty resourceflowunit - assertTrue(flowUnit.isEmpty()); - } - - /* - * Test SearchBackPressure RCA returns nonempty resourceFlowUnit if counter equals to rcaPeriod - */ - @Test - public void testSearchBpGetResourceContextEqualRcaPeriod() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.8); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - - // counter = RCA_PERIOD - // counter needs to equal to RcaPeriod (5 in this case) to get nonempty resourceflowunit - assertFalse(flowUnit.isEmpty()); - } - - /* - * Test SearchBackPressure RCA returns healthy nonempty flow units if the settings does not trigger autotune - * Meeting None of Increasing or Decreasing Threshold for both shard/task level - */ - @Test - public void testSearchBpGetHealthyFlowUnit() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.8); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 7.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertTrue(flowUnit.getResourceContext().isHealthy()); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by increasing threshold - * Increasing threshold: - * node max heap usage in last 60 secs is less than 70% - * cancellationCount due to heap is more than 50% of all task cancellations (Shard-Level) - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitByShardIncreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.3); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 4.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by increasing threshold - * Increasing threshold: - * node max heap usage in last 60 secs is less than 70% - * cancellationCount due to heap is more than 50% of all task cancellations (Task-Level). - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitByTaskIncreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.3); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 4.0, 8.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by decreasing threshold - * decreasing threshold: - * node min heap usage in last 60 secs is more than 80% - * cancellationCount due to heap is less than 30% of all task cancellations (Shard-Level) - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitByShardDecreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.9); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 2.0, 8.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units if the settings does trigger autotune by decreasing threshold - * decreasing threshold: - * node min heap usage in last 60 secs is more than 80% - * cancellationCount due to heap is less than 30% of all task cancellations (Task-Level) - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitByTaskDecreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.9); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 2.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource - * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in shard-level in decrease threshold - * decreasing threshold: - * node min heap usage in last 60 secs is more than 80% - * cancellationCount due to heap is less than 30% of all task cancellations (Shard-Level) - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitInShardLevelByDecreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.95); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 2.0, 8.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - - HotNodeSummary hotNodeSummary = flowUnit.getSummary(); - List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); - boolean found_shard_resource = - hotResourceSummaries.stream() - .anyMatch( - hotResourceSummary -> - (hotResourceSummary.getResource() - == SEARCHBACKPRESSURE_SHARD) - && (hotResourceSummary.getMetaData() - == SearchBackPressureRcaConfig - .DECREASE_THRESHOLD_BY_JVM_STR)); - - assertTrue(found_shard_resource); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource - * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in shard-level in increase threshold - * Increasing threshold: - * node max heap usage in last 60 secs is less than 70% - * cancellationCount due to heap is more than 50% of all task cancellations (Shard-Level) - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitInShardLevelByIncreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.5); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 2.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - - HotNodeSummary hotNodeSummary = flowUnit.getSummary(); - List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); - boolean found_shard_resource_and_increase_metadata = - hotResourceSummaries.stream() - .anyMatch( - hotResourceSummary -> - (hotResourceSummary.getResource() - == SEARCHBACKPRESSURE_SHARD) - && (hotResourceSummary.getMetaData() - == SearchBackPressureRcaConfig - .INCREASE_THRESHOLD_BY_JVM_STR)); - - assertTrue(found_shard_resource_and_increase_metadata); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource - * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in task-level - * decreasing threshold: - * node min heap usage in last 60 secs is more than 80% - * cancellationCount due to heap is less than 30% of all task cancellations (Task-Level) - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitInTaskLevelByDecreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.9); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 8.0, 2.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - - HotNodeSummary hotNodeSummary = flowUnit.getSummary(); - List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); - boolean found_task_resource = - hotResourceSummaries.stream() - .anyMatch( - hotResourceSummary -> - (hotResourceSummary.getResource() - == SEARCHBACKPRESSURE_TASK) - && (hotResourceSummary.getMetaData() - == SearchBackPressureRcaConfig - .DECREASE_THRESHOLD_BY_JVM_STR)); - - assertTrue(found_task_resource); - } - - /* - * Test SearchBackPressure RCA returns unhealthy nonempty flow units with a HotResourceSummary of SEARCHBACKPRESSURE_SHARD Resource - * indicating the autotune (unhealthy resource unit) is caused by meeting the threshold in shard-level - * Increasing threshold: - * node max heap usage in last 60 secs is less than 70% - * cancellationCount due to heap is more than 50% of all task cancellations (Task-Level) - */ - @Test - public void testSearchBpGetUnHealthyFlowUnitInTaskLevelByIncreaseThreshold() { - setupMockHeapMetric(mockHeapMax, DEFAULT_MAX_HEAP_SIZE); - setupMockHeapMetric(mockHeapUsed, DEFAULT_MAX_HEAP_SIZE * 0.5); - setupMockSearchbpStats(mockSearchbpStats, 10.0, 10.0, 2.0, 8.0); - IntStream.range(0, RCA_PERIOD - 1).forEach(i -> testRca.operate()); - - ResourceFlowUnit flowUnit = testRca.operate(); - assertFalse(flowUnit.isEmpty()); - assertFalse(flowUnit.getResourceContext().isHealthy()); - - HotNodeSummary hotNodeSummary = flowUnit.getSummary(); - List hotResourceSummaries = hotNodeSummary.getHotResourceSummaryList(); - boolean found_task_resource = - hotResourceSummaries.stream() - .anyMatch( - hotResourceSummary -> - (hotResourceSummary.getResource() - == SEARCHBACKPRESSURE_TASK) - && (hotResourceSummary.getMetaData() - == SearchBackPressureRcaConfig - .INCREASE_THRESHOLD_BY_JVM_STR)); - - assertTrue(found_task_resource); - } - - private void setupMockHeapMetric(final Metric metric, final double val) { - String valString = Double.toString(val); - List data = - Arrays.asList( - AllMetrics.GCType.HEAP.toString(), - valString, - valString, - valString, - valString); - when(metric.getFlowUnits()) - .thenReturn( - Collections.singletonList( - new MetricFlowUnit( - 0, - metricTestHelper.createTestResult( - heapTableColumns, data)))); - } - - private void setupMockSearchbpStats( - final Metric metric, - final double searchbpShardCancellationCount, - final double searchbpTaskCancellationCount, - final double searchbpJVMShardCancellationCount, - final double searchbpJVMTaskCancellationCount) { - String searchbpShardCancellationCountStr = Double.toString(searchbpShardCancellationCount); - String searchbpTaskCancellationCountStr = Double.toString(searchbpTaskCancellationCount); - String searchbpJVMShardCancellationCountStr = - Double.toString(searchbpJVMShardCancellationCount); - String searchbpJVMTaskCancellationCountStr = - Double.toString(searchbpJVMTaskCancellationCount); - - // add searchbpShardCancellationCountStr row - List searchbpShardCancellationCountRow = - Arrays.asList( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT - .toString(), - searchbpShardCancellationCountStr, - searchbpShardCancellationCountStr, - searchbpShardCancellationCountStr, - searchbpShardCancellationCountStr); - - // add searchbpTaskCancellationCountStr row - List searchbpTaskCancellationCountRow = - Arrays.asList( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_CANCELLATIONCOUNT - .toString(), - searchbpTaskCancellationCountStr, - searchbpTaskCancellationCountStr, - searchbpTaskCancellationCountStr, - searchbpTaskCancellationCountStr); - - // add searchbpJVMShardCancellationCountStr row - List searchbpJVMShardCancellationCountRow = - Arrays.asList( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString(), - searchbpJVMShardCancellationCountStr, - searchbpJVMShardCancellationCountStr, - searchbpJVMShardCancellationCountStr, - searchbpJVMShardCancellationCountStr); - - // add searchbpJVMTaskCancellationCountStr row - List searchbpJVMTaskCancellationCountRow = - Arrays.asList( - AllMetrics.SearchBackPressureStatsValue - .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT - .toString(), - searchbpJVMTaskCancellationCountStr, - searchbpJVMTaskCancellationCountStr, - searchbpJVMTaskCancellationCountStr, - searchbpJVMTaskCancellationCountStr); - - List flowUnits = - Arrays.asList( - new MetricFlowUnit( - 0, - metricTestHelper.createTestResult( - searchbpTableColumns, searchbpShardCancellationCountRow)), - new MetricFlowUnit( - 0, - metricTestHelper.createTestResult( - searchbpTableColumns, searchbpTaskCancellationCountRow)), - new MetricFlowUnit( - 0, - metricTestHelper.createTestResult( - searchbpTableColumns, - searchbpJVMShardCancellationCountRow)), - new MetricFlowUnit( - 0, - metricTestHelper.createTestResult( - searchbpTableColumns, - searchbpJVMTaskCancellationCountRow))); - - when(metric.getFlowUnits()).thenReturn(flowUnits); - } -}