From d1060b9db433d8ddab51f1ad1427fe539c0aa68a Mon Sep 17 00:00:00 2001 From: LiuLiqi <837397251@qq.com> Date: Wed, 25 Oct 2023 09:23:10 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20event=E6=96=B0?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=EF=BC=8C=E6=B7=BB=E5=8A=A0=20=E8=AE=A1?= =?UTF-8?q?=E7=AE=97=20=E5=9F=BA=E5=BA=A7=20Pod=E4=B8=8A=20=E6=9C=80?= =?UTF-8?q?=E5=A4=A7/=E6=9C=80=E5=B0=8F/=E5=B9=B3=E5=9D=87=20=E5=B7=B2?= =?UTF-8?q?=E8=A2=AB=E8=B0=83=E5=BA=A6=E6=A8=A1=E5=9D=97=E5=AE=9E=E4=BE=8B?= =?UTF-8?q?=E4=B8=AA=E6=95=B0=20=E4=BF=AE=E5=A4=8DModuleDeployment?= =?UTF-8?q?=E5=8D=95=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constants/label/well_known_labels.go | 8 ++ .../controller/moduledeployment_controller.go | 11 +- .../moduledeployment_controller_suit_test.go | 23 ++++ .../controller/modulereplicaset_controller.go | 1 + module-controller/internal/event/event.go | 13 +- .../internal/event/event_test.go | 6 + ...modulereplicaset_replicas_changed_event.go | 22 +++ ...dulereplicaset_replicas_changed_handler.go | 96 ++++++++++++++ ...eplicaset_replicas_changed_handler_test.go | 16 +++ .../internal/utils/count_on_base_utils.go | 125 ++++++++++++++++++ .../utils/count_on_base_utils_test.go | 32 +++++ 11 files changed, 341 insertions(+), 12 deletions(-) create mode 100644 module-controller/internal/event/modulereplicaset_replicas_changed_event.go create mode 100644 module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go create mode 100644 module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go create mode 100644 module-controller/internal/utils/count_on_base_utils.go create mode 100644 module-controller/internal/utils/count_on_base_utils_test.go diff --git a/module-controller/internal/constants/label/well_known_labels.go b/module-controller/internal/constants/label/well_known_labels.go index 7e8abfe61..53c7fcf82 100644 --- a/module-controller/internal/constants/label/well_known_labels.go +++ b/module-controller/internal/constants/label/well_known_labels.go @@ -1,6 +1,8 @@ package label const ( + DeploymentNameLabel = "serverless.alipay.com/deployment-name" + ModuleNameLabel = "serverless.alipay.com/module-name" ModuleVersionLabel = "serverless.alipay.com/module-version" @@ -21,6 +23,12 @@ const ( ModuleInstanceCount = "serverless.alipay.com/module-instance-count" + MaxModuleInstanceCount = "serverless.alipay.com/max-module-instance-count-on-base" + + MinModuleInstanceCount = "serverless.alipay.com/min-module-instance-count-on-base" + + AverageModuleInstanceCount = "serverless.alipay.com/average-module-instance-count-on-base" + ModuleSchedulingStrategy = "serverless.alipay.com/module-scheduling-strategy" MaxModuleCount = "serverless.alipay.com/max-module-count" diff --git a/module-controller/internal/controller/moduledeployment_controller.go b/module-controller/internal/controller/moduledeployment_controller.go index f0649bafc..a01d75e9c 100644 --- a/module-controller/internal/controller/moduledeployment_controller.go +++ b/module-controller/internal/controller/moduledeployment_controller.go @@ -113,9 +113,8 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req switch releaseStatus.Progress { case v1alpha1.ModuleDeploymentReleaseProgressInit: handleInitModuleDeployment(moduleDeployment, newRS) - if err := r.Status().Update(ctx, moduleDeployment); err != nil { - return ctrl.Result{}, err - } + err := r.Status().Update(ctx, moduleDeployment) + return ctrl.Result{}, err case v1alpha1.ModuleDeploymentReleaseProgressExecuting: return r.updateModuleReplicaSet(ctx, moduleDeployment, newRS) case v1alpha1.ModuleDeploymentReleaseProgressCompleted: @@ -141,12 +140,12 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req if err := r.Status().Update(ctx, moduleDeployment); err != nil { return ctrl.Result{}, err } + return ctrl.Result{}, nil case v1alpha1.ModuleDeploymentReleaseProgressPaused: if !moduleDeployment.Spec.Pause && time.Since(moduleDeployment.Status.ReleaseStatus.NextReconcileTime.Time) >= 0 { moduleDeployment.Status.ReleaseStatus.Progress = v1alpha1.ModuleDeploymentReleaseProgressExecuting - if err := r.Status().Update(ctx, moduleDeployment); err != nil { - return ctrl.Result{}, err - } + err := r.Status().Update(ctx, moduleDeployment) + return ctrl.Result{}, err } } diff --git a/module-controller/internal/controller/moduledeployment_controller_suit_test.go b/module-controller/internal/controller/moduledeployment_controller_suit_test.go index 51259a1c0..c4bab9cfb 100644 --- a/module-controller/internal/controller/moduledeployment_controller_suit_test.go +++ b/module-controller/internal/controller/moduledeployment_controller_suit_test.go @@ -104,6 +104,12 @@ var _ = Describe("ModuleDeployment Controller", func() { }) }) + Context("wait moduleDeployment Completed", func() { + It("wait moduleDeployment Completed", func() { + waitModuleDeploymentCompleted(moduleDeploymentName, namespace) + }) + }) + Context("update replicas for module deployment", func() { It("update module replicas", func() { key := types.NamespacedName{ @@ -123,6 +129,8 @@ var _ = Describe("ModuleDeployment Controller", func() { } }, timeout, interval).Should(BeTrue()) + waitModuleDeploymentCompleted(moduleDeploymentName, namespace) + Eventually(func() bool { set := map[string]string{ label.ModuleDeploymentLabel: moduleDeployment.Name, @@ -330,3 +338,18 @@ func checkModuleDeploymentReplicas(nn types.NamespacedName, replicas int32) bool newRS.Status.Replicas == newRS.Spec.Replicas && newRS.Status.Replicas == replicas } + +func waitModuleDeploymentCompleted(moduleDeploymentName string, namespace string) { + key := types.NamespacedName{ + Name: moduleDeploymentName, + Namespace: namespace, + } + newModuleDeployment := &v1alpha1.ModuleDeployment{} + Expect(k8sClient.Get(context.TODO(), key, newModuleDeployment)).Should(Succeed()) + progress := newModuleDeployment.Status.ReleaseStatus.Progress + if progress == v1alpha1.ModuleDeploymentReleaseProgressCompleted { + return + } + time.Sleep(5 * time.Second) + waitModuleDeploymentCompleted(moduleDeploymentName, namespace) +} diff --git a/module-controller/internal/controller/modulereplicaset_controller.go b/module-controller/internal/controller/modulereplicaset_controller.go index a168658ed..97cf68651 100644 --- a/module-controller/internal/controller/modulereplicaset_controller.go +++ b/module-controller/internal/controller/modulereplicaset_controller.go @@ -170,6 +170,7 @@ func (r *ModuleReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Req return reconcile.Result{}, err } } + event.PublishModuleReplicaSetReplicasChangedEvent(r.Client, ctx, moduleReplicaSet) } else { // replicas not change, directly update module err = r.compareAndUpdateModule(ctx, sameReplicaSetModules, moduleReplicaSet) diff --git a/module-controller/internal/event/event.go b/module-controller/internal/event/event.go index bac90a49f..d8b6e4cf7 100644 --- a/module-controller/internal/event/event.go +++ b/module-controller/internal/event/event.go @@ -8,12 +8,13 @@ import ( type EventType string const ( - ModuleDeploymentCreate EventType = "moduledeployment_create" - ModuleDeploymentDelete EventType = "moduledeployment_delete" - ModuleReplicaSetCreate EventType = "modulereplicaset_create" - ModuleReplicaSetDelete EventType = "modulereplicaset_delete" - ModuleCreate EventType = "module_create" - ModuleDelete EventType = "module_delete" + ModuleDeploymentCreate EventType = "moduledeployment_create" + ModuleDeploymentDelete EventType = "moduledeployment_delete" + ModuleReplicaSetCreate EventType = "modulereplicaset_create" + ModuleReplicaSetDelete EventType = "modulereplicaset_delete" + ModuleReplicaSetReplicasChanged EventType = "modulereplicaset_replicas_changed" + ModuleCreate EventType = "module_create" + ModuleDelete EventType = "module_delete" ) type Event interface { diff --git a/module-controller/internal/event/event_test.go b/module-controller/internal/event/event_test.go index 52967d4f1..23aaf4711 100644 --- a/module-controller/internal/event/event_test.go +++ b/module-controller/internal/event/event_test.go @@ -73,3 +73,9 @@ func TestPublishModuleReplicaSetDeleteEvent(t *testing.T) { moduleReplicaSet := v1alpha1.ModuleReplicaSet{} PublishModuleReplicaSetDeleteEvent(nil, nil, &moduleReplicaSet) } + +func TestPublishModuleReplicaSetReplicasChangedEvent(t *testing.T) { + assert.Equal(t, ModuleReplicaSetReplicasChanged, ModuleReplicaSetReplicasChangedEvent{}.GetEventType()) + moduleReplicaSet := v1alpha1.ModuleReplicaSet{} + PublishModuleReplicaSetReplicasChangedEvent(nil, nil, &moduleReplicaSet) +} diff --git a/module-controller/internal/event/modulereplicaset_replicas_changed_event.go b/module-controller/internal/event/modulereplicaset_replicas_changed_event.go new file mode 100644 index 000000000..30725fb5b --- /dev/null +++ b/module-controller/internal/event/modulereplicaset_replicas_changed_event.go @@ -0,0 +1,22 @@ +package event + +import ( + "context" + + "github.com/sofastack/sofa-serverless/api/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ModuleReplicaSetReplicasChangedEvent struct { + client.Client + Context context.Context + ModuleReplicaSet *v1alpha1.ModuleReplicaSet +} + +func PublishModuleReplicaSetReplicasChangedEvent(client client.Client, ctx context.Context, moduleReplicaSet *v1alpha1.ModuleReplicaSet) error { + return PublishEvent(ModuleReplicaSetReplicasChangedEvent{ModuleReplicaSet: moduleReplicaSet, Client: client, Context: ctx}) +} + +func (e ModuleReplicaSetReplicasChangedEvent) GetEventType() EventType { + return ModuleReplicaSetReplicasChanged +} diff --git a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go new file mode 100644 index 000000000..375f65875 --- /dev/null +++ b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go @@ -0,0 +1,96 @@ +package handler + +import ( + "fmt" + "github.com/sofastack/sofa-serverless/internal/constants/label" + "github.com/sofastack/sofa-serverless/internal/event" + "github.com/sofastack/sofa-serverless/internal/utils" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "strconv" +) + +type ModuleReplicaSetReplicasChangedHandler struct { +} + +func (h ModuleReplicaSetReplicasChangedHandler) Async() bool { + return false +} + +func (h ModuleReplicaSetReplicasChangedHandler) Handle(e event.Event) error { + moduleReplicaSetReplicasChangedEvent := e.(event.ModuleReplicaSetReplicasChangedEvent) + moduleReplicaSet := moduleReplicaSetReplicasChangedEvent.ModuleReplicaSet + ctx := moduleReplicaSetReplicasChangedEvent.Context + k8sClient := moduleReplicaSetReplicasChangedEvent.Client + baseDeploymentName := moduleReplicaSet.Labels[label.DeploymentNameLabel] + if baseDeploymentName == "" { + return nil + } + deployment := &v1.Deployment{} + err := k8sClient.Get(ctx, + types.NamespacedName{Namespace: moduleReplicaSet.Namespace, Name: baseDeploymentName}, deployment) + if err != nil { + return utils.Error(err, "Failed to get deployment", "deploymentName", baseDeploymentName) + } + allPodSelector, err := metav1.LabelSelectorAsSelector(&moduleReplicaSet.Spec.Selector) + allPods := &corev1.PodList{} + if err = k8sClient.List(ctx, allPods, &client.ListOptions{Namespace: moduleReplicaSet.Namespace, LabelSelector: allPodSelector}); err != nil { + return utils.Error(err, "Failed to list pod", "moduleReplicaSetName", moduleReplicaSet.Name) + } + + if len(allPods.Items) <= 0 { + return nil + } + var minInstanceCount int + var maxInstanceCount int + var totalInstanceCount int + for index, item := range allPods.Items { + var moduleInstanceCount int + if cntStr, ok := item.Labels[label.ModuleInstanceCount]; ok { + moduleInstanceCount, err = strconv.Atoi(cntStr) + if err != nil { + log.Log.Error(err, fmt.Sprintf("invalid ModuleInstanceCount in pod %v", item.Name)) + continue + } + } + // 赋值第一个pod的安装数量为最小值 + if index == 0 { + minInstanceCount = moduleInstanceCount + } + // 对比获取最小值 + if minInstanceCount > moduleInstanceCount { + minInstanceCount = moduleInstanceCount + } + // 对比获取最大值 + if moduleInstanceCount > maxInstanceCount { + maxInstanceCount = moduleInstanceCount + } + // 获取全部安装数量 + totalInstanceCount += moduleInstanceCount + } + + if deployment.Labels == nil { + deployment.Labels = map[string]string{} + } + deployment.Labels[label.MaxModuleInstanceCount] = strconv.Itoa(maxInstanceCount) + deployment.Labels[label.MinModuleInstanceCount] = strconv.Itoa(minInstanceCount) + avgInstanceCount := float64(totalInstanceCount) / float64(len(allPods.Items)) + deployment.Labels[label.AverageModuleInstanceCount] = fmt.Sprintf("%.2f", avgInstanceCount) + + if err = k8sClient.Update(ctx, deployment); err != nil { + return utils.Error(err, "Failed to update Deployment", "Deployment", baseDeploymentName) + } + return nil +} + +func (h ModuleReplicaSetReplicasChangedHandler) InterestIn(e event.Event) bool { + return e.GetEventType() == event.ModuleReplicaSetReplicasChanged +} + +func init() { + event.Handlers = append(event.Handlers, ModuleReplicaSetReplicasChangedHandler{}) +} diff --git a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go new file mode 100644 index 000000000..cd9b552be --- /dev/null +++ b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go @@ -0,0 +1,16 @@ +package handler + +import ( + "github.com/sofastack/sofa-serverless/internal/event" + "github.com/sofastack/sofa-serverless/internal/utils" + "testing" +) + +func TestModuleReplicaSetReplicasChangedHandler(t *testing.T) { + moduleReplicaSet := utils.PrepareModuleReplicaSet("default", "test-module-replica-set-name") + event.PublishModuleReplicaSetReplicasChangedEvent(TestModuleReplicaSetReplicasChangedClient{}, nil, &moduleReplicaSet) +} + +type TestModuleReplicaSetReplicasChangedClient struct { + utils.MockOnBaseClient +} diff --git a/module-controller/internal/utils/count_on_base_utils.go b/module-controller/internal/utils/count_on_base_utils.go new file mode 100644 index 000000000..e7c4c3b9f --- /dev/null +++ b/module-controller/internal/utils/count_on_base_utils.go @@ -0,0 +1,125 @@ +package utils + +import ( + "github.com/sofastack/sofa-serverless/api/v1alpha1" + "github.com/sofastack/sofa-serverless/internal/constants/label" + "golang.org/x/net/context" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" +) + +func PrepareModuleReplicaSet(namespace, moduleReplicaSetName string) v1alpha1.ModuleReplicaSet { + + moduleReplicaSet := v1alpha1.ModuleReplicaSet{ + Spec: v1alpha1.ModuleReplicaSetSpec{ + Replicas: 1, + Template: v1alpha1.ModuleTemplateSpec{ + Spec: v1alpha1.ModuleSpec{ + Module: v1alpha1.ModuleInfo{ + Name: "dynamic-provider", + Version: "1.0.0", + Url: "http://serverless-opensource.oss-cn-shanghai.aliyuncs.com/module-packages/stable/dynamic-provider-1.0.0-ark-biz.jar", + }, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: moduleReplicaSetName, + Namespace: namespace, + Labels: map[string]string{ + "app": "dynamic-stock", + label.MaxModuleCount: "10", + label.ModuleSchedulingStrategy: string(v1alpha1.Scatter), + label.DeploymentNameLabel: "test-deployment-name", + }, + Annotations: map[string]string{}, + }, + } + return moduleReplicaSet +} + +type MockOnBaseClient struct { +} + +func (m MockOnBaseClient) Scheme() *runtime.Scheme { + return nil +} + +func (m MockOnBaseClient) RESTMapper() meta.RESTMapper { + return nil +} + +func (m MockOnBaseClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { + return schema.GroupVersionKind{}, nil +} + +func (m MockOnBaseClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { + return true, nil +} + +func (m MockOnBaseClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + return nil +} + +func (m MockOnBaseClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + var mockPodList []corev1.Pod + for i := 3; i > 0; i-- { + podName := "mock-pod-" + strconv.Itoa(i) + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Labels: map[string]string{ + label.ModuleInstanceCount: strconv.Itoa(i), + }, + }, + Spec: corev1.PodSpec{}, + Status: corev1.PodStatus{}, + } + mockPodList = append(mockPodList, *pod) + } + + // 使用反射设置 list 的 Items 字段 + listValue := reflect.ValueOf(list) + if listValue.Kind() == reflect.Ptr && listValue.Elem().Kind() == reflect.Struct { + itemsField := listValue.Elem().FieldByName("Items") + if itemsField.IsValid() && itemsField.CanSet() { + itemsField.Set(reflect.ValueOf(mockPodList)) + } + } + return nil +} + +func (m MockOnBaseClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { + return nil +} + +func (m MockOnBaseClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { + return nil +} + +func (m MockOnBaseClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { + return nil +} + +func (m MockOnBaseClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { + return nil +} + +func (m MockOnBaseClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { + return nil +} + +func (m MockOnBaseClient) Status() client.SubResourceWriter { + return nil +} + +func (m MockOnBaseClient) SubResource(subResource string) client.SubResourceClient { + return nil +} diff --git a/module-controller/internal/utils/count_on_base_utils_test.go b/module-controller/internal/utils/count_on_base_utils_test.go new file mode 100644 index 000000000..8db657ed3 --- /dev/null +++ b/module-controller/internal/utils/count_on_base_utils_test.go @@ -0,0 +1,32 @@ +package utils + +import ( + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/types" + "testing" +) + +func TestPrepareModuleReplicaSet(t *testing.T) { + moduleReplicaSetName := "testModuleReplicaSet" + moduleReplicaSet := PrepareModuleReplicaSet("default", moduleReplicaSetName) + assert.Equal(t, moduleReplicaSetName, moduleReplicaSet.Name) +} + +func TestMockOnBaseClient(t *testing.T) { + mockClient := MockOnBaseClient{} + assert.True(t, mockClient.Scheme() == nil) + assert.Equal(t, nil, mockClient.RESTMapper()) + _, err := mockClient.GroupVersionKindFor(nil) + assert.Equal(t, nil, err) + _, err = mockClient.IsObjectNamespaced(nil) + assert.Equal(t, nil, err) + assert.Equal(t, nil, mockClient.Get(nil, types.NamespacedName{}, nil)) + assert.Equal(t, nil, mockClient.List(nil, nil)) + assert.Equal(t, nil, mockClient.Create(nil, nil)) + assert.Equal(t, nil, mockClient.Delete(nil, nil)) + assert.Equal(t, nil, mockClient.Update(nil, nil)) + assert.Equal(t, nil, mockClient.Patch(nil, nil, nil)) + assert.Equal(t, nil, mockClient.DeleteAllOf(nil, nil)) + assert.Equal(t, nil, mockClient.Status()) + assert.Equal(t, nil, mockClient.SubResource("")) +} From 7ebc6801ddddd93b5bdfc4bf6b7c7116ae16fcfc Mon Sep 17 00:00:00 2001 From: LiuLiqi <837397251@qq.com> Date: Wed, 25 Oct 2023 10:00:18 +0800 Subject: [PATCH 2/5] =?UTF-8?q?replicaset=E6=B7=BB=E5=8A=A0=E5=9F=BA?= =?UTF-8?q?=E5=BA=A7name=E7=9A=84labels?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/controller/moduledeployment_controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/module-controller/internal/controller/moduledeployment_controller.go b/module-controller/internal/controller/moduledeployment_controller.go index a01d75e9c..8e0162e82 100644 --- a/module-controller/internal/controller/moduledeployment_controller.go +++ b/module-controller/internal/controller/moduledeployment_controller.go @@ -430,6 +430,7 @@ func (r *ModuleDeploymentReconciler) generateModuleReplicas(moduleDeployment *v1 newLabels[label.ModuleNameLabel] = moduleDeployment.Spec.Template.Spec.Module.Name newLabels[label.ModuleDeploymentLabel] = moduleDeployment.Name newLabels[label.ModuleSchedulingStrategy] = string(moduleDeployment.Spec.SchedulingStrategy.SchedulingPolicy) + newLabels[label.DeploymentNameLabel] = moduleDeployment.Spec.BaseDeploymentName newLabels[label.ModuleReplicasetRevisionLabel] = strconv.Itoa(revision) moduleReplicaSet := &v1alpha1.ModuleReplicaSet{ ObjectMeta: metav1.ObjectMeta{ From dcadbd1e127698f38df00f335a4682de5e7d162f Mon Sep 17 00:00:00 2001 From: LiuLiqi <837397251@qq.com> Date: Wed, 25 Oct 2023 11:18:43 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E8=8E=B7=E5=8F=96=E6=A8=A1=E5=9D=97?= =?UTF-8?q?=E5=AE=89=E8=A3=85=E6=95=B0=E9=87=8F=E4=BF=AE=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E4=BB=8E=E8=8E=B7=E5=8F=96=E5=88=B0=E7=9A=84module=E5=AE=9E?= =?UTF-8?q?=E4=BE=8B=E8=AE=A1=E7=AE=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...dulereplicaset_replicas_changed_handler.go | 16 ++- ...eplicaset_replicas_changed_handler_test.go | 2 +- .../internal/utils/count_on_base_utils.go | 125 ------------------ .../utils/count_on_base_utils_test.go | 32 ----- .../internal/utils/test_utils.go | 93 +++++++++++++ .../internal/utils/test_utils_test.go | 12 ++ 6 files changed, 116 insertions(+), 164 deletions(-) delete mode 100644 module-controller/internal/utils/count_on_base_utils.go delete mode 100644 module-controller/internal/utils/count_on_base_utils_test.go diff --git a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go index 375f65875..9832d86b8 100644 --- a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go +++ b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go @@ -2,12 +2,14 @@ package handler import ( "fmt" + "github.com/sofastack/sofa-serverless/api/v1alpha1" "github.com/sofastack/sofa-serverless/internal/constants/label" "github.com/sofastack/sofa-serverless/internal/event" "github.com/sofastack/sofa-serverless/internal/utils" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -50,13 +52,15 @@ func (h ModuleReplicaSetReplicasChangedHandler) Handle(e event.Event) error { var totalInstanceCount int for index, item := range allPods.Items { var moduleInstanceCount int - if cntStr, ok := item.Labels[label.ModuleInstanceCount]; ok { - moduleInstanceCount, err = strconv.Atoi(cntStr) - if err != nil { - log.Log.Error(err, fmt.Sprintf("invalid ModuleInstanceCount in pod %v", item.Name)) - continue - } + moduleList := &v1alpha1.ModuleList{} + err := k8sClient.List(ctx, moduleList, &client.ListOptions{LabelSelector: labels.SelectorFromSet(map[string]string{ + label.BaseInstanceIpLabel: item.Status.PodIP, + })}, client.InNamespace(moduleReplicaSet.Namespace)) + if err != nil { + log.Log.Error(err, fmt.Sprintf("can't find any module in pod %v", item.Name)) + continue } + moduleInstanceCount = len(moduleList.Items) // 赋值第一个pod的安装数量为最小值 if index == 0 { minInstanceCount = moduleInstanceCount diff --git a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go index cd9b552be..97f432afb 100644 --- a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go +++ b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler_test.go @@ -12,5 +12,5 @@ func TestModuleReplicaSetReplicasChangedHandler(t *testing.T) { } type TestModuleReplicaSetReplicasChangedClient struct { - utils.MockOnBaseClient + utils.MockClient } diff --git a/module-controller/internal/utils/count_on_base_utils.go b/module-controller/internal/utils/count_on_base_utils.go deleted file mode 100644 index e7c4c3b9f..000000000 --- a/module-controller/internal/utils/count_on_base_utils.go +++ /dev/null @@ -1,125 +0,0 @@ -package utils - -import ( - "github.com/sofastack/sofa-serverless/api/v1alpha1" - "github.com/sofastack/sofa-serverless/internal/constants/label" - "golang.org/x/net/context" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "reflect" - "sigs.k8s.io/controller-runtime/pkg/client" - "strconv" -) - -func PrepareModuleReplicaSet(namespace, moduleReplicaSetName string) v1alpha1.ModuleReplicaSet { - - moduleReplicaSet := v1alpha1.ModuleReplicaSet{ - Spec: v1alpha1.ModuleReplicaSetSpec{ - Replicas: 1, - Template: v1alpha1.ModuleTemplateSpec{ - Spec: v1alpha1.ModuleSpec{ - Module: v1alpha1.ModuleInfo{ - Name: "dynamic-provider", - Version: "1.0.0", - Url: "http://serverless-opensource.oss-cn-shanghai.aliyuncs.com/module-packages/stable/dynamic-provider-1.0.0-ark-biz.jar", - }, - }, - }, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: moduleReplicaSetName, - Namespace: namespace, - Labels: map[string]string{ - "app": "dynamic-stock", - label.MaxModuleCount: "10", - label.ModuleSchedulingStrategy: string(v1alpha1.Scatter), - label.DeploymentNameLabel: "test-deployment-name", - }, - Annotations: map[string]string{}, - }, - } - return moduleReplicaSet -} - -type MockOnBaseClient struct { -} - -func (m MockOnBaseClient) Scheme() *runtime.Scheme { - return nil -} - -func (m MockOnBaseClient) RESTMapper() meta.RESTMapper { - return nil -} - -func (m MockOnBaseClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { - return schema.GroupVersionKind{}, nil -} - -func (m MockOnBaseClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { - return true, nil -} - -func (m MockOnBaseClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { - return nil -} - -func (m MockOnBaseClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { - var mockPodList []corev1.Pod - for i := 3; i > 0; i-- { - podName := "mock-pod-" + strconv.Itoa(i) - pod := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Labels: map[string]string{ - label.ModuleInstanceCount: strconv.Itoa(i), - }, - }, - Spec: corev1.PodSpec{}, - Status: corev1.PodStatus{}, - } - mockPodList = append(mockPodList, *pod) - } - - // 使用反射设置 list 的 Items 字段 - listValue := reflect.ValueOf(list) - if listValue.Kind() == reflect.Ptr && listValue.Elem().Kind() == reflect.Struct { - itemsField := listValue.Elem().FieldByName("Items") - if itemsField.IsValid() && itemsField.CanSet() { - itemsField.Set(reflect.ValueOf(mockPodList)) - } - } - return nil -} - -func (m MockOnBaseClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { - return nil -} - -func (m MockOnBaseClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { - return nil -} - -func (m MockOnBaseClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { - return nil -} - -func (m MockOnBaseClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { - return nil -} - -func (m MockOnBaseClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { - return nil -} - -func (m MockOnBaseClient) Status() client.SubResourceWriter { - return nil -} - -func (m MockOnBaseClient) SubResource(subResource string) client.SubResourceClient { - return nil -} diff --git a/module-controller/internal/utils/count_on_base_utils_test.go b/module-controller/internal/utils/count_on_base_utils_test.go deleted file mode 100644 index 8db657ed3..000000000 --- a/module-controller/internal/utils/count_on_base_utils_test.go +++ /dev/null @@ -1,32 +0,0 @@ -package utils - -import ( - "github.com/stretchr/testify/assert" - "k8s.io/apimachinery/pkg/types" - "testing" -) - -func TestPrepareModuleReplicaSet(t *testing.T) { - moduleReplicaSetName := "testModuleReplicaSet" - moduleReplicaSet := PrepareModuleReplicaSet("default", moduleReplicaSetName) - assert.Equal(t, moduleReplicaSetName, moduleReplicaSet.Name) -} - -func TestMockOnBaseClient(t *testing.T) { - mockClient := MockOnBaseClient{} - assert.True(t, mockClient.Scheme() == nil) - assert.Equal(t, nil, mockClient.RESTMapper()) - _, err := mockClient.GroupVersionKindFor(nil) - assert.Equal(t, nil, err) - _, err = mockClient.IsObjectNamespaced(nil) - assert.Equal(t, nil, err) - assert.Equal(t, nil, mockClient.Get(nil, types.NamespacedName{}, nil)) - assert.Equal(t, nil, mockClient.List(nil, nil)) - assert.Equal(t, nil, mockClient.Create(nil, nil)) - assert.Equal(t, nil, mockClient.Delete(nil, nil)) - assert.Equal(t, nil, mockClient.Update(nil, nil)) - assert.Equal(t, nil, mockClient.Patch(nil, nil, nil)) - assert.Equal(t, nil, mockClient.DeleteAllOf(nil, nil)) - assert.Equal(t, nil, mockClient.Status()) - assert.Equal(t, nil, mockClient.SubResource("")) -} diff --git a/module-controller/internal/utils/test_utils.go b/module-controller/internal/utils/test_utils.go index affc15c17..8d5e5c97d 100644 --- a/module-controller/internal/utils/test_utils.go +++ b/module-controller/internal/utils/test_utils.go @@ -2,12 +2,16 @@ package utils import ( "github.com/sofastack/sofa-serverless/api/v1alpha1" + "github.com/sofastack/sofa-serverless/internal/constants/label" "golang.org/x/net/context" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "reflect" "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" ) func PrepareModuleDeployment(namespace, moduleDeploymentName string) v1alpha1.ModuleDeployment { @@ -41,6 +45,36 @@ func PrepareModuleDeployment(namespace, moduleDeploymentName string) v1alpha1.Mo return moduleDeployment } +func PrepareModuleReplicaSet(namespace, moduleReplicaSetName string) v1alpha1.ModuleReplicaSet { + + moduleReplicaSet := v1alpha1.ModuleReplicaSet{ + Spec: v1alpha1.ModuleReplicaSetSpec{ + Replicas: 1, + Template: v1alpha1.ModuleTemplateSpec{ + Spec: v1alpha1.ModuleSpec{ + Module: v1alpha1.ModuleInfo{ + Name: "dynamic-provider", + Version: "1.0.0", + Url: "http://serverless-opensource.oss-cn-shanghai.aliyuncs.com/module-packages/stable/dynamic-provider-1.0.0-ark-biz.jar", + }, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: moduleReplicaSetName, + Namespace: namespace, + Labels: map[string]string{ + "app": "dynamic-stock", + label.MaxModuleCount: "10", + label.ModuleSchedulingStrategy: string(v1alpha1.Scatter), + label.DeploymentNameLabel: "test-deployment-name", + }, + Annotations: map[string]string{}, + }, + } + return moduleReplicaSet +} + type MockClient struct { } @@ -65,9 +99,68 @@ func (m MockClient) Get(ctx context.Context, key client.ObjectKey, obj client.Ob } func (m MockClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { + if list == nil { + return nil + } + + listValue := reflect.ValueOf(list) + + itemsType, itemsField := getListType(listValue) + if itemsType == nil { + return nil + } + + // 检查切片中的元素类型 + switch itemsType { + case reflect.TypeOf(corev1.Pod{}): + var mockPodList []corev1.Pod + for i := 3; i > 0; i-- { + podName := "mock-pod-" + strconv.Itoa(i) + podIp := "127.0.0." + strconv.Itoa(i) + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Labels: map[string]string{ + label.ModuleInstanceCount: strconv.Itoa(i), + }, + }, + Spec: corev1.PodSpec{}, + Status: corev1.PodStatus{ + PodIP: podIp, + }, + } + mockPodList = append(mockPodList, *pod) + } + itemsField.Set(reflect.ValueOf(mockPodList)) + case reflect.TypeOf(v1alpha1.Module{}): + var mockModuleList []v1alpha1.Module + moduleName := "mock-module-name" + module := &v1alpha1.Module{ + ObjectMeta: metav1.ObjectMeta{ + Name: moduleName, + }, + } + mockModuleList = append(mockModuleList, *module) + itemsField.Set(reflect.ValueOf(mockModuleList)) + } return nil } +func getListType(listValue reflect.Value) (reflect.Type, reflect.Value) { + + itemsField := listValue.Elem().FieldByName("Items") + if !itemsField.IsValid() { + return nil, reflect.Value{} + } + itemsType := itemsField.Type() + // 列表的类型是切片 + if itemsType.Kind() != reflect.Slice { + return nil, reflect.Value{} + } + return itemsType.Elem(), itemsField +} + func (m MockClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { return nil } diff --git a/module-controller/internal/utils/test_utils_test.go b/module-controller/internal/utils/test_utils_test.go index b3741fc28..54aee63b7 100644 --- a/module-controller/internal/utils/test_utils_test.go +++ b/module-controller/internal/utils/test_utils_test.go @@ -1,7 +1,9 @@ package utils import ( + "github.com/sofastack/sofa-serverless/api/v1alpha1" "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "testing" ) @@ -12,6 +14,12 @@ func TestPrepareModuleDeployment(t *testing.T) { assert.Equal(t, moduleDeploymentName, moduleDeployment.Name) } +func TestPrepareModuleReplicaSet(t *testing.T) { + moduleReplicaSetName := "testModuleReplicaSet" + moduleReplicaSet := PrepareModuleReplicaSet("default", moduleReplicaSetName) + assert.Equal(t, moduleReplicaSetName, moduleReplicaSet.Name) +} + func TestMockClient(t *testing.T) { mockClient := MockClient{} assert.True(t, mockClient.Scheme() == nil) @@ -29,4 +37,8 @@ func TestMockClient(t *testing.T) { assert.Equal(t, nil, mockClient.DeleteAllOf(nil, nil)) assert.Equal(t, nil, mockClient.Status()) assert.Equal(t, nil, mockClient.SubResource("")) + + assert.Equal(t, nil, mockClient.List(nil, &v1alpha1.ModuleList{})) + assert.Equal(t, nil, mockClient.List(nil, &corev1.PodList{})) + } From 497c2860ee0ce47ea05620a4781907bd270ac557 Mon Sep 17 00:00:00 2001 From: LiuLiqi <837397251@qq.com> Date: Wed, 1 Nov 2023 14:32:12 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=AE=A1=E7=AE=97pod?= =?UTF-8?q?=E4=B8=8A=E5=B7=B2=E5=AE=89=E8=A3=85=E5=AE=9E=E4=BE=8B=E7=9A=84?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=E6=96=B9=E5=BC=8F=EF=BC=8C=E5=87=8F=E5=B0=91?= =?UTF-8?q?=E6=80=A7=E8=83=BD=E6=B6=88=E8=80=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...dulereplicaset_replicas_changed_handler.go | 25 +++++++++++++------ .../internal/utils/test_utils.go | 13 +++++++--- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go index 9832d86b8..ff35174e3 100644 --- a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go +++ b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go @@ -52,15 +52,24 @@ func (h ModuleReplicaSetReplicasChangedHandler) Handle(e event.Event) error { var totalInstanceCount int for index, item := range allPods.Items { var moduleInstanceCount int - moduleList := &v1alpha1.ModuleList{} - err := k8sClient.List(ctx, moduleList, &client.ListOptions{LabelSelector: labels.SelectorFromSet(map[string]string{ - label.BaseInstanceIpLabel: item.Status.PodIP, - })}, client.InNamespace(moduleReplicaSet.Namespace)) - if err != nil { - log.Log.Error(err, fmt.Sprintf("can't find any module in pod %v", item.Name)) - continue + if cntStr, ok := item.Labels[label.ModuleInstanceCount]; ok { + moduleInstanceCount, err = strconv.Atoi(cntStr) + if err != nil { + log.Log.Error(err, fmt.Sprintf("invalid ModuleInstanceCount in pod %v", item.Name)) + continue + } + } else { + moduleList := &v1alpha1.ModuleList{} + err := k8sClient.List(ctx, moduleList, &client.ListOptions{LabelSelector: labels.SelectorFromSet(map[string]string{ + label.BaseInstanceIpLabel: item.Status.PodIP, + })}, client.InNamespace(moduleReplicaSet.Namespace)) + if err != nil { + log.Log.Error(err, fmt.Sprintf("can't find any module in pod %v", item.Name)) + continue + } + moduleInstanceCount = len(moduleList.Items) } - moduleInstanceCount = len(moduleList.Items) + // 赋值第一个pod的安装数量为最小值 if index == 0 { minInstanceCount = moduleInstanceCount diff --git a/module-controller/internal/utils/test_utils.go b/module-controller/internal/utils/test_utils.go index 8d5e5c97d..cc2945728 100644 --- a/module-controller/internal/utils/test_utils.go +++ b/module-controller/internal/utils/test_utils.go @@ -114,16 +114,21 @@ func (m MockClient) List(ctx context.Context, list client.ObjectList, opts ...cl switch itemsType { case reflect.TypeOf(corev1.Pod{}): var mockPodList []corev1.Pod + for i := 3; i > 0; i-- { + mockLabel := map[string]string{} + if i == 3 { + mockLabel = map[string]string{ + label.ModuleInstanceCount: strconv.Itoa(i), + } + } podName := "mock-pod-" + strconv.Itoa(i) podIp := "127.0.0." + strconv.Itoa(i) pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{ - Name: podName, - Labels: map[string]string{ - label.ModuleInstanceCount: strconv.Itoa(i), - }, + Name: podName, + Labels: mockLabel, }, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{ From aab79e565d94917f9feaafeabcbb463a1a4538ea Mon Sep 17 00:00:00 2001 From: LiuLiqi <837397251@qq.com> Date: Tue, 14 Nov 2023 13:43:12 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=AE=A1=E7=AE=97?= =?UTF-8?q?=E5=AE=89=E8=A3=85=E4=B8=AA=E6=95=B0handler=E4=B8=BA=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../handler/modulereplicaset_replicas_changed_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go index ff35174e3..6eaaa0042 100644 --- a/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go +++ b/module-controller/internal/handler/modulereplicaset_replicas_changed_handler.go @@ -20,7 +20,7 @@ type ModuleReplicaSetReplicasChangedHandler struct { } func (h ModuleReplicaSetReplicasChangedHandler) Async() bool { - return false + return true } func (h ModuleReplicaSetReplicasChangedHandler) Handle(e event.Event) error {