Skip to content

Commit

Permalink
Wrap AdmissionChecksSingleInstanceInClusterQueue and FlavorIndependen…
Browse files Browse the repository at this point in the history
…tAdmissionCheck status conditions with a feature gate.

Preserve the logic without the status conditions.
  • Loading branch information
mszadkow committed Oct 18, 2024
1 parent 9332a5a commit c8c32ad
Show file tree
Hide file tree
Showing 9 changed files with 281 additions and 68 deletions.
13 changes: 11 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down Expand Up @@ -240,11 +241,19 @@ func (c *Cache) DeleteResourceFlavor(rf *kueue.ResourceFlavor) sets.Set[string]
func (c *Cache) AddOrUpdateAdmissionCheck(ac *kueue.AdmissionCheck) sets.Set[string] {
c.Lock()
defer c.Unlock()

// TBD: This might not be needed, but we could keep it for readability
// because SingleInstanceInClusterQueue and FlavorIndependent will be false if AdmissionCheckValidationRules is off
singleInstanceInClusterQueue, flavorIndependent := false, false
if features.Enabled(features.AdmissionCheckValidationRules) {
singleInstanceInClusterQueue = apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue)
flavorIndependent = apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck)
}
c.admissionChecks[ac.Name] = AdmissionCheck{
Active: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionCheckActive),
Controller: ac.Spec.ControllerName,
SingleInstanceInClusterQueue: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue),
FlavorIndependent: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck),
SingleInstanceInClusterQueue: singleInstanceInClusterQueue,
FlavorIndependent: flavorIndependent,
}

return c.updateClusterQueues()
Expand Down
48 changes: 36 additions & 12 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down Expand Up @@ -265,16 +266,28 @@ func (c *clusterQueue) inactiveReason() (string, string) {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckInactive)
messages = append(messages, fmt.Sprintf("references inactive AdmissionCheck(s): %v", c.inactiveAdmissionChecks))
}
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
if features.Enabled(features.AdmissionCheckValidationRules) {
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
}
}

if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
}
} else {
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
messages = append(messages, "Cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue")
}
}

if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, "MultiKueue AdmissionCheck cannot be specified per flavor")
}
}

if len(reasons) == 0 {
Expand Down Expand Up @@ -333,11 +346,22 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
inactive = append(inactive, acName)
}
checksPerController[ac.Controller] = append(checksPerController[ac.Controller], acName)
if ac.SingleInstanceInClusterQueue {

if features.Enabled(features.AdmissionCheckValidationRules) {
if ac.SingleInstanceInClusterQueue {
singleInstanceControllers.Insert(ac.Controller)
}
if ac.FlavorIndependent && flavors.Len() != 0 {
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}
} else if ac.Controller == kueue.MultiKueueControllerName {
// MultiKueue Admission Checks have extra constraints
// disallow to use multiple MultiKueue AdmissionChecks on the same ClusterQueue
// MultiKueue AdmissionCheck cannot be specified per flavor
singleInstanceControllers.Insert(ac.Controller)
}
if ac.FlavorIndependent && flavors.Len() != 0 {
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
if flavors.Len() != 0 {
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}
}
}
}
Expand Down
84 changes: 65 additions & 19 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,14 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
Obj()

testcases := []struct {
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
acValidationRulesEnabled bool
}{
{
name: "Pending clusterQueue updated valid AC list",
Expand Down Expand Up @@ -629,7 +630,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantMessage: "Can't admit new workloads: references inactive AdmissionCheck(s): [check3].",
},
{
name: "Active clusterQueue updated with duplicate single instance AC Controller",
name: "Active clusterQueue updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithAC,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -648,12 +649,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -672,12 +674,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor - AdmissionCheckValidationRules enabled",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -687,9 +690,10 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
acValidationRulesEnabled: true,
},
{
name: "Terminating clusterQueue updated with valid AC list",
Expand Down Expand Up @@ -771,10 +775,52 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantReason: "Terminating",
wantMessage: "Can't admit new workloads; clusterQueue is terminating",
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
SingleInstanceInClusterQueue: true,
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller2",
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: "Can't admit new workloads: Cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue.",
},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: MultiKueue AdmissionCheck cannot be specified per flavor.",
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if tc.acValidationRulesEnabled {
features.SetFeatureGateDuringTest(t, features.AdmissionCheckValidationRules, true)
}
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(tc.cq)
if err != nil {
Expand Down
43 changes: 23 additions & 20 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
)

Expand Down Expand Up @@ -126,28 +127,30 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
apimeta.SetStatusCondition(&ac.Status.Conditions, newCondition)
needsUpdate = true
}
if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.AdmissionChecksSingleInstanceInClusterQueue,
Status: metav1.ConditionTrue,
Reason: SingleInstanceReason,
Message: SingleInstanceMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}

if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.FlavorIndependentAdmissionCheck,
Status: metav1.ConditionTrue,
Reason: FlavorIndependentCheckReason,
Message: FlavorIndependentCheckMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}
if features.Enabled(features.AdmissionCheckValidationRules) {
if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.AdmissionChecksSingleInstanceInClusterQueue,
Status: metav1.ConditionTrue,
Reason: SingleInstanceReason,
Message: SingleInstanceMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}

if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.FlavorIndependentAdmissionCheck,
Status: metav1.ConditionTrue,
Reason: FlavorIndependentCheckReason,
Message: FlavorIndependentCheckMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}
}
if needsUpdate {
err := a.client.Status().Update(ctx, ac)
if err != nil {
Expand Down
Loading

0 comments on commit c8c32ad

Please sign in to comment.