From caca0f36d1e891928a0e96a9231873c221aa0cc4 Mon Sep 17 00:00:00 2001 From: yunbo Date: Wed, 26 Jun 2024 17:59:24 +0800 Subject: [PATCH] traffic: do traffic routing improve Signed-off-by: yunbo traffic: init traffic check int-typed replicas Signed-off-by: yunbo others: add traffic setting check for partition step to webhook Signed-off-by: yunbo --- api/v1beta1/rollout_types.go | 30 ++++++++ pkg/controller/rollout/rollout_canary.go | 70 ++++++++++++++++++- pkg/controller/rollout/rollout_canary_test.go | 6 ++ pkg/trafficrouting/manager.go | 20 +++++- pkg/trafficrouting/manager_test.go | 18 ++++- .../rollout_create_update_handler.go | 30 +++++++- 6 files changed, 165 insertions(+), 9 deletions(-) diff --git a/api/v1beta1/rollout_types.go b/api/v1beta1/rollout_types.go index 7b8ff331..1c350df1 100644 --- a/api/v1beta1/rollout_types.go +++ b/api/v1beta1/rollout_types.go @@ -17,6 +17,9 @@ limitations under the License. package v1beta1 import ( + "reflect" + + apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -41,6 +44,14 @@ const ( // RollbackInBatchAnnotation is set to rollout annotations. // RollbackInBatchAnnotation allow use disable quick rollback, and will roll back in batch style. RollbackInBatchAnnotation = "rollouts.kruise.io/rollback-in-batch" + + // PartitionReplicasLimitWithTraffic is set to rollout annotations. + // PartitionReplicasLimitWithTraffic represents the maximum percentage of replicas + // allowed for a step of partition-style release, with traffic/matches specified. + // If a step is configured with a number of replicas exceeding this percentage, the traffic strategy for that step + // must not be specified. If this rule is violated, the Rollout webhook will block the creation or modification of the Rollout. + // The default limit is set to 30%. + PartitionReplicasLimitWithTraffic = "rollouts.kruise.io/partition-replicas-limit" ) // RolloutSpec defines the desired state of Rollout @@ -92,6 +103,25 @@ func (r *RolloutStrategy) GetRollingStyle() RollingStyleType { return PartitionRollingStyle } +// using single field EnableExtraWorkloadForCanary to distinguish partition-style from canary-style +// is not enough, for example, a v1alaph1 Rollout can be converted to v1beta1 Rollout +// with EnableExtraWorkloadForCanary set as true, even the objectRef is cloneset (which doesn't support canary release) +func IsRealPartition(rollout *Rollout) bool { + if rollout.Spec.Strategy.IsEmptyRelease() { + return false + } + estimation := rollout.Spec.Strategy.GetRollingStyle() + if estimation == BlueGreenRollingStyle { + return false + } + targetRef := rollout.Spec.WorkloadRef + if targetRef.APIVersion == apps.SchemeGroupVersion.String() && targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() && + estimation == CanaryRollingStyle { + return false + } + return true +} + // r.GetRollingStyle() == BlueGreenRollingStyle func (r *RolloutStrategy) IsBlueGreenRelease() bool { return r.GetRollingStyle() == BlueGreenRollingStyle diff --git a/pkg/controller/rollout/rollout_canary.go b/pkg/controller/rollout/rollout_canary.go index c814a750..c5cc187a 100644 --- a/pkg/controller/rollout/rollout_canary.go +++ b/pkg/controller/rollout/rollout_canary.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -94,10 +95,61 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { switch canaryStatus.CurrentStepState { // before CanaryStepStateUpgrade, handle some special cases, to prevent traffic loss case v1beta1.CanaryStepStateInit: - // placeholder for the later traffic modification Pull Request - canaryStatus.NextStepIndex = util.NextBatchIndex(c.Rollout, canaryStatus.CurrentStepIndex) + klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateInit) + tr := newTrafficRoutingContext(c) + if currentStep.Traffic == nil && len(currentStep.Matches) == 0 { + canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade + klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, + canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState) + return nil + } + + /* + The following check is used to bypass the bug in ingress-nginx controller https://github.com/kubernetes/ingress-nginx/issues/9635 + for partition release, if expected replicas of currentStep isn't less than the workload sepc.replicas, + we can assume that all traffic should be routed to canary pods + */ + expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true) + if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) { + klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name) + done, err := m.trafficRoutingManager.RestoreStableService(tr) + if err != nil { + return err + } else if !done { + expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second) + c.RecheckTime = &expectedTime + return nil + } + } + + /* + The following check is used to solve scenario like this: + steps: + - replicas: 1 # frist batch + matches: + - headers: + - name: user-agent + type: Exact + value: pc + we should patch selector to stable Service before CanaryStepStateUpgrade when in the first batch + otherwise, some traffic will loss between CanaryStepStateUpgrade and CanaryStepStateTrafficRouting + */ + if canaryStatus.CurrentStepIndex == 1 { + klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name) + done, err := m.trafficRoutingManager.PatchStableService(tr) + if err != nil { + return err + } else if !done { + expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second) + c.RecheckTime = &expectedTime + return nil + } + } + + canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade - fallthrough + klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, + canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState) case v1beta1.CanaryStepStateUpgrade: klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateUpgrade) @@ -106,6 +158,12 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { return err } else if done { canaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting + // if it is partition style and the last batch, we can skip the CanaryStepStateTrafficRouting step + // to bypass the bug mentioned above + expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true) + if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) { + canaryStatus.CurrentStepState = v1beta1.CanaryStepStateMetricsAnalysis + } canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState) @@ -217,6 +275,12 @@ func (m *canaryReleaseManager) doCanaryPaused(c *RolloutContext) (bool, error) { canaryStatus := c.NewStatus.CanaryStatus currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1] steps := len(c.Rollout.Spec.Strategy.Canary.Steps) + // If it is the last step, and 100% of pods, then return true + if int32(steps) == canaryStatus.CurrentStepIndex { + if currentStep.Replicas != nil && currentStep.Replicas.StrVal == "100%" { + return true, nil + } + } cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing) // need manual confirmation if currentStep.Pause.Duration == nil { diff --git a/pkg/controller/rollout/rollout_canary_test.go b/pkg/controller/rollout/rollout_canary_test.go index aebcfeef..cbefcdc1 100644 --- a/pkg/controller/rollout/rollout_canary_test.go +++ b/pkg/controller/rollout/rollout_canary_test.go @@ -63,6 +63,7 @@ func TestRunCanary(t *testing.T) { obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547" obj.Status.CanaryStatus.CurrentStepIndex = 1 + obj.Status.CanaryStatus.NextStepIndex = 2 obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -76,6 +77,7 @@ func TestRunCanary(t *testing.T) { s.CanaryStatus.StableRevision = "pod-template-hash-v1" s.CanaryStatus.CanaryRevision = "6f8cc56547" s.CanaryStatus.CurrentStepIndex = 1 + s.CanaryStatus.NextStepIndex = 2 s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -139,6 +141,7 @@ func TestRunCanary(t *testing.T) { obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547" obj.Status.CanaryStatus.CurrentStepIndex = 1 + obj.Status.CanaryStatus.NextStepIndex = 2 obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -185,6 +188,7 @@ func TestRunCanary(t *testing.T) { s.CanaryStatus.CanaryReplicas = 1 s.CanaryStatus.CanaryReadyReplicas = 1 s.CanaryStatus.CurrentStepIndex = 1 + s.CanaryStatus.NextStepIndex = 2 s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -290,6 +294,7 @@ func TestRunCanaryPaused(t *testing.T) { obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547" obj.Status.CanaryStatus.CurrentStepIndex = 3 + obj.Status.CanaryStatus.NextStepIndex = 4 obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2" obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused return obj @@ -301,6 +306,7 @@ func TestRunCanaryPaused(t *testing.T) { obj.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.CanaryStatus.CanaryRevision = "6f8cc56547" obj.CanaryStatus.CurrentStepIndex = 3 + obj.CanaryStatus.NextStepIndex = 4 obj.CanaryStatus.PodTemplateHash = "pod-template-hash-v2" obj.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused return obj diff --git a/pkg/trafficrouting/manager.go b/pkg/trafficrouting/manager.go index 0907f732..58a0ec3a 100644 --- a/pkg/trafficrouting/manager.go +++ b/pkg/trafficrouting/manager.go @@ -126,7 +126,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { if c.LastUpdateTime != nil { // wait seconds for network providers to consume the modification about workload, service and so on. if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) { - klog.Infof("%s update workload or service selector, and wait 3 seconds", c.Key) + klog.Infof("%s update workload or service selector, and wait %d seconds", c.Key, trafficRouting.GracePeriodSeconds) return false, nil } } @@ -139,6 +139,15 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { klog.Warningf("%s stableRevision or podTemplateHash can not be empty, and wait a moment", c.Key) return false, nil } + /* + Why is the serviceModified flag moved here? + The rationale behind this is that when we create a canary Service, it is already instantiated with the appropriate selector. + If the stable Service also has had selector patched previously, the logic will proceed to the EnsureRoutes function uninterrupted. + This actually means: Creating a new Service and updating the gateway resource occurs within a single reconciliation loop, + which might introduce instability. + Therefore, by moving the serviceModified flag here, we introduce a grace period between these two operations to ensure stability. + */ + serviceModified := false // fetch canary service err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: canaryServiceName}, canaryService) if err != nil && !errors.IsNotFound(err) { @@ -149,9 +158,9 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { if err != nil { return false, err } + serviceModified = true } - serviceModified := false // patch canary service to only select the canary pods if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision { body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.CanaryRevision) @@ -181,6 +190,13 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { if serviceModified { return false, nil } + } else if c.DisableGenerateCanaryService { + // if DisableGenerateCanaryService is on, selector is not needed, we should remove it + // it's necessary because selector probably has been patched in CanaryStepStateInit step within runCanary function + verify, err := m.restoreStableService(c) + if err != nil || !verify { + return false, err + } } // new network provider, ingress or gateway diff --git a/pkg/trafficrouting/manager_test.go b/pkg/trafficrouting/manager_test.go index 57546251..f8f6bf06 100644 --- a/pkg/trafficrouting/manager_test.go +++ b/pkg/trafficrouting/manager_test.go @@ -775,6 +775,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { getRollout: func() (*v1beta1.Rollout, *util.Workload) { obj := demoIstioRollout.DeepCopy() obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)} + obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1 return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, expectUnstructureds: func() []*unstructured.Unstructured { @@ -804,7 +805,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { objects = append(objects, u) return objects }, - // Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment expectDone: true, }, { @@ -834,6 +834,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { obj := demoIstioRollout.DeepCopy() // set DisableGenerateCanaryService as true obj.Spec.Strategy.Canary.DisableGenerateCanaryService = true + obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1 obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, @@ -864,7 +865,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { objects = append(objects, u) return objects }, - // Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment expectDone: true, }, } @@ -898,11 +898,22 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { if err != nil { t.Fatalf("InitializeTrafficRouting failed: %s", err) } + // now we need to wait at least 2x grace time to keep traffic stable: + // create the canary service -> grace time -> update the gateway -> grace time + // therefore, before both grace times are over, DoTrafficRouting should return false + // firstly, create the canary Service, before the grace time over, return false _, err = manager.DoTrafficRouting(c) if err != nil { t.Fatalf("DoTrafficRouting failed: %s", err) } - // may return false due to in the course of doing trafficRouting, let's do it again + time.Sleep(1 * time.Second) + // secondly, update the gateway, before the grace time over, return false + _, err = manager.DoTrafficRouting(c) + if err != nil { + t.Fatalf("DoTrafficRouting failed: %s", err) + } + time.Sleep(1 * time.Second) + // now, both grace times are over, it should be true done, err := manager.DoTrafficRouting(c) if err != nil { t.Fatalf("DoTrafficRouting failed: %s", err) @@ -986,6 +997,7 @@ func TestFinalisingTrafficRouting(t *testing.T) { obj := demoRollout.DeepCopy() obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted obj.Status.CanaryStatus.CurrentStepIndex = 4 + obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(time.Hour)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, onlyRestoreStableService: true, diff --git a/pkg/webhook/rollout/validating/rollout_create_update_handler.go b/pkg/webhook/rollout/validating/rollout_create_update_handler.go index 08bfacf2..7e5b883e 100644 --- a/pkg/webhook/rollout/validating/rollout_create_update_handler.go +++ b/pkg/webhook/rollout/validating/rollout_create_update_handler.go @@ -49,6 +49,12 @@ type RolloutCreateUpdateHandler struct { var _ admission.Handler = &RolloutCreateUpdateHandler{} +// default is 30% +const defaultReplicasLimitWithTraffic = 30 + +var replicasLimitWithTraffic = defaultReplicasLimitWithTraffic +var isPartitionStyle = false + // Handle handles admission requests. func (h *RolloutCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response { switch req.Operation { @@ -155,6 +161,14 @@ func (h *RolloutCreateUpdateHandler) validateRolloutUpdate(oldObj, newObj *appsv } func (h *RolloutCreateUpdateHandler) validateRollout(rollout *appsv1beta1.Rollout) field.ErrorList { + isPartitionStyle = appsv1beta1.IsRealPartition(rollout) + replicasLimitWithTraffic = defaultReplicasLimitWithTraffic // reset to default value + if limit, ok := rollout.Annotations[appsv1beta1.PartitionReplicasLimitWithTraffic]; ok { + strVal := intstr.FromString(limit) + if val, err := intstr.GetScaledValueFromIntOrPercent(&strVal, 100, true); err == nil && 0 < val && val <= 100 { + replicasLimitWithTraffic = val + } + } errList := validateRolloutSpec(rollout, field.NewPath("Spec")) errList = append(errList, h.validateRolloutConflict(rollout, field.NewPath("Conflict Checker"))...) return errList @@ -213,6 +227,7 @@ type TrafficRule string const ( TrafficRuleCanary TrafficRule = "Canary" + TrafficRulePartition TrafficRule = "Partition" TrafficRuleBlueGreen TrafficRule = "BlueGreen" NoTraffic TrafficRule = "NoTraffic" ) @@ -221,6 +236,9 @@ func validateRolloutSpecCanaryStrategy(canary *appsv1beta1.CanaryStrategy, fldPa trafficRule := NoTraffic if len(canary.TrafficRoutings) > 0 { trafficRule = TrafficRuleCanary + if isPartitionStyle { + trafficRule = TrafficRulePartition + } } errList := validateRolloutSpecCanarySteps(canary.Steps, fldPath.Child("Steps"), trafficRule) if len(canary.TrafficRoutings) > 1 { @@ -293,7 +311,17 @@ func validateRolloutSpecCanarySteps(steps []appsv1beta1.CanaryStep, fldPath *fie return field.ErrorList{field.Invalid(fldPath.Index(i).Child("Replicas"), s.Replicas, `replicas must be positive number, or a percentage with "0%" < canaryReplicas <= "100%"`)} } - if trafficRule == NoTraffic || s.Traffic == nil { + if trafficRule == NoTraffic || (s.Traffic == nil && len(s.Matches) == 0) { + continue + } + // traffic strategy is configured for current step && replicas is percentage + if trafficRule == TrafficRulePartition && IsPercentageCanaryReplicasType(s.Replicas) { + currCanaryReplicas, _ := intstr.GetScaledValueFromIntOrPercent(s.Replicas, 100, true) + if currCanaryReplicas > replicasLimitWithTraffic { + return field.ErrorList{field.Invalid(fldPath.Index(i).Child("steps"), steps, `For patition style rollout: step[x].replicas must not greater than replicasLimitWithTraffic if traffic or matches specified`)} + } + } + if s.Traffic == nil { continue } is := intstr.FromString(*s.Traffic)