Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add completion count into shard level and task level SBP RCA #517

Merged
merged 9 commits into from
Dec 22, 2023
8 changes: 8 additions & 0 deletions config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
"shard-request-cache-rca": {
"shard-request-cache-threshold" : 0.9
},
"search-back-pressure-rca": {
"max-heap-usage-increase": 80,
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
"max-shard-heap-cancellation-percentage": 5,
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
"max-task-heap-cancellation-percentage": 5,
"max-heap-usage-decrease": 90,
"min-shard-heap-cancellation-percentage": 3,
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
"min-task-heap-cancellation-percentage": 3
},
"admission-control-rca": {
"request-size": {
"heap-range": [
Expand Down
13 changes: 13 additions & 0 deletions config/rca_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@
// shard request cache rca
"shard-request-cache-rca": {
"shard-request-cache-threshold" : 0.9
},
"search-back-pressure-rca": {
"max-heap-usage-increase": 80,
"max-shard-heap-cancellation-percentage": 5,
"max-task-heap-cancellation-percentage": 5,
"max-heap-usage-decrease": 90,
"min-shard-heap-cancellation-percentage": 3,
"min-task-heap-cancellation-percentage": 3
}
},

Expand All @@ -98,6 +106,11 @@
"old-gen-threshold-level-one" : 0.6,
"old-gen-threshold-level-two" : 0.75,
"old-gen-threshold-level-three" : 0.9
},
"search-back-pressure-policy-config": {
"enabled": true,
"hour-breach-threshold": 1,
"searchbp-heap-stepsize-in-percentage": 5
}
},
// Action Configurations
Expand Down
2 changes: 1 addition & 1 deletion docker/opensearch.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cluster.name: "docker-cluster"
network.host: 0.0.0.0
network.host: 0.0.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@


import io.netty.handler.codec.http.HttpMethod;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.core.Util;
Expand Down Expand Up @@ -54,6 +62,62 @@
throw new RuntimeException("Failed to disable PA after 5 attempts");
}

public static class ClusterSettings {

Check warning on line 65 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L65

Added line #L65 was not covered by tests
static List<String> clusterSettings = new ArrayList<>();

public final static String SETTING_NOT_FOUND = "NULL";

static final String CLUSTER_SETTINGS_URL =
"/_cluster/settings?flat_settings=true&include_defaults=true&pretty";

/** Refreshes the Cluster Settings */
private static void refreshClusterSettings() {
final HttpURLConnection urlConnection =
LocalhostConnectionUtil.createHTTPConnection(
CLUSTER_SETTINGS_URL, HttpMethod.GET);
try (final BufferedReader br =
new BufferedReader(new InputStreamReader(urlConnection.getInputStream()))) {

Check warning on line 79 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L79

Added line #L79 was not covered by tests
String line;
clusterSettings.clear();

Check warning on line 81 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L81

Added line #L81 was not covered by tests
while ((line = br.readLine()) != null) {
clusterSettings.add(line);

Check warning on line 83 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L83

Added line #L83 was not covered by tests
}
} catch (IOException e) {
LOG.warn("could not refresh the cluster settings");
}

Check warning on line 87 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L87

Added line #L87 was not covered by tests
}

/**
* @param settingName a string representing the setting name in cluster settings, expected
* values are flat settings, e,g; <code>search_backpressure.node_duress.heap_threshold
* </code>
* @param settingValRegex a regex value representing valid regex match for the setting val
* and should encapsulate the value in a group inside the string settingValRegex, e,g;
* "\"([0-9].[0-9]+)\"" to match any floating value with one leading digit
* @returns the value for the setting settingName if present e,g; "0.7" or else NULL
*/
public static String getClusterSettingValue(String settingName, String settingValRegex) {
refreshClusterSettings();
Pattern settingValPattern = Pattern.compile(settingValRegex);
Optional<String> setting =
clusterSettings.stream()
.filter(settingLine -> settingLine.contains(settingName))
.findFirst();
final String settingVal =
setting.map(
settingLine -> {
Matcher settingValMatcher =
settingValPattern.matcher(settingLine);

Check warning on line 110 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L109-L110

Added lines #L109 - L110 were not covered by tests
if (settingValMatcher.find()) {
return settingValMatcher.group(1);

Check warning on line 112 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L112

Added line #L112 was not covered by tests
}
return null;

Check warning on line 114 in src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/LocalhostConnectionUtil.java#L114

Added line #L114 was not covered by tests
})
.orElseGet(() -> SETTING_NOT_FOUND);
return settingVal;
}
}

private static HttpURLConnection createHTTPConnection(String path, HttpMethod httpMethod) {
try {
String endPoint = "http://localhost:9200" + path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@
return summary.toJson();
}

@Override
public String toString() {
return summary();

Check warning on line 135 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/actions/SearchBackPressureAction.java#L135

Added line #L135 was not covered by tests
}

public static final class Builder {
public static final boolean DEFAULT_CAN_UPDATE = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@
* @param issue an issue with the application
*/
private void record(HotResourceSummary summary) {
LOG.trace("SearchBackPressurePolicy capturing resource summary: {}", summary);

Check warning on line 106 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java#L106

Added line #L106 was not covered by tests

if (HEAP_SEARCHBP_SHARD_SIGNALS.contains(summary.getResource())) {
LOG.debug("Shard signal in SBP RCA summary...");

Check warning on line 109 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java#L109

Added line #L109 was not covered by tests
searchBackPressureIssue =
new SearchBackPressureShardIssue(
summary, searchBackPressureShardAlarmMonitorMap);
searchBackPressureIssue.recordIssueBySummaryType(summary);
}

if (HEAP_SEARCHBP_TASK_SIGNALS.contains(summary.getResource())) {
LOG.debug("Task signal in SBP RCA summary...");

Check warning on line 117 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java#L117

Added line #L117 was not covered by tests
searchBackPressureIssue =
new SearchBackPressureSearchTaskIssue(
summary, searchBackPressureTaskAlarmMonitorMap);
Expand All @@ -126,7 +130,9 @@
LOG.debug("No flow units in searchBackPressureClusterRCA");
return;
}

LOG.debug(

Check warning on line 133 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java#L133

Added line #L133 was not covered by tests
"SearchBackPressurePolicy flow units: {}",
searchBackPressureClusterRCA.getFlowUnits());

Check warning on line 135 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java#L135

Added line #L135 was not covered by tests
for (ResourceFlowUnit<HotClusterSummary> flowUnit :
searchBackPressureClusterRCA.getFlowUnits()) {
if (!flowUnit.hasResourceSummary()) {
Expand Down Expand Up @@ -248,7 +254,10 @@
checkTaskAlarms(actions);

// print current size of the actions
LOG.debug("SearchBackPressurePolicy#evaluate() action size: {}", actions.size());
LOG.debug(

Check warning on line 257 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java#L257

Added line #L257 was not covered by tests
"SearchBackPressurePolicy#evaluate() action size: {} actions: {}",
actions.size(),

Check warning on line 259 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/SearchBackPressurePolicy.java#L259

Added line #L259 was not covered by tests
actions);

return actions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

@Override
public void recordIssueBySummaryType(HotResourceSummary summary) {
if (summary.getMetaData() == SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR) {
LOG.debug("Recording issue by summary type..... summary: {}", summary);
if (summary.getMetaData()

Check warning on line 28 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java#L27-L28

Added lines #L27 - L28 were not covered by tests
.equalsIgnoreCase(SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR)) {
LOG.debug("recording increase-level issue for shard");
LOG.debug("size of the HashMap: {}", actionsAlarmMonitorMap.size());
actionsAlarmMonitorMap
Expand All @@ -33,7 +35,8 @@
}

// decrease alarm for heap-related threshold
if (summary.getMetaData() == SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR) {
if (summary.getMetaData()

Check warning on line 38 in src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/decisionmaker/deciders/searchbackpressure/model/SearchBackPressureShardIssue.java#L38

Added line #L38 was not covered by tests
.equalsIgnoreCase(SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR)) {
LOG.debug("recording decrease-level issue for shard");
actionsAlarmMonitorMap
.get(SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_DECREASE_ALARM.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public class MetricsModel {

// Search Back Pressure Metrics
allMetricsInitializer.put(
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_TASK_STATS_CANCELLATION_COUNT
.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values()));
ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;

public class SearchBackPressureRcaConfig {
public static final String CONFIG_NAME = "search-back-pressure-rca-policy";
public static final String CONFIG_NAME = "search-back-pressure-rca";

/* Metadata fields for thresholds */
public static final String INCREASE_THRESHOLD_BY_JVM_STR = "increase_jvm";
Expand All @@ -25,28 +25,41 @@

/* Increase Threshold */
// node max heap usage in last 60 secs is less than 70%
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD = 70;
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD_PERCENT = 80;
private Integer maxHeapIncreasePercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
// cancellation percent due to heap is more than 5% of all task completions at shard level
// (Taking 3 because we don't cancel more than 10% of all completions at any time)
// Basically this threshold tell that we are overcancelling the shard level tasks given max heap
// from last rca eval period is still
// below or equal to DEFAULT_MAX_HEAP_INCREASE_THRESHOLD
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT = 5;
private Integer maxShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in task level
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
// cancellation percent due to heap is more than 5% of all task completions in
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
// SearchTask(co-ordinator) level (Taking 3 because we don't cancel more than 10% of all
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
// completions at any time)
// Basically this threshold tell that we are overcancelling the co-ordinator level tasks
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT = 5;
private Integer maxTaskHeapCancellationPercentageThreshold;

/* Decrease Threshold */
// node min heap usage in last 60 secs is more than 80%
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD = 80;
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD_PERCENT = 90;
private Integer minHeapDecreasePercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
// cancellationCount due to heap is less than 3% of all task completions in shard level
// Basically this threshold tell that we are under cancelling the shard level tasks given min
// heap from last rca eval period is still
// above or equal to DEFAULT_MIN_HEAP_DECREASE_THRESHOLD
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT = 3;
private Integer minShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in task level
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
// cancellationCount due to heap is less than 3% of all task completions in task level
kaushalmahi12 marked this conversation as resolved.
Show resolved Hide resolved
// Basically this threshold tell that we are under cancelling the coordinator level tasks given
// min heap from last rca eval period is still
// above or equal to DEFAULT_MIN_HEAP_DECREASE_THRESHOLD
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT = 3;
private Integer minTaskHeapCancellationPercentageThreshold;

public SearchBackPressureRcaConfig(final RcaConf conf) {
Expand All @@ -56,46 +69,46 @@
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_INCREASE_FIELD.toString(),
DEFAULT_MAX_HEAP_INCREASE_THRESHOLD,
DEFAULT_MAX_HEAP_INCREASE_THRESHOLD_PERCENT,

Check warning on line 72 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L72

Added line #L72 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
maxShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 80 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L80

Added line #L80 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
maxTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 88 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L88

Added line #L88 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
minHeapDecreasePercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_DECREASE_FIELD.toString(),
DEFAULT_MIN_HEAP_DECREASE_THRESHOLD,
DEFAULT_MIN_HEAP_DECREASE_THRESHOLD_PERCENT,

Check warning on line 95 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L95

Added line #L95 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
minShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 103 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L103

Added line #L103 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
minTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT,

Check warning on line 111 in src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/performanceanalyzer/rca/configs/SearchBackPressureRcaConfig.java#L111

Added line #L111 was not covered by tests
(s) -> s >= 0 && s <= 100,
Integer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ public ImmutableList<FlowUnitMessage> drainNode(final String graphNode) {
final List<FlowUnitMessage> tempList = new ArrayList<>();
BlockingQueue<FlowUnitMessage> existing = flowUnitMap.get(graphNode);
if (existing == null) {
LOG.debug("Nothing in the FlowUnitStore for vertex: {}", graphNode);
return ImmutableList.of();
}

existing.drainTo(tempList);
LOG.debug("Available flow units for vertex: {}, flowUnits: {}", graphNode, tempList);

return ImmutableList.copyOf(tempList);
}
Expand Down
Loading
Loading