Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Sep 12, 2024
1 parent 214eb82 commit 745f499
Show file tree
Hide file tree
Showing 2 changed files with 285 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.autoscaler;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
Expand All @@ -40,8 +39,8 @@
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedMap;
import java.util.function.Consumer;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
Expand Down Expand Up @@ -69,7 +68,7 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
protected static final String INEFFECTIVE_MESSAGE_FORMAT =
"Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s";

@VisibleForTesting protected static final String SCALE_LIMITED = "ScalingLimited";
@VisibleForTesting protected static final String SCALING_LIMITED = "ScalingLimited";

@VisibleForTesting
protected static final String SCALE_LIMITED_MESSAGE_FORMAT =
Expand Down Expand Up @@ -200,7 +199,7 @@ public ParallelismChange computeScaleTargetParallelism(
double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);

Tuple2<Integer, Optional<String>> newParallelism =
int newParallelism =
scale(
vertex,
currentParallelism,
Expand All @@ -209,20 +208,17 @@ public ParallelismChange computeScaleTargetParallelism(
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));

newParallelism.f1.ifPresent(
message -> {
autoScalerEventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Warning,
SCALE_LIMITED,
message,
SCALE_LIMITED + vertex + cappedTargetCapacity,
conf.get(SCALING_EVENT_INTERVAL));
});

if (newParallelism.f0 == currentParallelism) {
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)),
message ->
autoScalerEventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Warning,
SCALING_LIMITED,
message,
SCALING_LIMITED + vertex + cappedTargetCapacity,
conf.get(SCALING_EVENT_INTERVAL)));

if (newParallelism == currentParallelism) {
// Clear delayed scale down request if the new parallelism is equal to
// currentParallelism.
delayedScaleDown.clearVertex(vertex);
Expand All @@ -241,7 +237,7 @@ public ParallelismChange computeScaleTargetParallelism(
evaluatedMetrics,
history,
currentParallelism,
newParallelism.f0,
newParallelism,
delayedScaleDown);
}

Expand Down Expand Up @@ -372,15 +368,16 @@ private boolean detectIneffectiveScaleUp(
* number of partitions if a vertex has a known partition count.
*/
@VisibleForTesting
protected static Tuple2<Integer, Optional<String>> scale(
protected static int scale(
JobVertexID vertex,
int currentParallelism,
Collection<ShipStrategy> inputShipStrategies,
int numPartitions,
int maxParallelism,
double scaleFactor,
int parallelismLowerLimit,
int parallelismUpperLimit) {
int parallelismUpperLimit,
Consumer<String> consumer) {
checkArgument(
parallelismLowerLimit <= parallelismUpperLimit,
"The parallelism lower limitation must not be greater than the parallelism upper limitation.");
Expand Down Expand Up @@ -413,62 +410,63 @@ protected static Tuple2<Integer, Optional<String>> scale(
var adjustByMaxParallelism =
inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH);
if (!adjustByMaxParallelism) {
return Tuple2.of(newParallelism, Optional.empty());
return newParallelism;
}

int adjustableMaxParallelism = maxParallelism;
int adjustableUpperBound;
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
adjustableUpperBound = Math.min(maxParallelism / 2, upperBound);
} else {
adjustableUpperBound = Math.min(numPartitions, upperBound);
adjustableMaxParallelism = numPartitions;
}

// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the adjustableMaxParallelism
// without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= adjustableUpperBound; p++) {
if (adjustableMaxParallelism % p == 0) {
return p;
}
}

if (numPartitions > 0) {

// When the degree of parallelism after rounding up cannot be evenly divided by source
// PartitionCount, Try to find the smallest parallelism that can satisfy the current
// When adjust the parallelism after rounding up cannot be evenly divided by source
// numPartitions, Try to find the smallest parallelism that can satisfy the current
// consumption rate.
for (int p = newParallelism; p > parallelismLowerLimit; p--) {
if (numPartitions / p > numPartitions / newParallelism) {
if (numPartitions % p != 0) {
p += 1;
p++;
}
var message =
consumer.accept(
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
p,
String.format(
"numPartitions : %s,upperBound(maxParallelism or "
+ "parallelismUpperLimit): %s",
numPartitions, upperBound));
return Tuple2.of(p, Optional.of(message));
"numPartitions : %s,upperBound(maxParallelism or parallelismUpperLimit): %s",
numPartitions, upperBound)));
return p;
}
}
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
var message =

consumer.accept(
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
parallelismLowerLimit,
String.format("parallelismLowerLimit : %s", parallelismLowerLimit));
return Tuple2.of(parallelismLowerLimit, Optional.of(message));
String.format("parallelismLowerLimit : %s", parallelismLowerLimit)));
return parallelismLowerLimit;
}

// If parallelism adjustment fails, use originally computed parallelism
return newParallelism;
}

@VisibleForTesting
Expand Down
Loading

0 comments on commit 745f499

Please sign in to comment.