Skip to content

Commit

Permalink
add condition for pods and event for sidecarset when detecting not up…
Browse files Browse the repository at this point in the history
…gradable pod (openkruise#1272)

Signed-off-by: MarkLux <[email protected]>
  • Loading branch information
MarkLux committed Jun 27, 2023
1 parent 6d25366 commit a7b0453
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 25 deletions.
68 changes: 65 additions & 3 deletions pkg/controller/sidecarset/sidecarset_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
utilclient "github.com/openkruise/kruise/pkg/util/client"
historyutil "github.com/openkruise/kruise/pkg/util/history"
webhookutil "github.com/openkruise/kruise/pkg/webhook/util"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"

apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -48,6 +49,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
SidecarSetUpgradable corev1.PodConditionType = "SidecarSetUpgradable"
)

type Processor struct {
Client client.Client
recorder record.EventRecorder
Expand Down Expand Up @@ -155,7 +160,18 @@ func (p *Processor) UpdateSidecarSet(sidecarSet *appsv1alpha1.SidecarSet) (recon
func (p *Processor) updatePods(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) error {
sidecarset := control.GetSidecarset()
// compute next updated pods based on the sidecarset upgrade strategy
upgradePods := NewStrategy().GetNextUpgradePods(control, pods)
upgradePods, notUpgradablePods := NewStrategy().GetNextUpgradePods(control, pods)
for _, pod := range notUpgradablePods {
if err := p.updateNotUpgradablePodCondition(sidecarset, pod); err != nil {
klog.Errorf("update NotUpgradable PodCondition error, s:%s, pod:%s, err:%v", sidecarset.Name, pod.Name, err)
return err
}
sidecarcontrol.UpdateExpectations.ExpectUpdated(sidecarset.Name, sidecarcontrol.GetSidecarSetRevision(sidecarset), pod)
}
if len(notUpgradablePods) > 0 {
p.recorder.Eventf(sidecarset, corev1.EventTypeNormal, "NotUpgradablePods", "SidecarSet in-place update detected %d not upgradable pod(s) in this round, will skip them.", len(notUpgradablePods))
}

if len(upgradePods) == 0 {
klog.V(3).Infof("sidecarSet next update is nil, skip this round, name: %s", sidecarset.Name)
return nil
Expand Down Expand Up @@ -197,10 +213,26 @@ func (p *Processor) updatePodSidecarAndHash(control sidecarcontrol.SidecarContro
klog.Errorf("sidecarSet(%s) patch pod(%s/%s) metadata failed: %s", sidecarSet.Name, podClone.Namespace, podClone.Name, err.Error())
return err
}
//update pod in store
// update pod in store
return p.Client.Update(context.TODO(), podClone)
})
return err

if err != nil {
return err
}

// patch SidecarSetUpgradable condition
if conditionChanged := podutil.UpdatePodCondition(&podClone.Status, &corev1.PodCondition{
Type: SidecarSetUpgradable,
Status: corev1.ConditionTrue,
}); !conditionChanged {
// reduce unnecessary patch.
return nil
}

_, condition := podutil.GetPodCondition(&podClone.Status, SidecarSetUpgradable)
body := fmt.Sprintf(`{"status":%s}`, util.DumpJSON(condition))
return p.Client.Status().Patch(context.TODO(), podClone, client.RawPatch(types.MergePatchType, []byte(body)))
}

func (p *Processor) listMatchedSidecarSets(pod *corev1.Pod) string {
Expand Down Expand Up @@ -602,3 +634,33 @@ func inconsistentStatus(sidecarSet *appsv1alpha1.SidecarSet, status *appsv1alpha
func isSidecarSetUpdateFinish(status *appsv1alpha1.SidecarSetStatus) bool {
return status.UpdatedPods >= status.MatchedPods
}

func (p *Processor) updateNotUpgradablePodCondition(sidecarset *appsv1alpha1.SidecarSet, notUpgradablePod *corev1.Pod) error {

podClone := &corev1.Pod{}

err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := p.Client.Get(context.TODO(), types.NamespacedName{Namespace: notUpgradablePod.Namespace, Name: notUpgradablePod.Name}, podClone); err != nil {
klog.Errorf("error getting pod %s/%s from client", notUpgradablePod.Namespace, notUpgradablePod.Name)
return err
}
// update pod condition
conditionUpdateResult := podutil.UpdatePodCondition(&podClone.Status, &corev1.PodCondition{
Type: SidecarSetUpgradable,
Status: corev1.ConditionFalse,
Reason: "UpdateImmutableField",
Message: "Pod's sidecar set is not upgradable due to changes out of image field",
})
if !conditionUpdateResult {
return nil
}
err := p.Client.Status().Update(context.TODO(), podClone)
if err != nil {
return err
}
klog.V(3).Infof("sidecarSet(%s) updated pods(%s/%s) condition(%s=%s) success", sidecarset.Name, notUpgradablePod.Namespace, notUpgradablePod.Name, SidecarSetUpgradable, corev1.ConditionFalse)
return nil
})

return err
}
22 changes: 17 additions & 5 deletions pkg/controller/sidecarset/sidecarset_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type Strategy interface {
//2. Sort Pods with default sequence
//3. sort waitUpdateIndexes based on the scatter rules
//4. calculate max count of pods can update with maxUnavailable
GetNextUpgradePods(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) []*corev1.Pod
//5. also return the pods that are not upgradable
GetNextUpgradePods(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) (upgradePods []*corev1.Pod, notUpgradablePods []*corev1.Pod)
}

type spreadingStrategy struct{}
Expand All @@ -35,10 +36,12 @@ func NewStrategy() Strategy {
return globalSpreadingStrategy
}

func (p *spreadingStrategy) GetNextUpgradePods(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) (upgradePods []*corev1.Pod) {
func (p *spreadingStrategy) GetNextUpgradePods(control sidecarcontrol.SidecarControl, pods []*corev1.Pod) (upgradePods []*corev1.Pod, notUpgradablePods []*corev1.Pod) {
sidecarset := control.GetSidecarset()
// wait to upgrade pod index
var waitUpgradedIndexes []int
// because SidecarSet in-place update only support upgrading Image, if other fields are changed they will not be upgraded.
var notUpgradableIndexes []int
strategy := sidecarset.Spec.UpdateStrategy

// If selector is not nil, check whether the pods is selected to upgrade
Expand Down Expand Up @@ -68,12 +71,17 @@ func (p *spreadingStrategy) GetNextUpgradePods(control sidecarcontrol.SidecarCon
// * It is to determine whether there are other fields that have been modified for pod.
for index, pod := range pods {
isUpdated := sidecarcontrol.IsPodSidecarUpdated(sidecarset, pod)
if !isUpdated && isSelected(pod) && control.IsSidecarSetUpgradable(pod) {
waitUpgradedIndexes = append(waitUpgradedIndexes, index)
if !isUpdated && isSelected(pod) {
if control.IsSidecarSetUpgradable(pod) {
waitUpgradedIndexes = append(waitUpgradedIndexes, index)
} else if sidecarcontrol.GetPodSidecarSetWithoutImageRevision(sidecarset.Name, pod) != sidecarcontrol.GetSidecarSetWithoutImageRevision(sidecarset) {
// only image field can be in-place updated, if other fields changed, mark pod as not upgradable
notUpgradableIndexes = append(notUpgradableIndexes, index)
}
}
}

klog.V(3).Infof("sidecarSet(%s) matchedPods(%d) waitUpdated(%d)", sidecarset.Name, len(pods), len(waitUpgradedIndexes))
klog.V(3).Infof("sidecarSet(%s) matchedPods(%d) waitUpdated(%d) notUpgradable(%d)", sidecarset.Name, len(pods), len(waitUpgradedIndexes), len(notUpgradableIndexes))
//2. sort Pods with default sequence and scatter
waitUpgradedIndexes = SortUpdateIndexes(strategy, pods, waitUpgradedIndexes)

Expand All @@ -87,6 +95,10 @@ func (p *spreadingStrategy) GetNextUpgradePods(control sidecarcontrol.SidecarCon
for _, idx := range waitUpgradedIndexes {
upgradePods = append(upgradePods, pods[idx])
}
// 5. pods that are not upgradable will not be skipped in the following process
for _, idx := range notUpgradableIndexes {
notUpgradablePods = append(notUpgradablePods, pods[idx])
}
return
}

Expand Down
73 changes: 56 additions & 17 deletions pkg/controller/sidecarset/sidecarset_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,19 @@ func factoryPods(count, upgraded, upgradedAndReady int) []*corev1.Pod {
}

func factorySidecarSet() *appsv1alpha1.SidecarSet {
return createFactorySidecarSet("bbb", "without-aaa")
}

func factorySidecarSetNotUpgradable() *appsv1alpha1.SidecarSet {
return createFactorySidecarSet("bbb", "without-bbb")
}

func createFactorySidecarSet(sidecarsetHash string, sidecarsetHashWithoutImage string) *appsv1alpha1.SidecarSet {
sidecarSet := &appsv1alpha1.SidecarSet{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
sidecarcontrol.SidecarSetHashAnnotation: "bbb",
sidecarcontrol.SidecarSetHashWithoutImageAnnotation: "without-aaa",
sidecarcontrol.SidecarSetHashAnnotation: sidecarsetHash,
sidecarcontrol.SidecarSetHashWithoutImageAnnotation: sidecarsetHashWithoutImage,
},
Name: "test-sidecarset",
Labels: map[string]string{},
Expand Down Expand Up @@ -147,10 +155,11 @@ func TestGetNextUpgradePods(t *testing.T) {

func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySidecar FactorySidecarSet) {
cases := []struct {
name string
getPods func() []*corev1.Pod
getSidecarset func() *appsv1alpha1.SidecarSet
exceptNeedUpgradeCount int
name string
getPods func() []*corev1.Pod
getSidecarset func() *appsv1alpha1.SidecarSet
exceptNeedUpgradeCount int
exceptNotUpgradableCount int
}{
{
name: "only maxUnavailable(int=10), and pods(count=100, upgraded=30, upgradedAndReady=26)",
Expand All @@ -166,7 +175,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 6,
exceptNeedUpgradeCount: 6,
exceptNotUpgradableCount: 0,
},
{
name: "only maxUnavailable(string=10%), and pods(count=1000, upgraded=300, upgradedAndReady=260)",
Expand All @@ -182,7 +192,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 60,
exceptNeedUpgradeCount: 60,
exceptNotUpgradableCount: 0,
},
{
name: "only maxUnavailable(string=5%), and pods(count=1000, upgraded=300, upgradedAndReady=250)",
Expand All @@ -198,7 +209,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 0,
exceptNeedUpgradeCount: 0,
exceptNotUpgradableCount: 0,
},
{
name: "only maxUnavailable(int=100), and pods(count=100, upgraded=30, upgradedAndReady=27)",
Expand All @@ -214,7 +226,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 70,
exceptNeedUpgradeCount: 70,
exceptNotUpgradableCount: 0,
},
{
name: "partition(int=180) maxUnavailable(int=100), and pods(count=1000, upgraded=800, upgradedAndReady=760)",
Expand All @@ -234,7 +247,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 20,
exceptNeedUpgradeCount: 20,
exceptNotUpgradableCount: 0,
},
{
name: "partition(int=100) maxUnavailable(int=100), and pods(count=1000, upgraded=800, upgradedAndReady=760)",
Expand All @@ -254,7 +268,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 60,
exceptNeedUpgradeCount: 60,
exceptNotUpgradableCount: 0,
},
{
name: "partition(string=18%) maxUnavailable(int=100), and pods(count=1000, upgraded=800, upgradedAndReady=760)",
Expand All @@ -274,7 +289,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 20,
exceptNeedUpgradeCount: 20,
exceptNotUpgradableCount: 0,
},
{
name: "partition(string=10%) maxUnavailable(int=100), and pods(count=1000, upgraded=800, upgradedAndReady=760)",
Expand All @@ -294,7 +310,8 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 60,
exceptNeedUpgradeCount: 60,
exceptNotUpgradableCount: 0,
},
{
name: "selector(app=test, count=30) maxUnavailable(int=100), and pods(count=1000, upgraded=0, upgradedAndReady=0)",
Expand All @@ -316,18 +333,40 @@ func testGetNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySideca
}
return sidecarSet
},
exceptNeedUpgradeCount: 30,
exceptNeedUpgradeCount: 30,
exceptNotUpgradableCount: 0,
},
{
name: "not upgradable sidecarset, maxUnavailable(int=100), and pods(count=100, upgraded=0, upgradedAndReady=0)",
getPods: func() []*corev1.Pod {
pods := factoryPods(100, 0, 0)
return Random(pods)
},
getSidecarset: func() *appsv1alpha1.SidecarSet {
sidecarSet := factorySidecarSetNotUpgradable()
sidecarSet.Spec.UpdateStrategy.MaxUnavailable = &intstr.IntOrString{
Type: intstr.Int,
IntVal: 100,
}

return sidecarSet
},
exceptNeedUpgradeCount: 0,
exceptNotUpgradableCount: 100,
},
}
strategy := NewStrategy()
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
control := sidecarcontrol.New(cs.getSidecarset())
pods := cs.getPods()
upgradePods := strategy.GetNextUpgradePods(control, pods)
upgradePods, notUpgradablePods := strategy.GetNextUpgradePods(control, pods)
if cs.exceptNeedUpgradeCount != len(upgradePods) {
t.Fatalf("except NeedUpgradeCount(%d), but get value(%d)", cs.exceptNeedUpgradeCount, len(upgradePods))
}
if cs.exceptNotUpgradableCount != len(notUpgradablePods) {
t.Fatalf("except NotUpgradableCount(%d), but get value(%d)", cs.exceptNotUpgradableCount, len(notUpgradablePods))
}
})
}
}
Expand Down Expand Up @@ -524,7 +563,7 @@ func testSortNextUpgradePods(t *testing.T, factoryPods FactoryPods, factorySidec
t.Run(cs.name, func(t *testing.T) {
control := sidecarcontrol.New(cs.getSidecarset())
pods := cs.getPods()
injectedPods := strategy.GetNextUpgradePods(control, pods)
injectedPods, _ := strategy.GetNextUpgradePods(control, pods)
if len(cs.exceptNextUpgradePods) != len(injectedPods) {
t.Fatalf("except NeedUpgradeCount(%d), but get value(%d)", len(cs.exceptNextUpgradePods), len(injectedPods))
}
Expand Down
4 changes: 4 additions & 0 deletions test/e2e/apps/sidecarset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/controller/sidecarset"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/configuration"
"github.com/openkruise/kruise/test/e2e/framework"
Expand All @@ -39,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/history"
utilpointer "k8s.io/utils/pointer"
)
Expand Down Expand Up @@ -739,6 +741,8 @@ var _ = SIGDescribe("SidecarSet", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
for _, pod := range pods {
gomega.Expect(pod.Spec.Containers[0].Image).Should(gomega.Equal(BusyboxImage))
_, sidecarSetUpgradable := podutil.GetPodCondition(&pod.Status, sidecarset.SidecarSetUpgradable)
gomega.Expect(sidecarSetUpgradable.Status).Should(gomega.Equal(corev1.ConditionTrue))
}
ginkgo.By(fmt.Sprintf("sidecarSet upgrade cold sidecar container image done"))
})
Expand Down

0 comments on commit a7b0453

Please sign in to comment.