-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[OSPRH-10386] Update logic for workflows #231
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ package controllers | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"strconv" | ||
"time" | ||
|
||
|
@@ -37,7 +39,6 @@ import ( | |
corev1 "k8s.io/api/core/v1" | ||
k8s_errors "k8s.io/apimachinery/pkg/api/errors" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/log" | ||
) | ||
|
||
|
@@ -68,9 +69,6 @@ func (r *AnsibleTestReconciler) GetLogger(ctx context.Context) logr.Logger { | |
func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, _err error) { | ||
Log := r.GetLogger(ctx) | ||
|
||
// How much time should we wait before calling Reconcile loop when there is a failure | ||
requeueAfter := time.Second * 60 | ||
|
||
// Fetch the ansible instance | ||
instance := &testv1beta1.AnsibleTest{} | ||
err := r.Client.Get(ctx, req.NamespacedName, instance) | ||
|
@@ -81,11 +79,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) | |
return ctrl.Result{}, err | ||
} | ||
|
||
workflowActive := false | ||
if len(instance.Spec.Workflow) > 0 { | ||
workflowActive = true | ||
} | ||
|
||
// Create a helper | ||
helper, err := helper.NewHelper( | ||
instance, | ||
|
@@ -142,40 +135,51 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) | |
|
||
} | ||
|
||
// Ensure that there is an external counter and read its value | ||
// We use the external counter to keep track of the workflow steps | ||
r.WorkflowStepCounterCreate(ctx, instance, helper) | ||
externalWorkflowCounter := r.WorkflowStepCounterRead(ctx, instance) | ||
if externalWorkflowCounter == -1 { | ||
return ctrl.Result{RequeueAfter: requeueAfter}, nil | ||
} | ||
workflowLength := len(instance.Spec.Workflow) | ||
nextAction, nextWorkflowStep, err := r.NextAction(ctx, instance, workflowLength) | ||
|
||
// Each job that is being executed by the test operator has | ||
currentWorkflowStep := 0 | ||
runningAnsibleJob := &batchv1.Job{} | ||
runningJobName := r.GetJobName(instance, externalWorkflowCounter-1) | ||
err = r.Client.Get(ctx, client.ObjectKey{Namespace: instance.GetNamespace(), Name: runningJobName}, runningAnsibleJob) | ||
if err != nil && !k8s_errors.IsNotFound(err) { | ||
switch nextAction { | ||
case Failure: | ||
return ctrl.Result{}, err | ||
} else if err == nil { | ||
currentWorkflowStep, _ = strconv.Atoi(runningAnsibleJob.Labels["workflowStep"]) | ||
} | ||
|
||
if r.CompletedJobExists(ctx, instance, currentWorkflowStep) { | ||
// The job created by the instance was completed. Release the lock | ||
// so that other instances can spawn a job. | ||
instance.Status.Conditions.MarkTrue(condition.DeploymentReadyCondition, condition.DeploymentReadyMessage) | ||
Log.Info("Job completed") | ||
if lockReleased, err := r.ReleaseLock(ctx, instance); !lockReleased { | ||
return ctrl.Result{}, err | ||
case Wait: | ||
Log.Info(InfoWaitingOnJob) | ||
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil | ||
|
||
case EndTesting: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (non-blocking) suggestion: I'd maintain the comment section about releasing the lock to let other instances spawn a job. I see the point of detilling that EndTesting might not be related to any error, and it's just the job was completed. |
||
if lockReleased, _ := r.ReleaseLock(ctx, instance); !lockReleased { | ||
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) | ||
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (blocking) Question: If we get this point it's because there was an error releasing the lock and/or there's no lock, right? |
||
} | ||
|
||
instance.Status.Conditions.MarkTrue( | ||
condition.DeploymentReadyCondition, | ||
condition.DeploymentReadyMessage) | ||
|
||
Log.Info(InfoTestingCompleted) | ||
return ctrl.Result{}, nil | ||
|
||
case CreateFirstJob: | ||
lockAcquired, _ := r.AcquireLock(ctx, instance, helper, false) | ||
if !lockAcquired { | ||
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName)) | ||
return ctrl.Result{RequeueAfter: RequeueAfterValue}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (blocking) suggestion: I'd maintain here the error information from Acquirelock. |
||
} | ||
|
||
Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep)) | ||
|
||
case CreateNextJob: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (non-blocking) question: Do we want to check here if there's an already created lock? If there's no lock I guess this step should not be performed right? I see the point that if get this we've already taken a previous code branch at L162 CreateFirstJob. But from my side, I think it'd be important to check here again. The previous version would have checked that no matter if first or middle workflow step. Praise: The new format, is much easier to understand. Like the way to check the current status before starting to trigger a new job. Unsure if a comment at L178 would be good to explain if we reach here it's because it's a first job or NextJob or would be good to wrap that section (from L 178 to 302) in a new method to be executed from the switch clause? I guess this might be good for the following task, as I'm unsure how the defer func() L106 would behave (If this makes sense, take this as non-blocking). |
||
Log.Info(fmt.Sprintf(InfoCreatingNextPod, nextWorkflowStep)) | ||
|
||
default: | ||
return ctrl.Result{}, errors.New(ErrReceivedUnexpectedAction) | ||
} | ||
|
||
serviceLabels := map[string]string{ | ||
common.AppSelector: ansibletest.ServiceName, | ||
"workflowStep": strconv.Itoa(externalWorkflowCounter), | ||
"instanceName": instance.Name, | ||
"operator": "test-operator", | ||
workflowStepLabel: strconv.Itoa(nextWorkflowStep), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. praise: Like this nextWorkflowStep instead of previous externalWorkflowCounter! |
||
instanceNameLabel: instance.Name, | ||
operatorNameLabel: "test-operator", | ||
} | ||
|
||
// Create PersistentVolumeClaim | ||
|
@@ -194,51 +198,30 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) | |
} | ||
// Create PersistentVolumeClaim - end | ||
|
||
// If the current job is executing the last workflow step -> do not create another job | ||
if workflowActive && externalWorkflowCounter >= len(instance.Spec.Workflow) { | ||
return ctrl.Result{}, nil | ||
} else if !workflowActive && r.JobExists(ctx, instance, currentWorkflowStep) { | ||
return ctrl.Result{}, nil | ||
} | ||
|
||
// We are about to start job that spawns the pod with tests. | ||
// This lock ensures that there is always only one pod running. | ||
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false) | ||
if !lockAcquired { | ||
Log.Info("Can not acquire lock") | ||
requeueAfter := time.Second * 60 | ||
return ctrl.Result{RequeueAfter: requeueAfter}, err | ||
} | ||
Log.Info("Lock acquired") | ||
|
||
if workflowActive { | ||
r.WorkflowStepCounterIncrease(ctx, instance, helper) | ||
} | ||
|
||
instance.Status.Conditions.MarkTrue(condition.ServiceConfigReadyCondition, condition.ServiceConfigReadyMessage) | ||
|
||
// Create a new job | ||
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle") | ||
jobName := r.GetJobName(instance, externalWorkflowCounter) | ||
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, externalWorkflowCounter) | ||
jobName := r.GetJobName(instance, nextWorkflowStep) | ||
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep) | ||
logsPVCName := r.GetPVCLogsName(instance, 0) | ||
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance) | ||
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", externalWorkflowCounter).(bool) | ||
privileged := r.OverwriteAnsibleWithWorkflow(instance.Spec, "Privileged", "pbool", nextWorkflowStep).(bool) | ||
if err != nil { | ||
return ctrl.Result{}, err | ||
} | ||
|
||
if externalWorkflowCounter < len(instance.Spec.Workflow) { | ||
if instance.Spec.Workflow[externalWorkflowCounter].NodeSelector != nil { | ||
instance.Spec.NodeSelector = *instance.Spec.Workflow[externalWorkflowCounter].NodeSelector | ||
if nextWorkflowStep < len(instance.Spec.Workflow) { | ||
if instance.Spec.Workflow[nextWorkflowStep].NodeSelector != nil { | ||
instance.Spec.NodeSelector = *instance.Spec.Workflow[nextWorkflowStep].NodeSelector | ||
} | ||
|
||
if instance.Spec.Workflow[externalWorkflowCounter].Tolerations != nil { | ||
instance.Spec.Tolerations = *instance.Spec.Workflow[externalWorkflowCounter].Tolerations | ||
if instance.Spec.Workflow[nextWorkflowStep].Tolerations != nil { | ||
instance.Spec.Tolerations = *instance.Spec.Workflow[nextWorkflowStep].Tolerations | ||
} | ||
|
||
if instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel != nil { | ||
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[externalWorkflowCounter].SELinuxLevel | ||
if instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel != nil { | ||
instance.Spec.SELinuxLevel = *instance.Spec.Workflow[nextWorkflowStep].SELinuxLevel | ||
} | ||
} | ||
|
||
|
@@ -260,7 +243,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) | |
mountCerts, | ||
envVars, | ||
workflowOverrideParams, | ||
externalWorkflowCounter, | ||
nextWorkflowStep, | ||
containerImage, | ||
privileged, | ||
) | ||
|
@@ -297,7 +280,6 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request) | |
return ctrlResult, nil | ||
} | ||
// Create a new job - end | ||
|
||
Log.Info("Reconciled Service successfully") | ||
return ctrl.Result{}, nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
praise: Good refactor and enhancement, much more clear this way!