Skip to content

Commit

Permalink
Merge pull request #984 from nkwangleiGIT/main
Browse files Browse the repository at this point in the history
fix: update vllm to v0.4.0 hotfix version
  • Loading branch information
nkwangleiGIT authored Apr 11, 2024
2 parents bca8882 + 2ef657c commit 2f838b1
Show file tree
Hide file tree
Showing 16 changed files with 41 additions and 52 deletions.
2 changes: 1 addition & 1 deletion config/samples/arcadia_v1alpha1_worker_baichuan2-7b.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z
imagePullPolicy: IfNotPresent
runner:
image: kubeagi/arcadia-fastchat-worker:v0.2.36
image: kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix
imagePullPolicy: IfNotPresent
resources:
limits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spec:
image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z
imagePullPolicy: IfNotPresent
runner:
image: kubeagi/arcadia-fastchat-worker:v0.2.36
image: kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix
imagePullPolicy: IfNotPresent
model:
kind: "Models"
Expand Down
2 changes: 1 addition & 1 deletion config/samples/arcadia_v1alpha1_worker_qwen-7b-chat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ spec:
image: kubeagi/minio-mc:RELEASE.2023-01-28T20-29-38Z
imagePullPolicy: IfNotPresent
runner:
image: kubeagi/arcadia-fastchat-worker:v0.2.36
image: kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix
imagePullPolicy: IfNotPresent
resources:
limits:
Expand Down
4 changes: 2 additions & 2 deletions config/samples/ray.io_v1_raycluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
runAsGroup: 0
fsGroup: 0
containers:
- image: kubeagi/ray-ml:2.9.3-py39-vllm
- image: kubeagi/ray-ml:2.9.3-py39-vllm-0.4.0
name: ray-head
resources:
limits:
Expand Down Expand Up @@ -48,7 +48,7 @@ spec:
app.kubernetes.io/name: kuberay
spec:
containers:
- image: kubeagi/ray-ml:2.9.3-py39-vllm
- image: kubeagi/ray-ml:2.9.3-py39-vllm-0.4.0
name: ray-worker
resources:
limits:
Expand Down
2 changes: 1 addition & 1 deletion deploy/charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(Also a KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.3.29
version: 0.3.30
appVersion: "0.2.1"

keywords:
Expand Down
3 changes: 0 additions & 3 deletions deploy/charts/llm-worker/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,3 @@ sources:
keywords:
- kubeagi
- LLMOps
maintainers:
- name: lanture1064
url: https://github.com/lanture1064
2 changes: 1 addition & 1 deletion deploy/charts/llm-worker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ image:
repository: kubeagi/arcadia-fastchat-worker
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "v0.2.0"
tag: "vllm-v0.4.0-hotfix"
env:
- name: FASTCHAT_MODEL_NAME
value: "baichuan2-7b"
Expand Down
2 changes: 1 addition & 1 deletion deploy/llms/start-worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ python3.9 -m $FASTCHAT_WORKER_NAME --model-names $FASTCHAT_REGISTRATION_MODEL_NA
--model-path $FASTCHAT_MODEL_NAME_PATH --worker-address $FASTCHAT_WORKER_ADDRESS \
--controller-address $FASTCHAT_CONTROLLER_ADDRESS \
--num-gpus $NUMBER_GPUS \
--host 0.0.0.0 --port 21002 $EXTRA_ARGS
--host 0.0.0.0 --port 21002 $SYSTEM_ARGS $EXTRA_ARGS
6 changes: 3 additions & 3 deletions pkg/appruntime/agent/streamhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ type StreamHandler struct {
var _ callbacks.Handler = StreamHandler{}

func (handler StreamHandler) HandleStreamingFunc(ctx context.Context, chunk []byte) {
if _, ok := handler.args[base.OutputAnserStreamChanKeyInArg]; ok {
if _, ok := handler.args[base.OutputAnswerStreamChanKeyInArg]; ok {
logger := klog.FromContext(ctx)
streamChan, ok := handler.args[base.OutputAnserStreamChanKeyInArg].(chan string)
streamChan, ok := handler.args[base.OutputAnswerStreamChanKeyInArg].(chan string)
if !ok {
err := fmt.Errorf("answer_stream is not chan string, but %T", handler.args[base.OutputAnserStreamChanKeyInArg])
err := fmt.Errorf("answer_stream is not chan string, but %T", handler.args[base.OutputAnswerStreamChanKeyInArg])
logger.Error(err, "answer_stream is not chan string")
return
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/appruntime/app_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (a *Application) Run(ctx context.Context, cli client.Client, respStream cha
out := map[string]any{
base.InputQuestionKeyInArg: input.Question,
"files": input.Files,
base.OutputAnserStreamChanKeyInArg: respStream,
base.OutputAnswerStreamChanKeyInArg: respStream,
base.InputIsNeedStreamKeyInArg: input.NeedStream,
base.LangchaingoChatMessageHistoryKeyInArg: input.History,
// Use an empty context before run
Expand Down Expand Up @@ -205,7 +205,7 @@ func (a *Application) Run(ctx context.Context, cli client.Client, respStream cha
var er *base.RetrieverGetNullDocError
if errors.As(err, &er) {
agentReturnNothing := true
v, ok := out[base.OutputAnserKeyInArg]
v, ok := out[base.OutputAnswerKeyInArg]
if ok {
if answer, ok := v.(string); ok && len(answer) > 0 {
agentReturnNothing = false
Expand All @@ -229,7 +229,7 @@ func (a *Application) Run(ctx context.Context, cli client.Client, respStream cha
waitRunningNodes.PushBack(n)
}
}
if a, ok := out[base.OutputAnserKeyInArg]; ok {
if a, ok := out[base.OutputAnswerKeyInArg]; ok {
if answer, ok := a.(string); ok && len(answer) > 0 {
output = Output{Answer: answer}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/appruntime/base/keyword.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ const (
InputQuestionKeyInArg = "question"
InputIsNeedStreamKeyInArg = "_need_stream"
LangchaingoChatMessageHistoryKeyInArg = "_history"
OutputAnserKeyInArg = "_answer"
OutputAnswerKeyInArg = "_answer"
AgentOutputInArg = "_agent_answer"
MapReduceDocumentOutputInArg = "_mapreduce_document_answer"
OutputAnserStreamChanKeyInArg = "_answer_stream"
OutputAnswerStreamChanKeyInArg = "_answer_stream"
RuntimeRetrieverReferencesKeyInArg = "_references"
LangchaingoRetrieverKeyInArg = "retriever"
LangchaingoLLMKeyInArg = "llm"
Expand Down
2 changes: 1 addition & 1 deletion pkg/appruntime/chain/apichain.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (l *APIChain) Run(ctx context.Context, _ client.Client, args map[string]any
out, err = handleNoErrNoOut(ctx, needStream, out, err, l.APIChain, args, options)
klog.FromContext(ctx).V(5).Info("use apichain, blocking out:" + out)
if err == nil {
args[base.OutputAnserKeyInArg] = out
args[base.OutputAnswerKeyInArg] = out
return args, nil
}
return args, fmt.Errorf("apichain run error: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/appruntime/chain/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import (

func stream(res map[string]any) func(ctx context.Context, chunk []byte) error {
return func(ctx context.Context, chunk []byte) error {
if _, ok := res[base.OutputAnserStreamChanKeyInArg]; ok {
if _, ok := res[base.OutputAnswerStreamChanKeyInArg]; ok {
logger := klog.FromContext(ctx)
streamChan, ok := res[base.OutputAnserStreamChanKeyInArg].(chan string)
streamChan, ok := res[base.OutputAnswerStreamChanKeyInArg].(chan string)
if !ok {
err := fmt.Errorf("answer_stream is not chan string, but %T", res[base.OutputAnserStreamChanKeyInArg])
err := fmt.Errorf("answer_stream is not chan string, but %T", res[base.OutputAnswerStreamChanKeyInArg])
logger.Error(err, "answer_stream is not chan string")
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/appruntime/chain/llmchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (l *LLMChain) Run(ctx context.Context, cli client.Client, args map[string]a
out, err = handleNoErrNoOut(ctx, needStream, out, err, l.LLMChain, args, options)
klog.FromContext(ctx).V(5).Info("use llmchain, blocking out:" + out)
if err == nil {
args[base.OutputAnserKeyInArg] = out
args[base.OutputAnswerKeyInArg] = out
return args, nil
}
return args, fmt.Errorf("llmchain run error: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/appruntime/chain/retrievalqachain.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (l *RetrievalQAChain) Run(ctx context.Context, cli client.Client, args map[
out, err = handleNoErrNoOut(ctx, needStream, out, err, l.ConversationalRetrievalQA, args, options)
klog.FromContext(ctx).V(5).Info("use retrievalqachain, blocking out:" + out)
if err == nil {
args[base.OutputAnserKeyInArg] = out
args[base.OutputAnswerKeyInArg] = out
return args, nil
}
return args, fmt.Errorf("retrievalqachain run error: %w", err)
Expand Down
46 changes: 19 additions & 27 deletions pkg/worker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (

const (
// tag is the same version as fastchat
defaultFastChatImage = "kubeagi/arcadia-fastchat-worker:v0.2.36"
defaultFastChatImage = "kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix"
// For ease of maintenance and stability, VLLM module is now included in standard image as a default feature.
defaultFastchatVLLMImage = "kubeagi/arcadia-fastchat-worker:v0.2.36"
defaultFastchatVLLMImage = "kubeagi/arcadia-fastchat-worker:vllm-v0.4.0-hotfix"
// defaultKubeAGIImage for RunnerKubeAGI
defaultKubeAGIImage = "kubeagi/core-library-cli:v0.0.1"

Expand Down Expand Up @@ -93,24 +93,16 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1.
return nil, fmt.Errorf("failed to get arcadia config with %w", err)
}

extraAgrs := ""
for _, envItem := range runner.w.Spec.AdditionalEnvs {
if envItem.Name == "EXTRA_ARGS" {
extraAgrs = envItem.Value
break
}
}

modelFileDir := fmt.Sprintf("%s/%s", defaultModelMountPath, model.Name)
additionalEnvs := []corev1.EnvVar{}
extraArgs := fmt.Sprintf("--device %s %s", runner.Device().String(), extraAgrs)
systemArgs := fmt.Sprintf("--device %s", runner.Device().String())
if runner.modelFileFromRemote {
m := arcadiav1alpha1.Model{}
if err := runner.c.Get(ctx, types.NamespacedName{Namespace: *model.Namespace, Name: model.Name}, &m); err != nil {
return nil, err
}
if m.Spec.Revision != "" {
extraArgs += fmt.Sprintf(" --revision %s ", m.Spec.Revision)
systemArgs += fmt.Sprintf(" --revision %s ", m.Spec.Revision)
}
if m.Spec.ModelSource == modelSourceFromHugginfFace {
modelFileDir = m.Spec.HuggingFaceRepo
Expand Down Expand Up @@ -139,7 +131,6 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1.
{Name: "FASTCHAT_WORKER_ADDRESS", Value: fmt.Sprintf("http://%s.%s:%d", runner.w.Name+WokerCommonSuffix, runner.w.Namespace, arcadiav1alpha1.DefaultWorkerPort)},
{Name: "FASTCHAT_CONTROLLER_ADDRESS", Value: gw.Controller},
{Name: "NUMBER_GPUS", Value: runner.NumberOfGPUs()},
{Name: "EXTRA_ARGS", Value: extraArgs},
},
Ports: []corev1.ContainerPort{
{Name: "http", ContainerPort: arcadiav1alpha1.DefaultWorkerPort},
Expand All @@ -149,6 +140,7 @@ func (runner *RunnerFastchat) Build(ctx context.Context, model *arcadiav1alpha1.
},
Resources: runner.w.Spec.Resources,
}
additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "SYSTEM_ARGS", Value: systemArgs})

container.Env = append(container.Env, additionalEnvs...)
return container, nil
Expand Down Expand Up @@ -193,12 +185,12 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp
return nil, fmt.Errorf("failed to get arcadia config with %w", err)
}

extraAgrs := ""
systemArgs := ""
additionalEnvs := []corev1.EnvVar{}

// configure ray cluster
resources := runner.w.Spec.Resources
gpus := runner.NumberOfGPUs()
gpuEnvExist := false
// default ray cluster which can only utilize gpus on single nodes
rayCluster := config.DefaultRayCluster()
for _, envItem := range runner.w.Spec.AdditionalEnvs {
Expand All @@ -223,12 +215,10 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp
// By default, gpu_memory_utilization will be 0.9
if envItem.Name == "GPU_MEMORY_UTILIZATION" {
gpuMemoryUtilization, _ := strconv.ParseFloat(envItem.Value, 64)
extraAgrs += fmt.Sprintf(" --gpu_memory_utilization %f", gpuMemoryUtilization)
systemArgs += fmt.Sprintf(" --gpu_memory_utilization %f", gpuMemoryUtilization)
}

// extra arguments to run llm
if envItem.Name == "EXTRA_ARGS" {
extraAgrs = envItem.Value
if envItem.Name == "NUMBER_GPUS" {
gpuEnvExist = true
}
}
klog.V(5).Infof("run worker with raycluster:\n %s", rayCluster.String())
Expand All @@ -245,20 +235,16 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp
Name: "PYTHON_VERSION",
Value: rayCluster.GetPythonVersion(),
})
// Set gpu number to the number of GPUs in the worker's resource
additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "NUMBER_GPUS", Value: gpus})

modelFileDir := fmt.Sprintf("%s/%s", defaultModelMountPath, model.Name)
// --enforce-eager to disable cupy
// TODO: remove --enforce-eager when https://github.com/kubeagi/arcadia/issues/878 is fixed
extraAgrs = fmt.Sprintf("%s --trust-remote-code --enforce-eager", extraAgrs)
systemArgs = fmt.Sprintf("%s --trust-remote-code", systemArgs)
if runner.modelFileFromRemote {
m := arcadiav1alpha1.Model{}
if err := runner.c.Get(ctx, types.NamespacedName{Namespace: *model.Namespace, Name: model.Name}, &m); err != nil {
return nil, err
}
if m.Spec.Revision != "" {
extraAgrs += fmt.Sprintf(" --revision %s", m.Spec.Revision)
systemArgs += fmt.Sprintf(" --revision %s", m.Spec.Revision)
}
if m.Spec.ModelSource == modelSourceFromHugginfFace {
modelFileDir = m.Spec.HuggingFaceRepo
Expand All @@ -285,7 +271,6 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp
{Name: "FASTCHAT_MODEL_NAME", Value: model.Name},
{Name: "FASTCHAT_WORKER_ADDRESS", Value: fmt.Sprintf("http://%s.%s:%d", runner.w.Name+WokerCommonSuffix, runner.w.Namespace, arcadiav1alpha1.DefaultWorkerPort)},
{Name: "FASTCHAT_CONTROLLER_ADDRESS", Value: gw.Controller},
{Name: "EXTRA_ARGS", Value: extraAgrs},
},
Ports: []corev1.ContainerPort{
{Name: "http", ContainerPort: arcadiav1alpha1.DefaultWorkerPort},
Expand All @@ -297,6 +282,13 @@ func (runner *RunnerFastchatVLLM) Build(ctx context.Context, model *arcadiav1alp
},
Resources: resources,
}
if !gpuEnvExist {
// if env doesn't exist, set gpu number to the number of GPUs in the worker's resource
additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "NUMBER_GPUS", Value: runner.NumberOfGPUs()})
}

additionalEnvs = append(additionalEnvs, corev1.EnvVar{Name: "SYSTEM_ARGS", Value: systemArgs})

container.Env = append(container.Env, additionalEnvs...)
return container, nil
}
Expand Down

0 comments on commit 2f838b1

Please sign in to comment.