Skip to content

Commit

Permalink
feat: able to customize worker' images
Browse files Browse the repository at this point in the history
  • Loading branch information
0xff-dev committed Jan 15, 2024
1 parent 80366f4 commit 5dc83dd
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 13 deletions.
10 changes: 10 additions & 0 deletions api/base/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ import (
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

type Image struct {
Image string `json:"image,omitempty"`

// +kubebuilder:default=IfNotPresent
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
}

// WorkerSpec defines the desired state of Worker
type WorkerSpec struct {
CommonSpec `json:",inline"`
Expand Down Expand Up @@ -52,6 +59,9 @@ type WorkerSpec struct {

// Storage claimed to store model files
Storage *corev1.PersistentVolumeClaimSpec `json:"storage,omitempty"`

Loader Image `json:"loader"`
Runner Image `json:"runner"`
}

// WorkerStatus defines the observed state of Worker
Expand Down
17 changes: 17 additions & 0 deletions api/base/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions config/crd/bases/arcadia.kubeagi.k8s.com.cn_workers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ spec:
displayName:
description: DisplayName defines datasource display name
type: string
loader:
properties:
image:
type: string
imagePullPolicy:
default: IfNotPresent
description: PullPolicy describes a policy for if/when to pull
a container image
type: string
type: object
matchExpressions:
description: NodeSelectorRequirement to schedule this worker
items:
Expand Down Expand Up @@ -243,6 +253,16 @@ spec:
to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
type: object
type: object
runner:
properties:
image:
type: string
imagePullPolicy:
default: IfNotPresent
description: PullPolicy describes a policy for if/when to pull
a container image
type: string
type: object
storage:
description: Storage claimed to store model files
properties:
Expand Down Expand Up @@ -411,7 +431,9 @@ spec:
description: Type for this worker
type: string
required:
- loader
- model
- runner
type: object
status:
description: WorkerStatus defines the observed state of Worker
Expand Down
6 changes: 6 additions & 0 deletions config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ spec:
kind: "Models"
name: "baichuan2-7b-chat"
replicas: 1
loader:
image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z
imagePullPolicy: IfNotPresent
runner:
image: kubeagi/arcadia-fastchat-worker:v0.2.0
imagePullPolicy: IfNotPresent
resources:
limits:
nvidia.com/gpu: "1" # request 1 GPU
6 changes: 6 additions & 0 deletions config/samples/arcadia_v1alpha1_worker_bge-large-zh-v1.5.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ spec:
description: "这是一个Embedding模型服务,由BGE提供"
type: "fastchat"
replicas: 1
loader:
image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z
imagePullPolicy: IfNotPresent
runner:
image: kubeagi/arcadia-fastchat-worker:v0.2.0
imagePullPolicy: IfNotPresent
model:
kind: "Models"
name: "bge-large-zh-v1.5"
6 changes: 6 additions & 0 deletions config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ spec:
kind: "Models"
name: "qwen-7b-chat"
replicas: 1
loader:
image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z
imagePullPolicy: IfNotPresent
runner:
image: kubeagi/arcadia-fastchat-worker:v0.2.0
imagePullPolicy: IfNotPresent
resources:
limits:
nvidia.com/gpu: "1" # request 1 GPU
22 changes: 22 additions & 0 deletions deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_workers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ spec:
displayName:
description: DisplayName defines datasource display name
type: string
loader:
properties:
image:
type: string
imagePullPolicy:
default: IfNotPresent
description: PullPolicy describes a policy for if/when to pull
a container image
type: string
type: object
matchExpressions:
description: NodeSelectorRequirement to schedule this worker
items:
Expand Down Expand Up @@ -243,6 +253,16 @@ spec:
to an implementation-defined value. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
type: object
type: object
runner:
properties:
image:
type: string
imagePullPolicy:
default: IfNotPresent
description: PullPolicy describes a policy for if/when to pull
a container image
type: string
type: object
storage:
description: Storage claimed to store model files
properties:
Expand Down Expand Up @@ -411,7 +431,9 @@ spec:
description: Type for this worker
type: string
required:
- loader
- model
- runner
type: object
status:
description: WorkerStatus defines the observed state of Worker
Expand Down
30 changes: 23 additions & 7 deletions pkg/worker/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import (
"github.com/kubeagi/arcadia/pkg/datasource"
)

const (
defaultLoaderImage = "kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z"
defaultRDMALoaderImage = "wetman2023/floo:23.12"
)

// ModelLoader load models for worker
type ModelLoader interface {
Build(ctx context.Context, model *arcadiav1alpha1.TypedObjectReference) (any, error)
Expand All @@ -42,9 +47,10 @@ type LoaderOSS struct {

endpoint *arcadiav1alpha1.Endpoint
oss *datasource.OSS
worker *arcadiav1alpha1.Worker
}

func NewLoaderOSS(ctx context.Context, c client.Client, endpoint *arcadiav1alpha1.Endpoint) (ModelLoader, error) {
func NewLoaderOSS(ctx context.Context, c client.Client, endpoint *arcadiav1alpha1.Endpoint, worker *arcadiav1alpha1.Worker) (ModelLoader, error) {
if endpoint == nil {
return nil, errors.New("nil oss endpoint")
}
Expand All @@ -58,6 +64,7 @@ func NewLoaderOSS(ctx context.Context, c client.Client, endpoint *arcadiav1alpha
c: c,
endpoint: endpoint,
oss: oss,
worker: worker,
}, nil
}

Expand Down Expand Up @@ -98,10 +105,14 @@ func (loader *LoaderOSS) Build(ctx context.Context, model *arcadiav1alpha1.Typed
url = loader.endpoint.SchemeInternalURL()
}

img := defaultLoaderImage
if loader.worker.Spec.Loader.Image != "" {
img = loader.worker.Spec.Loader.Image
}
container := &corev1.Container{
Name: "loader",
Image: "kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z",
ImagePullPolicy: "IfNotPresent",
Image: img,
ImagePullPolicy: loader.worker.Spec.Loader.ImagePullPolicy,
Command: []string{
"/bin/bash",
"-c",
Expand Down Expand Up @@ -145,20 +156,25 @@ type RDMALoader struct {
workerUID string

datasource *arcadiav1alpha1.Datasource
worker *arcadiav1alpha1.Worker
}

func NewRDMALoader(c client.Client, modelName, workerUID string, source *arcadiav1alpha1.Datasource) *RDMALoader {
return &RDMALoader{c: c, modelName: modelName, workerUID: workerUID, datasource: source}
func NewRDMALoader(c client.Client, modelName, workerUID string, source *arcadiav1alpha1.Datasource, worker *arcadiav1alpha1.Worker) *RDMALoader {
return &RDMALoader{c: c, modelName: modelName, workerUID: workerUID, datasource: source, worker: worker}
}

func (r *RDMALoader) Build(ctx context.Context, _ *arcadiav1alpha1.TypedObjectReference) (any, error) {
rdmaEndpoint := r.datasource.Spec.Endpoint.URL
remoteBaseSavePath := r.datasource.Spec.RDMA.Path

img := defaultRDMALoaderImage
if r.worker.Spec.Loader.Image != "" {
img = r.worker.Spec.Loader.Image
}
container := &corev1.Container{
Name: "rdma-loader",
Image: "wetman2023/floo:23.12",
ImagePullPolicy: "IfNotPresent",
Image: img,
ImagePullPolicy: r.worker.Spec.Loader.ImagePullPolicy,
Command: []string{
"/bin/bash",
"-c",
Expand Down
21 changes: 17 additions & 4 deletions pkg/worker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
"github.com/kubeagi/arcadia/pkg/config"
)

const (
defaultFastChatImage = "kubeagi/arcadia-fastchat-worker:v0.2.0"
defaultFastChatLLMImage = "kubeagi/arcadia-fastchat-worker:vllm-v0.2.0"
)

// ModelRunner run a model service
type ModelRunner interface {
// Device used when running model
Expand Down Expand Up @@ -72,11 +77,15 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1.
return nil, fmt.Errorf("failed to get arcadia config with %w", err)
}

img := defaultFastChatImage
if runner.w.Spec.Runner.Image != "" {
img = runner.w.Spec.Runner.Image
}
// read worker address
container := &corev1.Container{
Name: "runner",
Image: "kubeagi/arcadia-fastchat-worker:v0.2.0",
ImagePullPolicy: "IfNotPresent",
Image: img,
ImagePullPolicy: runner.w.Spec.Runner.ImagePullPolicy,
Env: []corev1.EnvVar{
{Name: "FASTCHAT_WORKER_NAME", Value: "fastchat.serve.model_worker"},
{Name: "FASTCHAT_WORKER_NAMESPACE", Value: runner.w.Namespace},
Expand Down Expand Up @@ -166,10 +175,14 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp
klog.Infof("run worker with %s GPU", runner.NumberOfGPUs())
}

img := defaultFastChatLLMImage
if runner.w.Spec.Runner.Image != "" {
img = runner.w.Spec.Runner.Image
}
container := &corev1.Container{
Name: "runner",
Image: "kubeagi/arcadia-fastchat-worker:vllm-v0.2.0",
ImagePullPolicy: "IfNotPresent",
Image: img,
ImagePullPolicy: runner.w.Spec.Runner.ImagePullPolicy,
Env: []corev1.EnvVar{
{Name: "FASTCHAT_WORKER_NAME", Value: "fastchat.serve.vllm_worker"},
{Name: "FASTCHAT_WORKER_NAMESPACE", Value: runner.w.Namespace},
Expand Down
4 changes: 2 additions & 2 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar
if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil {
endpoint.AuthSecret.WithNameSpace(d.Namespace)
}
l, err := NewLoaderOSS(ctx, c, endpoint)
l, err := NewLoaderOSS(ctx, c, endpoint, w)
if err != nil {
return nil, fmt.Errorf("failed to new a loader with %w", err)
}
podWorker.l = l
case arcadiav1alpha1.DatasourceTypeRDMA:
l := NewRDMALoader(c, w.Spec.Model.Name, string(w.GetUID()), d)
l := NewRDMALoader(c, w.Spec.Model.Name, string(w.GetUID()), d, w)
podWorker.l = l
default:
return nil, fmt.Errorf("datasource %s with type %s not supported in worker", d.Name, d.Spec.Type())
Expand Down

0 comments on commit 5dc83dd

Please sign in to comment.