-
Notifications
You must be signed in to change notification settings - Fork 418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36018][autoscaler] Support lazy scale down to avoid frequent rescaling #875
Conversation
ee50f77
to
301742a
Compare
301742a
to
68c8c46
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, my only concern is related to the behaviour after a scale up. I feel like we should reset the DelayedScaleDown. firstTriggerTimes
to the scale up time to avoid scaling down after a scale up
|
||
var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex); | ||
if (firstTriggerTime.isEmpty()) { | ||
LOG.info("The scale down request is delayed for {}", vertex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we maybe log how long it is delayed?
if (lastScalingTs.plus(gracePeriod).isAfter(clock.instant())) { | ||
LOG.info( | ||
"Skipping immediate scale down after scale up within grace period for {}", | ||
vertex); | ||
return true; | ||
if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) { | ||
LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex); | ||
return ParallelismResult.optional(newParallelism); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may make sense to keep the previous logic. If you just had a scale up (you need to catch up) it's not good to scale down even if the scale down request was triggered a long time ago.
I think we can simply use the old logic here and basically a scale up would "reset" the scale down first trigger time. By actually resetting it we may even be able to get rid of this method completely?
// Clear delayed scale down request if the new parallelism is equal to | ||
// currentParallelism. | ||
delayedScaleDown.clearVertex(vertex); | ||
return ParallelismResult.optional(currentParallelism); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should have something like ParallelismResult.noChange()
because this looks a bit weird :D
* that if all vertices' ParallelismResult is optional, rescaling will be ignored. | ||
*/ | ||
@Getter | ||
public static class ParallelismResult { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call this ParallelismChange
instead?
@@ -156,6 +159,9 @@ public boolean scaleResource( | |||
|
|||
autoScalerStateStore.storeConfigChanges(context, configOverrides); | |||
|
|||
// Try to clear all delayed scale down requests after scaling. | |||
delayedScaleDown.clearAll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @gyfora , reply your comment[1] here.
IIUC, the previous logic still exists, if the rescale happens, delayedScaleDown.clearAll
will be called (rescale includes scale up). It means the first trigger time is reset.
Not sure whether your question is answered.
[1] #875 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic isn't same with the previous logic.
One demo of previous logic
- 9:00 scale up (be executed)
- 9:30 scale down (ignore)
- 9:59 scale down (ignore)
- 10:00 scale down (can be executed)
New logic
- 9:00 scale up (be executed, clear all first trigger time)
- 9:30 scale down (update the first trigger time to 9:30)
- 10:00 scale down (within interval from 9:30 to 10:30)
- 10:29 scale down (within interval from 9:30 to 10:30)
- 10:30 scale down (can be executed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if there was a scale down before 9:00 and that set the first trigger time, that doesn't matter as the scale up resets it subsequently.
In that case we should probably rename the method detectImmediateScaleDown
to something like applyScaleDownInterval
Hey @gyfora , thank you for the quick review, all comments are addressed. ❤️ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty good, let's give @mxm 1-2 days to comment on this :)
@1996fanrui I think we can go ahead with this :) |
Thank you, let me merge it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @1996fanrui and @gyfora for this improvement! The changes look good to me. Thank you also for waiting for any comments from my side.
.durationType() | ||
.defaultValue(Duration.ofHours(1)) | ||
.withFallbackKeys(oldOperatorConfigKey("scale-up.grace-period")) | ||
.withDeprecatedKeys(autoScalerConfigKey("scale-up.grace-period")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this as a deprecated key for existing users!
What is the purpose of the change
When the source traffic decreases, the autoscaler scales down job in time to save resources. It causes that each job rescales more than 20 times a day.
Brief change log
DelayedScaleDown
to store the firstTriggerTime for all vertices.JobVertexScaler#computeScaleTargetParallelism
fromint
toParallelismResult
ParallelismResult
introduced therequired
field, it means the parallelism change of current vertex is required or optional.scale-down.interval
.Verifying this change
JobVertexScalerTest
andScalingExecutorTest
.AbstractAutoScalerStateStoreTest
Does this pull request potentially affect one of the following parts:
CustomResourceDescriptors
: yes, update the autoscaler option.Documentation