From 5dc83dd2c6e718be5e6ef5af6b92c63f43e39608 Mon Sep 17 00:00:00 2001 From: 0xff-dev Date: Mon, 15 Jan 2024 14:21:51 +0800 Subject: [PATCH] feat: able to customize worker' images --- api/base/v1alpha1/worker_types.go | 10 +++++++ api/base/v1alpha1/zz_generated.deepcopy.go | 17 +++++++++++ .../arcadia.kubeagi.k8s.com.cn_workers.yaml | 22 ++++++++++++++ .../arcadia_v1alpha1_worker_baichuan2-7b.yaml | 6 ++++ ...dia_v1alpha1_worker_bge-large-zh-v1.5.yaml | 6 ++++ .../arcadia_v1alpha1_worker_qwen-7b-chat.yaml | 6 ++++ .../arcadia.kubeagi.k8s.com.cn_workers.yaml | 22 ++++++++++++++ pkg/worker/loader.go | 30 ++++++++++++++----- pkg/worker/runner.go | 21 ++++++++++--- pkg/worker/worker.go | 4 +-- 10 files changed, 131 insertions(+), 13 deletions(-) diff --git a/api/base/v1alpha1/worker_types.go b/api/base/v1alpha1/worker_types.go index be1dad1c1..0599e293a 100644 --- a/api/base/v1alpha1/worker_types.go +++ b/api/base/v1alpha1/worker_types.go @@ -24,6 +24,13 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +type Image struct { + Image string `json:"image,omitempty"` + + // +kubebuilder:default=IfNotPresent + ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"` +} + // WorkerSpec defines the desired state of Worker type WorkerSpec struct { CommonSpec `json:",inline"` @@ -52,6 +59,9 @@ type WorkerSpec struct { // Storage claimed to store model files Storage *corev1.PersistentVolumeClaimSpec `json:"storage,omitempty"` + + Loader Image `json:"loader"` + Runner Image `json:"runner"` } // WorkerStatus defines the observed state of Worker diff --git a/api/base/v1alpha1/zz_generated.deepcopy.go b/api/base/v1alpha1/zz_generated.deepcopy.go index 3ed9a4f39..28b5a1a28 100644 --- a/api/base/v1alpha1/zz_generated.deepcopy.go +++ b/api/base/v1alpha1/zz_generated.deepcopy.go @@ -600,6 +600,21 @@ func (in *FileStatus) DeepCopy() *FileStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Image) DeepCopyInto(out *Image) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Image. +func (in *Image) DeepCopy() *Image { + if in == nil { + return nil + } + out := new(Image) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KnowledgeBase) DeepCopyInto(out *KnowledgeBase) { *out = *in @@ -1485,6 +1500,8 @@ func (in *WorkerSpec) DeepCopyInto(out *WorkerSpec) { *out = new(v1.PersistentVolumeClaimSpec) (*in).DeepCopyInto(*out) } + out.Loader = in.Loader + out.Runner = in.Runner } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerSpec. diff --git a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml index c82505d0f..d0ff45a45 100644 --- a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml +++ b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml @@ -160,6 +160,16 @@ spec: displayName: description: DisplayName defines datasource display name type: string + loader: + properties: + image: + type: string + imagePullPolicy: + default: IfNotPresent + description: PullPolicy describes a policy for if/when to pull + a container image + type: string + type: object matchExpressions: description: NodeSelectorRequirement to schedule this worker items: @@ -243,6 +253,16 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + runner: + properties: + image: + type: string + imagePullPolicy: + default: IfNotPresent + description: PullPolicy describes a policy for if/when to pull + a container image + type: string + type: object storage: description: Storage claimed to store model files properties: @@ -411,7 +431,9 @@ spec: description: Type for this worker type: string required: + - loader - model + - runner type: object status: description: WorkerStatus defines the observed state of Worker diff --git a/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml b/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml index b3ab879cf..806b55639 100644 --- a/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml +++ b/config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml @@ -21,6 +21,12 @@ spec: kind: "Models" name: "baichuan2-7b-chat" replicas: 1 + loader: + image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z + imagePullPolicy: IfNotPresent + runner: + image: kubeagi/arcadia-fastchat-worker:v0.2.0 + imagePullPolicy: IfNotPresent resources: limits: nvidia.com/gpu: "1" # request 1 GPU diff --git a/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml b/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml index 8cf01d91e..b2589426d 100644 --- a/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml +++ b/config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml @@ -8,6 +8,12 @@ spec: description: "这是一个Embedding模型服务,由BGE提供" type: "fastchat" replicas: 1 + loader: + image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z + imagePullPolicy: IfNotPresent + runner: + image: kubeagi/arcadia-fastchat-worker:v0.2.0 + imagePullPolicy: IfNotPresent model: kind: "Models" name: "bge-large-zh-v1.5" diff --git a/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml b/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml index b1579fd8f..61dbfba44 100644 --- a/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml +++ b/config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml @@ -11,6 +11,12 @@ spec: kind: "Models" name: "qwen-7b-chat" replicas: 1 + loader: + image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z + imagePullPolicy: IfNotPresent + runner: + image: kubeagi/arcadia-fastchat-worker:v0.2.0 + imagePullPolicy: IfNotPresent resources: limits: nvidia.com/gpu: "1" # request 1 GPU diff --git a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml index c82505d0f..d0ff45a45 100644 --- a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml +++ b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml @@ -160,6 +160,16 @@ spec: displayName: description: DisplayName defines datasource display name type: string + loader: + properties: + image: + type: string + imagePullPolicy: + default: IfNotPresent + description: PullPolicy describes a policy for if/when to pull + a container image + type: string + type: object matchExpressions: description: NodeSelectorRequirement to schedule this worker items: @@ -243,6 +253,16 @@ spec: to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/' type: object type: object + runner: + properties: + image: + type: string + imagePullPolicy: + default: IfNotPresent + description: PullPolicy describes a policy for if/when to pull + a container image + type: string + type: object storage: description: Storage claimed to store model files properties: @@ -411,7 +431,9 @@ spec: description: Type for this worker type: string required: + - loader - model + - runner type: object status: description: WorkerStatus defines the observed state of Worker diff --git a/pkg/worker/loader.go b/pkg/worker/loader.go index 49f537638..741af2858 100644 --- a/pkg/worker/loader.go +++ b/pkg/worker/loader.go @@ -29,6 +29,11 @@ import ( "github.com/kubeagi/arcadia/pkg/datasource" ) +const ( + defaultLoaderImage = "kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z" + defaultRDMALoaderImage = "wetman2023/floo:23.12" +) + // ModelLoader load models for worker type ModelLoader interface { Build(ctx context.Context, model *arcadiav1alpha1.TypedObjectReference) (any, error) @@ -42,9 +47,10 @@ type LoaderOSS struct { endpoint *arcadiav1alpha1.Endpoint oss *datasource.OSS + worker *arcadiav1alpha1.Worker } -func NewLoaderOSS(ctx context.Context, c client.Client, endpoint *arcadiav1alpha1.Endpoint) (ModelLoader, error) { +func NewLoaderOSS(ctx context.Context, c client.Client, endpoint *arcadiav1alpha1.Endpoint, worker *arcadiav1alpha1.Worker) (ModelLoader, error) { if endpoint == nil { return nil, errors.New("nil oss endpoint") } @@ -58,6 +64,7 @@ func NewLoaderOSS(ctx context.Context, c client.Client, endpoint *arcadiav1alpha c: c, endpoint: endpoint, oss: oss, + worker: worker, }, nil } @@ -98,10 +105,14 @@ func (loader *LoaderOSS) Build(ctx context.Context, model *arcadiav1alpha1.Typed url = loader.endpoint.SchemeInternalURL() } + img := defaultLoaderImage + if loader.worker.Spec.Loader.Image != "" { + img = loader.worker.Spec.Loader.Image + } container := &corev1.Container{ Name: "loader", - Image: "kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z", - ImagePullPolicy: "IfNotPresent", + Image: img, + ImagePullPolicy: loader.worker.Spec.Loader.ImagePullPolicy, Command: []string{ "/bin/bash", "-c", @@ -145,20 +156,25 @@ type RDMALoader struct { workerUID string datasource *arcadiav1alpha1.Datasource + worker *arcadiav1alpha1.Worker } -func NewRDMALoader(c client.Client, modelName, workerUID string, source *arcadiav1alpha1.Datasource) *RDMALoader { - return &RDMALoader{c: c, modelName: modelName, workerUID: workerUID, datasource: source} +func NewRDMALoader(c client.Client, modelName, workerUID string, source *arcadiav1alpha1.Datasource, worker *arcadiav1alpha1.Worker) *RDMALoader { + return &RDMALoader{c: c, modelName: modelName, workerUID: workerUID, datasource: source, worker: worker} } func (r *RDMALoader) Build(ctx context.Context, _ *arcadiav1alpha1.TypedObjectReference) (any, error) { rdmaEndpoint := r.datasource.Spec.Endpoint.URL remoteBaseSavePath := r.datasource.Spec.RDMA.Path + img := defaultRDMALoaderImage + if r.worker.Spec.Loader.Image != "" { + img = r.worker.Spec.Loader.Image + } container := &corev1.Container{ Name: "rdma-loader", - Image: "wetman2023/floo:23.12", - ImagePullPolicy: "IfNotPresent", + Image: img, + ImagePullPolicy: r.worker.Spec.Loader.ImagePullPolicy, Command: []string{ "/bin/bash", "-c", diff --git a/pkg/worker/runner.go b/pkg/worker/runner.go index 749a1a5dc..60dad6d5e 100644 --- a/pkg/worker/runner.go +++ b/pkg/worker/runner.go @@ -30,6 +30,11 @@ import ( "github.com/kubeagi/arcadia/pkg/config" ) +const ( + defaultFastChatImage = "kubeagi/arcadia-fastchat-worker:v0.2.0" + defaultFastChatLLMImage = "kubeagi/arcadia-fastchat-worker:vllm-v0.2.0" +) + // ModelRunner run a model service type ModelRunner interface { // Device used when running model @@ -72,11 +77,15 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1. return nil, fmt.Errorf("failed to get arcadia config with %w", err) } + img := defaultFastChatImage + if runner.w.Spec.Runner.Image != "" { + img = runner.w.Spec.Runner.Image + } // read worker address container := &corev1.Container{ Name: "runner", - Image: "kubeagi/arcadia-fastchat-worker:v0.2.0", - ImagePullPolicy: "IfNotPresent", + Image: img, + ImagePullPolicy: runner.w.Spec.Runner.ImagePullPolicy, Env: []corev1.EnvVar{ {Name: "FASTCHAT_WORKER_NAME", Value: "fastchat.serve.model_worker"}, {Name: "FASTCHAT_WORKER_NAMESPACE", Value: runner.w.Namespace}, @@ -166,10 +175,14 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp klog.Infof("run worker with %s GPU", runner.NumberOfGPUs()) } + img := defaultFastChatLLMImage + if runner.w.Spec.Runner.Image != "" { + img = runner.w.Spec.Runner.Image + } container := &corev1.Container{ Name: "runner", - Image: "kubeagi/arcadia-fastchat-worker:vllm-v0.2.0", - ImagePullPolicy: "IfNotPresent", + Image: img, + ImagePullPolicy: runner.w.Spec.Runner.ImagePullPolicy, Env: []corev1.EnvVar{ {Name: "FASTCHAT_WORKER_NAME", Value: "fastchat.serve.vllm_worker"}, {Name: "FASTCHAT_WORKER_NAMESPACE", Value: runner.w.Namespace}, diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index c8d896716..4892daaa6 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -215,13 +215,13 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { endpoint.AuthSecret.WithNameSpace(d.Namespace) } - l, err := NewLoaderOSS(ctx, c, endpoint) + l, err := NewLoaderOSS(ctx, c, endpoint, w) if err != nil { return nil, fmt.Errorf("failed to new a loader with %w", err) } podWorker.l = l case arcadiav1alpha1.DatasourceTypeRDMA: - l := NewRDMALoader(c, w.Spec.Model.Name, string(w.GetUID()), d) + l := NewRDMALoader(c, w.Spec.Model.Name, string(w.GetUID()), d, w) podWorker.l = l default: return nil, fmt.Errorf("datasource %s with type %s not supported in worker", d.Name, d.Spec.Type())