Skip to content

Commit

Permalink
feat: change the Test Workflow orchestration mechanism to atomic inst…
Browse files Browse the repository at this point in the history
…ructions (#5676)

* fix(testworkflows): avoid unnecessary empty data in ContainerConfig
* feat(testworkflows): add basic implementation to build list of operations / grouping them, based on the TestWorkflowProcessor's Stages
* feat(testworkflows): migrate basics of container execution for handling multiple steps in single container
* feat: support conditions with new Init Process kind
* feat: encapsulate init process' container state
* chore: rename container to setup
* feat: support dynamic environment variables
* feat: add mock for pause in the Init Process
* feat: discriminate TestWorkflow instructions with ActionType
* chore: rename Paused to PausedOnStart
* chore: change StepData mutations
* chore: extract action.Action to separate package
* feat: move *Stage to separate package
* chore: reorganize action files
* chore: delete unused
* chore: extract lightweight version of actions to avoid increase in Init Process size
* feat: abstract the executions in the Init Process
* fix: aborting Test Workflow processes
* fix: support statuses properly in the Init Process
* fix: parsing container logs
* chore: delete debug information
* feat: make the root operation constant
* feat: support timeout for Test Workflow steps
* feat: add back retry functionality for the Test Workflows
* feat: support pause/resume for Test Workflows
* chore: delete unused code
* fix: watching services
* fix: some unit tests
* chore: add instructions check in unit test
* chore: refactor action unit tests a bit
* fix: build correct conditions when there are grouped steps
* chore: adjust processor tests
* chore: delete unused function
* feat: streamline the output in the Init Process, to allow obfuscating sensitive words
* feat: mask secret environment variables
* chore: extract function to get last step
* chore: unify internal machine for Init Process
* chore: move step timeout logic to the Init Process main switch
* fix: use default ContainerConfig for setup step container
* feat: use /bin/sh instead of /.tktw/bin/sh in case the Init/Toolkit image is used
* chore: fail property accessor in case of nullish value to access
* feat: ensure that aborted step is considered aborted, not skipped
* fix: machine for retry in the Init Process
* chore: rename `instructions` related to Actions to `actions`
* fix: send again toolkit/init image data along with Pod information from Test Workflow
* chore: simplify StepStatusFromCode
* fix: clean up printing in the Init Process
* chore: clean up getting actions in Processor tests
* chore: rename variable in Control Server
* chore: extract control server options to separate file
* chore: expose a bit configuration of sensitive words minimum length
* chore: delete obsolete todos
* fix: avoid copying binaries from the Init image when it's not necessary
* fix: unit tests
* fix: group conditions
* chore: wrap errors
* chore: reduce the size of Init Process
* fix: detect the Toolkit properly
* chore: rename SensitiveReadWriter to Obfuscator
* feat: show last characters in the obfuscated log output
* feat: make some environment variables sensitive by default
* chore: first set of code review fixes
* chore: make the 00 and 01 special groups constants
* chore: log error for mkdir of working directory
* chore: delete commented out code
* fix: handle errors in the setup step
  • Loading branch information
rangoo94 authored Jul 26, 2024
1 parent 506bb9b commit 220ac74
Show file tree
Hide file tree
Showing 84 changed files with 5,214 additions and 2,512 deletions.
4 changes: 2 additions & 2 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common"
"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common/render"
"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/testworkflows/renderer"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
apiclientv1 "github.com/kubeshop/testkube/pkg/api/v1/client"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
Expand Down Expand Up @@ -304,7 +304,7 @@ func printRawLogLines(logs []byte, steps []testkube.TestWorkflowSignature, resul
line = line[getTimestampLength(line)+1:]
}

start := data.StartHintRe.FindStringSubmatch(line)
start := instructions.StartHintRe.FindStringSubmatch(line)
if len(start) == 0 {
line += "\x07"
fmt.Println(line)
Expand Down
13 changes: 7 additions & 6 deletions cmd/tcl/testworkflow-toolkit/commands/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
commontcl "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common"
"github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/spawn"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/transfer"
"github.com/kubeshop/testkube/internal/common"
Expand Down Expand Up @@ -92,7 +93,7 @@ func buildTestExecution(test testworkflowsv1.StepExecuteTest, async bool) (func(
return
}

data.PrintOutput(env.Ref(), "test-start", &testExecutionDetails{
instructions.PrintOutput(env.Ref(), "test-start", &testExecutionDetails{
Id: exec.Id,
Name: exec.Name,
TestName: exec.TestName,
Expand Down Expand Up @@ -126,7 +127,7 @@ func buildTestExecution(test testworkflowsv1.StepExecuteTest, async bool) (func(
break loop
}
if prevStatus != status {
data.PrintOutput(env.Ref(), "test-status", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(env.Ref(), "test-status", &executionResult{Id: exec.Id, Status: string(status)})
}
prevStatus = status
}
Expand All @@ -140,7 +141,7 @@ func buildTestExecution(test testworkflowsv1.StepExecuteTest, async bool) (func(
color = ui.Red
}

data.PrintOutput(env.Ref(), "test-end", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(env.Ref(), "test-end", &executionResult{Id: exec.Id, Status: string(status)})
fmt.Printf("%s • %s\n", color(execName), string(status))
return
}, nil
Expand All @@ -161,7 +162,7 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
return
}

data.PrintOutput(env.Ref(), "testworkflow-start", &testWorkflowExecutionDetails{
instructions.PrintOutput(env.Ref(), "testworkflow-start", &testWorkflowExecutionDetails{
Id: exec.Id,
Name: exec.Name,
TestWorkflowName: exec.Workflow.Name,
Expand Down Expand Up @@ -195,7 +196,7 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
break loop
}
if prevStatus != status {
data.PrintOutput(env.Ref(), "testworkflow-status", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(env.Ref(), "testworkflow-status", &executionResult{Id: exec.Id, Status: string(status)})
}
prevStatus = status
}
Expand All @@ -209,7 +210,7 @@ func buildWorkflowExecution(workflow testworkflowsv1.StepExecuteWorkflow, async
color = ui.Red
}

data.PrintOutput(env.Ref(), "testworkflow-end", &executionResult{Id: exec.Id, Status: string(status)})
instructions.PrintOutput(env.Ref(), "testworkflow-end", &executionResult{Id: exec.Id, Status: string(status)})
fmt.Printf("%s • %s\n", color(execName), string(status))
return
}, nil
Expand Down
5 changes: 3 additions & 2 deletions cmd/tcl/testworkflow-toolkit/commands/kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
commontcl "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common"
"github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/spawn"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/artifacts"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/pkg/expressions"
Expand Down Expand Up @@ -73,7 +74,7 @@ func NewKillCmd() *cobra.Command {
Register("index", index).
RegisterAccessorExt(func(name string) (interface{}, bool, error) {
if name == "count" {
expr, err := expressions.CompileAndResolve(fmt.Sprintf("len(services.%s)", service))
expr, err := expressions.CompileAndResolve(fmt.Sprintf("len(%s)", data.ServicesPrefix+service))
return expr, true, err
}
return nil, false, nil
Expand Down Expand Up @@ -104,7 +105,7 @@ func NewKillCmd() *cobra.Command {

logsFilePath, err := spawn.SaveLogs(context.Background(), clientSet, storage, env.Namespace(), id, service+"/", index)
if err == nil {
data.PrintOutput(env.Ref(), "service", ServiceInfo{Group: groupRef, Name: service, Index: index, Logs: storage.FullPath(logsFilePath)})
instructions.PrintOutput(env.Ref(), "service", ServiceInfo{Group: groupRef, Name: service, Index: index, Logs: storage.FullPath(logsFilePath)})
log("saved logs")
} else {
log("warning", "problem saving the logs", err.Error())
Expand Down
16 changes: 8 additions & 8 deletions cmd/tcl/testworkflow-toolkit/commands/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@ import (
testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1"
commontcl "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common"
"github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/spawn"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/artifacts"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/transfer"
"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/expressions"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
"github.com/kubeshop/testkube/pkg/ui"
)

Expand Down Expand Up @@ -163,7 +163,7 @@ func NewParallelCmd() *cobra.Command {

// Send initial output
for index := range specs {
data.PrintOutput(env.Ref(), "parallel", ParallelStatus{
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{
Index: index,
Description: descriptions[index],
})
Expand Down Expand Up @@ -200,7 +200,7 @@ func NewParallelCmd() *cobra.Command {
}

// Compute the bundle instructions
sig := testworkflowprocessor.MapSignatureListToInternal(bundle.Signature)
sig := stage.MapSignatureListToInternal(bundle.Signature)
namespace := bundle.Job.Namespace
if namespace == "" {
namespace = env.Namespace()
Expand Down Expand Up @@ -228,7 +228,7 @@ func NewParallelCmd() *cobra.Command {
if shouldSaveLogs {
logsFilePath, err := spawn.SaveLogs(context.Background(), clientSet, storage, namespace, id, "", index)
if err == nil {
data.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Logs: storage.FullPath(logsFilePath)})
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Logs: storage.FullPath(logsFilePath)})
log("saved logs")
} else {
log("warning", "problem saving the logs", err.Error())
Expand All @@ -246,7 +246,7 @@ func NewParallelCmd() *cobra.Command {
}()

// Inform about the step structure
data.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Signature: sig})
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Signature: sig})

// Control the execution
// TODO: Consider aggregated controller to limit number of watchers
Expand Down Expand Up @@ -291,11 +291,11 @@ func NewParallelCmd() *cobra.Command {
prevStep = v.Current
prevStatus = v.Status
if v.Result.IsFinished() {
data.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Result: v.Result})
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Result: v.Result})
ctxCancel()
return v.Result.IsPassed()
} else {
data.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Current: v.Current})
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Current: v.Current})
}
}
}
Expand Down
18 changes: 10 additions & 8 deletions cmd/tcl/testworkflow-toolkit/commands/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
commontcl "github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/common"
"github.com/kubeshop/testkube/cmd/tcl/testworkflow-toolkit/spawn"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/instructions"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/env"
"github.com/kubeshop/testkube/cmd/testworkflow-toolkit/transfer"
"github.com/kubeshop/testkube/internal/common"
Expand Down Expand Up @@ -111,7 +112,7 @@ func NewServicesCmd() *cobra.Command {
}

// Initialize empty array of details for each of the services
data.PrintHintDetails(env.Ref(), fmt.Sprintf("services.%s", name), []ServiceState{})
instructions.PrintHintDetails(env.Ref(), data.ServicesPrefix+name, []ServiceState{})
}

// Analyze instances to run
Expand Down Expand Up @@ -189,12 +190,12 @@ func NewServicesCmd() *cobra.Command {
for i := range svcInstances {
state[name][i].Description = svcInstances[i].Description
}
data.PrintHintDetails(env.Ref(), fmt.Sprintf("services.%s", name), state)
instructions.PrintHintDetails(env.Ref(), data.ServicesPrefix+name, state)
}

// Inform about each service instance
for _, instance := range instances {
data.PrintOutput(env.Ref(), "service", ServiceInfo{
instructions.PrintOutput(env.Ref(), "service", ServiceInfo{
Group: groupRef,
Index: instance.Index,
Name: instance.Name,
Expand Down Expand Up @@ -274,7 +275,8 @@ func NewServicesCmd() *cobra.Command {
if namespace == "" {
namespace = env.Namespace()
}
mainRef := bundle.Job.Spec.Template.Spec.Containers[0].Name

mainRef := bundle.Actions().GetLastRef()

// Deploy the resources
// TODO: Avoid using Job
Expand Down Expand Up @@ -329,7 +331,7 @@ func NewServicesCmd() *cobra.Command {
state[instance.Name][index].Ip = v.PodIP
log(fmt.Sprintf("assigned to %s IP", ui.LightBlue(v.PodIP)))
info.Status = ServiceStatusRunning
data.PrintOutput(env.Ref(), "service", info)
instructions.PrintOutput(env.Ref(), "service", info)
}

if v.Current == mainRef {
Expand All @@ -349,7 +351,7 @@ func NewServicesCmd() *cobra.Command {
if !started {
info.Status = ServiceStatusFailed
log("container failed")
data.PrintOutput(env.Ref(), "service", info)
instructions.PrintOutput(env.Ref(), "service", info)
return false
}

Expand Down Expand Up @@ -377,7 +379,7 @@ func NewServicesCmd() *cobra.Command {
log("container ready")
info.Status = ServiceStatusReady
}
data.PrintOutput(env.Ref(), "service", info)
instructions.PrintOutput(env.Ref(), "service", info)

return ready
}
Expand All @@ -387,7 +389,7 @@ func NewServicesCmd() *cobra.Command {

// Inform about the services state
for k := range state {
data.PrintHintDetails(env.Ref(), fmt.Sprintf("services.%s", k), state[k])
instructions.PrintHintDetails(env.Ref(), data.ServicesPrefix+k, state[k])
}

// Notify the results
Expand Down
4 changes: 2 additions & 2 deletions cmd/tcl/testworkflow-toolkit/spawn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/expressions"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowcontroller"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/stage"
)

func MapDynamicListToStringList(list []interface{}) []string {
Expand Down Expand Up @@ -174,7 +174,7 @@ func ProcessFetch(transferSrv transfer.Server, fetch []testworkflowsv1.StepParal
Env: []corev1.EnvVar{
{Name: "TK_NS", Value: env.Namespace()},
{Name: "TK_REF", Value: env.Ref()},
testworkflowprocessor.BypassToolkitCheck,
stage.BypassToolkitCheck,
},
Args: &result,
},
Expand Down
79 changes: 79 additions & 0 deletions cmd/testworkflow-init/commands/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package commands

import (
"slices"

"github.com/kubeshop/testkube/cmd/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/cmd/testworkflow-init/orchestration"
"github.com/kubeshop/testkube/cmd/testworkflow-init/output"
"github.com/kubeshop/testkube/pkg/expressions"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite"
)

func Run(run lite.ActionExecute, container lite.LiteActionContainer) {
machine := data.GetInternalTestWorkflowMachine()
state := data.GetState()
step := state.GetStep(run.Ref)

// Abandon executing if the step was finished before
if step.IsFinished() {
return
}

// Obtain command to run
command := make([]string, 0)
if container.Config.Command != nil {
command = slices.Clone(*container.Config.Command)
}
if container.Config.Args != nil {
command = append(command, *container.Config.Args...)
}

// Ensure the command is not empty
if len(command) == 0 {
output.ExitErrorf(data.CodeInputError, "command is required")
}

// Resolve the command to run
for i := range command {
value, err := expressions.CompileAndResolveTemplate(command[i], machine, expressions.FinalizerFail)
if err != nil {
output.ExitErrorf(data.CodeInternal, "failed to compute argument '%d': %s", i, err.Error())
}
command[i], _ = value.Static().StringValue()
}

// Run the operation
execution := orchestration.Executions.Create(command[0], command[1:])
result, err := execution.Run()
if err != nil {
output.ExitErrorf(data.CodeInternal, "failed to execute: %v", err)
}

// Initialize local state
var status data.StepStatus

success := result.ExitCode == 0

// Compute the result
if run.Negative {
success = !success
}
if result.Aborted {
status = data.StepStatusAborted
} else if success {
status = data.StepStatusPassed
} else {
status = data.StepStatusFailed
}

// Abandon saving execution data if the step has been finished before
if step.IsFinished() {
return
}

// Notify about the status
step.SetStatus(status).SetExitCode(result.ExitCode)
orchestration.FinishExecution(step, constants.ExecutionResult{ExitCode: result.ExitCode, Iteration: int(step.Iteration)})
}
Loading

0 comments on commit 220ac74

Please sign in to comment.