Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MultiKueue] Introduce Admission Check Validation feature gate #3254

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilindexer "sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down Expand Up @@ -240,11 +241,19 @@ func (c *Cache) DeleteResourceFlavor(rf *kueue.ResourceFlavor) sets.Set[string]
func (c *Cache) AddOrUpdateAdmissionCheck(ac *kueue.AdmissionCheck) sets.Set[string] {
c.Lock()
defer c.Unlock()

// TBD: This might not be needed, but we could keep it for readability
// because SingleInstanceInClusterQueue and FlavorIndependent will be false if AdmissionCheckValidationRules is off
singleInstanceInClusterQueue, flavorIndependent := false, false
if features.Enabled(features.AdmissionCheckValidationRules) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gating this code makes sense, but I think for reasability we can do:

newAC :=  AdmissionCheck{
  Active
  Controller 
  // by default SingleInstanceInClusterQueue and FlavorIndependent to false
}
if features.Enabled(features.AdmissionCheckValidationRules) {
  newAC. SingleInstanceInClusterQueue =apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue)
  newAC.FlavorIndependent = apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck)
}
c.admissionChecks[ac.Name] = newAC

singleInstanceInClusterQueue = apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue)
flavorIndependent = apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck)
}
c.admissionChecks[ac.Name] = AdmissionCheck{
Active: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionCheckActive),
Controller: ac.Spec.ControllerName,
SingleInstanceInClusterQueue: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue),
FlavorIndependent: apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck),
SingleInstanceInClusterQueue: singleInstanceInClusterQueue,
FlavorIndependent: flavorIndependent,
}

return c.updateClusterQueues()
Expand Down
48 changes: 36 additions & 12 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down Expand Up @@ -265,16 +266,28 @@ func (c *clusterQueue) inactiveReason() (string, string) {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckInactive)
messages = append(messages, fmt.Sprintf("references inactive AdmissionCheck(s): %v", c.inactiveAdmissionChecks))
}
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
if features.Enabled(features.AdmissionCheckValidationRules) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC we don't need any diff here if we used a dedicated cache field.
Then we can also easier drop the code in 0.10. For the hard-coded validation we can use
multipleMultiKueueChecks and perFlavorMultiKueueChecks

if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
}
}

if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
}
} else {
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
messages = append(messages, "Cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue")
}
}

if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, "MultiKueue AdmissionCheck cannot be specified per flavor")
}
}

if len(reasons) == 0 {
Expand Down Expand Up @@ -333,11 +346,22 @@ func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionChec
inactive = append(inactive, acName)
}
checksPerController[ac.Controller] = append(checksPerController[ac.Controller], acName)
if ac.SingleInstanceInClusterQueue {

if features.Enabled(features.AdmissionCheckValidationRules) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here, avoid the diff by using dedicated cache fields for the new checks

if ac.SingleInstanceInClusterQueue {
singleInstanceControllers.Insert(ac.Controller)
}
if ac.FlavorIndependent && flavors.Len() != 0 {
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}
} else if ac.Controller == kueue.MultiKueueControllerName {
// MultiKueue Admission Checks have extra constraints
// disallow to use multiple MultiKueue AdmissionChecks on the same ClusterQueue
// MultiKueue AdmissionCheck cannot be specified per flavor
singleInstanceControllers.Insert(ac.Controller)
}
if ac.FlavorIndependent && flavors.Len() != 0 {
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
if flavors.Len() != 0 {
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}
}
}
}
Expand Down
84 changes: 65 additions & 19 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,14 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
Obj()

testcases := []struct {
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
acValidationRulesEnabled bool
}{
{
name: "Pending clusterQueue updated valid AC list",
Expand Down Expand Up @@ -629,7 +630,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantMessage: "Can't admit new workloads: references inactive AdmissionCheck(s): [check3].",
},
{
name: "Active clusterQueue updated with duplicate single instance AC Controller",
name: "Active clusterQueue updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithAC,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -648,12 +649,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -672,12 +674,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor - AdmissionCheckValidationRules enabled",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -687,9 +690,10 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
acValidationRulesEnabled: true,
},
{
name: "Terminating clusterQueue updated with valid AC list",
Expand Down Expand Up @@ -771,10 +775,52 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantReason: "Terminating",
wantMessage: "Can't admit new workloads; clusterQueue is terminating",
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
SingleInstanceInClusterQueue: true,
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller2",
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: "Can't admit new workloads: Cannot use multiple MultiKueue AdmissionChecks on the same ClusterQueue.",
Comment on lines +799 to +800
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the feature is disabled set reason as MultipleMultiKueueAdmissionChecks

},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: MultiKueue AdmissionCheck cannot be specified per flavor.",
Comment on lines +814 to +815
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the gate is disabled use reason MutliKueueAdmissionCheckAppliedPerFlavor

},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if tc.acValidationRulesEnabled {
features.SetFeatureGateDuringTest(t, features.AdmissionCheckValidationRules, true)
}
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(tc.cq)
if err != nil {
Expand Down
43 changes: 23 additions & 20 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
)

Expand Down Expand Up @@ -126,28 +127,30 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
apimeta.SetStatusCondition(&ac.Status.Conditions, newCondition)
needsUpdate = true
}
if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.AdmissionChecksSingleInstanceInClusterQueue,
Status: metav1.ConditionTrue,
Reason: SingleInstanceReason,
Message: SingleInstanceMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}

if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.FlavorIndependentAdmissionCheck,
Status: metav1.ConditionTrue,
Reason: FlavorIndependentCheckReason,
Message: FlavorIndependentCheckMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}
if features.Enabled(features.AdmissionCheckValidationRules) {
if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.AdmissionChecksSingleInstanceInClusterQueue,
Status: metav1.ConditionTrue,
Reason: SingleInstanceReason,
Message: SingleInstanceMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}

if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.FlavorIndependentAdmissionCheck,
Status: metav1.ConditionTrue,
Reason: FlavorIndependentCheckReason,
Message: FlavorIndependentCheckMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}
}
if needsUpdate {
err := a.client.Status().Update(ctx, ac)
if err != nil {
Expand Down
Loading