Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Sep 13, 2024
1 parent 745f499 commit 16b5d4a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;
import java.util.function.Consumer;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
Expand Down Expand Up @@ -209,14 +208,8 @@ public ParallelismChange computeScaleTargetParallelism(
scaleFactor,
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)),
message ->
autoScalerEventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Warning,
SCALING_LIMITED,
message,
SCALING_LIMITED + vertex + cappedTargetCapacity,
conf.get(SCALING_EVENT_INTERVAL)));
autoScalerEventHandler,
context);

if (newParallelism == currentParallelism) {
// Clear delayed scale down request if the new parallelism is equal to
Expand Down Expand Up @@ -368,7 +361,7 @@ private boolean detectIneffectiveScaleUp(
* number of partitions if a vertex has a known partition count.
*/
@VisibleForTesting
protected static int scale(
protected static <KEY, Context extends JobAutoScalerContext<KEY>> int scale(
JobVertexID vertex,
int currentParallelism,
Collection<ShipStrategy> inputShipStrategies,
Expand All @@ -377,7 +370,8 @@ protected static int scale(
double scaleFactor,
int parallelismLowerLimit,
int parallelismUpperLimit,
Consumer<String> consumer) {
AutoScalerEventHandler<KEY, Context> eventHandler,
Context context) {
checkArgument(
parallelismLowerLimit <= parallelismUpperLimit,
"The parallelism lower limitation must not be greater than the parallelism upper limitation.");
Expand All @@ -402,7 +396,7 @@ protected static int scale(

// Cap parallelism at either maxParallelism(number of key groups or source partitions) or
// parallelism upper limit
int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);

// Apply min/max parallelism
newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound);
Expand All @@ -413,21 +407,20 @@ protected static int scale(
return newParallelism;
}

int adjustableMaxParallelism = maxParallelism;
int adjustableUpperBound;
int numKeyGroupsOrPartitions = maxParallelism;
int upperBoundForAlignment;
if (numPartitions <= 0) {
adjustableUpperBound = Math.min(maxParallelism / 2, upperBound);
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
} else {
adjustableUpperBound = Math.min(numPartitions, upperBound);
adjustableMaxParallelism = numPartitions;
upperBoundForAlignment = Math.min(numPartitions, upperBound);
numKeyGroupsOrPartitions = numPartitions;
}

// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the adjustableMaxParallelism
// without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= adjustableUpperBound; p++) {
if (adjustableMaxParallelism % p == 0) {
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
if (numKeyGroupsOrPartitions % p == 0) {
return p;
}
}
Expand All @@ -437,32 +430,36 @@ protected static int scale(
// When adjust the parallelism after rounding up cannot be evenly divided by source
// numPartitions, Try to find the smallest parallelism that can satisfy the current
// consumption rate.
for (int p = newParallelism; p > parallelismLowerLimit; p--) {
int p = newParallelism;
for (; p > 0; p--) {
if (numPartitions / p > numPartitions / newParallelism) {
if (numPartitions % p != 0) {
p++;
}
consumer.accept(
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
p,
String.format(
"numPartitions : %s,upperBound(maxParallelism or parallelismUpperLimit): %s",
numPartitions, upperBound)));
return p;
break;
}
}

consumer.accept(
p = Math.max(p, parallelismLowerLimit);

var message =
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
parallelismLowerLimit,
String.format("parallelismLowerLimit : %s", parallelismLowerLimit)));
return parallelismLowerLimit;
p,
String.format(
"numPartitions : %s,upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, "
+ "parallelismLowerLimit: %s.",
numPartitions, upperBound, parallelismLowerLimit));
eventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Warning,
SCALING_LIMITED,
message,
SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
return p;
}

// If parallelism adjustment fails, use originally computed parallelism
Expand Down
Loading

0 comments on commit 16b5d4a

Please sign in to comment.