From b2c60db677200707da664147f95aff3fd92136b5 Mon Sep 17 00:00:00 2001 From: joey Date: Sun, 26 May 2024 18:40:00 +0800 Subject: [PATCH] support `fast-fail` for PipelineRun allow task to be cancelled if a parallel task fails #7880 Signed-off-by: chengjoey --- docs/pipeline-api.md | 48 +++++ docs/pipelineruns.md | 46 +++++ .../pipelineruns/pipelinerun-fail-fast.yaml | 32 ++++ pkg/apis/pipeline/v1/openapi_generated.go | 7 + pkg/apis/pipeline/v1/pipelinerun_types.go | 4 + pkg/apis/pipeline/v1/swagger.json | 4 + .../pipeline/v1beta1/openapi_generated.go | 7 + .../v1beta1/pipelinerun_conversion.go | 1 + .../pipeline/v1beta1/pipelinerun_types.go | 4 + pkg/apis/pipeline/v1beta1/swagger.json | 4 + pkg/reconciler/pipelinerun/pipelinerun.go | 12 ++ .../resources/pipelinerunresolution.go | 28 ++- .../resources/pipelinerunresolution_test.go | 4 +- .../pipelinerun/resources/pipelinerunstate.go | 4 +- .../resources/resultrefresolution.go | 2 +- test/fast_fail_test.go | 177 ++++++++++++++++++ 16 files changed, 374 insertions(+), 10 deletions(-) create mode 100644 examples/v1/pipelineruns/pipelinerun-fail-fast.yaml create mode 100644 test/fast_fail_test.go diff --git a/docs/pipeline-api.md b/docs/pipeline-api.md index e0b52e34073..93cca3470d9 100644 --- a/docs/pipeline-api.md +++ b/docs/pipeline-api.md @@ -822,6 +822,18 @@ with those declared in the pipeline.

TaskRunSpecs holds a set of runtime specs

+ + +failFast
+ +bool + + + +(Optional) +

FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.

+ + @@ -2438,6 +2450,18 @@ with those declared in the pipeline.

TaskRunSpecs holds a set of runtime specs

+ + +failFast
+ +bool + + + +(Optional) +

FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.

+ +

PipelineRunSpecStatus @@ -9158,6 +9182,18 @@ with those declared in the pipeline.

TaskRunSpecs holds a set of runtime specs

+ + +failFast
+ +bool + + + +(Optional) +

FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.

+ + @@ -11329,6 +11365,18 @@ with those declared in the pipeline.

TaskRunSpecs holds a set of runtime specs

+ + +failFast
+ +bool + + + +(Optional) +

FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.

+ +

PipelineRunSpecStatus diff --git a/docs/pipelineruns.md b/docs/pipelineruns.md index 716bbb040b3..6c99f85398a 100644 --- a/docs/pipelineruns.md +++ b/docs/pipelineruns.md @@ -36,6 +36,7 @@ weight: 204 - [Cancelling a PipelineRun](#cancelling-a-pipelinerun) - [Gracefully cancelling a PipelineRun](#gracefully-cancelling-a-pipelinerun) - [Gracefully stopping a PipelineRun](#gracefully-stopping-a-pipelinerun) + - [Fast-fail a PipelineRun](#fast-fail-a-pipelinerun) - [Pending PipelineRuns](#pending-pipelineruns) @@ -78,6 +79,7 @@ A `PipelineRun` definition supports the following fields: - [`timeouts`](#configuring-a-failure-timeout) - Specifies the timeout before the `PipelineRun` fails. `timeouts` allows more granular timeout configuration, at the pipeline, tasks, and finally levels - [`podTemplate`](#specifying-a-pod-template) - Specifies a [`Pod` template](./podtemplates.md) to use as the basis for the configuration of the `Pod` that executes each `Task`. - [`workspaces`](#specifying-workspaces) - Specifies a set of workspace bindings which must match the names of workspaces declared in the pipeline being used. + - [`fail-fast`](#fast-fail-a-pipelinerun) - Specifies whether to fail the `PipelineRun` as soon as a `Task` fails. [kubernetes-overview]: https://kubernetes.io/docs/concepts/overview/working-with-objects/kubernetes-objects/#required-fields @@ -1622,6 +1624,50 @@ spec: status: "StoppedRunFinally" ``` +## Fast fail a `PipelineRun` +Usually a pipeline may have several tasks running concurrently. When one of the tasks fails, +you may want to stop the entire pipeline immediately and quickly cancel other parallel tasks. +you can use `fastFail` to achieve this goal. + +For example: +```yaml +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + name: pipeline-run +spec: + failFast: true + pipelineSpec: + tasks: + - name: fail-task + taskSpec: + steps: + - name: fail-task + image: busybox + command: ["/bin/sh", "-c"] + args: + - exit 1 + - name: success1 + taskSpec: + steps: + - name: success1 + image: busybox + command: ["/bin/sh", "-c"] + args: + - sleep 360 + - name: success2 + taskSpec: + steps: + - name: success2 + image: busybox + command: ["/bin/sh", "-c"] + args: + - sleep 360 +``` +The above `PipelineRun` will fast cancel the execution of `success1` and `success2` immediately when `fail-task` failed. +For specific execution of cancel task status, please refer to[cancelling-a-taskrun](taskruns.md#cancelling-a-taskrun). + + ## Pending `PipelineRuns` A `PipelineRun` can be created as a "pending" `PipelineRun` meaning that it will not actually be started until the pending status is cleared. diff --git a/examples/v1/pipelineruns/pipelinerun-fail-fast.yaml b/examples/v1/pipelineruns/pipelinerun-fail-fast.yaml new file mode 100644 index 00000000000..a09cf566f1f --- /dev/null +++ b/examples/v1/pipelineruns/pipelinerun-fail-fast.yaml @@ -0,0 +1,32 @@ +apiVersion: tekton.dev/v1 +kind: PipelineRun +metadata: + generateName: pr-fail-fast- +spec: + failFast: true + pipelineSpec: + tasks: + - name: fail-task + taskSpec: + steps: + - name: fail-task + image: busybox + command: ["/bin/sh", "-c"] + args: + - exit 1 + - name: success1 + taskSpec: + steps: + - name: success1 + image: busybox + command: ["/bin/sh", "-c"] + args: + - sleep 360 + - name: success2 + taskSpec: + steps: + - name: success2 + image: busybox + command: ["/bin/sh", "-c"] + args: + - sleep 360 \ No newline at end of file diff --git a/pkg/apis/pipeline/v1/openapi_generated.go b/pkg/apis/pipeline/v1/openapi_generated.go index 67a9508011b..c9f4c610968 100644 --- a/pkg/apis/pipeline/v1/openapi_generated.go +++ b/pkg/apis/pipeline/v1/openapi_generated.go @@ -1429,6 +1429,13 @@ func schema_pkg_apis_pipeline_v1_PipelineRunSpec(ref common.ReferenceCallback) c }, }, }, + "failFast": { + SchemaProps: spec.SchemaProps{ + Description: "FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.", + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/pipeline/v1/pipelinerun_types.go b/pkg/apis/pipeline/v1/pipelinerun_types.go index 9c9bcd85566..0dbe91db3db 100644 --- a/pkg/apis/pipeline/v1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1/pipelinerun_types.go @@ -278,6 +278,10 @@ type PipelineRunSpec struct { // +optional // +listType=atomic TaskRunSpecs []PipelineTaskRunSpec `json:"taskRunSpecs,omitempty"` + + // FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled. + // +optional + FailFast bool `json:"failFast,omitempty"` } // TimeoutFields allows granular specification of pipeline, task, and finally timeouts diff --git a/pkg/apis/pipeline/v1/swagger.json b/pkg/apis/pipeline/v1/swagger.json index e014703014c..eac3f9c409a 100644 --- a/pkg/apis/pipeline/v1/swagger.json +++ b/pkg/apis/pipeline/v1/swagger.json @@ -656,6 +656,10 @@ "description": "PipelineRunSpec defines the desired state of PipelineRun", "type": "object", "properties": { + "failFast": { + "description": "FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.", + "type": "boolean" + }, "params": { "description": "Params is a list of parameter names and values.", "type": "array", diff --git a/pkg/apis/pipeline/v1beta1/openapi_generated.go b/pkg/apis/pipeline/v1beta1/openapi_generated.go index 72277eb914e..960b6245f64 100644 --- a/pkg/apis/pipeline/v1beta1/openapi_generated.go +++ b/pkg/apis/pipeline/v1beta1/openapi_generated.go @@ -2104,6 +2104,13 @@ func schema_pkg_apis_pipeline_v1beta1_PipelineRunSpec(ref common.ReferenceCallba }, }, }, + "failFast": { + SchemaProps: spec.SchemaProps{ + Description: "FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.", + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_conversion.go b/pkg/apis/pipeline/v1beta1/pipelinerun_conversion.go index 4e9ebf2ac90..01cc9982bcd 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_conversion.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_conversion.go @@ -81,6 +81,7 @@ func (prs PipelineRunSpec) ConvertTo(ctx context.Context, sink *v1.PipelineRunSp sink.TaskRunTemplate.PodTemplate = prs.PodTemplate sink.TaskRunTemplate.ServiceAccountName = prs.ServiceAccountName sink.Workspaces = nil + sink.FailFast = prs.FailFast for _, w := range prs.Workspaces { new := v1.WorkspaceBinding{} w.convertTo(ctx, &new) diff --git a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go index c3a111a978b..c59d956e87f 100644 --- a/pkg/apis/pipeline/v1beta1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1beta1/pipelinerun_types.go @@ -296,6 +296,10 @@ type PipelineRunSpec struct { // +optional // +listType=atomic TaskRunSpecs []PipelineTaskRunSpec `json:"taskRunSpecs,omitempty"` + + // FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled. + // +optional + FailFast bool `json:"failFast,omitempty"` } // TimeoutFields allows granular specification of pipeline, task, and finally timeouts diff --git a/pkg/apis/pipeline/v1beta1/swagger.json b/pkg/apis/pipeline/v1beta1/swagger.json index 4fd2b41fd3f..ab4fe8aa4ae 100644 --- a/pkg/apis/pipeline/v1beta1/swagger.json +++ b/pkg/apis/pipeline/v1beta1/swagger.json @@ -997,6 +997,10 @@ "description": "PipelineRunSpec defines the desired state of PipelineRun", "type": "object", "properties": { + "failFast": { + "description": "FailFast is an option. When a failed task is found, other parallel tasks can be quickly canceled.", + "type": "boolean" + }, "params": { "description": "Params is a list of parameter names and values.", "type": "array", diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 8756c1282f4..15b39361e50 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -606,6 +606,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel default: } + // find first failed task and cancel PipelineRun if FailFast is set + if pr.Spec.FailFast && !pr.IsCancelled() { + for _, resolvedTask := range pipelineRunState { + if resolvedTask.IsFailure() { + if err := cancelPipelineRun(ctx, logger, pr, c.PipelineClientSet); err != nil { + return err + } + break + } + } + } + // Second iteration pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState) switch { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go index a06b50d156e..91ec9a8d913 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go @@ -118,7 +118,7 @@ func (t *ResolvedPipelineTask) EvaluateCEL() error { // isDone returns true only if the task is skipped, succeeded or failed func (t ResolvedPipelineTask) isDone(facts *PipelineRunFacts) bool { - return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure() + return t.Skip(facts).IsSkipped || t.isSuccessful() || t.IsFailure() } // IsRunning returns true only if the task is neither succeeded, cancelled nor failed @@ -129,7 +129,7 @@ func (t ResolvedPipelineTask) IsRunning() bool { if !t.IsCustomTask() && len(t.TaskRuns) == 0 { return false } - return !t.isSuccessful() && !t.isFailure() + return !t.isSuccessful() && !t.IsFailure() } // IsCustomTask returns true if the PipelineTask references a Custom Task. @@ -162,9 +162,9 @@ func (t ResolvedPipelineTask) isSuccessful() bool { return true } -// isFailure returns true only if the run has failed (if it has ConditionSucceeded = False). -// If the PipelineTask has a Matrix, isFailure returns true if any run has failed and all other runs are done. -func (t ResolvedPipelineTask) isFailure() bool { +// IsFailure returns true only if the run has failed (if it has ConditionSucceeded = False). +// If the PipelineTask has a Matrix, IsFailure returns true if any run has failed and all other runs are done. +func (t ResolvedPipelineTask) IsFailure() bool { var isDone bool if t.IsCustomTask() { if len(t.CustomRuns) == 0 { @@ -186,6 +186,24 @@ func (t ResolvedPipelineTask) isFailure() bool { return t.haveAnyTaskRunsFailed() && isDone } +func (t ResolvedPipelineTask) GetFailedTasks() []string { + var failedTasks []string + if t.IsCustomTask() { + for _, run := range t.CustomRuns { + if run.IsFailure() { + failedTasks = append(failedTasks, run.Name) + } + } + } else { + for _, taskRun := range t.TaskRuns { + if taskRun.IsFailure() { + failedTasks = append(failedTasks, taskRun.Name) + } + } + } + return failedTasks +} + // isCancelledForTimeOut returns true only if the run is cancelled due to PipelineRun-controlled timeout // If the PipelineTask has a Matrix, isCancelled returns true if any run is cancelled due to PipelineRun-controlled timeout and all other runs are done. func (t ResolvedPipelineTask) isCancelledForTimeOut() bool { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go index fe6deee7c78..e8920217ef6 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go @@ -1662,8 +1662,8 @@ func TestIsFailure(t *testing.T) { want: false, }} { t.Run(tc.name, func(t *testing.T) { - if got := tc.rpt.isFailure(); got != tc.want { - t.Errorf("expected isFailure: %t but got %t", tc.want, got) + if got := tc.rpt.IsFailure(); got != tc.want { + t.Errorf("expected IsFailure: %t but got %t", tc.want, got) } }) } diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index 71537671cae..06342bef058 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -333,7 +333,7 @@ func (state PipelineRunState) getNextTasks(candidateTasks sets.String) []*Resolv func (facts *PipelineRunFacts) IsStopping() bool { for _, t := range facts.State { if facts.isDAGTask(t.PipelineTask.Name) { - if t.isFailure() && t.PipelineTask.OnError != v1.PipelineTaskContinue { + if t.IsFailure() && t.PipelineTask.OnError != v1.PipelineTaskContinue { return true } } @@ -700,7 +700,7 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount { case t.isCancelled(): s.Cancelled++ // increment failure counter based on Task OnError type since the task has failed - case t.isFailure(): + case t.IsFailure(): if t.PipelineTask.OnError == v1.PipelineTaskContinue { s.IgnoredFailed++ } else { diff --git a/pkg/reconciler/pipelinerun/resources/resultrefresolution.go b/pkg/reconciler/pipelinerun/resources/resultrefresolution.go index 73d7f9cf29a..4b546ff2edb 100644 --- a/pkg/reconciler/pipelinerun/resources/resultrefresolution.go +++ b/pkg/reconciler/pipelinerun/resources/resultrefresolution.go @@ -121,7 +121,7 @@ func convertToResultRefs(pipelineRunState PipelineRunState, target *ResolvedPipe if referencedPipelineTask == nil { return nil, resultRef.PipelineTask, fmt.Errorf("could not find task %q referenced by result", resultRef.PipelineTask) } - if !referencedPipelineTask.isSuccessful() && !referencedPipelineTask.isFailure() { + if !referencedPipelineTask.isSuccessful() && !referencedPipelineTask.IsFailure() { return nil, resultRef.PipelineTask, fmt.Errorf("task %q referenced by result was not finished", referencedPipelineTask.PipelineTask.Name) } // Custom Task diff --git a/test/fast_fail_test.go b/test/fast_fail_test.go new file mode 100644 index 00000000000..cbbcb5eda7b --- /dev/null +++ b/test/fast_fail_test.go @@ -0,0 +1,177 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2023 The Tekton Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "context" + "fmt" + "github.com/tektoncd/pipeline/test/parse" + knativetest "knative.dev/pkg/test" + "testing" +) + +func TestFastFail(t *testing.T) { + t.Parallel() + + requireAlphaFeatureFlag = requireAnyGate(map[string]string{ + "enable-api-fields": "alpha"}) + type tests struct { + name string + pipelineName string + pipelineRunFunc func(*testing.T, string) *v1.PipelineRun + } + + tds := []tests{{ + name: "fast fail pipeline run", + pipelineName: "fast-fail-pipeline", + pipelineRunFunc: getFastFailPipelineRun, + }} + for _, td := range tds { + t.Run(td.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + c, namespace := setup(ctx, t, requireAlphaFeatureFlag) + knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf) + defer tearDown(ctx, t, c, namespace) + + t.Logf("Setting up test resources for %q test in namespace %s", td.name, namespace) + pipelineRun := td.pipelineRunFunc(t, namespace) + + prName := pipelineRun.Name + _, err := c.V1PipelineRunClient.Create(ctx, pipelineRun, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create PipelineRun `%s`: %s", prName, err) + } + + t.Logf("Waiting for PipelineRun %s in namespace %s to complete", prName, namespace) + if err := WaitForPipelineRunState(ctx, c, prName, timeout, PipelineRunFailed(prName), "PipelineRunFailed", v1beta1Version); err != nil { + t.Fatalf("Error waiting for PipelineRun %s to finish: %s", prName, err) + } + cl, _ := c.V1PipelineRunClient.Get(ctx, prName, metav1.GetOptions{}) + if cl.Status.GetCondition(apis.ConditionSucceeded).IsTrue() { + t.Errorf("Expected PipelineRun to fail but found condition: %s", cl.Status.GetCondition(apis.ConditionSucceeded)) + } + expectedMessage := "Tasks Completed: 3 (Failed: 1, Cancelled 2), Skipped: 0" + if cl.Status.GetCondition(apis.ConditionSucceeded).Message != expectedMessage { + t.Errorf("Expected PipelineRun to fail with condition message: %s but got: %s", expectedMessage, cl.Status.GetCondition(apis.ConditionSucceeded).Message) + } + }) + } + +} + +func getFastFailPipelineRun(t *testing.T, namespace string) *v1.PipelineRun { + t.Helper() + pipelineRun := parse.MustParseV1beta1PipelineRun(t, fmt.Sprintf(` +metadata: + name: fast-fail-pipeline-run + namespace: %s +spec: + failFast: true + pipelineSpec: + tasks: + - name: fail-task + taskSpec: + steps: + - name: fail-task + image: busybox + command: ["/bin/sh", "-c"] + args: + - exit 1 + - name: success1 + taskSpec: + steps: + - name: success1 + image: busybox + command: ["/bin/sh", "-c"] + args: + - sleep 360 + - name: success2 + taskSpec: + steps: + - name: success2 + image: busybox + command: ["/bin/sh", "-c"] + args: + - sleep 360 +`)) + expectedPipelineRun := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: fast-fail-pipeline-run + namespace: %s +spec: + failFast: true + pipelineSpec: + tasks: + - name: fail-task + taskSpec: + metadata: {} + spec: null + steps: + - args: + - exit 1 + command: + - /bin/sh + - -c + computeResources: {} + image: busybox + name: fail-task + - name: success1 + taskSpec: + metadata: {} + spec: null + steps: + - args: + - sleep 360 + command: + - /bin/sh + - -c + computeResources: {} + image: busybox + name: success1 + - name: success2 + taskSpec: + metadata: {} + spec: null + steps: + - args: + - sleep 360 + command: + - /bin/sh + - -c + computeResources: {} + image: busybox + name: success2 + timeouts: + pipeline: 1h0m0s +status: + conditions: + - lastTransitionTime: "2024-05-26T09:59:24Z" + message: 'Tasks Completed: 3 (Failed: 1, Cancelled 2), Skipped: 0' + reason: Failed + status: "False" + type: Succeeded +`, namespace)) + + return pipelineRun +}