Skip to content

Commit

Permalink
chore: check for pause timeout after errors
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Sep 24, 2024
1 parent b8c61de commit f86408c
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,37 +845,49 @@ func (r *pipelineReconciler) pausePipeline(ctx context.Context, pl *dfv1.Pipelin
return updated, err
}

daemonClient, err := daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
var daemonError error = nil

Check failure on line 848 in pkg/reconciler/pipeline/controller.go

View workflow job for this annotation

GitHub Actions / Lint

ineffectual assignment to daemonError (ineffassign)
var drainCompleted = false

// Check for the daemon to obtain the buffer draining information, in case we see an error trying to
// retrieve this we do not exit prematurely to allow honoring the pause timeout for a consistent error
// - In case the timeout has not occurred we would trigger a requeue
// - If the timeout has occurred even after getting the drained error, we will try to pause the pipeline
daemonClient, daemonError := daemonclient.NewGRPCDaemonServiceClient(pl.GetDaemonServiceURL())
r.logger.Info(daemonClient.ListPipelineBuffers(ctx, pl.Name))
if err != nil {
return true, err
daemonError = err
}
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err := daemonClient.IsDrained(ctx, pl.Name)
if err != nil {
return true, err
if daemonClient != nil {
defer func() {
_ = daemonClient.Close()
}()
drainCompleted, err = daemonClient.IsDrained(ctx, pl.Name)
if err != nil {
daemonError = err
}
}

pauseTimestamp, err := time.Parse(time.RFC3339, pl.GetAnnotations()[dfv1.KeyPauseTimestamp])
if err != nil {
return false, err
}

// if drain is completed, or we have exceeded the pause deadline, mark pl as paused and scale down
if time.Now().After(pauseTimestamp.Add(time.Duration(pl.Spec.Lifecycle.GetPauseGracePeriodSeconds())*time.Second)) || drainCompleted {
_, err := r.scaleDownAllVertices(ctx, pl)
_, err = r.scaleDownAllVertices(ctx, pl)
if err != nil {
return true, err
}
// if the drain completed succesfully, then set the DrainedOnPause field to true
if daemonError != nil {
r.logger.Errorf("error in fetching Drained status, Pausing due to timeout: %v", zap.Error(err))
}
// if the drain completed successfully, then set the DrainedOnPause field to true
if drainCompleted {
pl.Status.MarkDrainedOnPauseTrue()
}
pl.Status.MarkPhasePaused()
return false, nil
}
return true, nil
return true, daemonError
}

func (r *pipelineReconciler) scaleDownSourceVertices(ctx context.Context, pl *dfv1.Pipeline) (bool, error) {
Expand Down

0 comments on commit f86408c

Please sign in to comment.