Skip to content

Commit

Permalink
complete kruise daemonset controller
Browse files Browse the repository at this point in the history
  • Loading branch information
daimaxiaxie committed Aug 4, 2024
1 parent 7fd2c76 commit 49067a0
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 25 deletions.
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE=
github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -244,8 +242,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0=
github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/onsi/gomega v1.34.0 h1:eSSPsPNp6ZpsG8X1OVmOTxig+CblTc4AxpPBykhe2Os=
github.com/onsi/gomega v1.34.0/go.mod h1:MIKI8c+f+QLWk+hxbePD4i0LMJSExPaZOVfkoex4cAo=
github.com/openkruise/kruise v1.6.3 h1:JkFu4/7adekciEjFwHW8qZiA06iXGnPSF0EA4+Zi04k=
github.com/openkruise/kruise v1.6.3/go.mod h1:XcFJ7Dx+320AHl6G9SEoJNsUk1HsCZTW7rUClf5VQ+0=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
Expand Down Expand Up @@ -472,6 +470,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down Expand Up @@ -634,6 +635,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
32 changes: 26 additions & 6 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"errors"
"fmt"
kruise "github.com/openkruise/kruise/apis/apps/v1alpha1"
"sigs.k8s.io/karpenter/pkg/operator/options"
"strings"
"time"

Expand Down Expand Up @@ -417,25 +419,43 @@ func (p *Provisioner) getDaemonSetPods(ctx context.Context) ([]*corev1.Pod, erro
return nil, fmt.Errorf("listing daemonsets, %w", err)
}

return lo.Map(daemonSetList.Items, func(d appsv1.DaemonSet, _ int) *corev1.Pod {
pod := p.cluster.GetDaemonSetPod(&d)
handler := func(pod *corev1.Pod, template corev1.PodTemplateSpec) *corev1.Pod {
if pod == nil {
pod = &corev1.Pod{Spec: d.Spec.Template.Spec}
pod = &corev1.Pod{Spec: template.Spec}
}
// Replacing retrieved pod affinity with daemonset pod template required node affinity since this is overridden
// by the daemonset controller during pod creation
// https://github.com/kubernetes/kubernetes/blob/c5cf0ac1889f55ab51749798bec684aed876709d/pkg/controller/daemon/util/daemonset_util.go#L176
if d.Spec.Template.Spec.Affinity != nil && d.Spec.Template.Spec.Affinity.NodeAffinity != nil && d.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
if template.Spec.Affinity != nil && template.Spec.Affinity.NodeAffinity != nil && template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
if pod.Spec.Affinity == nil {
pod.Spec.Affinity = &corev1.Affinity{}
}
if pod.Spec.Affinity.NodeAffinity == nil {
pod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
}
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = d.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
}
return pod
}), nil
}

pods := lo.Map(daemonSetList.Items, func(d appsv1.DaemonSet, _ int) *corev1.Pod {
pod := p.cluster.GetDaemonSetPod(&d)
return handler(pod, d.Spec.Template)
})

if options.FromContext(ctx).SupportKruise {
kruiseDaemonSetList := &kruise.DaemonSetList{}
if err := p.kubeClient.List(ctx, kruiseDaemonSetList); err != nil {
return nil, fmt.Errorf("listing kruise daemonsets, %w", err)
}

pods = append(pods, lo.Map(kruiseDaemonSetList.Items, func(d kruise.DaemonSet, _ int) *corev1.Pod {
pod := p.cluster.GetDaemonSetPod(&d)
return handler(pod, d.Spec.Template)
})...)
}

return pods, nil
}

func (p *Provisioner) Validate(ctx context.Context, pod *corev1.Pod) error {
Expand Down
32 changes: 21 additions & 11 deletions pkg/controllers/state/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ package state
import (
"context"
"fmt"
kruise "github.com/openkruise/kruise/apis/apps/v1alpha1"
"sort"
"sync"
"time"

"github.com/samber/lo"
"go.uber.org/multierr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -44,6 +42,13 @@ import (
podutils "sigs.k8s.io/karpenter/pkg/utils/pod"
)

type ObjectKey struct {
Group string
Kind string
Namespace string
Name string
}

// Cluster maintains cluster state that is often needed but expensive to compute.
type Cluster struct {
kubeClient client.Client
Expand Down Expand Up @@ -348,17 +353,17 @@ func (c *Cluster) Reset() {
c.daemonSetPods = sync.Map{}
}

func (c *Cluster) GetDaemonSetPod(daemonset *appsv1.DaemonSet) *corev1.Pod {
if pod, ok := c.daemonSetPods.Load(client.ObjectKeyFromObject(daemonset)); ok {
func (c *Cluster) GetDaemonSetPod(daemonset client.Object) *corev1.Pod {
if pod, ok := c.daemonSetPods.Load(c.ObjectKeyFromObject(daemonset)); ok {
return pod.(*corev1.Pod).DeepCopy()
}

return nil
}

func (c *Cluster) UpdateDaemonSet(ctx context.Context, daemonset *appsv1.DaemonSet) error {
func (c *Cluster) UpdateDaemonSet(ctx context.Context, daemonset client.Object) error {
pods := &corev1.PodList{}
err := c.kubeClient.List(ctx, pods, client.InNamespace(daemonset.Namespace))
err := c.kubeClient.List(ctx, pods, client.InNamespace(daemonset.GetNamespace()))
if err != nil {
return err
}
Expand All @@ -369,20 +374,25 @@ func (c *Cluster) UpdateDaemonSet(ctx context.Context, daemonset *appsv1.DaemonS

for i := range pods.Items {
if metav1.IsControlledBy(&pods.Items[i], daemonset) {
c.daemonSetPods.Store(client.ObjectKeyFromObject(daemonset), &pods.Items[i])
c.daemonSetPods.Store(c.ObjectKeyFromObject(daemonset), &pods.Items[i])
break
}
}

return nil
}

func (c *Cluster) UpdateKruiseDaemonSet(ctx context.Context, daemonset *kruise.DaemonSet) error {
return nil
func (c *Cluster) DeleteDaemonSet(key ObjectKey) {
c.daemonSetPods.Delete(key)
}

func (c *Cluster) DeleteDaemonSet(key types.NamespacedName) {
c.daemonSetPods.Delete(key)
func (c *Cluster) ObjectKeyFromObject(obj client.Object) ObjectKey {
return ObjectKey{
Group: obj.GetObjectKind().GroupVersionKind().Group,
Kind: obj.GetObjectKind().GroupVersionKind().Kind,
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
}
}

// WARNING
Expand Down
7 changes: 6 additions & 1 deletion pkg/controllers/state/informer/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ func (c *DaemonSetController) Reconcile(ctx context.Context, req reconcile.Reque
if err := c.kubeClient.Get(ctx, req.NamespacedName, &daemonSet); err != nil {
if errors.IsNotFound(err) {
// notify cluster state of the daemonset deletion
c.cluster.DeleteDaemonSet(req.NamespacedName)
c.cluster.DeleteDaemonSet(state.ObjectKey{
Group: "apps",
Kind: "DaemonSet",
Namespace: req.Namespace,
Name: req.Name,
})
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
Expand Down
25 changes: 22 additions & 3 deletions pkg/controllers/state/informer/kruisedaemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package informer

import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/karpenter/pkg/operator/options"
"time"

kruise "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand All @@ -33,6 +37,12 @@ import (
"sigs.k8s.io/karpenter/pkg/controllers/state"
)

func init() {
gv := schema.GroupVersion{Group: "apps.kruise.io", Version: "v1alpha"}
v1.AddToGroupVersion(scheme.Scheme, gv)
scheme.Scheme.AddKnownTypes(gv, &kruise.DaemonSet{})
}

type KruiseDaemonSetController struct {
kubeClient client.Client
cluster *state.Cluster
Expand All @@ -52,17 +62,26 @@ func (c *KruiseDaemonSetController) Reconcile(ctx context.Context, req reconcile
if err := c.kubeClient.Get(ctx, req.NamespacedName, &daemonSet); err != nil {
if errors.IsNotFound(err) {
// notify cluster state of the daemonset deletion
c.cluster.DeleteDaemonSet(req.NamespacedName)
c.cluster.DeleteDaemonSet(state.ObjectKey{
Group: "apps.kruise.io",
Kind: "DaemonSet",
Namespace: req.Namespace,
Name: req.Name,
})
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if err := c.cluster.UpdateKruiseDaemonSet(ctx, &daemonSet); err != nil {
if err := c.cluster.UpdateDaemonSet(ctx, &daemonSet); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{RequeueAfter: time.Minute}, nil
}

func (c *KruiseDaemonSetController) Register(_ context.Context, m manager.Manager) error {
func (c *KruiseDaemonSetController) Register(ctx context.Context, m manager.Manager) error {
if !options.FromContext(ctx).SupportKruise {
return nil
}

return controllerruntime.NewControllerManagedBy(m).
Named("state.kruise-daemonset").
For(&kruise.DaemonSet{}).
Expand Down
2 changes: 2 additions & 0 deletions pkg/operator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Options struct {
BatchMaxDuration time.Duration
BatchIdleDuration time.Duration
FeatureGates FeatureGates
SupportKruise bool
}

type FlagSet struct {
Expand Down Expand Up @@ -100,6 +101,7 @@ func (o *Options) AddFlags(fs *FlagSet) {
fs.DurationVar(&o.BatchMaxDuration, "batch-max-duration", env.WithDefaultDuration("BATCH_MAX_DURATION", 10*time.Second), "The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one time which usually results in fewer but larger nodes.")
fs.DurationVar(&o.BatchIdleDuration, "batch-idle-duration", env.WithDefaultDuration("BATCH_IDLE_DURATION", time.Second), "The maximum amount of time with no new pending pods that if exceeded ends the current batching window. If pods arrive faster than this time, the batching window will be extended up to the maxDuration. If they arrive slower, the pods will be batched separately.")
fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: SpotToSpotConsolidation")
fs.BoolVarWithEnv(&o.SupportKruise, "support-kruise", "SUPPORT_KRUISE", false, "Enable the support kruise")
}

func (o *Options) Parse(fs *FlagSet, args ...string) error {
Expand Down
9 changes: 9 additions & 0 deletions pkg/operator/options/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var _ = Describe("Options", func() {
"BATCH_MAX_DURATION",
"BATCH_IDLE_DURATION",
"FEATURE_GATES",
"SUPPORT_KRUISE",
}

BeforeEach(func() {
Expand Down Expand Up @@ -117,6 +118,7 @@ var _ = Describe("Options", func() {
FeatureGates: test.FeatureGates{
SpotToSpotConsolidation: lo.ToPtr(false),
},
SupportKruise: lo.ToPtr(false),
}))
})

Expand All @@ -142,6 +144,7 @@ var _ = Describe("Options", func() {
"--batch-max-duration", "5s",
"--batch-idle-duration", "5s",
"--feature-gates", "SpotToSpotConsolidation=true",
"--support-kruise=true",
)
Expect(err).To(BeNil())
expectOptionsMatch(opts, test.Options(test.OptionsFields{
Expand All @@ -164,6 +167,7 @@ var _ = Describe("Options", func() {
FeatureGates: test.FeatureGates{
SpotToSpotConsolidation: lo.ToPtr(true),
},
SupportKruise: lo.ToPtr(true),
}))
})

Expand All @@ -185,6 +189,7 @@ var _ = Describe("Options", func() {
os.Setenv("BATCH_MAX_DURATION", "5s")
os.Setenv("BATCH_IDLE_DURATION", "5s")
os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true")
os.Setenv("SUPPORT_KRUISE", "true")
fs = &options.FlagSet{
FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError),
}
Expand All @@ -211,6 +216,7 @@ var _ = Describe("Options", func() {
FeatureGates: test.FeatureGates{
SpotToSpotConsolidation: lo.ToPtr(true),
},
SupportKruise: lo.ToPtr(true),
}))
})

Expand All @@ -228,6 +234,7 @@ var _ = Describe("Options", func() {
os.Setenv("BATCH_MAX_DURATION", "5s")
os.Setenv("BATCH_IDLE_DURATION", "5s")
os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true")
os.Setenv("SUPPORT_KRUISE", "true")
fs = &options.FlagSet{
FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError),
}
Expand Down Expand Up @@ -259,6 +266,7 @@ var _ = Describe("Options", func() {
FeatureGates: test.FeatureGates{
SpotToSpotConsolidation: lo.ToPtr(true),
},
SupportKruise: lo.ToPtr(true),
}))
})
})
Expand Down Expand Up @@ -319,4 +327,5 @@ func expectOptionsMatch(optsA, optsB *options.Options) {
Expect(optsA.BatchMaxDuration).To(Equal(optsB.BatchMaxDuration))
Expect(optsA.BatchIdleDuration).To(Equal(optsB.BatchIdleDuration))
Expect(optsA.FeatureGates.SpotToSpotConsolidation).To(Equal(optsB.FeatureGates.SpotToSpotConsolidation))
Expect(optsA.SupportKruise).To(Equal(optsB.SupportKruise))
}
2 changes: 2 additions & 0 deletions pkg/test/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type OptionsFields struct {
BatchMaxDuration *time.Duration
BatchIdleDuration *time.Duration
FeatureGates FeatureGates
SupportKruise *bool
}

type FeatureGates struct {
Expand Down Expand Up @@ -79,5 +80,6 @@ func Options(overrides ...OptionsFields) *options.Options {
FeatureGates: options.FeatureGates{
SpotToSpotConsolidation: lo.FromPtrOr(opts.FeatureGates.SpotToSpotConsolidation, false),
},
SupportKruise: lo.FromPtrOr(opts.SupportKruise, false),
}
}

0 comments on commit 49067a0

Please sign in to comment.