diff --git a/api/base/v1alpha1/condition.go b/api/base/v1alpha1/condition.go index f831d565c..1d3c739fc 100644 --- a/api/base/v1alpha1/condition.go +++ b/api/base/v1alpha1/condition.go @@ -215,6 +215,11 @@ func (s *ConditionedStatus) IsReady() bool { return s.GetCondition(TypeReady).Status == corev1.ConditionTrue } +func (s *ConditionedStatus) IsOffline() bool { + readyCond := s.GetCondition(TypeReady) + return readyCond.Status == corev1.ConditionFalse && readyCond.Reason == "Offline" +} + func (s *ConditionedStatus) WaitingCompleteCondition() []Condition { return []Condition{{ Type: TypeReady, diff --git a/api/base/v1alpha1/worker.go b/api/base/v1alpha1/worker.go index 042958174..2bb2b8394 100644 --- a/api/base/v1alpha1/worker.go +++ b/api/base/v1alpha1/worker.go @@ -34,6 +34,10 @@ const ( const ( LabelWorkerType = Group + "/worker-type" + + // Labels for worker's Pod + WorkerPodSelectorLabel = "app.kubernetes.io/name" + WorkerPodLabel = Group + "/worker" ) func DefaultWorkerType() WorkerType { @@ -105,6 +109,27 @@ func (worker Worker) ReadyCondition() Condition { } } +func (worker Worker) OfflineCondition() Condition { + currCon := worker.Status.GetCondition(TypeReady) + // return current condition if condition not changed + if currCon.Status == corev1.ConditionTrue && currCon.Reason == "Offline" { + return currCon + } + // keep original LastSuccessfulTime if have + lastSuccessfulTime := metav1.Now() + if currCon.LastSuccessfulTime.IsZero() { + lastSuccessfulTime = currCon.LastSuccessfulTime + } + return Condition{ + Type: TypeReady, + Status: corev1.ConditionFalse, + Reason: "Offline", + Message: "Work is offline", + LastTransitionTime: metav1.Now(), + LastSuccessfulTime: lastSuccessfulTime, + } +} + func (worker Worker) ErrorCondition(msg string) Condition { currCon := worker.Status.GetCondition(TypeReady) // return current condition if condition not changed diff --git a/api/base/v1alpha1/worker_types.go b/api/base/v1alpha1/worker_types.go index f81475fdf..16711ac04 100644 --- a/api/base/v1alpha1/worker_types.go +++ b/api/base/v1alpha1/worker_types.go @@ -34,6 +34,11 @@ type WorkerSpec struct { // Model this worker wants to use Model *TypedObjectReference `json:"model"` + // Replicas of this worker instance(1 by default) + // +kubebuilder:default=1 + // +kubebuilder:validation:Maximum=1 + Replicas *int32 `json:"replicas,omitempty"` + // Resource request&limits including // - CPU or GPU // - Memory diff --git a/api/base/v1alpha1/zz_generated.deepcopy.go b/api/base/v1alpha1/zz_generated.deepcopy.go index 5150b2f7f..23a9a8f24 100644 --- a/api/base/v1alpha1/zz_generated.deepcopy.go +++ b/api/base/v1alpha1/zz_generated.deepcopy.go @@ -1368,6 +1368,11 @@ func (in *WorkerSpec) DeepCopyInto(out *WorkerSpec) { *out = new(TypedObjectReference) (*in).DeepCopyInto(*out) } + if in.Replicas != nil { + in, out := &in.Replicas, &out.Replicas + *out = new(int32) + **out = **in + } in.Resources.DeepCopyInto(&out.Resources) if in.Storage != nil { in, out := &in.Storage, &out.Storage diff --git a/apiserver/graph/generated/generated.go b/apiserver/graph/generated/generated.go index df4c72b19..8652eb16b 100644 --- a/apiserver/graph/generated/generated.go +++ b/apiserver/graph/generated/generated.go @@ -509,6 +509,7 @@ type ComplexityRoot struct { ModelTypes func(childComplexity int) int Name func(childComplexity int) int Namespace func(childComplexity int) int + Replicas func(childComplexity int) int Resources func(childComplexity int) int Status func(childComplexity int) int Type func(childComplexity int) int @@ -3005,6 +3006,13 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return e.complexity.Worker.Namespace(childComplexity), true + case "Worker.replicas": + if e.complexity.Worker.Replicas == nil { + break + } + + return e.complexity.Worker.Replicas(childComplexity), true + case "Worker.resources": if e.complexity.Worker.Resources == nil { break @@ -5175,6 +5183,13 @@ type Worker { """ modelTypes: String! + """ + worker运行的Pod副本数量 + 规则: 默认为1,最大值为1 + 规则: 为0时,即下线 + """ + replicas: String + """ worker运行所需的资源 规则: 必填 @@ -5259,6 +5274,8 @@ input UpdateWorkerInput { """ type: String + replicas: String + """ worker运行所需的资源 """ @@ -20684,6 +20701,47 @@ func (ec *executionContext) fieldContext_Worker_modelTypes(ctx context.Context, return fc, nil } +func (ec *executionContext) _Worker_replicas(ctx context.Context, field graphql.CollectedField, obj *Worker) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Worker_replicas(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Replicas, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*string) + fc.Result = res + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Worker_replicas(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Worker", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Worker_resources(ctx context.Context, field graphql.CollectedField, obj *Worker) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Worker_resources(ctx, field) if err != nil { @@ -20924,6 +20982,8 @@ func (ec *executionContext) fieldContext_WorkerMutation_createWorker(ctx context return ec.fieldContext_Worker_model(ctx, field) case "modelTypes": return ec.fieldContext_Worker_modelTypes(ctx, field) + case "replicas": + return ec.fieldContext_Worker_replicas(ctx, field) case "resources": return ec.fieldContext_Worker_resources(ctx, field) case "status": @@ -21015,6 +21075,8 @@ func (ec *executionContext) fieldContext_WorkerMutation_updateWorker(ctx context return ec.fieldContext_Worker_model(ctx, field) case "modelTypes": return ec.fieldContext_Worker_modelTypes(ctx, field) + case "replicas": + return ec.fieldContext_Worker_replicas(ctx, field) case "resources": return ec.fieldContext_Worker_resources(ctx, field) case "status": @@ -21158,6 +21220,8 @@ func (ec *executionContext) fieldContext_WorkerQuery_getWorker(ctx context.Conte return ec.fieldContext_Worker_model(ctx, field) case "modelTypes": return ec.fieldContext_Worker_modelTypes(ctx, field) + case "replicas": + return ec.fieldContext_Worker_replicas(ctx, field) case "resources": return ec.fieldContext_Worker_resources(ctx, field) case "status": @@ -26432,7 +26496,7 @@ func (ec *executionContext) unmarshalInputUpdateWorkerInput(ctx context.Context, asMap[k] = v } - fieldsInOrder := [...]string{"name", "namespace", "labels", "annotations", "displayName", "description", "type", "resources"} + fieldsInOrder := [...]string{"name", "namespace", "labels", "annotations", "displayName", "description", "type", "replicas", "resources"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -26502,6 +26566,15 @@ func (ec *executionContext) unmarshalInputUpdateWorkerInput(ctx context.Context, return it, err } it.Type = data + case "replicas": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("replicas")) + data, err := ec.unmarshalOString2ᚖstring(ctx, v) + if err != nil { + return it, err + } + it.Replicas = data case "resources": var err error @@ -30935,6 +31008,8 @@ func (ec *executionContext) _Worker(ctx context.Context, sel ast.SelectionSet, o if out.Values[i] == graphql.Null { out.Invalids++ } + case "replicas": + out.Values[i] = ec._Worker_replicas(ctx, field, obj) case "resources": out.Values[i] = ec._Worker_resources(ctx, field, obj) if out.Values[i] == graphql.Null { diff --git a/apiserver/graph/generated/models_gen.go b/apiserver/graph/generated/models_gen.go index 2f53c6e30..68e6b14e7 100644 --- a/apiserver/graph/generated/models_gen.go +++ b/apiserver/graph/generated/models_gen.go @@ -1186,7 +1186,8 @@ type UpdateWorkerInput struct { // - "fastchat" : fastchat提供的通用的推理服务模式 // - "fastchat-vllm" : fastchat提供的采用VLLM推理加速的推理服务模式 // 规则: 如果为空,则不更新;如果type类型与当前类型相同,则不更新 - Type *string `json:"type,omitempty"` + Type *string `json:"type,omitempty"` + Replicas *string `json:"replicas,omitempty"` // worker运行所需的资源 Resources *ResourcesInput `json:"resources,omitempty"` } @@ -1279,6 +1280,10 @@ type Worker struct { Model TypedObjectReference `json:"model"` // worker对应的模型类型 ModelTypes string `json:"modelTypes"` + // worker运行的Pod副本数量 + // 规则: 默认为1,最大值为1 + // 规则: 为0时,即下线 + Replicas *string `json:"replicas,omitempty"` // worker运行所需的资源 // 规则: 必填 Resources Resources `json:"resources"` diff --git a/apiserver/graph/schema/worker.gql b/apiserver/graph/schema/worker.gql index 02d3562f8..74e8f50c5 100644 --- a/apiserver/graph/schema/worker.gql +++ b/apiserver/graph/schema/worker.gql @@ -28,6 +28,7 @@ query listWorkers($input: ListWorkerInput!){ } api modelTypes + replicas resources { cpu memory @@ -64,6 +65,7 @@ query getWorker($name: String!, $namespace: String!) { } api modelTypes + replicas resources { cpu memory @@ -98,6 +100,7 @@ mutation createWorker($input: CreateWorkerInput!) { } api modelTypes + replicas resources { cpu memory @@ -124,6 +127,7 @@ mutation updateWorker($input: UpdateWorkerInput) { status message updateTimestamp + replicas resources { cpu memory diff --git a/apiserver/graph/schema/worker.graphqls b/apiserver/graph/schema/worker.graphqls index d37f26644..625d9c85d 100644 --- a/apiserver/graph/schema/worker.graphqls +++ b/apiserver/graph/schema/worker.graphqls @@ -86,6 +86,13 @@ type Worker { """ modelTypes: String! + """ + worker运行的Pod副本数量 + 规则: 默认为1,最大值为1 + 规则: 为0时,即下线 + """ + replicas: String + """ worker运行所需的资源 规则: 必填 @@ -170,6 +177,8 @@ input UpdateWorkerInput { """ type: String + replicas: String + """ worker运行所需的资源 """ diff --git a/apiserver/pkg/worker/worker.go b/apiserver/pkg/worker/worker.go index d9bc8252d..156f1aa69 100644 --- a/apiserver/pkg/worker/worker.go +++ b/apiserver/pkg/worker/worker.go @@ -18,7 +18,9 @@ package worker import ( "context" + "fmt" "sort" + "strconv" "strings" "github.com/pkg/errors" @@ -60,6 +62,12 @@ func worker2model(ctx context.Context, c dynamic.Interface, obj *unstructured.Un // Unknown,Pending ,Running ,Error status := common.GetObjStatus(worker) + // replicas + var replicas string + if worker.Spec.Replicas != nil { + replicas = fmt.Sprint(worker.Spec.Replicas) + } + // resources cpu := worker.Spec.Resources.Limits[v1.ResourceCPU] cpuStr := cpu.String() @@ -91,6 +99,7 @@ func worker2model(ctx context.Context, c dynamic.Interface, obj *unstructured.Un Status: &status, CreationTimestamp: &creationtimestamp, UpdateTimestamp: &updateTime, + Replicas: &replicas, Resources: resources, ModelTypes: "unknown", API: &api, @@ -216,6 +225,16 @@ func UpdateWorker(ctx context.Context, c dynamic.Interface, input *generated.Upd } } + // replicas + if input.Replicas != nil { + replicas, err := strconv.ParseInt(*input.Replicas, 10, 32) + if err != nil { + return nil, errors.Wrap(err, "Invalid replicas") + } + replicasInt32 := int32(replicas) + worker.Spec.Replicas = &replicasInt32 + } + // resources if input.Resources != nil { // cpu & memory diff --git a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml index 54d8f6a19..552299a25 100644 --- a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml +++ b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml @@ -73,6 +73,12 @@ spec: - kind - name type: object + replicas: + default: 1 + description: Replicas of this worker instance(1 by default) + format: int32 + maximum: 1 + type: integer resources: description: Resource request&limits including - CPU or GPU - Memory properties: diff --git a/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml b/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml index ca590f611..17c81fbc2 100644 --- a/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml +++ b/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml @@ -18,6 +18,7 @@ spec: model: kind: "Models" name: "baichuan2-7b-chat" + replicas: 1 resources: limits: nvidia.com/gpu: "1" # request 1 GPU diff --git a/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml b/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml index f78ca2306..e8ff3a3da 100644 --- a/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml +++ b/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml @@ -15,6 +15,7 @@ metadata: namespace: arcadia spec: type: "fastchat" + replicas: 1 model: kind: "Models" name: "bge-large-zh-v1.5" diff --git a/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml b/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml new file mode 100644 index 000000000..5f7905b60 --- /dev/null +++ b/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml @@ -0,0 +1,14 @@ +apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1 +kind: Worker +metadata: + name: qwen-7b-chat + namespace: kubeagi-system +spec: + type: "fastchat" + model: + kind: "Models" + name: "qwen-7b-chat" + replicas: 1 + resources: + limits: + nvidia.com/gpu: "1" # request 1 GPU diff --git a/controllers/embedder_controller.go b/controllers/embedder_controller.go index 15891cb04..1794e45cd 100644 --- a/controllers/embedder_controller.go +++ b/controllers/embedder_controller.go @@ -32,8 +32,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1" "github.com/kubeagi/arcadia/pkg/embeddings" @@ -133,6 +136,26 @@ func (r *EmbedderReconciler) SetupWithManager(mgr ctrl.Manager) error { return true }, })). + Watches(&source.Kind{Type: &arcadiav1alpha1.Worker{}}, + handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + worker := o.(*arcadiav1alpha1.Worker) + model := worker.Spec.Model.DeepCopy() + if model.Namespace == nil { + model.Namespace = &worker.Namespace + } + m := &arcadiav1alpha1.Model{} + if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: *model.Namespace, Name: model.Name}, m); err != nil { + return []ctrl.Request{} + } + if m.IsEmbeddingModel() { + return []ctrl.Request{ + reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(o), + }, + } + } + return []ctrl.Request{} + })). Complete(r) } @@ -196,6 +219,9 @@ func (r *EmbedderReconciler) checkWorkerEmbedder(ctx context.Context, logger log return r.UpdateStatus(ctx, instance, nil, err) } if !worker.Status.IsReady() { + if worker.Status.IsOffline() { + return r.UpdateStatus(ctx, instance, nil, errors.New("worker is offline")) + } return r.UpdateStatus(ctx, instance, nil, errors.New("worker is not ready")) } diff --git a/controllers/llm_controller.go b/controllers/llm_controller.go index d7c52698f..0a8e32cd0 100644 --- a/controllers/llm_controller.go +++ b/controllers/llm_controller.go @@ -32,8 +32,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1" "github.com/kubeagi/arcadia/pkg/llms" @@ -118,6 +121,26 @@ func (r *LLMReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R func (r *LLMReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&arcadiav1alpha1.LLM{}, builder.WithPredicates(LLMPredicates{})). + Watches(&source.Kind{Type: &arcadiav1alpha1.Worker{}}, + handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + worker := o.(*arcadiav1alpha1.Worker) + model := worker.Spec.Model.DeepCopy() + if model.Namespace == nil { + model.Namespace = &worker.Namespace + } + m := &arcadiav1alpha1.Model{} + if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: *model.Namespace, Name: model.Name}, m); err != nil { + return []ctrl.Request{} + } + if m.IsLLMModel() { + return []ctrl.Request{ + reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(o), + }, + } + } + return []ctrl.Request{} + })). Complete(r) } @@ -182,6 +205,9 @@ func (r *LLMReconciler) checkWorkerLLM(ctx context.Context, logger logr.Logger, return r.UpdateStatus(ctx, instance, "", err) } if !worker.Status.IsReady() { + if worker.Status.IsOffline() { + return r.UpdateStatus(ctx, instance, nil, errors.New("worker is offline")) + } return r.UpdateStatus(ctx, instance, nil, errors.New("worker is not ready")) } diff --git a/controllers/worker_controller.go b/controllers/worker_controller.go index e456e882c..ced896f2f 100644 --- a/controllers/worker_controller.go +++ b/controllers/worker_controller.go @@ -22,9 +22,7 @@ import ( "github.com/go-logr/logr" "github.com/pkg/errors" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -34,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1" @@ -110,7 +109,7 @@ func (r *WorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res } // initialize labels - requeue, err := r.Initialize(ctx, log, worker) + requeue, err := r.initialize(ctx, log, worker) if err != nil { log.V(1).Info("Failed to update labels") return ctrl.Result{}, err @@ -119,13 +118,16 @@ func (r *WorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{Requeue: true}, nil } - err = r.reconcile(ctx, log, worker) + // core rereconcile for worker + reconciledWorker, err := r.reconcile(ctx, log, worker) if err != nil { log.Error(err, "Failed to reconcile worker") r.setCondition(worker, worker.ErrorCondition(err.Error())) } - if updateStatusErr := r.patchStatus(ctx, worker); updateStatusErr != nil { + // update status + updateStatusErr := r.patchStatus(ctx, reconciledWorker) + if updateStatusErr != nil { log.Error(updateStatusErr, "Failed to patch worker status") return ctrl.Result{Requeue: true}, updateStatusErr } @@ -133,7 +135,7 @@ func (r *WorkerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return ctrl.Result{}, nil } -func (r *WorkerReconciler) Initialize(ctx context.Context, logger logr.Logger, instance *arcadiav1alpha1.Worker) (bool, error) { +func (r *WorkerReconciler) initialize(ctx context.Context, logger logr.Logger, instance *arcadiav1alpha1.Worker) (bool, error) { instanceDeepCopy := instance.DeepCopy() var update bool @@ -157,104 +159,36 @@ func (r *WorkerReconciler) Initialize(ctx context.Context, logger logr.Logger, i return false, nil } -func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, worker *arcadiav1alpha1.Worker) error { - // reconcile worker instance - system, err := config.GetSystemDatasource(ctx, r.Client, nil) +func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, worker *arcadiav1alpha1.Worker) (*arcadiav1alpha1.Worker, error) { + logger.V(5).Info("GetSystemDatasource which hosts the worker's model files") + datasource, err := config.GetSystemDatasource(ctx, r.Client, nil) if err != nil { - return errors.Wrap(err, "Failed to get system datasource") + return worker, errors.Wrap(err, "Failed to get system datasource") } - w, err := arcadiaworker.NewPodWorker(ctx, r.Client, r.Scheme, worker, system) + + // Only PodWorker(hosts this worker via a single pod) supported now + w, err := arcadiaworker.NewPodWorker(ctx, r.Client, r.Scheme, worker, datasource) if err != nil { - return errors.Wrap(err, "Failed to new a pod worker") + return worker, errors.Wrap(err, "Failed to new a pod worker") } logger.V(5).Info("BeforeStart worker") if err := w.BeforeStart(ctx); err != nil { - return errors.Wrap(err, "Failed to do BeforeStart") + return w.Worker(), errors.Wrap(err, "Failed to do BeforeStart") } + logger.V(5).Info("Start worker") if err := w.Start(ctx); err != nil { - return errors.Wrap(err, "Failed to do Start") - } - logger.V(5).Info("State worker") - status, err := w.State(ctx) - if err != nil { - return errors.Wrap(err, "Failed to do State") - } - - // check & patch state - podStatus := status.(*corev1.PodStatus) - switch podStatus.Phase { - case corev1.PodRunning, corev1.PodSucceeded: - r.setCondition(worker, worker.ReadyCondition()) - case corev1.PodPending, corev1.PodUnknown: - r.setCondition(worker, worker.PendingCondition()) - case corev1.PodFailed: - r.setCondition(worker, worker.ErrorCondition("Pod failed")) - } - - worker.Status.PodStatus = *podStatus - - // further reconcile when worker is ready - if worker.Status.IsReady() { - if err := r.reconcileWhenWorkerReady(ctx, logger, worker, w.Model()); err != nil { - return errors.Wrap(err, "Failed to do further reconcile when worker is ready") - } + return w.Worker(), errors.Wrap(err, "Failed to do Start") } - return nil -} - -func (r *WorkerReconciler) reconcileWhenWorkerReady(ctx context.Context, logger logr.Logger, worker *arcadiav1alpha1.Worker, model *arcadiav1alpha1.Model) error { - // reconcile worker's Embedder when its model is a embedding model - if model.IsEmbeddingModel() { - embedder := &arcadiav1alpha1.Embedder{} - err := r.Client.Get(ctx, types.NamespacedName{Namespace: worker.Namespace, Name: worker.Name + "-worker"}, embedder) - switch arcadiaworker.ActionOnError(err) { - case arcadiaworker.Create: - // Create when not found - embedder = worker.BuildEmbedder() - if err = controllerutil.SetControllerReference(worker, embedder, r.Scheme); err != nil { - return err - } - if err = r.Client.Create(ctx, embedder); err != nil { - // Ignore error when already exists - if !k8serrors.IsAlreadyExists(err) { - return err - } - } - case arcadiaworker.Update: - // Skip update when found - case arcadiaworker.Panic: - return err - } - } - - // reconcile worker's LLM when its model is a LLM model - if model.IsLLMModel() { - llm := &arcadiav1alpha1.LLM{} - err := r.Client.Get(ctx, types.NamespacedName{Namespace: worker.Namespace, Name: worker.Name + "-worker"}, llm) - switch arcadiaworker.ActionOnError(err) { - case arcadiaworker.Create: - // Create when not found - llm = worker.BuildLLM() - if err = controllerutil.SetControllerReference(worker, llm, r.Scheme); err != nil { - return err - } - if err = r.Client.Create(ctx, llm); err != nil { - // Ignore error when already exists - if !k8serrors.IsAlreadyExists(err) { - return err - } - } - case arcadiaworker.Update: - // Skip update when found - case arcadiaworker.Panic: - return err - } + logger.V(5).Info("AfterStart worker") + err = w.AfterStart(ctx) + if err != nil { + return w.Worker(), errors.Wrap(err, "Failed to do AfterStart") } - return nil + return w.Worker(), nil } func (r *WorkerReconciler) setCondition(worker *arcadiav1alpha1.Worker, condition ...arcadiav1alpha1.Condition) *arcadiav1alpha1.Worker { @@ -287,17 +221,19 @@ func (r *WorkerReconciler) SetupWithManager(mgr ctrl.Manager) error { return !reflect.DeepEqual(oldWorker.Spec, newWorker.Spec) || newWorker.DeletionTimestamp != nil }, })). - Watches(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &arcadiav1alpha1.Worker{}, - }, builder.WithPredicates( - predicate.Funcs{ - UpdateFunc: func(ue event.UpdateEvent) bool { - oldDep := ue.ObjectOld.(*appsv1.Deployment) - newDep := ue.ObjectNew.(*appsv1.Deployment) - return !reflect.DeepEqual(oldDep.Status, newDep.Status) - }, - }, - )). + Watches(&source.Kind{Type: &corev1.Pod{}}, handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request { + pod := o.(*corev1.Pod) + if pod.Labels != nil && pod.Labels[arcadiav1alpha1.WorkerPodLabel] != "" { + return []ctrl.Request{ + reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Labels[arcadiav1alpha1.WorkerPodLabel], + }, + }, + } + } + return []ctrl.Request{} + })). Complete(r) } diff --git a/deploy/charts/arcadia/Chart.yaml b/deploy/charts/arcadia/Chart.yaml index e0c7c9be8..440cc2a3c 100644 --- a/deploy/charts/arcadia/Chart.yaml +++ b/deploy/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(KubeBB Component) for KubeAGI Arcadia type: application -version: 0.1.48 +version: 0.1.49 appVersion: "0.0.1" keywords: diff --git a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml index 54d8f6a19..552299a25 100644 --- a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml +++ b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml @@ -73,6 +73,12 @@ spec: - kind - name type: object + replicas: + default: 1 + description: Replicas of this worker instance(1 by default) + format: int32 + maximum: 1 + type: integer resources: description: Resource request&limits including - CPU or GPU - Memory properties: diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 44300a79a..081824528 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -43,6 +43,10 @@ const ( var ( ErrNotImplementedYet = errors.New("not implemented yet") ErrModelNotReady = errors.New("worker's model is not ready") + + // Default replicas for a worker + // Only support 1 for now + DefaultWorkerReplicas int32 = 1 ) type Action string @@ -64,13 +68,17 @@ func ActionOnError(err error) Action { // Worker implement the lifecycle management of a LLM worker type Worker interface { + // Worker that this is for + Worker() *arcadiav1alpha1.Worker // Model that this worker is running for Model() *arcadiav1alpha1.Model // Actions to do before start this worker BeforeStart(ctx context.Context) error - // Actiosn to do when Start this worker + // Actions to do when Start this worker Start(ctx context.Context) error + // Actions to do after start this worker + AfterStart(ctx context.Context) error // Actions to do before stop this worker BeforeStop(ctx context.Context) error @@ -83,6 +91,7 @@ type Worker interface { var _ Worker = (*PodWorker)(nil) +// PodWorker hosts this worker in a single pod but with different loader and runner based on Worker's confiugration type PodWorker struct { c client.Client s *runtime.Scheme @@ -105,8 +114,8 @@ type PodWorker struct { storage corev1.Volume } -func (worker *PodWorker) SuffixedName() string { - return worker.Name + WokerCommonSuffix +func (podWorker *PodWorker) SuffixedName() string { + return podWorker.Name + WokerCommonSuffix } func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *arcadiav1alpha1.Worker, d *arcadiav1alpha1.Datasource) (*PodWorker, error) { @@ -115,7 +124,7 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar model.Namespace = &w.Namespace } - worker := &PodWorker{ + podWorker := &PodWorker{ c: c, s: s, w: w.DeepCopy(), @@ -134,7 +143,7 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar klog.Errorf("%s/%s model is not ready", m.Namespace, m.Name) return nil, ErrModelNotReady } - worker.m = m + podWorker.m = m // default fields in a worker storage := corev1.Volume{ @@ -146,13 +155,13 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar service := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: worker.SuffixedName(), - Namespace: worker.Namespace, + Name: podWorker.SuffixedName(), + Namespace: podWorker.Namespace, }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, Selector: map[string]string{ - "app.kubernetes.io/name": worker.SuffixedName(), + arcadiav1alpha1.WorkerPodSelectorLabel: podWorker.SuffixedName(), }, Ports: []corev1.ServicePort{ {Name: "http", Port: 21002, TargetPort: intstr.Parse("http"), Protocol: corev1.ProtocolTCP}, @@ -162,22 +171,28 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar deployment := appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: worker.SuffixedName(), + Name: podWorker.SuffixedName(), Namespace: w.Namespace, }, Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app.kubernetes.io/name": worker.SuffixedName(), + arcadiav1alpha1.WorkerPodSelectorLabel: podWorker.SuffixedName(), }, }, Strategy: appsv1.DeploymentStrategy{Type: appsv1.RecreateDeploymentStrategyType}, + Replicas: &DefaultWorkerReplicas, }, } - worker.storage = storage - worker.service = service - worker.deployment = deployment + // set the worker replicas + if w.Spec.Replicas != nil { + deployment.Spec.Replicas = w.Spec.Replicas + } + + podWorker.storage = storage + podWorker.service = service + podWorker.deployment = deployment // init loader(Only oss supported yet) endpoint := d.Spec.Enpoint.DeepCopy() @@ -190,7 +205,7 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar if err != nil { return nil, fmt.Errorf("failed to new a loader with %w", err) } - worker.l = l + podWorker.l = l default: return nil, fmt.Errorf("datasource %s with type %s not supported in worker", d.Name, d.Spec.Type()) } @@ -202,82 +217,136 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar if err != nil { return nil, fmt.Errorf("failed to new a runner with %w", err) } - worker.r = r + podWorker.r = r case arcadiav1alpha1.WorkerTypeFastchatNormal: r, err := NewRunnerFastchat(c, w.DeepCopy()) if err != nil { return nil, fmt.Errorf("failed to new a runner with %w", err) } - worker.r = r + podWorker.r = r default: return nil, fmt.Errorf("worker %s with type %s not supported in worker", w.Name, w.Type()) } - return worker, nil + return podWorker, nil +} + +func (podWorker *PodWorker) Worker() *arcadiav1alpha1.Worker { + return podWorker.w } // Model that this worker is running for -func (worker *PodWorker) Model() *arcadiav1alpha1.Model { - return worker.m.DeepCopy() +func (podWorker *PodWorker) Model() *arcadiav1alpha1.Model { + return podWorker.m.DeepCopy() } -func (worker *PodWorker) BeforeStart(ctx context.Context) error { +// BeforeStart will create resources which are related to this Worker +// Now we have a pvc(if configured),service,LLM(if a llm model),Embedder(if a embedding model) +func (podWorker *PodWorker) BeforeStart(ctx context.Context) error { var err error // prepare pvc - if worker.w.Spec.Storage != nil { + if podWorker.Worker().Spec.Storage != nil { pvc := &corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ - Namespace: worker.Namespace, - Name: worker.SuffixedName(), + Namespace: podWorker.Namespace, + Name: podWorker.SuffixedName(), }, - Spec: *worker.w.Spec.Storage.DeepCopy(), + Spec: *podWorker.Worker().Spec.Storage.DeepCopy(), } - err = controllerutil.SetControllerReference(worker.w, pvc, worker.s) + err = controllerutil.SetControllerReference(podWorker.Worker(), pvc, podWorker.s) if err != nil { return fmt.Errorf("failed to set owner reference with %w", err) } - err = worker.c.Get(ctx, types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}, &corev1.PersistentVolumeClaim{}) + err = podWorker.c.Get(ctx, types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}, &corev1.PersistentVolumeClaim{}) switch ActionOnError(err) { case Panic: return err case Update: - if err = worker.c.Update(ctx, pvc); err != nil { + if err = podWorker.c.Update(ctx, pvc); err != nil { return err } case Create: - err = worker.c.Create(ctx, pvc) + err = podWorker.c.Create(ctx, pvc) if err != nil { return err } } - worker.storage = corev1.Volume{ + podWorker.storage = corev1.Volume{ Name: "models", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ - ClaimName: worker.SuffixedName(), + ClaimName: podWorker.SuffixedName(), }, }, } } // prepare svc - svc := worker.service.DeepCopy() - err = controllerutil.SetControllerReference(worker.w, svc, worker.s) + svc := podWorker.service.DeepCopy() + err = controllerutil.SetControllerReference(podWorker.Worker(), svc, podWorker.s) if err != nil { return err } - err = worker.c.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, &corev1.Service{}) + err = podWorker.c.Get(ctx, types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}, &corev1.Service{}) switch ActionOnError(err) { case Panic: return err case Update: - if err := worker.c.Update(ctx, svc); err != nil { + if err := podWorker.c.Update(ctx, svc); err != nil { return err } case Create: - if err := worker.c.Create(ctx, svc); err != nil { + if err := podWorker.c.Create(ctx, svc); err != nil { + return err + } + } + + // prepare LLM/Embedder + model := podWorker.Model() + if model.IsEmbeddingModel() { + embedder := &arcadiav1alpha1.Embedder{} + err := podWorker.c.Get(ctx, types.NamespacedName{Namespace: podWorker.Namespace, Name: podWorker.Name}, embedder) + switch ActionOnError(err) { + case Create: + // Create when not found + embedder = podWorker.Worker().BuildEmbedder() + if err = controllerutil.SetControllerReference(podWorker.Worker(), embedder, podWorker.c.Scheme()); err != nil { + return err + } + if err = podWorker.c.Create(ctx, embedder); err != nil { + // Ignore error when already exists + if !k8serrors.IsAlreadyExists(err) { + return err + } + } + case Update: + // Skip update when found + case Panic: + return err + } + } + + if model.IsLLMModel() { + llm := &arcadiav1alpha1.LLM{} + err := podWorker.c.Get(ctx, types.NamespacedName{Namespace: podWorker.Namespace, Name: podWorker.Name}, llm) + switch ActionOnError(err) { + case Create: + // Create when not found + llm = podWorker.Worker().BuildLLM() + if err = controllerutil.SetControllerReference(podWorker.Worker(), llm, podWorker.c.Scheme()); err != nil { + return err + } + if err = podWorker.c.Create(ctx, llm); err != nil { + // Ignore error when already exists + if !k8serrors.IsAlreadyExists(err) { + return err + } + } + case Update: + // Skip update when found + case Panic: return err } } @@ -285,58 +354,62 @@ func (worker *PodWorker) BeforeStart(ctx context.Context) error { return nil } -func (worker *PodWorker) Start(ctx context.Context) error { +// Start will build and create worker pod which will host model service +func (podWorker *PodWorker) Start(ctx context.Context) error { var err error // define the way to load model - loader, err := worker.l.Build(ctx, &arcadiav1alpha1.TypedObjectReference{Namespace: &worker.m.Namespace, Name: worker.m.Name}) + loader, err := podWorker.l.Build(ctx, &arcadiav1alpha1.TypedObjectReference{Namespace: &podWorker.m.Namespace, Name: podWorker.m.Name}) if err != nil { return fmt.Errorf("failed to build loader with %w", err) } conLoader, _ := loader.(*corev1.Container) // define the way to run model - runner, err := worker.r.Build(ctx, &arcadiav1alpha1.TypedObjectReference{Namespace: &worker.m.Namespace, Name: worker.m.Name}) + runner, err := podWorker.r.Build(ctx, &arcadiav1alpha1.TypedObjectReference{Namespace: &podWorker.m.Namespace, Name: podWorker.m.Name}) if err != nil { return fmt.Errorf("failed to build runner with %w", err) } conRunner, _ := runner.(*corev1.Container) // initialize deployment - desiredDep := worker.deployment.DeepCopy() - desiredDep.Spec.Template = corev1.PodTemplateSpec{ + desiredDep := podWorker.deployment.DeepCopy() + // configure pod template + podSpecTempalte := corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app.kubernetes.io/name": worker.SuffixedName(), + arcadiav1alpha1.WorkerPodSelectorLabel: podWorker.SuffixedName(), + arcadiav1alpha1.WorkerPodLabel: podWorker.Worker().Name, }, }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyAlways, InitContainers: []corev1.Container{*conLoader}, Containers: []corev1.Container{*conRunner}, - Volumes: []corev1.Volume{worker.storage}, + Volumes: []corev1.Volume{podWorker.storage}, }, } - err = controllerutil.SetControllerReference(worker.w, desiredDep, worker.s) + desiredDep.Spec.Template = podSpecTempalte + err = controllerutil.SetControllerReference(podWorker.Worker(), desiredDep, podWorker.s) if err != nil { return fmt.Errorf("failed to set owner reference with %w", err) } currDep := &appsv1.Deployment{} - err = worker.c.Get(ctx, types.NamespacedName{Namespace: desiredDep.Namespace, Name: desiredDep.Name}, currDep) + err = podWorker.c.Get(ctx, types.NamespacedName{Namespace: desiredDep.Namespace, Name: desiredDep.Name}, currDep) switch ActionOnError(err) { case Panic: return err case Update: merged := MakeMergedDeployment(currDep, desiredDep) // Update only when spec changed - err = worker.c.Patch(ctx, merged, client.MergeFrom(currDep)) + err = podWorker.c.Patch(ctx, merged, client.MergeFrom(currDep)) if err != nil { return errors.Wrap(err, "Failed to update worker") } case Create: - err = worker.c.Create(ctx, desiredDep) + err = podWorker.c.Create(ctx, desiredDep) if err != nil { return fmt.Errorf("failed to create deployment with %w", err) } @@ -349,26 +422,59 @@ func MakeMergedDeployment(target *appsv1.Deployment, desired *appsv1.Deployment) merged := target.DeepCopy() // merge this deployment with desired - merged.Spec.Template.Spec = desired.Spec.Template.Spec + merged.Spec = desired.Spec + return merged } +// Actions to do after start this worker +func (podWorker *PodWorker) AfterStart(ctx context.Context) error { + // get worker's latest state + status, err := podWorker.State(ctx) + if err != nil { + return errors.Wrap(err, "Failed to do State") + } + + // check & patch state + podStatus := status.(*corev1.PodStatus) + switch podStatus.Phase { + case corev1.PodRunning, corev1.PodSucceeded: + podWorker.Worker().Status.SetConditions(podWorker.Worker().ReadyCondition()) + case corev1.PodPending: + podWorker.Worker().Status.SetConditions(podWorker.Worker().PendingCondition()) + case corev1.PodUnknown: + // If pod is unknown and replicas is zero,then this must be offline + if *podWorker.w.Spec.Replicas == 0 { + podWorker.Worker().Status.SetConditions(podWorker.Worker().OfflineCondition()) + } else { + podWorker.Worker().Status.SetConditions(podWorker.Worker().PendingCondition()) + } + + case corev1.PodFailed: + podWorker.Worker().Status.SetConditions(podWorker.Worker().ErrorCondition("Pod failed")) + } + + podWorker.Worker().Status.PodStatus = *podStatus + + return nil +} + // TODO: BeforeStop -func (worker *PodWorker) BeforeStop(ctx context.Context) error { +func (podWorker *PodWorker) BeforeStop(ctx context.Context) error { return nil } // TODO: Stop -func (worker *PodWorker) Stop(ctx context.Context) error { +func (podWorker *PodWorker) Stop(ctx context.Context) error { return nil } // State of this worker -func (worker *PodWorker) State(ctx context.Context) (any, error) { +func (podWorker *PodWorker) State(ctx context.Context) (any, error) { podList := &corev1.PodList{} - err := worker.c.List(ctx, podList, &client.ListOptions{ + err := podWorker.c.List(ctx, podList, &client.ListOptions{ LabelSelector: labels.Set{ - "app.kubernetes.io/name": worker.SuffixedName(), + arcadiav1alpha1.WorkerPodSelectorLabel: podWorker.SuffixedName(), }.AsSelector(), })