Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36192][autocaler] Autocaler supports adjusting the parallelism of source vertex based on the number of partitions in Kafka or pulsars #879

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

huyuanfeng2018
Copy link
Contributor

What is the purpose of the change

Autoscaler adjusts the parallelism of the corresponding vertex according to the number of partitions in Kafka or Pulsar, so that the parallelism is a divisor of the number of partitions.

Brief change log

  • Add a new ScalingMetric.NUM_PARTITIONS to record partition count of kafka or pulsar
  • adjusts org.apache.flink.autoscaler.JobVertexScaler.scale : This method also attempts to adjust the parallelism to ensure it aligns well with the number of partitions if a vertex has a known partition count
  • adjusts org.apache.flink.autoscaler.JobVertexScaler.scale Return exception information that occurs during the adjustment process
  • The eventhandler will handle events where the final degree of parallelism does not meet expectations due to the number of partitions or maxparallelism limitations.

Verifying this change

  • Added test cases org.apache.flink.autoscaler.JobVertexScalerTest#testNumPartitionsAdjustment and org.apache.flink.autoscaler.JobVertexScalerTest#testSendingScalingLimitedEvents

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changes to the CustomResourceDescriptors: (no)
  • Core observer or reconciler logic that is regularly executed: (no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

huyuanfeng added 2 commits September 10, 2024 11:36
…of the Source to the number of partitions in kafka or pulsar
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huyuanfeng2018 for this PR! A couple suggestions and a comment for my understanding.

Comment on lines 419 to 439
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {

// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like code duplication to me. Only the for loop termination condition changed. We should be able to pass an argument to the for loop which we set based on the number of partitions.

We may even completely simplify like this:

Suggested change
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {
// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
if (numPartitions <= 0) {
// No partition information is available, assume numPartitions equals the number of key groups
numPartitions = maxParallelism;
}
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with respect to p <= maxParallelism / 2

When dealing with inputShipStrategies = hash, maxParallelism = 128, newParallelism = 78, I think newParallelism = 78 is acceptable, because not all tasks have a large state after keyby,

But for consuming kafka's vertex, this becomes unacceptable
Imagine that Kafka with 128 partitions is consumed concurrently by 78 task :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I missed that. I was trying to generalize the two code blocks. How about the following?

Suggested change
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {
// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
if (numPartitions <= 0) {
upperBound = Math.min(maxParallelism / 2, upperBound);
} else {
upperBound = Math.min(num_partitions, upperBound);
maxParallelism = num_partitions;
}
for (int p = newParallelism; p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
...
// Resource optimization logic follows (if we can't achieve optimal partitioning)
// (See review comment below)
...
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());

Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I missed that. I was trying to generalize the two code blocks. How about the following?

Thansk, fine with me , However, I do not recommend overriding the values ​​​​of MaxParallelism and UpperBound, so I added two new variables instead:

  1. adjustableMaxParallelism( Indicates the MaxParallelism in the adjustment process )
  2. adjustableUpperBound (Indicates the UpperBound in the adjustment process)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call them like this?

  1. adjustableMaxParallelism => numKeyGroupsOrPartitions
  2. adjustableUpperBound => upperBoundForAlignment

Adjustable just doesn't tell someone who is unfamiliar with the code very much.

Comment on lines 462 to 470
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
var message =
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
parallelismLowerLimit,
String.format("parallelismLowerLimit : %s", parallelismLowerLimit));
return Tuple2.of(parallelismLowerLimit, Optional.of(message));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this behavior. Why return the lower limit, instead of the originally computed target parallelism? I think we should retain this logic:

           // If parallelism adjustment fails, use originally computed parallelism
           return newParallelism;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I responded with the specific logic, but I think it's still worth discussing

if (numPartitions / p > numPartitions / newParallelism) {
if (numPartitions % p != 0) {
p += 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why p++ here? Maybe I'm overlooking something but p already divides numPartitions without a remainder.

Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me expand on my thoughts on this point:

  1. The first thing we need to consider is the p that can be divisible by the number of partitions when p<upperBound is satisfied.

  2. If step 1 fails to obtain the corresponding result, it means that we cannot guarantee the even consumption of the number of partitions when rounding up (due to uneven consumption, bottlenecks will exist in some tasks, so theoretically we get the largest upperBound The processing rate will not increase), at this time we consider taking a minimum value that is comparable to the current processing rate, satisfying newParallelism / partition = p/partition

Here is an example:
numPartitions=35 ,newParallelism=20, upperBound = 30;

step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain the number of partitions that can be consumed evenly. p=30 and p=20 have no change in the consumption rate.

step2:
Since 35/20 = 1 .... 15, 20 degrees of parallelism cannot consume Kafka evenly, so at this time there will be 15 tasks consuming two partitions,5 task consuming one partitions, so our final result only requires that there are tasks consuming two partition, then our processing rate won’t slow down

That is ( numPartitions / p = 2 && numPartitions % p =0 || numPartitions/ (p-1) =2 && (p-1) % !=0 ).

So p+=1 here is caused by the indivisible partition obtained when fetching down. It should need +1 to meet the conditions. 35 / 17 = 2 But eventually there will be a task consuming three partitions, so we need to add 17+1, 18 is our final result (17 tasks consume 2 partitions each, 1 task consumes one partition)

step3:
If p is already less than parallelismLowerLimit during the fetching process, we should directly use parallelismLowerLimit as the final degree of parallelism

However, the above is all theoretical logic. I am not sure whether this will cause some negative effects. For example, the data distribution of the partition itself is uneven. Step 2 may aggravate this phenomenon, so I have reservations about this part of the logic, another approach is to directly return newParallelism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining in detail. I misread some of the code. It is correct that we need to add +1 when we have found a parallelism which yields a greater value for num_partitions / p than the initial num_partitions / new_parallelism because we have found the tipping point where we achieve the most utilization in terms of partitions per task.

I think we should return new_parallelism if all adaptation logic fails because using a potentially very small configured lower parallelism could make things a lot worse due to resource constraints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining in detail. I misread some of the code. It is correct that we need to add +1 when we have found a parallelism which yields a greater value for num_partitions / p than the initial num_partitions / new_parallelism because we have found the tipping point where we achieve the most utilization in terms of partitions per task.

I think we should return new_parallelism if all adaptation logic fails because using a potentially very small configured lower parallelism could make things a lot worse due to resource constraints.

I want to explain the reason for using parallelismLowerLimit, example:

numPartitions=35 ,newParallelism=24, upperBound = 30, parallelismLowerLimit = 19

Step1 cannot get a result, so it goes to step2, but step2 still cannot get a result because parallelismLowerLimit = 19 and the expected value of step2 is 18, so it will eventually return 19

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, it's probably ok to use the lower limit. As you said, we would already be approaching the limit. Most users will never run into this because they haven't configured a minimum parallelism.

@1996fanrui 1996fanrui self-requested a review September 11, 2024 01:43
Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm I explained some logic. You can review again when you have time. Thank you very much !

if (numPartitions / p > numPartitions / newParallelism) {
if (numPartitions % p != 0) {
p += 1;
}
Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me expand on my thoughts on this point:

  1. The first thing we need to consider is the p that can be divisible by the number of partitions when p<upperBound is satisfied.

  2. If step 1 fails to obtain the corresponding result, it means that we cannot guarantee the even consumption of the number of partitions when rounding up (due to uneven consumption, bottlenecks will exist in some tasks, so theoretically we get the largest upperBound The processing rate will not increase), at this time we consider taking a minimum value that is comparable to the current processing rate, satisfying newParallelism / partition = p/partition

Here is an example:
numPartitions=35 ,newParallelism=20, upperBound = 30;

step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain the number of partitions that can be consumed evenly. p=30 and p=20 have no change in the consumption rate.

step2:
Since 35/20 = 1 .... 15, 20 degrees of parallelism cannot consume Kafka evenly, so at this time there will be 15 tasks consuming two partitions,5 task consuming one partitions, so our final result only requires that there are tasks consuming two partition, then our processing rate won’t slow down

That is ( numPartitions / p = 2 && numPartitions % p =0 || numPartitions/ (p-1) =2 && (p-1) % !=0 ).

So p+=1 here is caused by the indivisible partition obtained when fetching down. It should need +1 to meet the conditions. 35 / 17 = 2 But eventually there will be a task consuming three partitions, so we need to add 17+1, 18 is our final result (17 tasks consume 2 partitions each, 1 task consumes one partition)

step3:
If p is already less than parallelismLowerLimit during the fetching process, we should directly use parallelismLowerLimit as the final degree of parallelism

However, the above is all theoretical logic. I am not sure whether this will cause some negative effects. For example, the data distribution of the partition itself is uneven. Step 2 may aggravate this phenomenon, so I have reservations about this part of the logic, another approach is to directly return newParallelism

Comment on lines 419 to 439
if (numPartitions <= 0) {
// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides the maxParallelism without a
// remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) {
if (maxParallelism % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
// If parallelism adjustment fails, use originally computed parallelism
return Tuple2.of(newParallelism, Optional.empty());
} else {

// When we know the numPartitions at a vertex,
// adjust the parallelism such that it divides the numPartitions without a remainder
// => Data is evenly distributed among subtasks
for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) {
if (numPartitions % p == 0) {
return Tuple2.of(p, Optional.empty());
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with respect to p <= maxParallelism / 2

When dealing with inputShipStrategies = hash, maxParallelism = 128, newParallelism = 78, I think newParallelism = 78 is acceptable, because not all tasks have a large state after keyby,

But for consuming kafka's vertex, this becomes unacceptable
Imagine that Kafka with 128 partitions is consumed concurrently by 78 task :)

Comment on lines 462 to 470
// If a suitable degree of parallelism cannot be found, return parallelismLowerLimit
var message =
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
newParallelism,
parallelismLowerLimit,
String.format("parallelismLowerLimit : %s", parallelismLowerLimit));
return Tuple2.of(parallelismLowerLimit, Optional.of(message));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I responded with the specific logic, but I think it's still worth discussing

Copy link
Member

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huyuanfeng2018 for the contribution, and @mxm for the review!

I left some comments, please take a look in your free time, thanks~

@@ -345,15 +356,22 @@ private boolean detectIneffectiveScaleUp(
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
* remainder.
*
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
* number of partitions if a vertex has a known partition count.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* number of partitions if a vertex has a known partition count.
* number of source partitions if a source vertex has a known partition count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

int numKeyGroupsOrPartitions = maxParallelism;
int upperBoundForAlignment;
if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why maxParallelism / 2?

Assuming the upstream edge of vertex is keyBy(hash):

  • The maxParallelism is 100
  • newParallelism is 80

We will use 80 as the result, right? If so, it will meet same issue with source partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I think not all tasks after hash are in large flink keyedState. In this scenario, miss alignment has almost no impact on the task. I think this is the trade-off between performance and resources that was taken into consideration by the previous logic.

But this becomes less acceptable for consuming kafka or pulsar

@mxm @1996fanrui Maybe we can discuss this logic further

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand, the key group is totally similar with source partition(kafka or plusar). They determine how many partitions or groups a Flink parallelism can consume.

The performance is unbalanced even if without large state. For example, the maxParallelism(number of keyGroups) is 100, and the actual parallelism is 70.

  • It means that 30 instances process 2 keyGroups each, and the remaining 40 instances process 2 keyGroups each.
  • Assuming that the data of each keyGroup is balanced, the 30 instances processing 2 keyGroups will become the bottleneck of the job.

For this scenario, there is no difference when the parallelism is set to 50 and 99.

IIUC, this situation is exactly the source partition problem you want to solve, and it works exactly the same for keyGroup as well.

Please correct me if anything is wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, you are right, the specific entrance is here KeyGroupStreamPartitionerIn this case, We can unify our logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.

Are you asking to expand the source logic introduced here to hash keyed state?

For this scenario, there is no difference when the parallelism is set to 50 and 99.

That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.

In this scenario though, we should not be going down to 50, we probably should be going up to 100. I think the maxParallelism / 2 stems from the idea that the maximum parallelism won't be reached because it is set to a number parallelism <= maxParallelism / 2 which would mean that it doesn't make sense to continue beyond maxParallelism/2 because there aren't more possible divisors. However, when parallelism > maxParallelism / 2, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism for the initial parallelism > maxParallelism / 2. We could just skip this (premature) optimization entirely.

I agree that we should replace the current key alignment logic with the generalized source logic introduced here. Something like this:

final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
    numKeyGroupsOrPartitions = maxParallelism;
    upperBoundForAlignment = Math.min(
        // Optimize the case where newParallelism <= maxParallelism / 2
        newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, 
        upperBound
    );
} else {
    numKeyGroupsOrPartitions = numSourcePartitions;
    upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}

// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
    if (numKeyGroupsOrPartitions % p == 0) {
        return p;
    }
}

// When adjust the parallelism after rounding up cannot be evenly divided by source
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
// current
// consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
    if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
        if (numKeyGroupsOrPartitions % p != 0) {
            p++;
        }
        break;
    }
}

p = Math.max(p, parallelismLowerLimit);
return p;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @mxm , sorry for the late reply. Because there are too many comments, I missed this one.

@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.

Are you asking to expand the source logic introduced here to hash keyed state?

I'm asking hash keyed state, I don't know why we recommend the result is 80 instead of 100 for hash keyed state case. But I think you have answered my question in this comment.

For this scenario, there is no difference when the parallelism is set to 50 and 99.

That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.

Good point! Hot keys may happen in the flink job. In general, Source partition without data skew.

However, when parallelism > maxParallelism / 2, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism for the initial parallelism > maxParallelism / 2. We could just skip this (premature) optimization entirely.

This is exactly my question, I think we should use maxParallelism as the final parallelism. (For the above example, 100 instead of 80).

    upperBoundForAlignment = Math.min(
        // Optimize the case where newParallelism <= maxParallelism / 2
        newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, 
        upperBound
    );

Great, this part solved my concern. thank you~

if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
} else {
upperBoundForAlignment = Math.min(numPartitions, upperBound);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the source vertex respect the maxParallelism?

If maxParallelism is 100, source has 1000 partition, and upperBound is 200. The new parallelism may be greater than 100, right?

IIUC, the flink job won't run when parallelism > maxParallelism even if without key group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upperBound is already the smaller value obtained by comparing parallelismUpperLimit and maxParallelism.

Copy link
Contributor Author

@huyuanfeng2018 huyuanfeng2018 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @1996fanrui for your review, I have made some changes to the code, PTAL.

@@ -345,15 +356,22 @@ private boolean detectIneffectiveScaleUp(
* <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the
* parallelism for source and keyed vertex such that it divides the maxParallelism without a
* remainder.
*
* <p>This method also attempts to adjust the parallelism to ensure it aligns well with the
* number of partitions if a vertex has a known partition count.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

int numKeyGroupsOrPartitions = maxParallelism;
int upperBoundForAlignment;
if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but I think not all tasks after hash are in large flink keyedState. In this scenario, miss alignment has almost no impact on the task. I think this is the trade-off between performance and resources that was taken into consideration by the previous logic.

But this becomes less acceptable for consuming kafka or pulsar

@mxm @1996fanrui Maybe we can discuss this logic further

if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
} else {
upperBoundForAlignment = Math.min(numPartitions, upperBound);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upperBound is already the smaller value obtained by comparing parallelismUpperLimit and maxParallelism.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great discussion @huyuanfeng2018 @1996fanrui! I think this is a good opportunity to unify the logic further and to address a gap with the current key alignment logic for hash partitioning.

int numKeyGroupsOrPartitions = maxParallelism;
int upperBoundForAlignment;
if (numPartitions <= 0) {
upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80.

Are you asking to expand the source logic introduced here to hash keyed state?

For this scenario, there is no difference when the parallelism is set to 50 and 99.

That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data.

In this scenario though, we should not be going down to 50, we probably should be going up to 100. I think the maxParallelism / 2 stems from the idea that the maximum parallelism won't be reached because it is set to a number parallelism <= maxParallelism / 2 which would mean that it doesn't make sense to continue beyond maxParallelism/2 because there aren't more possible divisors. However, when parallelism > maxParallelism / 2, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to maxParallelism for the initial parallelism > maxParallelism / 2. We could just skip this (premature) optimization entirely.

I agree that we should replace the current key alignment logic with the generalized source logic introduced here. Something like this:

final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
    numKeyGroupsOrPartitions = maxParallelism;
    upperBoundForAlignment = Math.min(
        // Optimize the case where newParallelism <= maxParallelism / 2
        newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, 
        upperBound
    );
} else {
    numKeyGroupsOrPartitions = numSourcePartitions;
    upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}

// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
    if (numKeyGroupsOrPartitions % p == 0) {
        return p;
    }
}

// When adjust the parallelism after rounding up cannot be evenly divided by source
// numSourcePartitions, Try to find the smallest parallelism that can satisfy the
// current
// consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
    if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
        if (numKeyGroupsOrPartitions % p != 0) {
            p++;
        }
        break;
    }
}

p = Math.max(p, parallelismLowerLimit);
return p;

@huyuanfeng2018
Copy link
Contributor Author

Great discussion @huyuanfeng2018 @1996fanrui! I think this is a good opportunity to unify the logic further and to address a gap with the current key alignment logic for hash partitioning.

Thanks @mxm for review,LGTM for these suggestions , I fixed code.

Comment on lines +412 to +426
final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
numKeyGroupsOrPartitions = maxParallelism;
upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > maxParallelism / 2
? maxParallelism
: maxParallelism / 2,
upperBound);
} else {
numKeyGroupsOrPartitions = numSourcePartitions;
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize the case where newParallelism <= maxParallelism / 2

Why need this this optimization? Reducing the count of for loop?

I'm curious why source partition doesn't use this optimization? If both of source and keygroup could use this optimization, does the following code work?

Suggested change
final int numKeyGroupsOrPartitions;
final int upperBoundForAlignment;
if (numSourcePartitions <= 0) {
numKeyGroupsOrPartitions = maxParallelism;
upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > maxParallelism / 2
? maxParallelism
: maxParallelism / 2,
upperBound);
} else {
numKeyGroupsOrPartitions = numSourcePartitions;
upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
}
var numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions;
var upperBoundForAlignment =
Math.min(
// Optimize the case where newParallelism <= maxParallelism / 2
newParallelism > numKeyGroupsOrPartitions / 2
? numKeyGroupsOrPartitions
: numKeyGroupsOrPartitions / 2,
upperBound);


// When the shuffle type of vertex inputs contains keyBy or vertex is a source,
// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
// the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks

Comment on lines +432 to +448
if (numKeyGroupsOrPartitions % p == 0) {
return p;
}
}

// If parallelism adjustment fails, use originally computed parallelism
return newParallelism;
// When adjust the parallelism after rounding up cannot be evenly divided by
// numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the
// current consumption rate.
int p = newParallelism;
for (; p > 0; p--) {
if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) {
if (numKeyGroupsOrPartitions % p != 0) {
p++;
}
break;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i found our discussion cannot cover all cases during I review this part in detail.

For example: sourcePartition is 199, and new parallelism is 99. IIUC, the final parallelism is 67(every subtask consume 3 source partitions, except for the last subtask), right?

But 100 as the final parallelism makes sense to me(every subtask consume 2 source partitions, except for the last subtask).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow #879 (comment) . I found the current logic isn't perfect even if sourcePartitionNumber is 200.

// we try to adjust the parallelism such that it divides
// the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks
for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
if (numKeyGroupsOrPartitions % p == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About this comment #879 (comment), I'm thinking whether the following change is more reasonable?

Note: numKeyGroupsOrPartitions / p means how many source partitions or key groups every subtask consume.

Suggested change
if (numKeyGroupsOrPartitions % p == 0) {
if (numKeyGroupsOrPartitions % p == 0 || numKeyGroupsOrPartitions / p < numKeyGroupsOrPartitions / newParallelism) {

For example: maxParallelism is 200, and new parallelism is 60. (Some subtasks consume 4 keyGroups, the rest of subtask consume 3 keyGroups)

  • The final parallelism is 100 based on the main branch code due to we only return p when maxParallelism % p == 0.
  • But I think 67 is more reasonable here. (One subtask consumes 2 key groups. The remaining 66 subtasks, each subtask consumes 3 key groups.)

@mxm @gyfora , WDYT?

Also, it's a bit beyond the scope of this PR. I could file a separate PR if you think it makes sense. Of course, it's acceptable to be done at this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants