From 49067a0142563aaa33415fd5c0cf794d7eef4fd4 Mon Sep 17 00:00:00 2001 From: daimaxiaxie Date: Sun, 4 Aug 2024 18:04:52 +0800 Subject: [PATCH] complete kruise daemonset controller --- go.sum | 10 +++--- pkg/controllers/provisioning/provisioner.go | 32 +++++++++++++++---- pkg/controllers/state/cluster.go | 32 ++++++++++++------- pkg/controllers/state/informer/daemonset.go | 7 +++- .../state/informer/kruisedaemonset.go | 25 +++++++++++++-- pkg/operator/options/options.go | 2 ++ pkg/operator/options/suite_test.go | 9 ++++++ pkg/test/options.go | 2 ++ 8 files changed, 94 insertions(+), 25 deletions(-) diff --git a/go.sum b/go.sum index 4afdb05ff5..187b9cd1ea 100644 --- a/go.sum +++ b/go.sum @@ -118,8 +118,6 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= -github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -244,8 +242,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/onsi/ginkgo/v2 v2.19.1 h1:QXgq3Z8Crl5EL1WBAC98A5sEBHARrAJNzAmMxzLcRF0= github.com/onsi/ginkgo/v2 v2.19.1/go.mod h1:O3DtEWQkPa/F7fBMgmZQKKsluAy8pd3rEQdrjkPb9zA= -github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= -github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= +github.com/onsi/gomega v1.34.0 h1:eSSPsPNp6ZpsG8X1OVmOTxig+CblTc4AxpPBykhe2Os= +github.com/onsi/gomega v1.34.0/go.mod h1:MIKI8c+f+QLWk+hxbePD4i0LMJSExPaZOVfkoex4cAo= github.com/openkruise/kruise v1.6.3 h1:JkFu4/7adekciEjFwHW8qZiA06iXGnPSF0EA4+Zi04k= github.com/openkruise/kruise v1.6.3/go.mod h1:XcFJ7Dx+320AHl6G9SEoJNsUk1HsCZTW7rUClf5VQ+0= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= @@ -472,6 +470,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -634,6 +635,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 59e2fe0b97..d3a5fd4c85 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -20,6 +20,8 @@ import ( "context" "errors" "fmt" + kruise "github.com/openkruise/kruise/apis/apps/v1alpha1" + "sigs.k8s.io/karpenter/pkg/operator/options" "strings" "time" @@ -417,25 +419,43 @@ func (p *Provisioner) getDaemonSetPods(ctx context.Context) ([]*corev1.Pod, erro return nil, fmt.Errorf("listing daemonsets, %w", err) } - return lo.Map(daemonSetList.Items, func(d appsv1.DaemonSet, _ int) *corev1.Pod { - pod := p.cluster.GetDaemonSetPod(&d) + handler := func(pod *corev1.Pod, template corev1.PodTemplateSpec) *corev1.Pod { if pod == nil { - pod = &corev1.Pod{Spec: d.Spec.Template.Spec} + pod = &corev1.Pod{Spec: template.Spec} } // Replacing retrieved pod affinity with daemonset pod template required node affinity since this is overridden // by the daemonset controller during pod creation // https://github.com/kubernetes/kubernetes/blob/c5cf0ac1889f55ab51749798bec684aed876709d/pkg/controller/daemon/util/daemonset_util.go#L176 - if d.Spec.Template.Spec.Affinity != nil && d.Spec.Template.Spec.Affinity.NodeAffinity != nil && d.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { + if template.Spec.Affinity != nil && template.Spec.Affinity.NodeAffinity != nil && template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil { if pod.Spec.Affinity == nil { pod.Spec.Affinity = &corev1.Affinity{} } if pod.Spec.Affinity.NodeAffinity == nil { pod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{} } - pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = d.Spec.Template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution + pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = template.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution } return pod - }), nil + } + + pods := lo.Map(daemonSetList.Items, func(d appsv1.DaemonSet, _ int) *corev1.Pod { + pod := p.cluster.GetDaemonSetPod(&d) + return handler(pod, d.Spec.Template) + }) + + if options.FromContext(ctx).SupportKruise { + kruiseDaemonSetList := &kruise.DaemonSetList{} + if err := p.kubeClient.List(ctx, kruiseDaemonSetList); err != nil { + return nil, fmt.Errorf("listing kruise daemonsets, %w", err) + } + + pods = append(pods, lo.Map(kruiseDaemonSetList.Items, func(d kruise.DaemonSet, _ int) *corev1.Pod { + pod := p.cluster.GetDaemonSetPod(&d) + return handler(pod, d.Spec.Template) + })...) + } + + return pods, nil } func (p *Provisioner) Validate(ctx context.Context, pod *corev1.Pod) error { diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index 6f842e0e95..418c8eab1b 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -19,14 +19,12 @@ package state import ( "context" "fmt" - kruise "github.com/openkruise/kruise/apis/apps/v1alpha1" "sort" "sync" "time" "github.com/samber/lo" "go.uber.org/multierr" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -44,6 +42,13 @@ import ( podutils "sigs.k8s.io/karpenter/pkg/utils/pod" ) +type ObjectKey struct { + Group string + Kind string + Namespace string + Name string +} + // Cluster maintains cluster state that is often needed but expensive to compute. type Cluster struct { kubeClient client.Client @@ -348,17 +353,17 @@ func (c *Cluster) Reset() { c.daemonSetPods = sync.Map{} } -func (c *Cluster) GetDaemonSetPod(daemonset *appsv1.DaemonSet) *corev1.Pod { - if pod, ok := c.daemonSetPods.Load(client.ObjectKeyFromObject(daemonset)); ok { +func (c *Cluster) GetDaemonSetPod(daemonset client.Object) *corev1.Pod { + if pod, ok := c.daemonSetPods.Load(c.ObjectKeyFromObject(daemonset)); ok { return pod.(*corev1.Pod).DeepCopy() } return nil } -func (c *Cluster) UpdateDaemonSet(ctx context.Context, daemonset *appsv1.DaemonSet) error { +func (c *Cluster) UpdateDaemonSet(ctx context.Context, daemonset client.Object) error { pods := &corev1.PodList{} - err := c.kubeClient.List(ctx, pods, client.InNamespace(daemonset.Namespace)) + err := c.kubeClient.List(ctx, pods, client.InNamespace(daemonset.GetNamespace())) if err != nil { return err } @@ -369,7 +374,7 @@ func (c *Cluster) UpdateDaemonSet(ctx context.Context, daemonset *appsv1.DaemonS for i := range pods.Items { if metav1.IsControlledBy(&pods.Items[i], daemonset) { - c.daemonSetPods.Store(client.ObjectKeyFromObject(daemonset), &pods.Items[i]) + c.daemonSetPods.Store(c.ObjectKeyFromObject(daemonset), &pods.Items[i]) break } } @@ -377,12 +382,17 @@ func (c *Cluster) UpdateDaemonSet(ctx context.Context, daemonset *appsv1.DaemonS return nil } -func (c *Cluster) UpdateKruiseDaemonSet(ctx context.Context, daemonset *kruise.DaemonSet) error { - return nil +func (c *Cluster) DeleteDaemonSet(key ObjectKey) { + c.daemonSetPods.Delete(key) } -func (c *Cluster) DeleteDaemonSet(key types.NamespacedName) { - c.daemonSetPods.Delete(key) +func (c *Cluster) ObjectKeyFromObject(obj client.Object) ObjectKey { + return ObjectKey{ + Group: obj.GetObjectKind().GroupVersionKind().Group, + Kind: obj.GetObjectKind().GroupVersionKind().Kind, + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } } // WARNING diff --git a/pkg/controllers/state/informer/daemonset.go b/pkg/controllers/state/informer/daemonset.go index 6c3ca9ba7f..c6f7d773ee 100644 --- a/pkg/controllers/state/informer/daemonset.go +++ b/pkg/controllers/state/informer/daemonset.go @@ -52,7 +52,12 @@ func (c *DaemonSetController) Reconcile(ctx context.Context, req reconcile.Reque if err := c.kubeClient.Get(ctx, req.NamespacedName, &daemonSet); err != nil { if errors.IsNotFound(err) { // notify cluster state of the daemonset deletion - c.cluster.DeleteDaemonSet(req.NamespacedName) + c.cluster.DeleteDaemonSet(state.ObjectKey{ + Group: "apps", + Kind: "DaemonSet", + Namespace: req.Namespace, + Name: req.Name, + }) } return reconcile.Result{}, client.IgnoreNotFound(err) } diff --git a/pkg/controllers/state/informer/kruisedaemonset.go b/pkg/controllers/state/informer/kruisedaemonset.go index e7ae3bac7f..b986654cf9 100644 --- a/pkg/controllers/state/informer/kruisedaemonset.go +++ b/pkg/controllers/state/informer/kruisedaemonset.go @@ -18,6 +18,10 @@ package informer import ( "context" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/karpenter/pkg/operator/options" "time" kruise "github.com/openkruise/kruise/apis/apps/v1alpha1" @@ -33,6 +37,12 @@ import ( "sigs.k8s.io/karpenter/pkg/controllers/state" ) +func init() { + gv := schema.GroupVersion{Group: "apps.kruise.io", Version: "v1alpha"} + v1.AddToGroupVersion(scheme.Scheme, gv) + scheme.Scheme.AddKnownTypes(gv, &kruise.DaemonSet{}) +} + type KruiseDaemonSetController struct { kubeClient client.Client cluster *state.Cluster @@ -52,17 +62,26 @@ func (c *KruiseDaemonSetController) Reconcile(ctx context.Context, req reconcile if err := c.kubeClient.Get(ctx, req.NamespacedName, &daemonSet); err != nil { if errors.IsNotFound(err) { // notify cluster state of the daemonset deletion - c.cluster.DeleteDaemonSet(req.NamespacedName) + c.cluster.DeleteDaemonSet(state.ObjectKey{ + Group: "apps.kruise.io", + Kind: "DaemonSet", + Namespace: req.Namespace, + Name: req.Name, + }) } return reconcile.Result{}, client.IgnoreNotFound(err) } - if err := c.cluster.UpdateKruiseDaemonSet(ctx, &daemonSet); err != nil { + if err := c.cluster.UpdateDaemonSet(ctx, &daemonSet); err != nil { return reconcile.Result{}, err } return reconcile.Result{RequeueAfter: time.Minute}, nil } -func (c *KruiseDaemonSetController) Register(_ context.Context, m manager.Manager) error { +func (c *KruiseDaemonSetController) Register(ctx context.Context, m manager.Manager) error { + if !options.FromContext(ctx).SupportKruise { + return nil + } + return controllerruntime.NewControllerManagedBy(m). Named("state.kruise-daemonset"). For(&kruise.DaemonSet{}). diff --git a/pkg/operator/options/options.go b/pkg/operator/options/options.go index 262733b439..624400594b 100644 --- a/pkg/operator/options/options.go +++ b/pkg/operator/options/options.go @@ -63,6 +63,7 @@ type Options struct { BatchMaxDuration time.Duration BatchIdleDuration time.Duration FeatureGates FeatureGates + SupportKruise bool } type FlagSet struct { @@ -100,6 +101,7 @@ func (o *Options) AddFlags(fs *FlagSet) { fs.DurationVar(&o.BatchMaxDuration, "batch-max-duration", env.WithDefaultDuration("BATCH_MAX_DURATION", 10*time.Second), "The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one time which usually results in fewer but larger nodes.") fs.DurationVar(&o.BatchIdleDuration, "batch-idle-duration", env.WithDefaultDuration("BATCH_IDLE_DURATION", time.Second), "The maximum amount of time with no new pending pods that if exceeded ends the current batching window. If pods arrive faster than this time, the batching window will be extended up to the maxDuration. If they arrive slower, the pods will be batched separately.") fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: SpotToSpotConsolidation") + fs.BoolVarWithEnv(&o.SupportKruise, "support-kruise", "SUPPORT_KRUISE", false, "Enable the support kruise") } func (o *Options) Parse(fs *FlagSet, args ...string) error { diff --git a/pkg/operator/options/suite_test.go b/pkg/operator/options/suite_test.go index 5dca0ba426..9660357603 100644 --- a/pkg/operator/options/suite_test.go +++ b/pkg/operator/options/suite_test.go @@ -62,6 +62,7 @@ var _ = Describe("Options", func() { "BATCH_MAX_DURATION", "BATCH_IDLE_DURATION", "FEATURE_GATES", + "SUPPORT_KRUISE", } BeforeEach(func() { @@ -117,6 +118,7 @@ var _ = Describe("Options", func() { FeatureGates: test.FeatureGates{ SpotToSpotConsolidation: lo.ToPtr(false), }, + SupportKruise: lo.ToPtr(false), })) }) @@ -142,6 +144,7 @@ var _ = Describe("Options", func() { "--batch-max-duration", "5s", "--batch-idle-duration", "5s", "--feature-gates", "SpotToSpotConsolidation=true", + "--support-kruise=true", ) Expect(err).To(BeNil()) expectOptionsMatch(opts, test.Options(test.OptionsFields{ @@ -164,6 +167,7 @@ var _ = Describe("Options", func() { FeatureGates: test.FeatureGates{ SpotToSpotConsolidation: lo.ToPtr(true), }, + SupportKruise: lo.ToPtr(true), })) }) @@ -185,6 +189,7 @@ var _ = Describe("Options", func() { os.Setenv("BATCH_MAX_DURATION", "5s") os.Setenv("BATCH_IDLE_DURATION", "5s") os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true") + os.Setenv("SUPPORT_KRUISE", "true") fs = &options.FlagSet{ FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError), } @@ -211,6 +216,7 @@ var _ = Describe("Options", func() { FeatureGates: test.FeatureGates{ SpotToSpotConsolidation: lo.ToPtr(true), }, + SupportKruise: lo.ToPtr(true), })) }) @@ -228,6 +234,7 @@ var _ = Describe("Options", func() { os.Setenv("BATCH_MAX_DURATION", "5s") os.Setenv("BATCH_IDLE_DURATION", "5s") os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true") + os.Setenv("SUPPORT_KRUISE", "true") fs = &options.FlagSet{ FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError), } @@ -259,6 +266,7 @@ var _ = Describe("Options", func() { FeatureGates: test.FeatureGates{ SpotToSpotConsolidation: lo.ToPtr(true), }, + SupportKruise: lo.ToPtr(true), })) }) }) @@ -319,4 +327,5 @@ func expectOptionsMatch(optsA, optsB *options.Options) { Expect(optsA.BatchMaxDuration).To(Equal(optsB.BatchMaxDuration)) Expect(optsA.BatchIdleDuration).To(Equal(optsB.BatchIdleDuration)) Expect(optsA.FeatureGates.SpotToSpotConsolidation).To(Equal(optsB.FeatureGates.SpotToSpotConsolidation)) + Expect(optsA.SupportKruise).To(Equal(optsB.SupportKruise)) } diff --git a/pkg/test/options.go b/pkg/test/options.go index 0c0a14f8f9..8af6bb5da9 100644 --- a/pkg/test/options.go +++ b/pkg/test/options.go @@ -45,6 +45,7 @@ type OptionsFields struct { BatchMaxDuration *time.Duration BatchIdleDuration *time.Duration FeatureGates FeatureGates + SupportKruise *bool } type FeatureGates struct { @@ -79,5 +80,6 @@ func Options(overrides ...OptionsFields) *options.Options { FeatureGates: options.FeatureGates{ SpotToSpotConsolidation: lo.FromPtrOr(opts.FeatureGates.SpotToSpotConsolidation, false), }, + SupportKruise: lo.FromPtrOr(opts.SupportKruise, false), } }