Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31215] [autoscaler] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators #847

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/layouts/shortcodes/generated/auto_scaler_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@
<td>Double</td>
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
</tr>
<tr>
<td><h5>job.autoscaler.processing.rate.backpropagation.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable backpropagation of processing rate during autoscaling to reduce resources usage.</td>
</tr>
<tr>
<td><h5>job.autoscaler.processing.rate.backpropagation.impact</h5></td>
<td style="word-wrap: break-word;">0.0</td>
<td>Double</td>
<td>How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8</td>
</tr>
<tr>
<td><h5>job.autoscaler.quota.cpu</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -210,7 +222,7 @@
<td><h5>job.autoscaler.vertex.exclude.ids</h5></td>
<td style="word-wrap: break-word;"></td>
<td>List&lt;String&gt;</td>
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.</td>
<td>A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.</td>
</tr>
<tr>
<td><h5>job.autoscaler.vertex.max-parallelism</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
Expand All @@ -37,6 +39,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
Expand All @@ -52,8 +55,10 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.NUM_SOURCE_PARTITIONS;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
import static org.apache.flink.autoscaler.utils.AutoScalerUtils.getTargetDataRateFromUpstream;
import static org.apache.flink.util.Preconditions.checkArgument;

/** Component responsible for computing vertex parallelism based on the scaling metrics. */
Expand Down Expand Up @@ -148,6 +153,67 @@ public static ParallelismChange noChange() {
}
}

public void backpropagateRate(
Configuration conf,
JobVertexID vertex,
JobTopology topology,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, Double> backpropagationRate,
List<String> excludedVertices) {

if (excludedVertices.contains(vertex.toHexString())) {
return;
}

var vertexMetrics = evaluatedMetrics.getVertexMetrics().get(vertex);

// vertex scale factor is limited by max scale factor
double scaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);

// vertex scale factor is limited by max parallelism scale factor
scaleFactor =
Math.min(
scaleFactor,
vertexMetrics.get(MAX_PARALLELISM).getCurrent()
/ vertexMetrics.get(PARALLELISM).getCurrent());

double maxProcessingRateAfterScale =
Math.min(
vertexMetrics.get(TARGET_DATA_RATE).getAverage()
* backpropagationRate.getOrDefault(vertex, 1.0),
vertexMetrics.get(TRUE_PROCESSING_RATE).getAverage() * scaleFactor);

// evaluating partially updated target data rate from upstream
double targetDataRate =
getTargetDataRateFromUpstream(
evaluatedMetrics, topology, vertex, backpropagationRate);

// if cannot derive finite value, then assume full processing
if (Double.isNaN(targetDataRate) || Double.isInfinite(targetDataRate)) {
return;
}

// if cannot derive finite value, then assume full processing
if (Double.isNaN(maxProcessingRateAfterScale)
|| Double.isInfinite(maxProcessingRateAfterScale)) {
return;
}

// if all input stream can be processed, skip propagation
if (targetDataRate < maxProcessingRateAfterScale) {
return;
}

// propagation coefficient
double adjustmentRate = maxProcessingRateAfterScale / targetDataRate;

// update rate of direct upstream vertices
for (var v : topology.getVertexInfos().get(vertex).getInputs().keySet()) {
double vertexRate = backpropagationRate.getOrDefault(v, 1.0);
backpropagationRate.put(v, vertexRate * adjustmentRate);
}
}

public ParallelismChange computeScaleTargetParallelism(
Context context,
JobVertexID vertex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import org.apache.flink.shaded.curator5.com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,6 +55,7 @@
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON;
Expand All @@ -61,6 +64,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/** Class responsible for executing scaling decisions. */
Expand Down Expand Up @@ -238,6 +242,11 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(

var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);

if (context.getConfiguration().get(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) {
backpropagateProcessingRate(context, evaluatedMetrics, jobTopology);
}

evaluatedMetrics
.getVertexMetrics()
.forEach(
Expand Down Expand Up @@ -284,6 +293,73 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
return out;
}

private void backpropagateProcessingRate(
Context context, EvaluatedMetrics evaluatedMetrics, JobTopology jobTopology) {
var conf = context.getConfiguration();
double impact = conf.get(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_IMPACT);
Preconditions.checkState(
0 <= impact && impact <= 1.0, "Backpropagation impact should be in range [0, 1]");
var propagationRate = new HashMap<JobVertexID, Double>();
var excludeVertexIdList =
context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
var vertexIterator =
jobTopology
.getStrongTopologicalOrder()
.listIterator(jobTopology.getStrongTopologicalOrder().size());

// backpropagate scale factors
while (vertexIterator.hasPrevious()) {
var vertex = vertexIterator.previous();
jobVertexScaler.backpropagateRate(
conf,
vertex,
jobTopology,
evaluatedMetrics,
propagationRate,
excludeVertexIdList);
}

// use an extra map to not lose precision
Map<JobVertexID, Double> adjustedDataRate = new HashMap<>();

// re-evaluating vertices capacity
// Target data rate metric is rewritten for parallelism evaluation
for (var vertex : jobTopology.getVerticesInTopologicalOrder()) {
double newTargetDataRate = 0.0;

if (jobTopology.isSource(vertex)) {
double targetDateRate =
evaluatedMetrics
.getVertexMetrics()
.get(vertex)
.get(TARGET_DATA_RATE)
.getAverage();

// linear interpolation between adjusted value and initial
newTargetDataRate =
targetDateRate
* (impact * propagationRate.getOrDefault(vertex, 1.0)
+ 1.0
- impact);
} else {
for (var input : jobTopology.getVertexInfos().get(vertex).getInputs().keySet()) {
newTargetDataRate +=
adjustedDataRate.get(input)
* jobTopology
.getVertexInfos()
.get(vertex)
.getInputRatios()
.get(input);
}
}
adjustedDataRate.put(vertex, newTargetDataRate);
evaluatedMetrics
.getVertexMetrics()
.get(vertex)
.put(TARGET_DATA_RATE, EvaluatedScalingMetric.avg(newTargetDataRate));
}
}

private boolean isJobUnderMemoryPressure(
Context ctx, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ private void computeTargetDataRate(
var inputTargetRate = inputEvaluatedMetrics.get(TARGET_DATA_RATE);
var outputRatio =
computeEdgeOutputRatio(inputVertex, vertex, topology, metricsHistory);
topology.get(vertex).getInputRatios().put(inputVertex, outputRatio);
LOG.debug(
"Computed output ratio for edge ({} -> {}) : {}",
inputVertex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,24 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.");

public static final ConfigOption<Boolean> PROCESSING_RATE_BACKPROPAGATION_ENABLED =
autoScalerConfig("processing.rate.backpropagation.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could call this simply bottleneck-propagation.enabled and to control the scaling bottleneck-propagation.allow-scale-down

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 config names are not in sync now

.booleanType()
.defaultValue(false)
.withFallbackKeys(
oldOperatorConfigKey("processing.rate.backpropagation.enabled"))
.withDescription(
"Enable backpropagation of processing rate during autoscaling to reduce resources usage.");

public static final ConfigOption<Double> PROCESSING_RATE_BACKPROPAGATION_IMPACT =
autoScalerConfig("processing.rate.backpropagation.impact")
.doubleType()
.defaultValue(0.0)
.withFallbackKeys(
oldOperatorConfigKey("processing.rate.backpropagation.impact"))
.withDescription(
"How strong should backpropagated values affect scaling. 0 - means no effect, 1 - use backpropagated values. It is not recommended to set this factor greater than 0.8");

public static final ConfigOption<Duration> METRICS_WINDOW =
autoScalerConfig("metrics.window")
.durationType()
Expand Down Expand Up @@ -320,7 +338,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.defaultValues()
.withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids"))
.withDescription(
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling.");

public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
autoScalerConfig("scaling.event.interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

/** Structure representing information about the jobgraph that is relevant for scaling. */
Expand All @@ -52,6 +53,7 @@ public class JobTopology {
@Getter private final Map<SlotSharingGroupId, Set<JobVertexID>> slotSharingGroupMapping;
@Getter private final Set<JobVertexID> finishedVertices;
@Getter private final List<JobVertexID> verticesInTopologicalOrder;
@Getter private final List<JobVertexID> strongTopologicalOrder;

public JobTopology(Collection<VertexInfo> vertexInfo) {
this(new HashSet<>(vertexInfo));
Expand Down Expand Up @@ -99,6 +101,7 @@ public JobTopology(Set<VertexInfo> vertexInfo) {
this.slotSharingGroupMapping = ImmutableMap.copyOf(vertexSlotSharingGroupMapping);
this.finishedVertices = finishedVertices.build();
this.verticesInTopologicalOrder = returnVerticesInTopologicalOrder();
this.strongTopologicalOrder = returnStrongTopologicalOrder();
}

public VertexInfo get(JobVertexID jvi) {
Expand All @@ -112,9 +115,9 @@ public boolean isSource(JobVertexID jobVertexID) {
private List<JobVertexID> returnVerticesInTopologicalOrder() {
List<JobVertexID> sorted = new ArrayList<>(vertexInfos.size());

Map<JobVertexID, List<JobVertexID>> remainingInputs = new HashMap<>(vertexInfos.size());
Map<JobVertexID, Set<JobVertexID>> remainingInputs = new HashMap<>(vertexInfos.size());
vertexInfos.forEach(
(id, v) -> remainingInputs.put(id, new ArrayList<>(v.getInputs().keySet())));
(id, v) -> remainingInputs.put(id, new HashSet<>(v.getInputs().keySet())));

while (!remainingInputs.isEmpty()) {
List<JobVertexID> verticesWithZeroIndegree = new ArrayList<>();
Expand All @@ -140,6 +143,71 @@ private List<JobVertexID> returnVerticesInTopologicalOrder() {
return sorted;
}

/**
* Strong topological order is a topological order, where vertices are also sorted by their
* distance to the most distant sources.
*
* @return vertices in the enhanced topological order
*/
public List<JobVertexID> returnStrongTopologicalOrder() {
List<JobVertexID> sorted = new ArrayList<>(vertexInfos.size());

Map<JobVertexID, Set<JobVertexID>> remainingInputs = new HashMap<>(vertexInfos.size());
vertexInfos.forEach(
(id, v) -> remainingInputs.put(id, new HashSet<>(v.getInputs().keySet())));

Map<JobVertexID, Integer> distances = new HashMap<>(vertexInfos.size());
TreeMap<Integer, List<JobVertexID>> order = new TreeMap<>();

while (!remainingInputs.isEmpty()) {
List<JobVertexID> verticesWithZeroIndegree = new ArrayList<>();

// storing
remainingInputs.forEach(
(v, inputs) -> {
if (inputs.isEmpty()) {
int dist = distances.getOrDefault(v, 0);
if (!order.containsKey(dist)) {
order.put(dist, new ArrayList<>());
}
order.get(dist).add(v);
verticesWithZeroIndegree.add(v);
}
});

verticesWithZeroIndegree.forEach(
v -> {
remainingInputs.remove(v);
vertexInfos
.get(v)
.getOutputs()
.keySet()
.forEach(o -> remainingInputs.get(o).remove(v));
});

List<JobVertexID> layer = order.firstEntry().getValue();
order.remove(order.firstKey());

layer.forEach(
v -> {
final int dist = distances.getOrDefault(v, 0);
vertexInfos
.get(v)
.getOutputs()
.keySet()
.forEach(
o -> {
remainingInputs.get(o).remove(v);
int dist1 = distances.getOrDefault(o, 0);
distances.put(o, Math.max(dist1, dist + 1));
});
});

sorted.addAll(layer);
}
return sorted;
}

public static JobTopology fromJsonPlan(
String jsonPlan,
Map<JobVertexID, SlotSharingGroupId> slotSharingGroupIdMap,
Expand Down
Loading