diff --git a/controllers/ansibletest_controller.go b/controllers/ansibletest_controller.go index 1293369..cc39249 100644 --- a/controllers/ansibletest_controller.go +++ b/controllers/ansibletest_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "fmt" "strconv" "time" @@ -29,6 +30,7 @@ import ( "github.com/openstack-k8s-operators/lib-common/modules/common/env" "github.com/openstack-k8s-operators/lib-common/modules/common/helper" "github.com/openstack-k8s-operators/lib-common/modules/common/job" + common_rbac "github.com/openstack-k8s-operators/lib-common/modules/common/rbac" "github.com/openstack-k8s-operators/test-operator/api/v1beta1" testv1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1" "github.com/openstack-k8s-operators/test-operator/pkg/ansibletest" @@ -36,7 +38,6 @@ import ( corev1 "k8s.io/api/core/v1" k8s_errors "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -80,11 +81,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, err } - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - // Create a helper helper, err := helper.NewHelper( instance, @@ -141,38 +137,61 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + nextAction, err := r.NextAction( + ctx, + instance, + helper, + len(instance.Spec.Workflow), + false, + ) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningAnsibleJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob) - if err != nil && !k8s_errors.IsNotFound(err) { + nextWorkflowStep := 0 + switch nextAction { + case Failure: return ctrl.Result{}, err - } else if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"]) - } - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - Log.Info("Job completed") + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: requeueAfter}, nil + + case EndTesting: if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { return ctrl.Result{}, err } + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) + if !lockAcquired { + return ctrl.Result{}, err + } + + nextWorkflowStep = 0 + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + lastJob, err := r.GetLastJob(ctx, instance) + if err != nil { + return ctrl.Result{}, err + } + + lastJobworkflowStep, err := strconv.Atoi(lastJob.Labels[workflowStepLabel]) + if err != nil { + return ctrl.Result{}, err + } + + nextWorkflowStep = lastJobworkflowStep + 1 + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, fmt.Errorf(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: ansibletest.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), + "workflowStep": strconv.Itoa(nextWorkflowStep), "instanceName": instance.Name, "operator": "test-operator", } @@ -193,51 +212,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // Create PersistentVolumeClaim - end - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) - if !lockAcquired { - Log.Info("Can not acquire lock") - requeueAfter := time.Second * 60 - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage) // Create a new job mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle") - jobName := r.GetJobName(instance, externalWorkflowCounter) - envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter) + jobName := r.GetJobName(instance, nextWorkflowStep) + envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, 0) containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance) - privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool) + privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool) if err != nil { return ctrl.Result{}, err } - if externalWorkflowCounter < len(instance.Spec.Workflow) { - if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil { - instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector + if nextWorkflowStep < len(instance.Spec.Workflow) { + if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil { + instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector } - if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil { - instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations + if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil { + instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations } - if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil { - instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel + if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil { + instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel } } @@ -259,7 +257,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) mountCerts, envVars, workflowOverrideParams, - externalWorkflowCounter, + nextWorkflowStep, containerImage, privileged, ) diff --git a/controllers/common.go b/controllers/common.go index 706025d..b4166c0 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -409,7 +409,7 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b err = r.Client.Delete(ctx, cm) if err != nil && k8s_errors.IsNotFound(err) { - return true, nil + return false, nil } // Check whether the lock was successfully deleted deleted @@ -563,6 +563,15 @@ func isLastJob(createdJobs int, workflowLength int) bool { } } +func isLastIndex(workflowLength int, lastJobIndex int) bool { + switch workflowLength { + case 0: + return lastJobIndex == workflowLength + default: + return lastJobIndex == (workflowLength - 1) + } +} + func (r *Reconciler) NextAction( ctx context.Context, instance client.Object, @@ -581,6 +590,14 @@ func (r *Reconciler) NextAction( return Failure, err } + lastJobIndex := -2 + if lastJob != nil { + lastJobIndex, err = strconv.Atoi(lastJob.Labels[workflowStepLabel]) + if err != nil { + return Failure, err + } + } + if lastJob != nil { lastJobFinished := (lastJob.Status.Failed + lastJob.Status.Succeeded) > 0 @@ -589,13 +606,18 @@ func (r *Reconciler) NextAction( } } - if isLastJob(len(jobs), workflowLength) { + if isLastJob(len(jobs), workflowLength) && isLastIndex(workflowLength, lastJobIndex) { if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { - return EndTesting, err + // TODO(lpiwowar): No failure when releasing non-existing lock + return Failure, err } return EndTesting, nil } + if isLastJob(len(jobs), workflowLength) && !isLastIndex(workflowLength, lastJobIndex) { + return Wait, nil + } + if len(jobs) == 0 { lockAcquired, err := r.AcquireLock(ctx, instance, helper, parallel) if !lockAcquired { diff --git a/controllers/tempest_controller.go b/controllers/tempest_controller.go index 0a2bc8b..4fb9580 100644 --- a/controllers/tempest_controller.go +++ b/controllers/tempest_controller.go @@ -333,8 +333,8 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations } - if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil { - instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel + if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil { + instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel } } diff --git a/controllers/tobiko_controller.go b/controllers/tobiko_controller.go index a04e3b6..67eb37b 100644 --- a/controllers/tobiko_controller.go +++ b/controllers/tobiko_controller.go @@ -29,6 +29,8 @@ import ( "github.com/openstack-k8s-operators/lib-common/modules/common/env" "github.com/openstack-k8s-operators/lib-common/modules/common/helper" "github.com/openstack-k8s-operators/lib-common/modules/common/job" + nad "github.com/openstack-k8s-operators/lib-common/modules/common/networkattachment" + common_rbac "github.com/openstack-k8s-operators/lib-common/modules/common/rbac" "github.com/openstack-k8s-operators/lib-common/modules/common/util" testv1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1" "github.com/openstack-k8s-operators/test-operator/pkg/tobiko" @@ -81,12 +83,6 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, err } - // Check whether the user wants to execute workflow - workflowActive := false - if len(instance.Spec.Workflow) > 0 { - workflowActive = true - } - helper, err := helper.NewHelper( instance, r.Client, @@ -146,36 +142,61 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res instance.Status.NetworkAttachments = map[string][]string{} } - // Ensure that there is an external counter and read its value - // We use the external counter to keep track of the workflow steps - r.WorkflowStepCounterCreate(ctx, instance, helper) - externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) - if externalWorkflowCounter == -1 { - return ctrl.Result{RequeueAfter: requeueAfter}, nil - } + nextAction, err := r.NextAction( + ctx, + instance, + helper, + len(instance.Spec.Workflow), + instance.Spec.Parallel, + ) - // Each job that is being executed by the test operator has - currentWorkflowStep := 0 - runningTobikoJob := &batchv1.Job{} - runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) - err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningTobikoJob) - if err == nil { - currentWorkflowStep, _ = strconv.Atoi(runningTobikoJob.Labels["workflowStep"]) - } + nextWorkflowStep := 0 + switch nextAction { + case Failure: + return ctrl.Result{}, err + + case Wait: + Log.Info(InfoWaitingOnJob) + return ctrl.Result{RequeueAfter: requeueAfter}, nil - if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { - instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) - // The job created by the instance was completed. Release the lock - // so that other instances can spawn a job. - Log.Info("Job completed") + case EndTesting: if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { return ctrl.Result{}, err } + + Log.Info(InfoTestingCompleted) + return ctrl.Result{}, nil + + case CreateFirstJob: + lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) + if !lockAcquired { + return ctrl.Result{}, err + } + + nextWorkflowStep = 0 + Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) + + case CreateNextJob: + lastJob, err := r.GetLastJob(ctx, instance) + if err != nil { + return ctrl.Result{}, err + } + + lastJobworkflowStep, err := strconv.Atoi(lastJob.Labels[workflowStepLabel]) + if err != nil { + return ctrl.Result{}, err + } + + nextWorkflowStep = lastJobworkflowStep + 1 + Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) + + default: + return ctrl.Result{}, fmt.Errorf(ErrReceivedUnexpectedAction) } serviceLabels := map[string]string{ common.AppSelector: tobiko.ServiceName, - "workflowStep": strconv.Itoa(externalWorkflowCounter), + "workflowStep": strconv.Itoa(nextWorkflowStep), "instanceName": instance.Name, "operator": "test-operator", } @@ -189,8 +210,8 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res workflowStepNum := 0 // Create multiple PVCs for parallel execution - if instance.Spec.Parallel && externalWorkflowCounter < len(instance.Spec.Workflow) { - workflowStepNum = externalWorkflowCounter + if instance.Spec.Parallel && nextWorkflowStep < len(instance.Spec.Workflow) { + workflowStepNum = nextWorkflowStep } // Create PersistentVolumeClaim @@ -216,7 +237,7 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res } // NetworkAttachments - if r.JobExists(ctx, instance, externalWorkflowCounter) { + if r.JobExists(ctx, instance, nextWorkflowStep) { networkReady, networkAttachmentStatus, err := nad.VerifyNetworkStatusFromAnnotation( ctx, helper, @@ -263,32 +284,12 @@ func (r *TobikoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res mountKubeconfig = true } - // If the current job is executing the last workflow step -> do not create another job - if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { - return ctrl.Result{}, nil - } else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { - return ctrl.Result{}, nil - } - - // We are about to start job that spawns the pod with tests. - // This lock ensures that there is always only one pod running. - lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel) - if !lockAcquired { - Log.Info("Can not acquire lock") - return ctrl.Result{RequeueAfter: requeueAfter}, err - } - Log.Info("Lock acquired") - - if workflowActive { - r.WorkflowStepCounterIncrease(ctx, instance, helper) - } - // Prepare Tobiko env vars - envVars := r.PrepareTobikoEnvVars(ctx, serviceLabels, instance, helper, externalWorkflowCounter) - jobName := r.GetJobName(instance, externalWorkflowCounter) + envVars := r.PrepareTobikoEnvVars(ctx, serviceLabels, instance, helper, nextWorkflowStep) + jobName := r.GetJobName(instance, nextWorkflowStep) logsPVCName := r.GetPVCLogsName(instance, workflowStepNum) containerImage, err := r.GetContainerImage(ctx, instance.Spec.ContainerImage, instance) - privileged := r.OverwriteValueWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool) + privileged := r.OverwriteValueWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool) if err != nil { return ctrl.Result{}, err }