Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add completion count into shard level and task level SBP RCA #517

Merged
merged 9 commits into from
Dec 22, 2023
8 changes: 8 additions & 0 deletions config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
"shard-request-cache-rca": {
"shard-request-cache-threshold" : 0.9
},
"search-back-pressure-rca": {
"max-heap-usage-increase": 70,
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
"max-shard-heap-cancellation-percentage": 5,
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
"max-task-heap-cancellation-percentage": 5,
"max-heap-usage-decrease": 80,
"min-shard-heap-cancellation-percentage": 3,
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
"min-task-heap-cancellation-percentage": 3
},
"admission-control-rca": {
"request-size": {
"heap-range": [
Expand Down
13 changes: 13 additions & 0 deletions config/rca_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@
// shard request cache rca
"shard-request-cache-rca": {
"shard-request-cache-threshold" : 0.9
},
"search-back-pressure-rca": {
"max-heap-usage-increase": 70,
"max-shard-heap-cancellation-percentage": 5,
"max-task-heap-cancellation-percentage": 5,
"max-heap-usage-decrease": 80,
"min-shard-heap-cancellation-percentage": 3,
"min-task-heap-cancellation-percentage": 3
}
},

Expand All @@ -98,6 +106,11 @@
"old-gen-threshold-level-one" : 0.6,
"old-gen-threshold-level-two" : 0.75,
"old-gen-threshold-level-three" : 0.9
},
"search-back-pressure-policy-config": {
"enabled": true,
"hour-breach-threshold": 1,
"searchbp-heap-stepsize-in-percentage": 5
}
},
// Action Configurations
Expand Down
2 changes: 1 addition & 1 deletion docker/opensearch.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cluster.name: "docker-cluster"
network.host: 0.0.0.0
network.host: 0.0.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public String summary() {
return summary.toJson();
}

@Override
public String toString() {
return summary();
}

public static final class Builder {
public static final boolean DEFAULT_CAN_UPDATE = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@ public SearchBackPressurePolicy(SearchBackPressureClusterRCA searchBackPressureC
* @param issue an issue with the application
*/
private void record(HotResourceSummary summary) {
LOG.debug("SearchBackPressurePolicy capturing resource summary: {}", summary);
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved

if (HEAP_SEARCHBP_SHARD_SIGNALS.contains(summary.getResource())) {
LOG.debug("Shard signal in SBP RCA summary...");
searchBackPressureIssue =
new SearchBackPressureShardIssue(
summary, searchBackPressureShardAlarmMonitorMap);
searchBackPressureIssue.recordIssueBySummaryType(summary);
}

if (HEAP_SEARCHBP_TASK_SIGNALS.contains(summary.getResource())) {
LOG.debug("Task signal in SBP RCA summary...");
searchBackPressureIssue =
new SearchBackPressureSearchTaskIssue(
summary, searchBackPressureTaskAlarmMonitorMap);
Expand All @@ -126,7 +130,9 @@ private void recordIssues() {
LOG.debug("No flow units in searchBackPressureClusterRCA");
return;
}

LOG.debug(
"SearchBackPressurePolicy flow units: {}",
searchBackPressureClusterRCA.getFlowUnits());
for (ResourceFlowUnit<HotClusterSummary> flowUnit :
searchBackPressureClusterRCA.getFlowUnits()) {
if (!flowUnit.hasResourceSummary()) {
Expand Down Expand Up @@ -248,7 +254,10 @@ public List<Action> evaluate() {
checkTaskAlarms(actions);

// print current size of the actions
LOG.debug("SearchBackPressurePolicy#evaluate() action size: {}", actions.size());
LOG.debug(
"SearchBackPressurePolicy#evaluate() action size: {} actions: {}",
actions.size(),
actions);

return actions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public SearchBackPressureShardIssue(

@Override
public void recordIssueBySummaryType(HotResourceSummary summary) {
if (summary.getMetaData() == SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR) {
LOG.debug("Recording issue by summary type..... summary: {}", summary);
if (summary.getMetaData()
.equalsIgnoreCase(SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR)) {
LOG.debug("recording increase-level issue for shard");
LOG.debug("size of the HashMap: {}", actionsAlarmMonitorMap.size());
actionsAlarmMonitorMap
Expand All @@ -33,7 +35,8 @@ public void recordIssueBySummaryType(HotResourceSummary summary) {
}

// decrease alarm for heap-related threshold
if (summary.getMetaData() == SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR) {
if (summary.getMetaData()
.equalsIgnoreCase(SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR)) {
LOG.debug("recording decrease-level issue for shard");
actionsAlarmMonitorMap
.get(SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_DECREASE_ALARM.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public class MetricsModel {

// Search Back Pressure Metrics
allMetricsInitializer.put(
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_TASK_STATS_CANCELLATION_COUNT
.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values()));
ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;

public class SearchBackPressureRcaConfig {
public static final String CONFIG_NAME = "search-back-pressure-rca-policy";
public static final String CONFIG_NAME = "search-back-pressure-rca";

/* Metadata fields for thresholds */
public static final String INCREASE_THRESHOLD_BY_JVM_STR = "increase_jvm";
Expand All @@ -28,25 +28,28 @@ public class SearchBackPressureRcaConfig {
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD = 70;
private Integer maxHeapIncreasePercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
// cancellation percent due to heap is more than 3% of all task completions in shard level
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
// (Taking 3 because we don't cancel more than 10% of all completions at any time)
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 5;
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
private Integer maxShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in task level
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
// cancellation percent due to heap is more than 5% of all task completions in
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
// SearchTask(co-ordinator) level (Taking 3 because we don't cancel more than 10% of all
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
// completions at any time)
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 5;
private Integer maxTaskHeapCancellationPercentageThreshold;

/* Decrease Threshold */
// node min heap usage in last 60 secs is more than 80%
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD = 80;
private Integer minHeapDecreasePercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
// cancellationCount due to heap is less than 3% of all task completions in shard level
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 3;
private Integer minShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in task level
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
// cancellationCount due to heap is less than 3% of all task completions in task level
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 3;
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
private Integer minTaskHeapCancellationPercentageThreshold;

public SearchBackPressureRcaConfig(final RcaConf conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ public ImmutableList<FlowUnitMessage> drainNode(final String graphNode) {
final List<FlowUnitMessage> tempList = new ArrayList<>();
BlockingQueue<FlowUnitMessage> existing = flowUnitMap.get(graphNode);
if (existing == null) {
LOG.debug("Nothing in the FlowUnitStore for vertex: {}", graphNode);
return ImmutableList.of();
}

existing.drainTo(tempList);
LOG.debug("Available flow units for vertex: {}, flowUnits: {}", graphNode, tempList);

return ImmutableList.copyOf(tempList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
final List<FlowUnitMessage> flowUnitMessages =
args.getWireHopper().readFromWire(args.getNode());
final List<ResourceFlowUnit<HotNodeSummary>> flowUnitList = new ArrayList<>();
LOG.info("rca: Executing fromWire: {}", this.getClass().getSimpleName());
LOG.info(
"rca: Executing fromWire: {}, remoteFlowUnits: {}",
this.getClass().getSimpleName(),
flowUnitMessages);
for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
}
Expand All @@ -201,14 +204,17 @@ public ResourceFlowUnit<HotNodeSummary> operate() {

// print out oldGenUsed and maxOldGen
LOG.debug(
"SearchBackPressureRCA: oldGenUsed: {} maxOldGen: {}, heapUsedPercentage: {}, searchbpShardCancellationCount: {}, searchbpTaskCancellationCount: {}, searchbpJVMShardCancellationCount: {}, searchbpJVMTaskCancellationCount: {}",
"SearchBackPressureRCA: oldGenUsed: {} maxOldGen: {}, heapUsedPercentage: {}, searchbpShardCancellationCount: {}, searchbpTaskCancellationCount: {}, searchbpJVMShardCancellationCount: {}, searchbpJVMTaskCancellationCount: {}"
+ ", searchShardTaskCompletionCount: {}, searchTaskCompletionCount: {}",
searchBackPressureRCAMetric.getUsedHeap(),
searchBackPressureRCAMetric.getMaxHeap(),
searchBackPressureRCAMetric.getHeapUsagePercent(),
searchBackPressureRCAMetric.getSearchbpShardCancellationCount(),
searchBackPressureRCAMetric.getSearchbpTaskCancellationCount(),
searchBackPressureRCAMetric.getSearchbpJVMShardCancellationCount(),
searchBackPressureRCAMetric.getSearchbpJVMTaskCancellationCount());
searchBackPressureRCAMetric.getSearchbpJVMTaskCancellationCount(),
searchBackPressureRCAMetric.getSearchShardTaskCompletionCount(),
searchBackPressureRCAMetric.getSearchTaskCompletionCount());

updateAllSlidingWindows(searchBackPressureRCAMetric, currentTimeMillis);

Expand Down Expand Up @@ -241,8 +247,8 @@ public ResourceFlowUnit<HotNodeSummary> operate() {

/*
* 2 cases we send Unhealthy ResourceContext when we need to autotune the threshold
* (increase) node max heap usage in last 60 secs is less than 70% and cancellationCountPercentage due to heap is more than 50% of all task cancellations
* (decrease) node min heap usage in last 60 secs is more than 80% and cancellationCountPercetange due to heap is less than 30% of all task cancellations
* (increase) node max heap usage in last 60 secs is less than 70% and cancellationCountPercentage due to heap is more than 5% of all task completions
* (decrease) node min heap usage in last 60 secs is more than 80% and cancellationCountPercetange due to heap is less than 3% of all task completions
*/
boolean maxHeapBelowIncreaseThreshold =
maxHeapUsagePercentage < heapUsedIncreaseThreshold;
Expand Down Expand Up @@ -270,14 +276,13 @@ public ResourceFlowUnit<HotNodeSummary> operate() {
minHeapAboveDecreaseThreshold && taskHeapCancellationPercentageBelowThreshold;

// Generate a flow unit with an Unhealthy ResourceContext
LOG.debug(
"Increase/Decrease Condition Meet for Shard, maxHeapUsagePercentage: {} is less than threshold: {}, avgShardJVMCancellationPercentage: {} is bigger than heapShardCancellationIncreaseMaxThreshold: {}",
maxHeapUsagePercentage,
heapUsedIncreaseThreshold,
avgShardJVMCancellationPercentage,
heapShardCancellationIncreaseMaxThreshold);

if (increaseJVMThresholdMetByShard || decreaseJVMThresholdMetByShard) {
LOG.debug(
"Increase/Decrease Condition Meet for Shard, maxHeapUsagePercentage: {} is less than threshold: {}, avgShardJVMCancellationPercentage: {} is bigger than heapShardCancellationIncreaseMaxThreshold: {}",
maxHeapUsagePercentage,
heapUsedIncreaseThreshold,
avgShardJVMCancellationPercentage,
heapShardCancellationIncreaseMaxThreshold);
context = new ResourceContext(Resources.State.UNHEALTHY);
HotResourceSummary resourceSummary;
// metadata fields indicate the reason for Unhealthy Resource Unit
Expand Down Expand Up @@ -432,35 +437,51 @@ private SearchBackPressureRCAMetric getSearchBackPressureRCAMetric() {
this.searchbp_Stats,
searchbp_stats_type_field,
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
.SEARCHBP_SHARD_TASK_STATS_CANCELLATION_COUNT
.toString());
double searchbpTaskCancellationCount =
getMetric(
this.searchbp_Stats,
searchbp_stats_type_field,
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_CANCELLATIONCOUNT
.SEARCHBP_SEARCH_TASK_STATS_CANCELLATION_COUNT
.toString());
double searchShardTaskCompletionCount =
getMetric(
this.searchbp_Stats,
searchbp_stats_type_field,
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_TASK_STATS_COMPLETION_COUNT
.toString());
double searchTaskCompletionCount =
getMetric(
this.searchbp_Stats,
searchbp_stats_type_field,
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SEARCH_TASK_STATS_COMPLETION_COUNT
.toString());
double searchbpJVMShardCancellationCount =
getMetric(
this.searchbp_Stats,
searchbp_stats_type_field,
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.SEARCHBP_SHARD_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATION_COUNT
.toString());
double searchbpJVMTaskCancellationCount =
getMetric(
this.searchbp_Stats,
searchbp_stats_type_field,
AllMetrics.SearchBackPressureStatsValue
.SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT
.SEARCHBP_SEARCH_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATION_COUNT
.toString());

return new SearchBackPressureRCAMetric(
prevHeapUsage,
maxHeapSize,
searchbpShardCancellationCount,
searchbpTaskCancellationCount,
searchShardTaskCompletionCount,
searchTaskCompletionCount,
searchbpJVMShardCancellationCount,
searchbpJVMTaskCancellationCount);
}
Expand Down Expand Up @@ -521,12 +542,12 @@ public void readRcaConf(RcaConf conf) {
*/
private void updateAllSlidingWindows(
SearchBackPressureRCAMetric searchBackPressureRCAMetric, long currentTimeMillis) {
double prevheapUsagePercentage = searchBackPressureRCAMetric.getHeapUsagePercent();
if (!Double.isNaN(prevheapUsagePercentage)) {
double prevHeapUsagePercentage = searchBackPressureRCAMetric.getHeapUsagePercent();
if (!Double.isNaN(prevHeapUsagePercentage)) {
minHeapUsageSlidingWindow.next(
new SlidingWindowData(currentTimeMillis, prevheapUsagePercentage));
new SlidingWindowData(currentTimeMillis, prevHeapUsagePercentage));
maxHeapUsageSlidingWindow.next(
new SlidingWindowData(currentTimeMillis, prevheapUsagePercentage));
new SlidingWindowData(currentTimeMillis, prevHeapUsagePercentage));
}

double shardJVMCancellationPercentage =
Expand Down
Loading
Loading