Skip to content

Commit

Permalink
add no cluster schedule condition reason
Browse files Browse the repository at this point in the history
Signed-off-by: whitewindmills <[email protected]>
  • Loading branch information
whitewindmills committed Jul 6, 2023
1 parent 9bac51d commit 247f0f0
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 59 deletions.
15 changes: 10 additions & 5 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -60,19 +61,23 @@ func (g *genericScheduler) Schedule(
) (result ScheduleResult, err error) {
clusterInfoSnapshot := g.schedulerCache.Snapshot()
if clusterInfoSnapshot.NumOfClusters() == 0 {
return result, fmt.Errorf("no clusters available to schedule")
return result, &NoClusterError{
err: errors.New("no cluster available to schedule"),
}
}

feasibleClusters, diagnosis, err := g.findClustersThatFit(ctx, spec, status, &clusterInfoSnapshot)
if err != nil {
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
return result, fmt.Errorf("failed to find fit clusters: %w", err)
}

// Short path for case no cluster fit.
if len(feasibleClusters) == 0 {
return result, &framework.FitError{
NumAllClusters: clusterInfoSnapshot.NumOfClusters(),
Diagnosis: diagnosis,
return result, &NoClusterError{
err: &framework.FitError{
NumAllClusters: clusterInfoSnapshot.NumOfClusters(),
Diagnosis: diagnosis,
},
}
}
klog.V(4).Infof("Feasible clusters found: %v", feasibleClusters)
Expand Down
15 changes: 15 additions & 0 deletions pkg/scheduler/core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ import (

type calculator func([]*clusterv1alpha1.Cluster, *workv1alpha2.ResourceBindingSpec) []workv1alpha2.TargetCluster

type NoClusterError struct {
err error
}

func (e *NoClusterError) Error() string {
if e.err == nil {
return ""
}
return e.err.Error()
}

func (e *NoClusterError) Unwrap() error {
return e.err
}

func getDefaultWeightPreference(clusters []*clusterv1alpha1.Cluster) *policyv1alpha1.ClusterPreferences {
staticWeightLists := make([]policyv1alpha1.StaticClusterWeight, 0)
for _, cluster := range clusters {
Expand Down
22 changes: 22 additions & 0 deletions pkg/scheduler/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package scheduler

import (
"encoding/json"
"errors"
"reflect"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
"github.com/karmada-io/karmada/pkg/util"
)

func placementChanged(
Expand Down Expand Up @@ -84,3 +90,19 @@ func getAffinityIndex(affinities []policyv1alpha1.ClusterAffinityTerm, observedN
}
return 0
}

func createConditionByError(err error) metav1.Condition {
if err == nil {
return util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
}

var fitErr *framework.FitError
if errors.As(err, &fitErr) {
return util.NewCondition(workv1alpha2.Scheduled, noClusterFitReason, err.Error(), metav1.ConditionTrue)
}
var noClusterErr *core.NoClusterError
if errors.As(err, &noClusterErr) {
return util.NewCondition(workv1alpha2.Scheduled, noClusterAvailableReason, err.Error(), metav1.ConditionTrue)
}
return util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
45 changes: 45 additions & 0 deletions pkg/scheduler/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package scheduler

import (
"encoding/json"
"errors"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
)

func Test_needConsideredPlacementChanged(t *testing.T) {
Expand Down Expand Up @@ -428,3 +433,43 @@ func Test_getAffinityIndex(t *testing.T) {
})
}
}

func Test_createConditionByError(t *testing.T) {
tests := []struct {
name string
err error
expectedCondition metav1.Condition
}{
{
name: "no error",
err: nil,
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: scheduleSuccessReason, Status: metav1.ConditionTrue},
},
{
name: "failed to schedule",
err: errors.New(""),
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: scheduleFailedReason, Status: metav1.ConditionFalse},
},
{
name: "no cluster available",
err: &core.NoClusterError{},
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: noClusterAvailableReason, Status: metav1.ConditionTrue},
},
{
name: "no cluster fit",
err: &framework.FitError{},
expectedCondition: metav1.Condition{Type: workv1alpha2.Scheduled, Reason: noClusterFitReason, Status: metav1.ConditionTrue},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
condition := createConditionByError(tt.err)
if condition.Type != tt.expectedCondition.Type ||
condition.Reason != tt.expectedCondition.Reason ||
condition.Status != tt.expectedCondition.Status {
t.Errorf("expected condition: (%s, %s, %s), but got (%s, %s, %s)",
tt.expectedCondition.Type, tt.expectedCondition.Reason, tt.expectedCondition.Status, condition.Type, condition.Reason, condition.Status)
}
})
}
}
95 changes: 49 additions & 46 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
"github.com/karmada-io/karmada/pkg/scheduler/core"
"github.com/karmada-io/karmada/pkg/scheduler/framework"
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
Expand All @@ -54,9 +53,11 @@ const (
)

const (
scheduleSuccessReason = "BindingScheduled"
scheduleFailedReason = "BindingFailedScheduling"
scheduleSuccessMessage = "Binding has been scheduled"
scheduleSuccessReason = "BindingScheduled"
scheduleFailedReason = "BindingFailedScheduling"
noClusterAvailableReason = "NoClusterAvailable"
noClusterFitReason = "NoClusterFit"
scheduleSuccessMessage = "Binding has been scheduled"
)

const (
Expand Down Expand Up @@ -415,18 +416,14 @@ func (s *Scheduler) doScheduleClusterBinding(name string) (err error) {

func (s *Scheduler) scheduleResourceBinding(rb *workv1alpha2.ResourceBinding) (err error) {
defer func() {
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
condition := createConditionByError(err)
if updateErr := patchBindingStatusCondition(s.KarmadaClient, rb, condition); updateErr != nil {
// if patch error occurs, just return patch error to reconcile again.
err = updateErr
klog.Errorf("Failed to patch schedule status to ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
if err == nil {
// schedule succeed but update status failed, return err in order to retry in next loop.
err = updateErr
}
} else if err != nil && condition.Status == metav1.ConditionTrue {
// for finished schedule, we won't retry.
err = nil
}
}()

Expand All @@ -447,9 +444,9 @@ func (s *Scheduler) scheduleResourceBindingWithClusterAffinity(rb *workv1alpha2.
}

scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &rb.Spec, &rb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
var noClusterErr *core.NoClusterError
// in case of no cluster error, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterErr) {
s.recordScheduleResultEventForResourceBinding(rb, err)
klog.Errorf("Failed scheduling ResourceBinding(%s/%s): %v", rb.Namespace, rb.Name, err)
return err
Expand All @@ -458,10 +455,11 @@ func (s *Scheduler) scheduleResourceBindingWithClusterAffinity(rb *workv1alpha2.
klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, scheduleResult.SuggestedClusters)
patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), scheduleResult.SuggestedClusters)
s.recordScheduleResultEventForResourceBinding(rb, utilerrors.NewAggregate([]error{err, patchErr}))

// only care about the patch result,
// for FitError already recorded by event
return patchErr
if patchErr != nil {
// if patch error occurs, just return patch error to reconcile again.
return patchErr
}
return err
}

func (s *Scheduler) scheduleResourceBindingWithClusterAffinities(rb *workv1alpha2.ResourceBinding) error {
Expand Down Expand Up @@ -505,15 +503,19 @@ func (s *Scheduler) scheduleResourceBindingWithClusterAffinities(rb *workv1alpha

updatedStatus.SchedulerObservedAffinityName = rb.Status.SchedulerObservedAffinityName

var noClusterFit *framework.FitError
if !errors.As(firstErr, &noClusterFit) {
var noClusterErr *core.NoClusterError
if !errors.As(firstErr, &noClusterErr) {
return firstErr
}

klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, nil)
patchErr := s.patchScheduleResultForResourceBinding(rb, string(placementBytes), nil)
s.recordScheduleResultEventForResourceBinding(rb, patchErr)
return patchErr
s.recordScheduleResultEventForResourceBinding(rb, utilerrors.NewAggregate([]error{firstErr, patchErr}))
if patchErr != nil {
// if patch error occurs, just return patch error to reconcile again.
return patchErr
}
return firstErr
}

klog.V(4).Infof("ResourceBinding(%s/%s) scheduled to clusters %v", rb.Namespace, rb.Name, scheduleResult.SuggestedClusters)
Expand Down Expand Up @@ -552,18 +554,14 @@ func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alph

func (s *Scheduler) scheduleClusterResourceBinding(crb *workv1alpha2.ClusterResourceBinding) (err error) {
defer func() {
var condition metav1.Condition
if err == nil {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleSuccessReason, scheduleSuccessMessage, metav1.ConditionTrue)
} else {
condition = util.NewCondition(workv1alpha2.Scheduled, scheduleFailedReason, err.Error(), metav1.ConditionFalse)
}
condition := createConditionByError(err)
if updateErr := patchClusterBindingStatusCondition(s.KarmadaClient, crb, condition); updateErr != nil {
// if patch error occurs, just return patch error to reconcile again.
err = updateErr
klog.Errorf("Failed to patch schedule status to ClusterResourceBinding(%s): %v", crb.Name, err)
if err == nil {
// schedule succeed but update status failed, return err in order to retry in next loop.
err = updateErr
}
} else if err != nil && condition.Status == metav1.ConditionTrue {
// for finished schedule, we won't retry.
err = nil
}
}()

Expand All @@ -584,9 +582,9 @@ func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinity(crb *workv
}

scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &crb.Spec, &crb.Status, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
var noClusterFit *framework.FitError
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterFit) {
var noClusterErr *core.NoClusterError
// in case of no cluster error, can not return but continue to patch(cleanup) the result.
if err != nil && !errors.As(err, &noClusterErr) {
s.recordScheduleResultEventForClusterResourceBinding(crb, err)
klog.Errorf("Failed scheduling clusterResourceBinding(%s): %v", crb.Name, err)
return err
Expand All @@ -595,10 +593,11 @@ func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinity(crb *workv
klog.V(4).Infof("clusterResourceBinding(%s) scheduled to clusters %v", crb.Name, scheduleResult.SuggestedClusters)
patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), scheduleResult.SuggestedClusters)
s.recordScheduleResultEventForClusterResourceBinding(crb, utilerrors.NewAggregate([]error{err, patchErr}))

// only care about the patch result,
// for FitError already recorded by event
return patchErr
if patchErr != nil {
// if patch occurs, just return patch error to reconcile again.
return patchErr
}
return err
}

func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinities(crb *workv1alpha2.ClusterResourceBinding) error {
Expand Down Expand Up @@ -642,15 +641,19 @@ func (s *Scheduler) scheduleClusterResourceBindingWithClusterAffinities(crb *wor

updatedStatus.SchedulerObservedAffinityName = crb.Status.SchedulerObservedAffinityName

var noClusterFit *framework.FitError
if !errors.As(firstErr, &noClusterFit) {
var noClusterErr *core.NoClusterError
if !errors.As(firstErr, &noClusterErr) {
return firstErr
}

klog.V(4).Infof("ClusterResourceBinding(%s) scheduled to clusters %v", crb.Name, nil)
patchErr := s.patchScheduleResultForClusterResourceBinding(crb, string(placementBytes), nil)
s.recordScheduleResultEventForClusterResourceBinding(crb, patchErr)
return patchErr
s.recordScheduleResultEventForClusterResourceBinding(crb, utilerrors.NewAggregate([]error{firstErr, patchErr}))
if patchErr != nil {
// if patch error occurs, just return patch error to reconcile again.
return patchErr
}
return firstErr
}

klog.V(4).Infof("ClusterResourceBinding(%s) scheduled to clusters %v", crb.Name, scheduleResult.SuggestedClusters)
Expand Down
Loading

0 comments on commit 247f0f0

Please sign in to comment.