Skip to content

Commit

Permalink
use only desired replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Sep 23, 2024
1 parent aac2462 commit d05437d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 159 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (mv MonoVertex) getReplicas() int {

func (mv MonoVertex) CalculateReplicas() int {
desiredReplicas := mv.getReplicas()
// If we are pausing the MonoVertex or in a paused state then we should have the desired replicas as 0
if mv.Spec.Lifecycle.GetDesiredPhase() == MonoVertexPhasePaused || mv.Status.Phase == MonoVertexPhasePaused {
// If we are pausing the MonoVertex then we should have the desired replicas as 0
if mv.Spec.Lifecycle.GetDesiredPhase() == MonoVertexPhasePaused {
return 0
}
// Don't allow replicas to be out of the range of min and max when auto scaling is enabled
Expand Down
128 changes: 8 additions & 120 deletions pkg/reconciler/monovertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,83 +130,24 @@ func (mr *monoVertexReconciler) reconcile(ctx context.Context, monoVtx *dfv1.Mon

monoVtx.Status.MarkDeployed()

// If the MonoVertex has a lifecycle change, then do not update the phase as
// this should happen only after the required configs for the lifecycle changes
// have been applied.
if !isLifecycleChange(monoVtx) {
monoVtx.Status.MarkPhase(monoVtx.Spec.Lifecycle.GetDesiredPhase(), "", "")
// Update the phase based on the DesiredPhase from the lifecycle, this should encompass
// the Paused and running states.
currentPhase := monoVtx.Status.Phase
monoVtx.Status.MarkPhase(monoVtx.Spec.Lifecycle.GetDesiredPhase(), "", "")
// If the phase has changed, log the event
if monoVtx.Status.Phase != currentPhase {
log.Infow("Updated MonoVertex phase", zap.String("originalPhase", string(currentPhase)), zap.String("currentPhase", string(monoVtx.Status.Phase)))
mr.recorder.Eventf(monoVtx, corev1.EventTypeNormal, "UpdateMonoVertexPhase", "Updated MonoVertex phase from %s to %s", string(currentPhase), string(monoVtx.Status.Phase))
}

// Check children resource status
if err := mr.checkChildrenResourceStatus(ctx, monoVtx); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to check mono vertex children resource status, %w", err)
}

// check if any changes related to pause/resume lifecycle for the pipeline
if isLifecycleChange(monoVtx) {
currentPhase := monoVtx.Status.Phase
needRequeue, err := mr.updateLifecycleDesiredState(ctx, monoVtx)
if err != nil {
logMsg := fmt.Sprintf("Updated desired MonoVertex phase failed: %v", zap.Error(err))
log.Error(logMsg)
mr.recorder.Eventf(monoVtx, corev1.EventTypeWarning, "ReconcileMonoVertexFailed", logMsg)
return ctrl.Result{}, err
}
if monoVtx.Status.Phase != currentPhase {
log.Infow("Updated MonoVertex phase", zap.String("originalPhase", string(currentPhase)), zap.String("currentPhase", string(monoVtx.Status.Phase)))
mr.recorder.Eventf(monoVtx, corev1.EventTypeNormal, "UpdateMonoVertexPhase", "Updated MonoVertex phase from %s to %s", string(currentPhase), string(monoVtx.Status.Phase))
}
if needRequeue {
return ctrl.Result{RequeueAfter: dfv1.DefaultRequeueAfter}, nil
}
}
return ctrl.Result{}, nil
}

// updateLifecycleDesiredState evaluates the desired state of the MonoVertex's lifecycle
// and updates its state accordingly. It handles transitions between paused and running states.
func (mr *monoVertexReconciler) updateLifecycleDesiredState(ctx context.Context, mvtx *dfv1.MonoVertex) (bool, error) {
switch mvtx.Spec.Lifecycle.GetDesiredPhase() {
case dfv1.MonoVertexPhasePaused:
// // Check if the desired phase is paused then pause the MonoVertex
return mr.pauseMonoVertex(ctx, mvtx)
case dfv1.MonoVertexPhaseRunning, dfv1.MonoVertexPhaseUnknown:
// Call to resume the MonoVertex
return mr.resumeMonoVertex(ctx, mvtx)
default:
// Any other phases are considered invalid
return false, fmt.Errorf("invalid desired phase")
}
}

// resumeMonoVertex resumes the MonoVertex from a paused state.
func (mr *monoVertexReconciler) resumeMonoVertex(ctx context.Context, mvtx *dfv1.MonoVertex) (bool, error) {
// Attempt to scale up the MonoVertex to its minimum replicas.
_, err := mr.scaleUpMonoVertex(ctx, mvtx)
if err != nil {
// If an error occurs, return true and the error. Indicating to requeue.
return true, err
}
// Mark the status of the MonoVertex as running
mvtx.Status.MarkPhaseRunning()
// Return false indicating no further updates are needed.
return false, nil
}

// pauseMonoVertex pauses the MonoVertex, scaling it down to zero replicas as part of the process.
func (mr *monoVertexReconciler) pauseMonoVertex(ctx context.Context, mvtx *dfv1.MonoVertex) (bool, error) {
// Attempt to scale down the MonoVertex to zero replicas.
updated, err := mr.scaleDownMonoVertex(ctx, mvtx)
if err != nil || updated {
// If there's an error or the scaling action has occurred, indicate to requeue the request.
return updated, err
}
// If successful, Mark the status of the MonoVertex as paused
mvtx.Status.MarkPhasePaused()
// Return false indicating no further updates are needed.
return false, nil
}

// orchestrateFixedResources orchestrates fixed resources such as daemon service related objects for a mono vertex.
func (mr *monoVertexReconciler) orchestrateFixedResources(ctx context.Context, monoVtx *dfv1.MonoVertex) error {
// Create or update mono vtx services
Expand Down Expand Up @@ -665,56 +606,3 @@ func (mr *monoVertexReconciler) checkChildrenResourceStatus(ctx context.Context,

return nil
}

// isLifecycleChange determines whether there has been a change requested in the lifecycle
// of a MonoVertex object, specifically relating to the paused and pausing states.
func isLifecycleChange(mvtx *dfv1.MonoVertex) bool {
// Extract the current phase from the status of the MonoVertex.
// Check if the desired phase or the current phase of the MonoVertex is 'Paused'.
// This indicates a transition into or out of a paused state
// which is a lifecycle phase change
if mvtx.Status.Phase == dfv1.MonoVertexPhasePaused || mvtx.Spec.Lifecycle.GetDesiredPhase() == dfv1.MonoVertexPhasePaused {
return true
}
// If none of the conditions are met, return false
return false
}

// scaleDownMonoVertex scales down the number of replicas of the specified MonoVertex to zero.
func (mr *monoVertexReconciler) scaleDownMonoVertex(ctx context.Context, mvtx *dfv1.MonoVertex) (bool, error) {
return mr.scaleMonoVertex(ctx, mvtx, 0)
}

// scaleUpMonoVertex scales up the number of replicas of the specified MonoVertex to its minimum replicas defined in Spec.
func (mr *monoVertexReconciler) scaleUpMonoVertex(ctx context.Context, mvtx *dfv1.MonoVertex) (bool, error) {
return mr.scaleMonoVertex(ctx, mvtx, mvtx.Spec.Scale.GetMinReplicas())
}

// scaleMonoVertex scales the specified MonoVertex to the desired number of replicas.
// It takes the current context, the MonoVertex to scale, and the desired number of replicas as arguments.
func (mr *monoVertexReconciler) scaleMonoVertex(ctx context.Context, mvtx *dfv1.MonoVertex, desired int32) (bool, error) {
// Check if the current number of replicas (origin) differs from the desired number.
if origin := *mvtx.Spec.Replicas; origin != desired {
// Create a JSON patch to update the replicas in the spec.
patchJson := fmt.Sprintf(`{"spec":{"replicas":%d}}`, desired)

// Apply the patch to the MonoVertex using the client interface.
err := mr.client.Patch(ctx, mvtx, client.RawPatch(types.MergePatchType, []byte(patchJson)))

// Handle errors from the patch operation, excluding a not found error.
if err != nil && !apierrors.IsNotFound(err) {
return false, err // Return false and the error if patch fails.
}

// Log the scaling operation details.
mr.logger.Infow("Scaled MonoVertex", zap.Int32("from", origin), zap.Int32("to", desired), zap.String("MonoVertex", mvtx.Name))

// Record an event in the event tracker indicating that scaling occurred.
mr.recorder.Eventf(mvtx, corev1.EventTypeNormal, "ScalingMonoVertex", "Scaled MonoVertex %s from %d to %d replicas", mvtx.Name, origin, desired)

// Return true indicating that scaling operation was performed
return true, nil
}
// Return false if the desired number of replicas is the same as the current.
return false, nil
}
37 changes: 0 additions & 37 deletions pkg/reconciler/monovertex/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,40 +392,3 @@ func Test_reconcile(t *testing.T) {
assert.Equal(t, uint32(5), testObj.Status.UpdatedReplicas)
})
}

func Test_pauseAndResumePipeline(t *testing.T) {

t.Run("test pause monovertex", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
r := fakeReconciler(t, cl)
ctx := context.TODO()
testObj := testMonoVtx.DeepCopy()
testObj.Spec.Replicas = ptr.To[int32](0)
_, err := r.reconcile(ctx, testObj)
assert.NoError(t, err)

_, err = r.pauseMonoVertex(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, dfv1.MonoVertexPhasePaused, testObj.Status.Phase)
_, err = r.resumeMonoVertex(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, dfv1.MonoVertexPhaseRunning, testObj.Status.Phase)
})
t.Run("test pause monovertex - start with Paused", func(t *testing.T) {
cl := fake.NewClientBuilder().Build()
r := fakeReconciler(t, cl)
ctx := context.TODO()
testObj := testMonoVtx.DeepCopy()
testObj.Spec.Replicas = ptr.To[int32](2)
_, err := r.reconcile(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, uint32(2), testObj.Status.Replicas)

testObj = testMonoVtx.DeepCopy()
testObj.Spec.Replicas = ptr.To[int32](2)
testObj.Spec.Lifecycle.DesiredPhase = dfv1.MonoVertexPhasePaused
_, err = r.reconcile(ctx, testObj)
assert.NoError(t, err)
assert.Equal(t, uint32(0), testObj.Status.Replicas)
})
}

0 comments on commit d05437d

Please sign in to comment.