Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OSPRH-10386] Update logic for workflows #231

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 50 additions & 68 deletions controllers/ansibletest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controllers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

Expand All @@ -37,7 +39,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger {
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) {
Log := r.GetLogger(ctx)

// How much time should we wait before calling Reconcile loop when there is a failure
requeueAfter := time.Second * 60

// Fetch the ansible instance
instance := &testv1beta1.AnsibleTest{}
err := r.Client.Get(ctx, req.NamespacedName, instance)
Expand All @@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

workflowActive := false
if len(instance.Spec.Workflow) > 0 {
workflowActive = true
}

// Create a helper
helper, err := helper.NewHelper(
instance,
Expand Down Expand Up @@ -142,40 +135,51 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

}

// Ensure that there is an external counter and read its value
// We use the external counter to keep track of the workflow steps
r.WorkflowStepCounterCreate(ctx, instance, helper)
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance)
if externalWorkflowCounter == -1 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
workflowLength := len(instance.Spec.Workflow)
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength)

// Each job that is being executed by the test operator has
currentWorkflowStep := 0
runningAnsibleJob := &batchv1.Job{}
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1)
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob)
if err != nil && !k8s_errors.IsNotFound(err) {
switch nextAction {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Good refactor and enhancement, much more clear this way!

case Failure:
return ctrl.Result{}, err
} else if err == nil {
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"])
}

if r.CompletedJobExists(ctx, instance, currentWorkflowStep) {
// The job created by the instance was completed. Release the lock
// so that other instances can spawn a job.
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage)
Log.Info("Job completed")
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased {
return ctrl.Result{}, err
case Wait:
Log.Info(InfoWaitingOnJob)
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil

case EndTesting:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(non-blocking) suggestion: I'd maintain the comment section about releasing the lock to let other instances spawn a job. I see the point of detilling that EndTesting might not be related to any error, and it's just the job was completed.

if lockReleased, _ := r.ReleaseLock(ctx, instance); !lockReleased {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(blocking) Question: If we get this point it's because there was an error releasing the lock and/or there's no lock, right?
Are we loosing here some context by returning nil as error info?

}

instance.Status.Conditions.MarkTrue(
condition.DeploymentReadyCondition,
condition.DeploymentReadyMessage)

Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
lockAcquired, _ := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(blocking) suggestion: I'd maintain here the error information from Acquirelock.

}

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(non-blocking) question: Do we want to check here if there's an already created lock? If there's no lock I guess this step should not be performed right?

I see the point that if get this we've already taken a previous code branch at L162 CreateFirstJob. But from my side, I think it'd be important to check here again. The previous version would have checked that no matter if first or middle workflow step.

Praise: The new format, is much easier to understand. Like the way to check the current status before starting to trigger a new job. Unsure if a comment at L178 would be good to explain if we reach here it's because it's a first job or NextJob or would be good to wrap that section (from L 178 to 302) in a new method to be executed from the switch clause? I guess this might be good for the following task, as I'm unsure how the defer func() L106 would behave (If this makes sense, take this as non-blocking).

Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep))

default:
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction)
}

serviceLabels := map[string]string{
common.AppSelector: ansibletest.ServiceName,
"workflowStep": strconv.Itoa(externalWorkflowCounter),
"instanceName": instance.Name,
"operator": "test-operator",
workflowStepLabel: strconv.Itoa(nextWorkflowStep),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: Like this nextWorkflowStep instead of previous externalWorkflowCounter!

instanceNameLabel: instance.Name,
operatorNameLabel: "test-operator",
}

// Create PersistentVolumeClaim
Expand All @@ -194,51 +198,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// Create PersistentVolumeClaim - end

// If the current job is executing the last workflow step -> do not create another job
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) {
return ctrl.Result{}, nil
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) {
return ctrl.Result{}, nil
}

// We are about to start job that spawns the pod with tests.
// This lock ensures that there is always only one pod running.
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info("Can not acquire lock")
requeueAfter := time.Second * 60
return ctrl.Result{RequeueAfter: requeueAfter}, err
}
Log.Info("Lock acquired")

if workflowActive {
r.WorkflowStepCounterIncrease(ctx, instance, helper)
}

instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage)

// Create a new job
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
jobName := r.GetJobName(instance, externalWorkflowCounter)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter)
jobName := r.GetJobName(instance, nextWorkflowStep)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool)
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool)
if err != nil {
return ctrl.Result{}, err
}

if externalWorkflowCounter < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector
if nextWorkflowStep < len(instance.Spec.Workflow) {
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil {
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector
}

if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil {
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations
}

if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil {
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel
}
}

Expand All @@ -260,7 +243,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
mountCerts,
envVars,
workflowOverrideParams,
externalWorkflowCounter,
nextWorkflowStep,
containerImage,
privileged,
)
Expand Down Expand Up @@ -297,7 +280,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrlResult, nil
}
// Create a new job - end

Log.Info("Reconciled Service successfully")
return ctrl.Result{}, nil
}
Expand Down
Loading
Loading