Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #208 from sofastack/feat.count_on_base
Browse files Browse the repository at this point in the history
添加 event新类型,添加 计算 基座 Pod上 最大/最小/平均 已被调度模块实例个数 修复ModuleDeployment单测
  • Loading branch information
gold300jin authored Nov 15, 2023
2 parents 10d65b4 + aab79e5 commit 1fd9d3a
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package label

const (
DeploymentNameLabel = "serverless.alipay.com/deployment-name"

ModuleNameLabel = "serverless.alipay.com/module-name"

ModuleVersionLabel = "serverless.alipay.com/module-version"
Expand All @@ -21,6 +23,12 @@ const (

ModuleInstanceCount = "serverless.alipay.com/module-instance-count"

MaxModuleInstanceCount = "serverless.alipay.com/max-module-instance-count-on-base"

MinModuleInstanceCount = "serverless.alipay.com/min-module-instance-count-on-base"

AverageModuleInstanceCount = "serverless.alipay.com/average-module-instance-count-on-base"

ModuleSchedulingStrategy = "serverless.alipay.com/module-scheduling-strategy"

MaxModuleCount = "serverless.alipay.com/max-module-count"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ func (r *ModuleDeploymentReconciler) generateModuleReplicas(moduleDeployment *v1
newLabels[label.ModuleNameLabel] = moduleDeployment.Spec.Template.Spec.Module.Name
newLabels[label.ModuleDeploymentLabel] = moduleDeployment.Name
newLabels[label.ModuleSchedulingStrategy] = string(moduleDeployment.Spec.SchedulingStrategy.SchedulingPolicy)
newLabels[label.DeploymentNameLabel] = moduleDeployment.Spec.BaseDeploymentName
newLabels[label.ModuleReplicasetRevisionLabel] = strconv.Itoa(revision)
moduleReplicaSet := &v1alpha1.ModuleReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() {
})
})

Context("wait moduleDeployment Completed", func() {
It("wait moduleDeployment Completed", func() {
waitModuleDeploymentCompleted(moduleDeploymentName, namespace)
})
})

Context("update replicas for module deployment", func() {
It("update module replicas", func() {
key := types.NamespacedName{
Expand All @@ -124,6 +130,8 @@ var _ = Describe("ModuleDeployment Controller OperationStrategy Test", func() {
}
}, timeout, interval).Should(BeTrue())

waitModuleDeploymentCompleted(moduleDeploymentName, namespace)

Eventually(func() bool {
set := map[string]string{
label.ModuleDeploymentLabel: moduleDeployment.Name,
Expand Down Expand Up @@ -331,3 +339,18 @@ func checkModuleDeploymentReplicas(nn types.NamespacedName, replicas int32) bool
newRS.Status.Replicas == newRS.Spec.Replicas &&
newRS.Status.Replicas == replicas
}

func waitModuleDeploymentCompleted(moduleDeploymentName string, namespace string) {
key := types.NamespacedName{
Name: moduleDeploymentName,
Namespace: namespace,
}
newModuleDeployment := &v1alpha1.ModuleDeployment{}
Expect(k8sClient.Get(context.TODO(), key, newModuleDeployment)).Should(Succeed())
progress := newModuleDeployment.Status.ReleaseStatus.Progress
if progress == v1alpha1.ModuleDeploymentReleaseProgressCompleted {
return
}
time.Sleep(5 * time.Second)
waitModuleDeploymentCompleted(moduleDeploymentName, namespace)
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (r *ModuleReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
return reconcile.Result{}, err
}
}
event.PublishModuleReplicaSetReplicasChangedEvent(r.Client, ctx, moduleReplicaSet)
} else {
// replicas not change, directly update module
err = r.compareAndUpdateModule(ctx, sameReplicaSetModules, moduleReplicaSet)
Expand Down
13 changes: 7 additions & 6 deletions module-controller/internal/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
type EventType string

const (
ModuleDeploymentCreate EventType = "moduledeployment_create"
ModuleDeploymentDelete EventType = "moduledeployment_delete"
ModuleReplicaSetCreate EventType = "modulereplicaset_create"
ModuleReplicaSetDelete EventType = "modulereplicaset_delete"
ModuleCreate EventType = "module_create"
ModuleDelete EventType = "module_delete"
ModuleDeploymentCreate EventType = "moduledeployment_create"
ModuleDeploymentDelete EventType = "moduledeployment_delete"
ModuleReplicaSetCreate EventType = "modulereplicaset_create"
ModuleReplicaSetDelete EventType = "modulereplicaset_delete"
ModuleReplicaSetReplicasChanged EventType = "modulereplicaset_replicas_changed"
ModuleCreate EventType = "module_create"
ModuleDelete EventType = "module_delete"
)

type Event interface {
Expand Down
6 changes: 6 additions & 0 deletions module-controller/internal/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ func TestPublishModuleReplicaSetDeleteEvent(t *testing.T) {
moduleReplicaSet := v1alpha1.ModuleReplicaSet{}
PublishModuleReplicaSetDeleteEvent(nil, nil, &moduleReplicaSet)
}

func TestPublishModuleReplicaSetReplicasChangedEvent(t *testing.T) {
assert.Equal(t, ModuleReplicaSetReplicasChanged, ModuleReplicaSetReplicasChangedEvent{}.GetEventType())
moduleReplicaSet := v1alpha1.ModuleReplicaSet{}
PublishModuleReplicaSetReplicasChangedEvent(nil, nil, &moduleReplicaSet)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package event

import (
"context"

"github.com/sofastack/sofa-serverless/api/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ModuleReplicaSetReplicasChangedEvent struct {
client.Client
Context context.Context
ModuleReplicaSet *v1alpha1.ModuleReplicaSet
}

func PublishModuleReplicaSetReplicasChangedEvent(client client.Client, ctx context.Context, moduleReplicaSet *v1alpha1.ModuleReplicaSet) error {
return PublishEvent(ModuleReplicaSetReplicasChangedEvent{ModuleReplicaSet: moduleReplicaSet, Client: client, Context: ctx})
}

func (e ModuleReplicaSetReplicasChangedEvent) GetEventType() EventType {
return ModuleReplicaSetReplicasChanged
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package handler

import (
"fmt"
"github.com/sofastack/sofa-serverless/api/v1alpha1"
"github.com/sofastack/sofa-serverless/internal/constants/label"
"github.com/sofastack/sofa-serverless/internal/event"
"github.com/sofastack/sofa-serverless/internal/utils"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"strconv"
)

type ModuleReplicaSetReplicasChangedHandler struct {
}

func (h ModuleReplicaSetReplicasChangedHandler) Async() bool {
return true
}

func (h ModuleReplicaSetReplicasChangedHandler) Handle(e event.Event) error {
moduleReplicaSetReplicasChangedEvent := e.(event.ModuleReplicaSetReplicasChangedEvent)
moduleReplicaSet := moduleReplicaSetReplicasChangedEvent.ModuleReplicaSet
ctx := moduleReplicaSetReplicasChangedEvent.Context
k8sClient := moduleReplicaSetReplicasChangedEvent.Client
baseDeploymentName := moduleReplicaSet.Labels[label.DeploymentNameLabel]
if baseDeploymentName == "" {
return nil
}
deployment := &v1.Deployment{}
err := k8sClient.Get(ctx,
types.NamespacedName{Namespace: moduleReplicaSet.Namespace, Name: baseDeploymentName}, deployment)
if err != nil {
return utils.Error(err, "Failed to get deployment", "deploymentName", baseDeploymentName)
}
allPodSelector, err := metav1.LabelSelectorAsSelector(&moduleReplicaSet.Spec.Selector)
allPods := &corev1.PodList{}
if err = k8sClient.List(ctx, allPods, &client.ListOptions{Namespace: moduleReplicaSet.Namespace, LabelSelector: allPodSelector}); err != nil {
return utils.Error(err, "Failed to list pod", "moduleReplicaSetName", moduleReplicaSet.Name)
}

if len(allPods.Items) <= 0 {
return nil
}
var minInstanceCount int
var maxInstanceCount int
var totalInstanceCount int
for index, item := range allPods.Items {
var moduleInstanceCount int
if cntStr, ok := item.Labels[label.ModuleInstanceCount]; ok {
moduleInstanceCount, err = strconv.Atoi(cntStr)
if err != nil {
log.Log.Error(err, fmt.Sprintf("invalid ModuleInstanceCount in pod %v", item.Name))
continue
}
} else {
moduleList := &v1alpha1.ModuleList{}
err := k8sClient.List(ctx, moduleList, &client.ListOptions{LabelSelector: labels.SelectorFromSet(map[string]string{
label.BaseInstanceIpLabel: item.Status.PodIP,
})}, client.InNamespace(moduleReplicaSet.Namespace))
if err != nil {
log.Log.Error(err, fmt.Sprintf("can't find any module in pod %v", item.Name))
continue
}
moduleInstanceCount = len(moduleList.Items)
}

// 赋值第一个pod的安装数量为最小值
if index == 0 {
minInstanceCount = moduleInstanceCount
}
// 对比获取最小值
if minInstanceCount > moduleInstanceCount {
minInstanceCount = moduleInstanceCount
}
// 对比获取最大值
if moduleInstanceCount > maxInstanceCount {
maxInstanceCount = moduleInstanceCount
}
// 获取全部安装数量
totalInstanceCount += moduleInstanceCount
}

if deployment.Labels == nil {
deployment.Labels = map[string]string{}
}
deployment.Labels[label.MaxModuleInstanceCount] = strconv.Itoa(maxInstanceCount)
deployment.Labels[label.MinModuleInstanceCount] = strconv.Itoa(minInstanceCount)
avgInstanceCount := float64(totalInstanceCount) / float64(len(allPods.Items))
deployment.Labels[label.AverageModuleInstanceCount] = fmt.Sprintf("%.2f", avgInstanceCount)

if err = k8sClient.Update(ctx, deployment); err != nil {
return utils.Error(err, "Failed to update Deployment", "Deployment", baseDeploymentName)
}
return nil
}

func (h ModuleReplicaSetReplicasChangedHandler) InterestIn(e event.Event) bool {
return e.GetEventType() == event.ModuleReplicaSetReplicasChanged
}

func init() {
event.Handlers = append(event.Handlers, ModuleReplicaSetReplicasChangedHandler{})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package handler

import (
"github.com/sofastack/sofa-serverless/internal/event"
"github.com/sofastack/sofa-serverless/internal/utils"
"testing"
)

func TestModuleReplicaSetReplicasChangedHandler(t *testing.T) {
moduleReplicaSet := utils.PrepareModuleReplicaSet("default", "test-module-replica-set-name")
event.PublishModuleReplicaSetReplicasChangedEvent(TestModuleReplicaSetReplicasChangedClient{}, nil, &moduleReplicaSet)
}

type TestModuleReplicaSetReplicasChangedClient struct {
utils.MockClient
}
98 changes: 98 additions & 0 deletions module-controller/internal/utils/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package utils

import (
"github.com/sofastack/sofa-serverless/api/v1alpha1"
"github.com/sofastack/sofa-serverless/internal/constants/label"
"golang.org/x/net/context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
)

func PrepareModuleDeployment(namespace, moduleDeploymentName string) v1alpha1.ModuleDeployment {
Expand Down Expand Up @@ -41,6 +45,36 @@ func PrepareModuleDeployment(namespace, moduleDeploymentName string) v1alpha1.Mo
return moduleDeployment
}

func PrepareModuleReplicaSet(namespace, moduleReplicaSetName string) v1alpha1.ModuleReplicaSet {

moduleReplicaSet := v1alpha1.ModuleReplicaSet{
Spec: v1alpha1.ModuleReplicaSetSpec{
Replicas: 1,
Template: v1alpha1.ModuleTemplateSpec{
Spec: v1alpha1.ModuleSpec{
Module: v1alpha1.ModuleInfo{
Name: "dynamic-provider",
Version: "1.0.0",
Url: "http://serverless-opensource.oss-cn-shanghai.aliyuncs.com/module-packages/stable/dynamic-provider-1.0.0-ark-biz.jar",
},
},
},
},
ObjectMeta: metav1.ObjectMeta{
Name: moduleReplicaSetName,
Namespace: namespace,
Labels: map[string]string{
"app": "dynamic-stock",
label.MaxModuleCount: "10",
label.ModuleSchedulingStrategy: string(v1alpha1.Scatter),
label.DeploymentNameLabel: "test-deployment-name",
},
Annotations: map[string]string{},
},
}
return moduleReplicaSet
}

type MockClient struct {
}

Expand All @@ -65,9 +99,73 @@ func (m MockClient) Get(ctx context.Context, key client.ObjectKey, obj client.Ob
}

func (m MockClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
if list == nil {
return nil
}

listValue := reflect.ValueOf(list)

itemsType, itemsField := getListType(listValue)
if itemsType == nil {
return nil
}

// 检查切片中的元素类型
switch itemsType {
case reflect.TypeOf(corev1.Pod{}):
var mockPodList []corev1.Pod

for i := 3; i > 0; i-- {
mockLabel := map[string]string{}
if i == 3 {
mockLabel = map[string]string{
label.ModuleInstanceCount: strconv.Itoa(i),
}
}
podName := "mock-pod-" + strconv.Itoa(i)
podIp := "127.0.0." + strconv.Itoa(i)
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Labels: mockLabel,
},
Spec: corev1.PodSpec{},
Status: corev1.PodStatus{
PodIP: podIp,
},
}
mockPodList = append(mockPodList, *pod)
}
itemsField.Set(reflect.ValueOf(mockPodList))
case reflect.TypeOf(v1alpha1.Module{}):
var mockModuleList []v1alpha1.Module
moduleName := "mock-module-name"
module := &v1alpha1.Module{
ObjectMeta: metav1.ObjectMeta{
Name: moduleName,
},
}
mockModuleList = append(mockModuleList, *module)
itemsField.Set(reflect.ValueOf(mockModuleList))
}
return nil
}

func getListType(listValue reflect.Value) (reflect.Type, reflect.Value) {

itemsField := listValue.Elem().FieldByName("Items")
if !itemsField.IsValid() {
return nil, reflect.Value{}
}
itemsType := itemsField.Type()
// 列表的类型是切片
if itemsType.Kind() != reflect.Slice {
return nil, reflect.Value{}
}
return itemsType.Elem(), itemsField
}

func (m MockClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
return nil
}
Expand Down
Loading

0 comments on commit 1fd9d3a

Please sign in to comment.