diff --git a/api/base/v1alpha1/datasource_types.go b/api/base/v1alpha1/datasource_types.go index 734386510..9c1d059de 100644 --- a/api/base/v1alpha1/datasource_types.go +++ b/api/base/v1alpha1/datasource_types.go @@ -38,10 +38,12 @@ type DatasourceSpec struct { } type RDMA struct { - // Path on a model storage server, the usual storage path is /path/ns/mode-name, and the path field is /path/, which must end in /. + // ServerSavePath on a model storage server, the usual storage path is /path/ns/mode-name, and the path field is /path/, which must end in /. // example: /opt/kubeagi/, /opt/, / // +kubebuilder:validation:Pattern=(^\/$)|(^\/[a-zA-Z0-9\_.@-]+(\/[a-zA-Z0-9\_.@-]+)*\/$) - Path string `json:"path"` + ServerSavePath string `json:"path"` + + NodePaths map[string]string `json:"nodePaths,omitempty"` } // OSS defines info for object storage service as datasource diff --git a/api/base/v1alpha1/zz_generated.deepcopy.go b/api/base/v1alpha1/zz_generated.deepcopy.go index 8d870cce1..c27531b81 100644 --- a/api/base/v1alpha1/zz_generated.deepcopy.go +++ b/api/base/v1alpha1/zz_generated.deepcopy.go @@ -357,7 +357,7 @@ func (in *DatasourceSpec) DeepCopyInto(out *DatasourceSpec) { if in.RDMA != nil { in, out := &in.RDMA, &out.RDMA *out = new(RDMA) - **out = **in + (*in).DeepCopyInto(*out) } } @@ -1087,6 +1087,13 @@ func (in *Provider) DeepCopy() *Provider { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RDMA) DeepCopyInto(out *RDMA) { *out = *in + if in.NodePaths != nil { + in, out := &in.NodePaths, &out.NodePaths + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RDMA. diff --git a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_datasources.yaml b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_datasources.yaml index 44784922c..0cf0ca1ff 100644 --- a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_datasources.yaml +++ b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_datasources.yaml @@ -103,10 +103,14 @@ spec: description: RDMA configure RDMA pulls the model file directly from the remote service to the host node. properties: + nodePaths: + additionalProperties: + type: string + type: object path: - description: 'Path on a model storage server, the usual storage - path is /path/ns/mode-name, and the path field is /path/, which - must end in /. example: /opt/kubeagi/, /opt/, /' + description: 'ServerSavePath on a model storage server, the usual + storage path is /path/ns/mode-name, and the path field is /path/, + which must end in /. example: /opt/kubeagi/, /opt/, /' pattern: (^\/$)|(^\/[a-zA-Z0-9\_.@-]+(\/[a-zA-Z0-9\_.@-]+)*\/$) type: string required: diff --git a/controllers/datasource_controller.go b/controllers/datasource_controller.go index 4ffa836eb..236980efb 100644 --- a/controllers/datasource_controller.go +++ b/controllers/datasource_controller.go @@ -169,6 +169,8 @@ func (r *DatasourceReconciler) Checkdatasource(ctx context.Context, logger logr. return r.UpdateStatus(ctx, instance, err) } info = instance.Spec.OSS.DeepCopy() + case arcadiav1alpha1.DatasourceTypeRDMA: + return r.UpdateStatus(ctx, instance, nil) default: ds, err = datasource.NewUnknown(ctx, r.Client) if err != nil { diff --git a/controllers/worker_controller.go b/controllers/worker_controller.go index ced896f2f..6c33ef6fb 100644 --- a/controllers/worker_controller.go +++ b/controllers/worker_controller.go @@ -161,9 +161,29 @@ func (r *WorkerReconciler) initialize(ctx context.Context, logger logr.Logger, i func (r *WorkerReconciler) reconcile(ctx context.Context, logger logr.Logger, worker *arcadiav1alpha1.Worker) (*arcadiav1alpha1.Worker, error) { logger.V(5).Info("GetSystemDatasource which hosts the worker's model files") - datasource, err := config.GetSystemDatasource(ctx, r.Client, nil) - if err != nil { - return worker, errors.Wrap(err, "Failed to get system datasource") + + m := arcadiav1alpha1.Model{} + ns := worker.Namespace + if worker.Spec.Model.Namespace != nil && *worker.Spec.Model.Namespace != "" { + ns = *worker.Spec.Model.Namespace + } + if err := r.Client.Get(ctx, types.NamespacedName{Name: worker.Spec.Model.Name, Namespace: ns}, &m); err != nil { + return worker, errors.Wrap(err, "failed to get model") + } + + var ( + datasource = &arcadiav1alpha1.Datasource{} + err error + ) + if m.Spec.Source != nil { + if err = r.Client.Get(ctx, types.NamespacedName{Namespace: ns, Name: m.Spec.Source.Name}, datasource); err != nil { + return worker, errors.Wrap(err, "model config datasource, but get it failed.") + } + } else { + datasource, err = config.GetSystemDatasource(ctx, r.Client, nil) + if err != nil { + return worker, errors.Wrap(err, "Failed to get system datasource") + } } // Only PodWorker(hosts this worker via a single pod) supported now diff --git a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_datasources.yaml b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_datasources.yaml index 44784922c..0cf0ca1ff 100644 --- a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_datasources.yaml +++ b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_datasources.yaml @@ -103,10 +103,14 @@ spec: description: RDMA configure RDMA pulls the model file directly from the remote service to the host node. properties: + nodePaths: + additionalProperties: + type: string + type: object path: - description: 'Path on a model storage server, the usual storage - path is /path/ns/mode-name, and the path field is /path/, which - must end in /. example: /opt/kubeagi/, /opt/, /' + description: 'ServerSavePath on a model storage server, the usual + storage path is /path/ns/mode-name, and the path field is /path/, + which must end in /. example: /opt/kubeagi/, /opt/, /' pattern: (^\/$)|(^\/[a-zA-Z0-9\_.@-]+(\/[a-zA-Z0-9\_.@-]+)*\/$) type: string required: diff --git a/pkg/worker/loader.go b/pkg/worker/loader.go index 08ef616dc..ce2fd1b5f 100644 --- a/pkg/worker/loader.go +++ b/pkg/worker/loader.go @@ -132,3 +132,55 @@ type LoaderGit struct{} func (loader *LoaderGit) Build(ctx context.Context, model *arcadiav1alpha1.TypedObjectReference) (any, error) { return nil, ErrNotImplementedYet } + +var _ ModelLoader = (*RDMALoader)(nil) + +// RDMALoader Support for RDMA. +// Allow Pod to user hostpath and RDMA to pull models faster and start services +type RDMALoader struct { + c client.Client + + modelName string + // workerUID/modelname is the local model storage path + workerUID string + + datasource *arcadiav1alpha1.Datasource +} + +func NewRDMALoader(c client.Client, modelName, workerUID string, source *arcadiav1alpha1.Datasource) *RDMALoader { + return &RDMALoader{c: c, modelName: modelName, workerUID: workerUID, datasource: source} +} + +func (r *RDMALoader) Build(ctx context.Context, _ *arcadiav1alpha1.TypedObjectReference) (any, error) { + rdmaEndpoint := r.datasource.Spec.Endpoint.URL + remoteBaseSavePath := r.datasource.Spec.RDMA.ServerSavePath + + container := &corev1.Container{ + Name: "rdma-loader", + Image: "wetman2023/floo:23.12", + ImagePullPolicy: "IfNotPresent", + Command: []string{ + "/bin/bash", + "-c", + // pulls files from the service's 'rdmaEndpoint:/remoteBaseSavePath/modelName' directory to the local 'UID' directory. + fmt.Sprintf("floo_get --from=%s --to=$TO --srv=%s --dir=%s%s", rdmaEndpoint, r.workerUID, remoteBaseSavePath, r.modelName), + }, + Env: []corev1.EnvVar{ + { + Name: "TO", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "status.hostIP", + }, + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "tmp", + MountPath: "/tmp", + }, + }, + } + return container, nil +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 83b4f8e72..7465cd476 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -38,6 +38,8 @@ import ( const ( WokerCommonSuffix = "-worker" + + RDMANodeLabel = "arcadia.kubeagi.k8s.com.cn/rdma" ) var ( @@ -152,6 +154,17 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar EmptyDir: &corev1.EmptyDirVolumeSource{}, }, } + if d.Spec.Type() == arcadiav1alpha1.DatasourceTypeRDMA { + storage = corev1.Volume{ + Name: "models", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + // /rdma/abc/uid -> /data/models + Path: fmt.Sprintf("%s/%s", d.Spec.RDMA.ServerSavePath, w.GetUID()), + }, + }, + } + } service := corev1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -194,18 +207,21 @@ func NewPodWorker(ctx context.Context, c client.Client, s *runtime.Scheme, w *ar podWorker.service = service podWorker.deployment = deployment - // init loader(Only oss supported yet) - endpoint := d.Spec.Endpoint.DeepCopy() - if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { - endpoint.AuthSecret.WithNameSpace(d.Namespace) - } switch d.Spec.Type() { case arcadiav1alpha1.DatasourceTypeOSS: + // init loader(Only oss supported yet) + endpoint := d.Spec.Endpoint.DeepCopy() + if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { + endpoint.AuthSecret.WithNameSpace(d.Namespace) + } l, err := NewLoaderOSS(ctx, c, endpoint) if err != nil { return nil, fmt.Errorf("failed to new a loader with %w", err) } podWorker.l = l + case arcadiav1alpha1.DatasourceTypeRDMA: + l := NewRDMALoader(c, w.Spec.Model.Name, string(w.GetUID()), d) + podWorker.l = l default: return nil, fmt.Errorf("datasource %s with type %s not supported in worker", d.Name, d.Spec.Type()) } @@ -244,9 +260,8 @@ func (podWorker *PodWorker) Model() *arcadiav1alpha1.Model { // Now we have a pvc(if configured),service,LLM(if a llm model),Embedder(if a embedding model) func (podWorker *PodWorker) BeforeStart(ctx context.Context) error { var err error - - // prepare pvc - if podWorker.Worker().Spec.Storage != nil { + // If the local directory is mounted, there is no need to create the pvc + if podWorker.Worker().Spec.Storage != nil && podWorker.storage.HostPath == nil { pvc := &corev1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Namespace: podWorker.Namespace, @@ -372,6 +387,16 @@ func (podWorker *PodWorker) Start(ctx context.Context) error { } conRunner, _ := runner.(*corev1.Container) + if podWorker.storage.HostPath != nil { + conRunner.Lifecycle = &corev1.Lifecycle{ + PreStop: &corev1.LifecycleHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/bash", "-c", fmt.Sprintf("rm -rf /data/models/%s", podWorker.Model().Name)}, + }, + }, + } + } + // initialize deployment desiredDep := podWorker.deployment.DeepCopy() // configure pod template @@ -389,6 +414,33 @@ func (podWorker *PodWorker) Start(ctx context.Context) error { Volumes: []corev1.Volume{podWorker.storage}, }, } + if podWorker.storage.HostPath != nil { + podSpecTempalte.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Operator: corev1.NodeSelectorOpExists, + Key: RDMANodeLabel, + }, + }, + }, + }, + }, + }, + } + podSpecTempalte.Spec.Volumes = append(podSpecTempalte.Spec.Volumes, corev1.Volume{ + Name: "tmp", + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/tmp", + }, + }, + }) + } + desiredDep.Spec.Template = podSpecTempalte err = controllerutil.SetControllerReference(podWorker.Worker(), desiredDep, podWorker.s) if err != nil {