Skip to content

Commit

Permalink
SearchBackPressure Service Node/Cluster RCA (#437)
Browse files Browse the repository at this point in the history
* Remove log files and add DCO (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove extra files (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove styling difference (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove unnecessary file changes (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add RCA_Decider (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Extract Heap Usage from SQlitedb (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Extract required searchbp metrics for deciders (signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add SearchBackPressureRCA Metric (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Use SearchBackPressureRCAMetrics to aggregate metrics (signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add the conf file extracted part for SearchBackPressureRcaConfig.java (signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add MinMaxSlidingWindow in OldGenRca (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Rename SearchBackPressureClusterRCA and add it to AnalysisGraph (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add basic UTs for SearchBackPressureRCA cluster/node level (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add unhealthy/healthy stats UTs for SearchBackPressureRCA cluster/node level (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add healthy resource unit UT (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add UT s both shard/task level (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add a new SearchBp Resource Unit (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add UTs to test shard/task level resource include-ness (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove styling changes for Version.java (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Add metadata to resourceSummary (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Update to more general framework (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Refactor the MinMaxSlidingWindow and bug fix (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Refactor Heap Stats Metrics Getter(Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Refactor HeapUsed and HeapMax Getters (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Refactor operate() (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Refactor operate() and remove dead comments (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Merged Main (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Merged Main (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* remove trailing space in build.gradle (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* nit javadoc update (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* nit javadoc updates (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* nit javadoc updates (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Remove dead comments  (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* update javadoc  (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* LOG Level Change  (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

* Change from static class to enum (Signed-off-by: Jeffrey Liu [email protected])

Signed-off-by: CoderJeffrey <[email protected]>

---------

Signed-off-by: CoderJeffrey <[email protected]>
(cherry picked from commit 2b970e3)
  • Loading branch information
CoderJeffrey authored and github-actions[bot] committed Oct 5, 2023
1 parent d29ac81 commit dc86c3a
Show file tree
Hide file tree
Showing 12 changed files with 1,374 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ public class MetricsModel {
MetricUnits.MILLISECOND.toString(),
AllMetrics.ShardIndexingPressureDimension.values()));

// Search Back Pressure Metrics
allMetricsInitializer.put(
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values()));
ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.rca.configs;


import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;

public class SearchBackPressureRcaConfig {
public static final String CONFIG_NAME = "search-back-pressure-rca-policy";

/* Metadata fields for thresholds */
public static final String INCREASE_THRESHOLD_BY_JVM_STR = "increase_jvm";
public static final String DECREASE_THRESHOLD_BY_JVM_STR = "decrease_jvm";

public static final int SLIDING_WINDOW_SIZE_IN_MINS = 1;

// Interval period in seconds
public static final long DEFAULT_EVALUATION_INTERVAL_IN_S = 60;

/* interval period to call operate() */
public static final long EVAL_INTERVAL_IN_S = 5;

/* Increase Threshold */
// node max heap usage in last 60 secs is less than 70%
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD = 70;
private Integer maxHeapIncreasePercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
private Integer maxShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in task level
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
private Integer maxTaskHeapCancellationPercentageThreshold;

/* Decrease Threshold */
// node min heap usage in last 60 secs is more than 80%
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD = 80;
private Integer minHeapDecreasePercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
private Integer minShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in task level
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
private Integer minTaskHeapCancellationPercentageThreshold;

public SearchBackPressureRcaConfig(final RcaConf conf) {
// (s) -> s > 0 is the validator, if validated, fields from conf file will be returned,
// else, default value gets returned
maxHeapIncreasePercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_INCREASE_FIELD.toString(),
DEFAULT_MAX_HEAP_INCREASE_THRESHOLD,
(s) -> s >= 0 && s <= 100,
Integer.class);
maxShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD,
(s) -> s >= 0 && s <= 100,
Integer.class);
maxTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD,
(s) -> s >= 0 && s <= 100,
Integer.class);
minHeapDecreasePercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_DECREASE_FIELD.toString(),
DEFAULT_MIN_HEAP_DECREASE_THRESHOLD,
(s) -> s >= 0 && s <= 100,
Integer.class);
minShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD,
(s) -> s >= 0 && s <= 100,
Integer.class);
minTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD,
(s) -> s >= 0 && s <= 100,
Integer.class);
}

// Getters for private field
public Integer getMaxHeapIncreasePercentageThreshold() {
return maxHeapIncreasePercentageThreshold;
}

public Integer getMaxShardHeapCancellationPercentageThreshold() {
return maxShardHeapCancellationPercentageThreshold;
}

public Integer getMaxTaskHeapCancellationPercentageThreshold() {
return maxTaskHeapCancellationPercentageThreshold;
}

public Integer getMinHeapDecreasePercentageThreshold() {
return minHeapDecreasePercentageThreshold;
}

public Integer getMinShardHeapCancellationPercentageThreshold() {
return minShardHeapCancellationPercentageThreshold;
}

public Integer getMinTaskHeapCancellationPercentageThreshold() {
return minTaskHeapCancellationPercentageThreshold;
}

// name for the configuration field
public enum SearchBackPressureRcaConfigKeys {
MAX_HEAP_USAGE_INCREASE_FIELD("max-heap-usage-increase"),
MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD("max-shard-heap-cancellation-percentage"),
MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD("max-task-heap-cancellation-percentage"),
MAX_HEAP_USAGE_DECREASE_FIELD("max-heap-usage-decrease"),
MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD("min-shard-heap-cancellation-percentage"),
MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD("min-task-heap-cancellation-percentage");

private final String value;

SearchBackPressureRcaConfigKeys(final String value) {
this.value = value;
}

@Override
public String toString() {
return this.value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.rca.framework.api.metrics;


import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.rca.framework.api.Metric;

public class Searchbp_Stats extends Metric {
public Searchbp_Stats(long evaluationIntervalSeconds) {
super(
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_TABLE_NAME.toString(),
evaluationIntervalSeconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,24 @@ public class ResourceUtil {
.setResourceEnum(ResourceEnum.SHARD_REQUEST_CACHE)
.setMetricEnum(MetricEnum.CACHE_MAX_SIZE)
.build();
/*
* searchbackpressure related resource
* SEARCHBACKPRESSURE_SHARD resource indicate a searchbackpressure unhealthy resource unit is caused by shard level cancellation
*/
public static final Resource SEARCHBACKPRESSURE_SHARD =
Resource.newBuilder()
.setResourceEnum(ResourceEnum.SEARCHBP)
.setMetricEnum(MetricEnum.SEARCHBP_SHARD)
.build();

/*
* SEARCHBACKPRESSURE_TASK resource indicate a searchbackpressure unhealthy resource unit is caused by task level cancellation
*/
public static final Resource SEARCHBACKPRESSURE_TASK =
Resource.newBuilder()
.setResourceEnum(ResourceEnum.SEARCHBP)
.setMetricEnum(MetricEnum.SEARCHBP_TASK)
.build();

/**
* Read the resourceType name from the ResourceType object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.performanceanalyzer.rca.configs.HotShardRcaConfig;
import org.opensearch.performanceanalyzer.rca.configs.OldGenContendedRcaConfig;
import org.opensearch.performanceanalyzer.rca.configs.QueueRejectionRcaConfig;
import org.opensearch.performanceanalyzer.rca.configs.SearchBackPressureRcaConfig;
import org.opensearch.performanceanalyzer.rca.configs.ShardRequestCacheRcaConfig;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.bucket.BasicBucketCalculator;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.bucket.BucketCalculator;
Expand Down Expand Up @@ -232,6 +233,10 @@ public OldGenContendedRcaConfig getOldGenContendedRcaConfig() {
return new OldGenContendedRcaConfig(this);
}

public SearchBackPressureRcaConfig getSearchBackPressureRcaConfig() {
return new SearchBackPressureRcaConfig(this);
}

public <T> T readRcaConfig(
String rcaName, String key, T defaultValue, Class<? extends T> clazz) {
return readRcaConfig(rcaName, key, defaultValue, (s) -> true, clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Heap_Max;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Heap_Used;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.IndexWriter_Memory;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Searchbp_Stats;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_QueueCapacity;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.ThreadPool_RejectedReqs;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Thread_Blocked_Time;
Expand Down Expand Up @@ -85,6 +86,8 @@
import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.LargeHeapClusterRca;
import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.OldGenContendedRca;
import org.opensearch.performanceanalyzer.rca.store.rca.jvmsizing.OldGenReclamationRca;
import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureClusterRCA;
import org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure.SearchBackPressureRCA;
import org.opensearch.performanceanalyzer.rca.store.rca.temperature.NodeTemperatureRca;
import org.opensearch.performanceanalyzer.rca.store.rca.temperature.dimension.CpuUtilDimensionTemperatureRca;
import org.opensearch.performanceanalyzer.rca.store.rca.temperature.dimension.HeapAllocRateTemperatureRca;
Expand Down Expand Up @@ -117,6 +120,9 @@ public void construct() {
MetricsDB.AVG,
AllMetrics.CommonDimension.OPERATION.toString());

// SearchBackpressure Metric
Metric searchbp_Stats = new Searchbp_Stats(EVALUATION_INTERVAL_SECONDS);

heapUsed.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
Expand All @@ -141,6 +147,9 @@ public void construct() {
threadWaitedTime.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
searchbp_Stats.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);

addLeaf(heapUsed);
addLeaf(gcEvent);
Expand All @@ -150,6 +159,7 @@ public void construct() {
addLeaf(cpuUtilizationGroupByOperation);
addLeaf(threadBlockedTime);
addLeaf(threadWaitedTime);
addLeaf(searchbp_Stats);

// add node stats metrics
List<Metric> nodeStatsMetrics = constructNodeStatsMetrics();
Expand Down Expand Up @@ -433,6 +443,28 @@ public void construct() {
shardRequestCacheClusterRca,
highHeapUsageClusterRca));

// Search Back Pressure Service RCA enabled
SearchBackPressureRCA searchBackPressureRCA =
new SearchBackPressureRCA(RCA_PERIOD, heapMax, heapUsed, searchbp_Stats);
searchBackPressureRCA.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_DATA_CLUSTER_MANAGER_NODE);
searchBackPressureRCA.addAllUpstreams(Arrays.asList(heapMax, heapUsed, searchbp_Stats));

// Search Back Pressure Service Cluster RCA enabled
SearchBackPressureClusterRCA searchBackPressureClusterRCA =
new SearchBackPressureClusterRCA(RCA_PERIOD, searchBackPressureRCA);
searchBackPressureClusterRCA.addTag(
RcaConsts.RcaTagConstants.TAG_LOCUS,
RcaConsts.RcaTagConstants.LOCUS_CLUSTER_MANAGER_NODE);
searchBackPressureClusterRCA.addAllUpstreams(
Collections.singletonList(searchBackPressureRCA));
searchBackPressureClusterRCA.addTag(
RcaConsts.RcaTagConstants.TAG_AGGREGATE_UPSTREAM,
RcaConsts.RcaTagConstants.LOCUS_DATA_NODE);

// TODO: Add SearchBackPressure Decider

AdmissionControlDecider admissionControlDecider =
buildAdmissionControlDecider(heapUsed, heapMax);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected boolean isOldGenCollectorCMS() {
return true;
}

/** Sliding window to check the minimal olg gen usage within a given time frame */
/** Sliding window to check the minimal old gen usage within a given time frame */
public static class MinOldGenSlidingWindow extends SlidingWindow<SlidingWindowData> {

public MinOldGenSlidingWindow(int SLIDING_WINDOW_SIZE_IN_TIMESTAMP, TimeUnit timeUnit) {
Expand Down Expand Up @@ -250,4 +250,60 @@ public double readMin() {
return Double.NaN;
}
}

/**
* Sliding window to check the max/min old gen usage within a given time frame
*
* @param isMinSlidingWindow true if the sliding window is for min usage, false for max usage
* Provides a more general framework than MinOldGenSlidingWindow as this sliding window can
* be implemented as minSlidingWindow or maxSlidingWindow depending on the need.
*/
public static class MinMaxSlidingWindow extends SlidingWindow<SlidingWindowData> {
boolean isMinSlidingWindow;

public MinMaxSlidingWindow(
int SLIDING_WINDOW_SIZE_IN_TIMESTAMP,
TimeUnit timeUnit,
boolean isMinSlidingWindow) {
super(SLIDING_WINDOW_SIZE_IN_TIMESTAMP, timeUnit);
this.isMinSlidingWindow = isMinSlidingWindow;
}

@Override
public void next(SlidingWindowData e) {
if (isMinSlidingWindow) {
// monotonically decreasing sliding window
while (!windowDeque.isEmpty()
&& windowDeque.peekFirst().getValue() >= e.getValue()) {
windowDeque.pollFirst();
}
} else {
// monotonically increasing sliding window
while (!windowDeque.isEmpty()
&& windowDeque.peekFirst().getValue() < e.getValue()) {
windowDeque.pollFirst();
}
}

windowDeque.addFirst(e);
while (!windowDeque.isEmpty()
&& TimeUnit.MILLISECONDS.toSeconds(
e.getTimeStamp() - windowDeque.peekLast().getTimeStamp())
> SLIDING_WINDOW_SIZE) {
windowDeque.pollLast();
}
}

/*
* read last element in the window
* if the sliding window is MinSlidingWindow then returns the min element
* else return the max element in the deque
*/
public double readLastElementInWindow() {
if (!windowDeque.isEmpty()) {
return windowDeque.peekLast().getValue();
}
return Double.NaN;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.rca.store.rca.searchbackpressure;


import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.rca.framework.api.Rca;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.store.rca.cluster.BaseClusterRca;

public class SearchBackPressureClusterRCA extends BaseClusterRca {

public static final String RCA_TABLE_NAME = SearchBackPressureClusterRCA.class.getSimpleName();
private static final Logger LOG = LogManager.getLogger(SearchBackPressureClusterRCA.class);

public <R extends Rca<ResourceFlowUnit<HotNodeSummary>>> SearchBackPressureClusterRCA(
final int rcaPeriod, final R SearchBackPressureRCA) {
super(rcaPeriod, SearchBackPressureRCA);
}
}
Loading

0 comments on commit dc86c3a

Please sign in to comment.