Skip to content

Commit

Permalink
Unify controller loops - workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
lpiwowar committed Oct 22, 2024
1 parent 9a5a14b commit 7f6cddb
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 118 deletions.
118 changes: 58 additions & 60 deletions controllers/ansibletest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"
"strconv"
"time"

Expand All @@ -29,14 +30,14 @@ import (
"github.com/openstack-k8s-operators/lib-common/modules/common/env"
"github.com/openstack-k8s-operators/lib-common/modules/common/helper"
"github.com/openstack-k8s-operators/lib-common/modules/common/job"
common_rbac "github.com/openstack-k8s-operators/lib-common/modules/common/rbac"
"github.com/openstack-k8s-operators/test-operator/api/v1beta1"
testv1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
"github.com/openstack-k8s-operators/test-operator/pkg/ansibletest"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -80,11 +81,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

workflowActive := false
if len(instance.Spec.Workflow) > 0 {
workflowActive = true
}

// Create a helper
helper, err := helper.NewHelper(
instance,
Expand Down Expand Up @@ -141,38 +137,61 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

}

// Ensure that there is an external counter and read its value
// We use the external counter to keep track of the workflow steps
r.WorkflowStepCounterCreate(ctx, instance, helper)
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
if externalWorkflowCounter == -1 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
nextAction, err := r.NextAction(
ctx,
instance,
helper,
len(instance.Spec.Workflow),
false,
)

// Each job that is being executed by the test operator has
currentWorkflowStep := 0
runningAnsibleJob := &batchv1.Job{}
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
if err != nil && !k8s_errors.IsNotFound(err) {
nextWorkflowStep := 0
switch nextAction {
case Failure:
return ctrl.Result{}, err
} else if err == nil {
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
}

if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
// The job created by the instance was completed. Release the lock
// so that other instances can spawn a job.
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
Log.Info("Job completed")
case Wait:
Log.Info(InfoWaitingOnJob)
return ctrl.Result{RequeueAfter: requeueAfter}, nil

case EndTesting:
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
return ctrl.Result{}, err
}

Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
return ctrl.Result{}, err
}

nextWorkflowStep = 0
Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
lastJob, err := r.GetLastJob(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}

lastJobworkflowStep, err := strconv.Atoi(lastJob.Labels[workflowStepLabel])
if err != nil {
return ctrl.Result{}, err
}

nextWorkflowStep = lastJobworkflowStep + 1
Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))

default:
return ctrl.Result{}, fmt.Errorf(ErrReceivedUnexpectedAction)
}

serviceLabels := map[string]string{
common.AppSelector: ansibletest.ServiceName,
"workflowStep": strconv.Itoa(externalWorkflowCounter),
"workflowStep": strconv.Itoa(nextWorkflowStep),
"instanceName": instance.Name,
"operator": "test-operator",
}
Expand All @@ -193,51 +212,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// Create PersistentVolumeClaim - end

// If the current job is executing the last workflow step -> do not create another job
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
return ctrl.Result{}, nil
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
return ctrl.Result{}, nil
}

// We are about to start job that spawns the pod with tests.
// This lock ensures that there is always only one pod running.
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info("Can not acquire lock")
requeueAfter := time.Second * 60
return ctrl.Result{RequeueAfter: requeueAfter}, err
}
Log.Info("Lock acquired")

if workflowActive {
r.WorkflowStepCounterIncrease(ctx, instance, helper)
}

instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)

// Create a new job
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
jobName := r.GetJobName(instance, externalWorkflowCounter)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
jobName := r.GetJobName(instance, nextWorkflowStep)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
if err != nil {
return ctrl.Result{}, err
}

if externalWorkflowCounter < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
if nextWorkflowStep < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
}

if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
}

if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
}
}

Expand All @@ -259,7 +257,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mountCerts,
envVars,
workflowOverrideParams,
externalWorkflowCounter,
nextWorkflowStep,
containerImage,
privileged,
)
Expand Down
28 changes: 25 additions & 3 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b

err = r.Client.Delete(ctx, cm)
if err != nil && k8s_errors.IsNotFound(err) {
return true, nil
return false, nil
}

// Check whether the lock was successfully deleted deleted
Expand Down Expand Up @@ -563,6 +563,15 @@ func isLastJob(createdJobs int, workflowLength int) bool {
}
}

func isLastIndex(workflowLength int, lastJobIndex int) bool {
switch workflowLength {
case 0:
return lastJobIndex == workflowLength
default:
return lastJobIndex == (workflowLength - 1)
}
}

func (r *Reconciler) NextAction(
ctx context.Context,
instance client.Object,
Expand All @@ -581,6 +590,14 @@ func (r *Reconciler) NextAction(
return Failure, err
}

lastJobIndex := -2
if lastJob != nil {
lastJobIndex, err = strconv.Atoi(lastJob.Labels[workflowStepLabel])
if err != nil {
return Failure, err
}
}

if lastJob != nil {
lastJobFinished := (lastJob.Status.Failed + lastJob.Status.Succeeded) > 0

Expand All @@ -589,13 +606,18 @@ func (r *Reconciler) NextAction(
}
}

if isLastJob(len(jobs), workflowLength) {
if isLastJob(len(jobs), workflowLength) && isLastIndex(workflowLength, lastJobIndex) {
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
return EndTesting, err
// TODO(lpiwowar): No failure when releasing non-existing lock
return Failure, err
}
return EndTesting, nil
}

if isLastJob(len(jobs), workflowLength) && !isLastIndex(workflowLength, lastJobIndex) {
return Wait, nil
}

if len(jobs) == 0 {
lockAcquired, err := r.AcquireLock(ctx, instance, helper, parallel)
if !lockAcquired {
Expand Down
4 changes: 2 additions & 2 deletions controllers/tempest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ func (r *TempestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (re
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
}

if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
}
}

Expand Down
Loading

0 comments on commit 7f6cddb

Please sign in to comment.