Skip to content

Commit

Permalink
add logic to keep the heap node duress setting in sync with cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
kaushalmahi12 committed Dec 20, 2023
1 parent 510f2d6 commit e9b9585
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@


import io.netty.handler.codec.http.HttpMethod;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.core.Util;
Expand Down Expand Up @@ -54,6 +62,60 @@ public static void disablePA() throws InterruptedException {
throw new RuntimeException("Failed to disable PA after 5 attempts");
}

public static class ClusterSettings {

Check warning on line 65 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L65

Added line #L65 was not covered by tests
static List<String> clusterSettings = new ArrayList<>();

static final String CLUSTER_SETTINGS_URL =
"/_cluster/settings?flat_settings=true&include_defaults=true&pretty";

/** Refreshes the Cluster Settings */
private static void refreshClusterSettings() {
final HttpURLConnection urlConnection =
LocalhostConnectionUtil.createHTTPConnection(
CLUSTER_SETTINGS_URL, HttpMethod.GET);
try (final BufferedReader br =
new BufferedReader(new InputStreamReader(urlConnection.getInputStream()))) {

Check warning on line 77 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L77

Added line #L77 was not covered by tests
String line;
clusterSettings.clear();

Check warning on line 79 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L79

Added line #L79 was not covered by tests
while ((line = br.readLine()) != null) {
clusterSettings.add(line);

Check warning on line 81 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L81

Added line #L81 was not covered by tests
}
} catch (IOException e) {
LOG.warn("could not refresh the cluster settings");
}

Check warning on line 85 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L85

Added line #L85 was not covered by tests
}

/**
* @param settingName a string representing the setting name in cluster settings, expected
* values are flat settings, e,g; <code>search_backpressure.node_duress.heap_threshold
* </code>
* @param settingValRegex a regex value representing valid regex match for the setting val
* and should encapsulate the value in a group inside the string settingValRegex, e,g;
* "\"([0-9].[0-9]+)\"" to match any floating value with one leading digit
* @returns the value for the setting settingName if present e,g; "0.7" or else NULL
*/
public static String getClusterSettingValue(String settingName, String settingValRegex) {
refreshClusterSettings();
Pattern settingValPattern = Pattern.compile(settingValRegex);
Optional<String> setting =
clusterSettings.stream()
.filter(settingLine -> settingLine.contains(settingName))
.findFirst();
final String settingVal =
setting.map(
settingLine -> {
Matcher settingValMatcher =
settingValPattern.matcher(settingLine);

Check warning on line 108 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L107-L108

Added lines #L107 - L108 were not covered by tests
if (settingValMatcher.find()) {
return settingValMatcher.group(1);

Check warning on line 110 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L110

Added line #L110 was not covered by tests
}
return null;

Check warning on line 112 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L112

Added line #L112 was not covered by tests
})
.orElseGet(() -> "NULL");
return settingVal;
}
}

private static HttpURLConnection createHTTPConnection(String path, HttpMethod httpMethod) {
try {
String endPoint = "http://localhost:9200" + path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,41 @@ public class SearchBackPressureRcaConfig {

/* Increase Threshold */
// node max heap usage in last 60 secs is less than 70%
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD = 70;
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD_PERCENT = 80;
private Integer maxHeapIncreasePercentageThreshold;

// cancellation percent due to heap is more than 5% of all task completions at shard level
// (Taking 3 because we don't cancel more than 10% of all completions at any time)
// Basically this threshold tell that we are overcancelling the shard level tasks given max heap from last rca eval period is still
// Basically this threshold tell that we are overcancelling the shard level tasks given max heap
// from last rca eval period is still
// below or equal to DEFAULT_MAX_HEAP_INCREASE_THRESHOLD
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 5;
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT = 5;
private Integer maxShardHeapCancellationPercentageThreshold;

// cancellation percent due to heap is more than 5% of all task completions in
// SearchTask(co-ordinator) level (Taking 3 because we don't cancel more than 10% of all
// completions at any time)
// Basically this threshold tell that we are overcancelling the co-ordinator level tasks
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 5;
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT = 5;
private Integer maxTaskHeapCancellationPercentageThreshold;

/* Decrease Threshold */
// node min heap usage in last 60 secs is more than 80%
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD = 80;
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD_PERCENT = 90;
private Integer minHeapDecreasePercentageThreshold;

// cancellationCount due to heap is less than 3% of all task completions in shard level
// Basically this threshold tell that we are under cancelling the shard level tasks given min heap from last rca eval period is still
// Basically this threshold tell that we are under cancelling the shard level tasks given min
// heap from last rca eval period is still
// above or equal to DEFAULT_MIN_HEAP_DECREASE_THRESHOLD
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 3;
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT = 3;
private Integer minShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is less than 3% of all task completions in task level
// Basically this threshold tell that we are under cancelling the coordinator level tasks given min heap from last rca eval period is still
// Basically this threshold tell that we are under cancelling the coordinator level tasks given
// min heap from last rca eval period is still
// above or equal to DEFAULT_MIN_HEAP_DECREASE_THRESHOLD
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 3;
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT = 3;
private Integer minTaskHeapCancellationPercentageThreshold;

public SearchBackPressureRcaConfig(final RcaConf conf) {
Expand All @@ -66,46 +69,46 @@ public SearchBackPressureRcaConfig(final RcaConf conf) {
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_INCREASE_FIELD.toString(),
DEFAULT_MAX_HEAP_INCREASE_THRESHOLD,
DEFAULT_MAX_HEAP_INCREASE_THRESHOLD_PERCENT,

Check warning on line 72 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L72

Added line #L72 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
maxShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 80 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L80

Added line #L80 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
maxTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 88 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L88

Added line #L88 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
minHeapDecreasePercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_DECREASE_FIELD.toString(),
DEFAULT_MIN_HEAP_DECREASE_THRESHOLD,
DEFAULT_MIN_HEAP_DECREASE_THRESHOLD_PERCENT,

Check warning on line 95 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L95

Added line #L95 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
minShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 103 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L103

Added line #L103 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
minTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 111 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L111

Added line #L111 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.logging.log4j.Logger;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.opensearch.performanceanalyzer.LocalhostConnectionUtil;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.metricsdb.MetricsDB;
Expand All @@ -44,7 +45,12 @@ public class SearchBackPressureRCA extends Rca<ResourceFlowUnit<HotNodeSummary>>
private static final Logger LOG = LogManager.getLogger(SearchBackPressureRCA.class);
private static final double BYTES_TO_GIGABYTES = Math.pow(1024, 3);
private static final long EVAL_INTERVAL_IN_S = SearchBackPressureRcaConfig.EVAL_INTERVAL_IN_S;
private static final String SEARCH_BACKPRESSURE_HEAP_DURESS_KEY =
"search_backpressure.node_duress.heap_threshold";
private static final String SEARCH_BACKPRESSURE_HEAP_DURESS_VAL_REGEX = "([0-9].[0-9]+)";
private static final double CONVERT_BYTES_TO_MEGABYTES = Math.pow(1024, 2);
public static final int MAX_ALLOWED_HEAP = 90;
public static final double MAX_GAP_BW_BASELINE_HEAP_AND_MAX_ALLOWED = 0.1;

// Key metrics used to determine RCA Flow Unit health status
private final Metric heapUsed;
Expand Down Expand Up @@ -133,23 +139,23 @@ public <M extends Metric> SearchBackPressureRCA(

// threshold for heap usage
this.heapUsedIncreaseThreshold =
SearchBackPressureRcaConfig.DEFAULT_MAX_HEAP_INCREASE_THRESHOLD;
SearchBackPressureRcaConfig.DEFAULT_MAX_HEAP_INCREASE_THRESHOLD_PERCENT;
this.heapUsedDecreaseThreshold =
SearchBackPressureRcaConfig.DEFAULT_MIN_HEAP_DECREASE_THRESHOLD;
SearchBackPressureRcaConfig.DEFAULT_MIN_HEAP_DECREASE_THRESHOLD_PERCENT;

/*
* threshold for search back pressure service stats
* currently, only consider the percentage of JVM Usage cancellation count compared to the total cancellation count
*/
this.heapShardCancellationIncreaseMaxThreshold =
SearchBackPressureRcaConfig.DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD;
SearchBackPressureRcaConfig.DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT;
this.heapTaskCancellationIncreaseMaxThreshold =
SearchBackPressureRcaConfig.DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD;
SearchBackPressureRcaConfig.DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT;

this.heapShardCancellationDecreaseMinThreashold =
SearchBackPressureRcaConfig.DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD;
SearchBackPressureRcaConfig.DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT;
this.heapTaskCancellationDecreaseMinThreashold =
SearchBackPressureRcaConfig.DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD;
SearchBackPressureRcaConfig.DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT;

// sliding window for heap usage
this.minHeapUsageSlidingWindow =
Expand Down Expand Up @@ -188,6 +194,23 @@ public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
setFlowUnits(flowUnitList);
}

private long getUpdatedHeapUsedIncreaseThreshold() {
String val =
LocalhostConnectionUtil.ClusterSettings.getClusterSettingValue(
SEARCH_BACKPRESSURE_HEAP_DURESS_KEY,
SEARCH_BACKPRESSURE_HEAP_DURESS_VAL_REGEX);
final String SETTING_NOT_FOUND = "NULL";
// If there was an error fetching the threshold ignore for this run
if (val.equals(SETTING_NOT_FOUND)) {
LOG.warn("There was an error fetching the node duress heap settings value...");
return heapUsedIncreaseThreshold;
}
LOG.debug("successfully fetched the node duress heap threshold {}", val);

Check warning on line 208 in src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java#L208

Added line #L208 was not covered by tests
// this value will be sub-decimal e,g; 0.7
double floatVal = Double.parseDouble(val);
return (long) (floatVal * 100);

Check warning on line 211 in src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/searchbackpressure/SearchBackPressureRCA.java#L210-L211

Added lines #L210 - L211 were not covered by tests
}

/*
* operate() evaluates the current stats against threshold
* Unhealthy Flow Units is a marker that this resource at current instance is not healthy
Expand Down Expand Up @@ -229,6 +252,18 @@ public ResourceFlowUnit<HotNodeSummary> operate() {
// reset currentIterationNumber
currentIterationNumber = 0;

heapUsedIncreaseThreshold = getUpdatedHeapUsedIncreaseThreshold();
// We always want to maintain the gap in increase and decrease thresholds so that OOMs
// doesn't happen
// due to excessive traffic
heapUsedDecreaseThreshold =
Math.min(
MAX_ALLOWED_HEAP,
(long)
(heapUsedIncreaseThreshold
+ heapUsedDecreaseThreshold
* MAX_GAP_BW_BASELINE_HEAP_AND_MAX_ALLOWED));

double maxHeapUsagePercentage = maxHeapUsageSlidingWindow.readLastElementInWindow();
double minHeapUsagePercentage = minHeapUsageSlidingWindow.readLastElementInWindow();
double avgShardJVMCancellationPercentage = shardJVMCancellationSlidingWindow.readAvg();
Expand Down

0 comments on commit e9b9585

Please sign in to comment.