Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Oct 17, 2024
1 parent 72ea4cf commit 705820a
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 62 deletions.
30 changes: 22 additions & 8 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down Expand Up @@ -265,16 +266,29 @@ func (c *clusterQueue) inactiveReason() (string, string) {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckInactive)
messages = append(messages, fmt.Sprintf("references inactive AdmissionCheck(s): %v", c.inactiveAdmissionChecks))
}
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
if features.Enabled(features.AdmissionCheckValidationRules) {
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
}
}

if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
}
} else {
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
messages = append(messages, "Cannot use mutliple MultiKueue AdmissionChecks on the same ClusterQueue")

}
}

if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, "MultiKueue AdmissionCheck cannot be specified per flavor")
}
}

if len(reasons) == 0 {
Expand Down
84 changes: 65 additions & 19 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,14 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
Obj()

testcases := []struct {
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
wantMessage string
acValidationRulesEnabled bool
}{
{
name: "Pending clusterQueue updated valid AC list",
Expand Down Expand Up @@ -629,7 +630,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantMessage: "Can't admit new workloads: references inactive AdmissionCheck(s): [check3].",
},
{
name: "Active clusterQueue updated with duplicate single instance AC Controller",
name: "Active clusterQueue updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithAC,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -648,12 +649,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller - AdmissionCheckValidationRules enabled",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -672,12 +674,13 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: `Can't admit new workloads: only one AdmissionCheck of [check2 check3] can be referenced for controller "controller2".`,
acValidationRulesEnabled: true,
},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor - AdmissionCheckValidationRules enabled",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
Expand All @@ -687,9 +690,10 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: AdmissionCheck(s): [check1] cannot be set at flavor level.",
acValidationRulesEnabled: true,
},
{
name: "Terminating clusterQueue updated with valid AC list",
Expand Down Expand Up @@ -771,10 +775,52 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
wantReason: "Terminating",
wantMessage: "Can't admit new workloads; clusterQueue is terminating",
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
SingleInstanceInClusterQueue: true,
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller2",
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerAdmissionChecks",
wantMessage: "Can't admit new workloads: Cannot use mutliple MultiKueue AdmissionChecks on the same ClusterQueue.",
},
{
name: "Active clusterQueue with a FlavorIndependent AC applied per ResourceFlavor",
cq: cqWithACPerFlavor,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
FlavorIndependent: true,
},
},
wantStatus: pending,
wantReason: "FlavorIndependentAdmissionCheckAppliedPerFlavor",
wantMessage: "Can't admit new workloads: MultiKueue AdmissionCheck cannot be specified per flavor.",
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if tc.acValidationRulesEnabled {
features.SetFeatureGateDuringTest(t, features.AdmissionCheckValidationRules, true)
}
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(tc.cq)
if err != nil {
Expand Down
43 changes: 23 additions & 20 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
)

Expand Down Expand Up @@ -126,28 +127,30 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
apimeta.SetStatusCondition(&ac.Status.Conditions, newCondition)
needsUpdate = true
}
if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.AdmissionChecksSingleInstanceInClusterQueue,
Status: metav1.ConditionTrue,
Reason: SingleInstanceReason,
Message: SingleInstanceMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}

if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.FlavorIndependentAdmissionCheck,
Status: metav1.ConditionTrue,
Reason: FlavorIndependentCheckReason,
Message: FlavorIndependentCheckMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}
if features.Enabled(features.AdmissionCheckValidationRules) {
if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.AdmissionChecksSingleInstanceInClusterQueue) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.AdmissionChecksSingleInstanceInClusterQueue,
Status: metav1.ConditionTrue,
Reason: SingleInstanceReason,
Message: SingleInstanceMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}

if !apimeta.IsStatusConditionTrue(ac.Status.Conditions, kueue.FlavorIndependentAdmissionCheck) {
apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{
Type: kueue.FlavorIndependentAdmissionCheck,
Status: metav1.ConditionTrue,
Reason: FlavorIndependentCheckReason,
Message: FlavorIndependentCheckMessage,
ObservedGeneration: ac.Generation,
})
needsUpdate = true
}
}
if needsUpdate {
err := a.client.Status().Update(ctx, ac)
if err != nil {
Expand Down
46 changes: 44 additions & 2 deletions pkg/controller/admissionchecks/multikueue/admissioncheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

Expand All @@ -36,8 +37,9 @@ func TestReconcile(t *testing.T) {
reconcileFor string
configs []kueue.MultiKueueConfig

wantChecks []kueue.AdmissionCheck
wantError error
wantChecks []kueue.AdmissionCheck
wantError error
acValidationRulesEnabled bool
}{
"missing admissioncheck": {
reconcileFor: "missing-ac",
Expand Down Expand Up @@ -66,6 +68,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
acValidationRulesEnabled: true,
},
"unmanaged": {
reconcileFor: "ac1",
Expand Down Expand Up @@ -107,6 +110,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
acValidationRulesEnabled: true,
},
"inactive cluster": {
reconcileFor: "ac1",
Expand Down Expand Up @@ -140,6 +144,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
acValidationRulesEnabled: true,
},
"all clusters missing or inactive": {
reconcileFor: "ac1",
Expand Down Expand Up @@ -176,6 +181,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
acValidationRulesEnabled: true,
},
"partially active": {
reconcileFor: "ac1",
Expand Down Expand Up @@ -212,6 +218,7 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
acValidationRulesEnabled: true,
},
"active": {
reconcileFor: "ac1",
Expand Down Expand Up @@ -245,11 +252,46 @@ func TestReconcile(t *testing.T) {
}).
Obj(),
},
acValidationRulesEnabled: true,
},
"active AdmissionCheckValidationRules disabled": {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Generation(1).
Obj(),
},
configs: []kueuealpha.MultiKueueConfig{
*utiltesting.MakeMultiKueueConfig("config1").Clusters("worker1").Obj(),
},
clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").
Active(metav1.ConditionTrue, "ByTest", "by test", 1).
Obj(),
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(kueuealpha.MultiKueueControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "Active",
Message: `The admission check is active`,
ObservedGeneration: 1,
}).
Obj(),
},
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
if tc.acValidationRulesEnabled {
features.SetFeatureGateDuringTest(t, features.AdmissionCheckValidationRules, true)
}
builder, ctx := getClientBuilder()

builder = builder.WithLists(
Expand Down
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ const (
// Enable Topology Aware Scheduling allowing to optimize placement of Pods
// to put them on closely located nodes (e.g. within the same rack or block).
TopologyAwareScheduling featuregate.Feature = "TopologyAwareScheduling"

// owner: @mszadkow
// alpha: v0.9
// Deprecated: v0.10
//
// Enable additional AdmissionCheck validation rules that will appear in status conditions.
AdmissionCheckValidationRules featuregate.Feature = "AdmissionCheckValidationRules"
)

func init() {
Expand All @@ -133,6 +140,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
MultiKueueBatchJobWithManagedBy: {Default: false, PreRelease: featuregate.Alpha},
MultiplePreemptions: {Default: true, PreRelease: featuregate.Beta},
TopologyAwareScheduling: {Default: false, PreRelease: featuregate.Alpha},
AdmissionCheckValidationRules: {Default: false, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) {
Expand Down
1 change: 1 addition & 0 deletions site/content/en/docs/installation/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ The currently supported features are:
| `MultiplePreemptions` | `false` | Alpha | 0.8 | 0.8 |
| `MultiplePreemptions` | `true` | Beta | 0.9 | |
| `TopologyAwareScheduling` | `false` | Alpha | 0.9 | |
| `AdmissionCheckValidationRules` | `false` | Alpha | 0.9 | |

## What's next

Expand Down
Loading

0 comments on commit 705820a

Please sign in to comment.