Skip to content

Commit

Permalink
support fast-fail for PipelineRun
Browse files Browse the repository at this point in the history
allow task to be cancelled if a parallel task fails #7880

Signed-off-by: chengjoey <[email protected]>
  • Loading branch information
chengjoey committed May 26, 2024
1 parent e6d9154 commit b2c60db
Show file tree
Hide file tree
Showing 16 changed files with 374 additions and 10 deletions.
48 changes: 48 additions & 0 deletions docs/pipeline-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,18 @@ with those declared in the pipeline.</p>
<p>TaskRunSpecs holds a set of runtime specs</p>
</td>
</tr>
<tr>
<td>
<code>failFast</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.</p>
</td>
</tr>
</table>
</td>
</tr>
Expand Down Expand Up @@ -2438,6 +2450,18 @@ with those declared in the pipeline.</p>
<p>TaskRunSpecs holds a set of runtime specs</p>
</td>
</tr>
<tr>
<td>
<code>failFast</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="tekton.dev/v1.PipelineRunSpecStatus">PipelineRunSpecStatus
Expand Down Expand Up @@ -9158,6 +9182,18 @@ with those declared in the pipeline.</p>
<p>TaskRunSpecs holds a set of runtime specs</p>
</td>
</tr>
<tr>
<td>
<code>failFast</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.</p>
</td>
</tr>
</table>
</td>
</tr>
Expand Down Expand Up @@ -11329,6 +11365,18 @@ with those declared in the pipeline.</p>
<p>TaskRunSpecs holds a set of runtime specs</p>
</td>
</tr>
<tr>
<td>
<code>failFast</code><br/>
<em>
bool
</em>
</td>
<td>
<em>(Optional)</em>
<p>FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="tekton.dev/v1beta1.PipelineRunSpecStatus">PipelineRunSpecStatus
Expand Down
46 changes: 46 additions & 0 deletions docs/pipelineruns.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ weight: 204
- [Cancelling a <code>PipelineRun</code>](#cancelling-a-pipelinerun)
- [Gracefully cancelling a <code>PipelineRun</code>](#gracefully-cancelling-a-pipelinerun)
- [Gracefully stopping a <code>PipelineRun</code>](#gracefully-stopping-a-pipelinerun)
- [Fast-fail a <code>PipelineRun</code>](#fast-fail-a-pipelinerun)
- [Pending <code>PipelineRuns</code>](#pending-pipelineruns)
<!-- /toc -->

Expand Down Expand Up @@ -78,6 +79,7 @@ A `PipelineRun` definition supports the following fields:
- [`timeouts`](#configuring-a-failure-timeout) - Specifies the timeout before the `PipelineRun` fails. `timeouts` allows more granular timeout configuration, at the pipeline, tasks, and finally levels
- [`podTemplate`](#specifying-a-pod-template) - Specifies a [`Pod` template](./podtemplates.md) to use as the basis for the configuration of the `Pod` that executes each `Task`.
- [`workspaces`](#specifying-workspaces) - Specifies a set of workspace bindings which must match the names of workspaces declared in the pipeline being used.
- [`fail-fast`](#fast-fail-a-pipelinerun) - Specifies whether to fail the `PipelineRun` as soon as a `Task` fails.

[kubernetes-overview]:
https://kubernetes.io/docs/concepts/overview/working-with-objects/kubernetes-objects/#required-fields
Expand Down Expand Up @@ -1622,6 +1624,50 @@ spec:
status: "StoppedRunFinally"
```

## Fast fail a `PipelineRun`
Usually a pipeline may have several tasks running concurrently. When one of the tasks fails,
you may want to stop the entire pipeline immediately and quickly cancel other parallel tasks.
you can use `fastFail` to achieve this goal.

For example:
```yaml
apiVersion: tekton.dev/v1
kind: PipelineRun
metadata:
name: pipeline-run
spec:
failFast: true
pipelineSpec:
tasks:
- name: fail-task
taskSpec:
steps:
- name: fail-task
image: busybox
command: ["/bin/sh", "-c"]
args:
- exit 1
- name: success1
taskSpec:
steps:
- name: success1
image: busybox
command: ["/bin/sh", "-c"]
args:
- sleep 360
- name: success2
taskSpec:
steps:
- name: success2
image: busybox
command: ["/bin/sh", "-c"]
args:
- sleep 360
```
The above `PipelineRun` will fast cancel the execution of `success1` and `success2` immediately when `fail-task` failed.
For specific execution of cancel task status, please refer to[cancelling-a-taskrun](taskruns.md#cancelling-a-taskrun).


## Pending `PipelineRuns`

A `PipelineRun` can be created as a "pending" `PipelineRun` meaning that it will not actually be started until the pending status is cleared.
Expand Down
32 changes: 32 additions & 0 deletions examples/v1/pipelineruns/pipelinerun-fail-fast.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
apiVersion: tekton.dev/v1
kind: PipelineRun
metadata:
generateName: pr-fail-fast-
spec:
failFast: true
pipelineSpec:
tasks:
- name: fail-task
taskSpec:
steps:
- name: fail-task
image: busybox
command: ["/bin/sh", "-c"]
args:
- exit 1
- name: success1
taskSpec:
steps:
- name: success1
image: busybox
command: ["/bin/sh", "-c"]
args:
- sleep 360
- name: success2
taskSpec:
steps:
- name: success2
image: busybox
command: ["/bin/sh", "-c"]
args:
- sleep 360
7 changes: 7 additions & 0 deletions pkg/apis/pipeline/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ type PipelineRunSpec struct {
// +optional
// +listType=atomic
TaskRunSpecs []PipelineTaskRunSpec `json:"taskRunSpecs,omitempty"`

// FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.
// +optional
FailFast bool `json:"failFast,omitempty"`
}

// TimeoutFields allows granular specification of pipeline, task, and finally timeouts
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@
"description": "PipelineRunSpec defines the desired state of PipelineRun",
"type": "object",
"properties": {
"failFast": {
"description": "FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.",
"type": "boolean"
},
"params": {
"description": "Params is a list of parameter names and values.",
"type": "array",
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/pipeline/v1beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (prs PipelineRunSpec) ConvertTo(ctx context.Context, sink *v1.PipelineRunSp
sink.TaskRunTemplate.PodTemplate = prs.PodTemplate
sink.TaskRunTemplate.ServiceAccountName = prs.ServiceAccountName
sink.Workspaces = nil
sink.FailFast = prs.FailFast
for _, w := range prs.Workspaces {
new := v1.WorkspaceBinding{}
w.convertTo(ctx, &new)
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ type PipelineRunSpec struct {
// +optional
// +listType=atomic
TaskRunSpecs []PipelineTaskRunSpec `json:"taskRunSpecs,omitempty"`

// FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.
// +optional
FailFast bool `json:"failFast,omitempty"`
}

// TimeoutFields allows granular specification of pipeline, task, and finally timeouts
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@
"description": "PipelineRunSpec defines the desired state of PipelineRun",
"type": "object",
"properties": {
"failFast": {
"description": "FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.",
"type": "boolean"
},
"params": {
"description": "Params is a list of parameter names and values.",
"type": "array",
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
default:
}

// find first failed task and cancel PipelineRun if FailFast is set
if pr.Spec.FailFast && !pr.IsCancelled() {
for _, resolvedTask := range pipelineRunState {
if resolvedTask.IsFailure() {
if err := cancelPipelineRun(ctx, logger, pr, c.PipelineClientSet); err != nil {
return err
}
break
}
}
}

// Second iteration
pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState)
switch {
Expand Down
28 changes: 23 additions & 5 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (t *ResolvedPipelineTask) EvaluateCEL() error {

// isDone returns true only if the task is skipped, succeeded or failed
func (t ResolvedPipelineTask) isDone(facts *PipelineRunFacts) bool {
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure()
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.IsFailure()
}

// IsRunning returns true only if the task is neither succeeded, cancelled nor failed
Expand All @@ -129,7 +129,7 @@ func (t ResolvedPipelineTask) IsRunning() bool {
if !t.IsCustomTask() && len(t.TaskRuns) == 0 {
return false
}
return !t.isSuccessful() && !t.isFailure()
return !t.isSuccessful() && !t.IsFailure()
}

// IsCustomTask returns true if the PipelineTask references a Custom Task.
Expand Down Expand Up @@ -162,9 +162,9 @@ func (t ResolvedPipelineTask) isSuccessful() bool {
return true
}

// isFailure returns true only if the run has failed (if it has ConditionSucceeded = False).
// If the PipelineTask has a Matrix, isFailure returns true if any run has failed and all other runs are done.
func (t ResolvedPipelineTask) isFailure() bool {
// IsFailure returns true only if the run has failed (if it has ConditionSucceeded = False).
// If the PipelineTask has a Matrix, IsFailure returns true if any run has failed and all other runs are done.
func (t ResolvedPipelineTask) IsFailure() bool {
var isDone bool
if t.IsCustomTask() {
if len(t.CustomRuns) == 0 {
Expand All @@ -186,6 +186,24 @@ func (t ResolvedPipelineTask) isFailure() bool {
return t.haveAnyTaskRunsFailed() && isDone
}

func (t ResolvedPipelineTask) GetFailedTasks() []string {
var failedTasks []string
if t.IsCustomTask() {
for _, run := range t.CustomRuns {
if run.IsFailure() {
failedTasks = append(failedTasks, run.Name)
}
}
} else {
for _, taskRun := range t.TaskRuns {
if taskRun.IsFailure() {
failedTasks = append(failedTasks, taskRun.Name)
}
}
}
return failedTasks
}

// isCancelledForTimeOut returns true only if the run is cancelled due to PipelineRun-controlled timeout
// If the PipelineTask has a Matrix, isCancelled returns true if any run is cancelled due to PipelineRun-controlled timeout and all other runs are done.
func (t ResolvedPipelineTask) isCancelledForTimeOut() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1662,8 +1662,8 @@ func TestIsFailure(t *testing.T) {
want: false,
}} {
t.Run(tc.name, func(t *testing.T) {
if got := tc.rpt.isFailure(); got != tc.want {
t.Errorf("expected isFailure: %t but got %t", tc.want, got)
if got := tc.rpt.IsFailure(); got != tc.want {
t.Errorf("expected IsFailure: %t but got %t", tc.want, got)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (state PipelineRunState) getNextTasks(candidateTasks sets.String) []*Resolv
func (facts *PipelineRunFacts) IsStopping() bool {
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
if t.isFailure() && t.PipelineTask.OnError != v1.PipelineTaskContinue {
if t.IsFailure() && t.PipelineTask.OnError != v1.PipelineTaskContinue {
return true
}
}
Expand Down Expand Up @@ -700,7 +700,7 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount {
case t.isCancelled():
s.Cancelled++
// increment failure counter based on Task OnError type since the task has failed
case t.isFailure():
case t.IsFailure():
if t.PipelineTask.OnError == v1.PipelineTaskContinue {
s.IgnoredFailed++
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func convertToResultRefs(pipelineRunState PipelineRunState, target *ResolvedPipe
if referencedPipelineTask == nil {
return nil, resultRef.PipelineTask, fmt.Errorf("could not find task %q referenced by result", resultRef.PipelineTask)
}
if !referencedPipelineTask.isSuccessful() && !referencedPipelineTask.isFailure() {
if !referencedPipelineTask.isSuccessful() && !referencedPipelineTask.IsFailure() {
return nil, resultRef.PipelineTask, fmt.Errorf("task %q referenced by result was not finished", referencedPipelineTask.PipelineTask.Name)
}
// Custom Task
Expand Down
Loading

0 comments on commit b2c60db

Please sign in to comment.