diff --git a/api/evaluation/v1alpha1/common.go b/api/evaluation/v1alpha1/common.go index fd6da9669..cef526144 100644 --- a/api/evaluation/v1alpha1/common.go +++ b/api/evaluation/v1alpha1/common.go @@ -16,6 +16,11 @@ limitations under the License. package v1alpha1 +import ( + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" +) + type MetricsKind string const ( @@ -102,3 +107,30 @@ func RagStatusChanged(a, b RAGStatus) bool { return ac[0].Type != bc[0].Type || ac[0].Status != bc[0].Status || ac[0].Reason != bc[0].Reason || ac[0].Message != bc[0].Message } + +const ( + ING = "ing" // evaluating + COMPLETE = "complete" + FAILED = "failed" + SUSPEND = "suspend" +) + +func RagStatus(rag *RAG) (string, RAGPhase, string) { + phase := rag.Status.Phase + status, phaseMsg := ING, "" + + if len(rag.Status.Conditions) > 0 { + cond := rag.Status.Conditions[0] + phaseMsg = rag.Status.Conditions[0].Message + if phase == CompletePhase && cond.Type == batchv1.JobComplete && cond.Status == corev1.ConditionTrue { + status = COMPLETE + } + if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue { + status = FAILED + } + } + if rag.Spec.Suspend { + status = SUSPEND + } + return status, phase, phaseMsg +} diff --git a/apiserver/graph/generated/generated.go b/apiserver/graph/generated/generated.go index 6add886df..421c80fbf 100644 --- a/apiserver/graph/generated/generated.go +++ b/apiserver/graph/generated/generated.go @@ -604,8 +604,8 @@ type ComplexityRoot struct { RAGMutation struct { CreateRag func(childComplexity int, input CreateRAGInput) int - DeleteRag func(childComplexity int, input *DeleteRAGInput) int - UpdateRag func(childComplexity int, input *UpdateRAGInput) int + DeleteRag func(childComplexity int, input DeleteRAGInput) int + UpdateRag func(childComplexity int, input UpdateRAGInput) int } RAGQuery struct { @@ -877,8 +877,8 @@ type RAGResolver interface { } type RAGMutationResolver interface { CreateRag(ctx context.Context, obj *RAGMutation, input CreateRAGInput) (*Rag, error) - UpdateRag(ctx context.Context, obj *RAGMutation, input *UpdateRAGInput) (*Rag, error) - DeleteRag(ctx context.Context, obj *RAGMutation, input *DeleteRAGInput) (*string, error) + UpdateRag(ctx context.Context, obj *RAGMutation, input UpdateRAGInput) (*Rag, error) + DeleteRag(ctx context.Context, obj *RAGMutation, input DeleteRAGInput) (*string, error) } type RAGQueryResolver interface { GetRag(ctx context.Context, obj *RAGQuery, name string, namespace string) (*Rag, error) @@ -3738,7 +3738,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.RAGMutation.DeleteRag(childComplexity, args["input"].(*DeleteRAGInput)), true + return e.complexity.RAGMutation.DeleteRag(childComplexity, args["input"].(DeleteRAGInput)), true case "RAGMutation.updateRAG": if e.complexity.RAGMutation.UpdateRag == nil { @@ -3750,7 +3750,7 @@ func (e *executableSchema) Complexity(typeName, field string, childComplexity in return 0, false } - return e.complexity.RAGMutation.UpdateRag(childComplexity, args["input"].(*UpdateRAGInput)), true + return e.complexity.RAGMutation.UpdateRag(childComplexity, args["input"].(UpdateRAGInput)), true case "RAGQuery.getRAG": if e.complexity.RAGQuery.GetRag == nil { @@ -5908,7 +5908,7 @@ type PersistentVolumeClaimSpec { accessModes: [String!] selector: Selector resources: Resource - volumeName: String! + volumeName: String storageClassName: String volumeMode: String datasource: TypedObjectReference @@ -6786,7 +6786,7 @@ type RAG { } input CreateRAGInput { - name: String! + name: String namespace: String! labels: Map annotations: Map @@ -6830,12 +6830,23 @@ input ListRAGInput { status: String """根据名字,displayName字段获取""" keyword: String + """ + 分页页码, + 规则: 从1开始,默认是1 + """ + page: Int + + """ + 每页数量, + 规则: 默认10 + """ + pageSize: Int } type RAGMutation { createRAG(input: CreateRAGInput!): RAG! - updateRAG(input: UpdateRAGInput): RAG! - deleteRAG(input: DeleteRAGInput): Void + updateRAG(input: UpdateRAGInput!): RAG! + deleteRAG(input: DeleteRAGInput!): Void } type RAGQuery { @@ -8321,10 +8332,10 @@ func (ec *executionContext) field_RAGMutation_createRAG_args(ctx context.Context func (ec *executionContext) field_RAGMutation_deleteRAG_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} - var arg0 *DeleteRAGInput + var arg0 DeleteRAGInput if tmp, ok := rawArgs["input"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("input")) - arg0, err = ec.unmarshalODeleteRAGInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDeleteRAGInput(ctx, tmp) + arg0, err = ec.unmarshalNDeleteRAGInput2githubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDeleteRAGInput(ctx, tmp) if err != nil { return nil, err } @@ -8336,10 +8347,10 @@ func (ec *executionContext) field_RAGMutation_deleteRAG_args(ctx context.Context func (ec *executionContext) field_RAGMutation_updateRAG_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} - var arg0 *UpdateRAGInput + var arg0 UpdateRAGInput if tmp, ok := rawArgs["input"]; ok { ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("input")) - arg0, err = ec.unmarshalOUpdateRAGInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐUpdateRAGInput(ctx, tmp) + arg0, err = ec.unmarshalNUpdateRAGInput2githubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐUpdateRAGInput(ctx, tmp) if err != nil { return nil, err } @@ -23887,14 +23898,11 @@ func (ec *executionContext) _PersistentVolumeClaimSpec_volumeName(ctx context.Co return graphql.Null } if resTmp == nil { - if !graphql.HasFieldError(ctx, fc) { - ec.Errorf(ctx, "must not be null") - } return graphql.Null } - res := resTmp.(string) + res := resTmp.(*string) fc.Result = res - return ec.marshalNString2string(ctx, field.Selections, res) + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) } func (ec *executionContext) fieldContext_PersistentVolumeClaimSpec_volumeName(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { @@ -26211,7 +26219,7 @@ func (ec *executionContext) _RAGMutation_updateRAG(ctx context.Context, field gr }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.RAGMutation().UpdateRag(rctx, obj, fc.Args["input"].(*UpdateRAGInput)) + return ec.resolvers.RAGMutation().UpdateRag(rctx, obj, fc.Args["input"].(UpdateRAGInput)) }) if err != nil { ec.Error(ctx, err) @@ -26306,7 +26314,7 @@ func (ec *executionContext) _RAGMutation_deleteRAG(ctx context.Context, field gr }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.resolvers.RAGMutation().DeleteRag(rctx, obj, fc.Args["input"].(*DeleteRAGInput)) + return ec.resolvers.RAGMutation().DeleteRag(rctx, obj, fc.Args["input"].(DeleteRAGInput)) }) if err != nil { ec.Error(ctx, err) @@ -33164,7 +33172,7 @@ func (ec *executionContext) unmarshalInputCreateRAGInput(ctx context.Context, ob var err error ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("name")) - data, err := ec.unmarshalNString2string(ctx, v) + data, err := ec.unmarshalOString2ᚖstring(ctx, v) if err != nil { return it, err } @@ -34648,7 +34656,7 @@ func (ec *executionContext) unmarshalInputListRAGInput(ctx context.Context, obj asMap[k] = v } - fieldsInOrder := [...]string{"appName", "namespace", "status", "keyword"} + fieldsInOrder := [...]string{"appName", "namespace", "status", "keyword", "page", "pageSize"} for _, k := range fieldsInOrder { v, ok := asMap[k] if !ok { @@ -34691,6 +34699,24 @@ func (ec *executionContext) unmarshalInputListRAGInput(ctx context.Context, obj return it, err } it.Keyword = data + case "page": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("page")) + data, err := ec.unmarshalOInt2ᚖint(ctx, v) + if err != nil { + return it, err + } + it.Page = data + case "pageSize": + var err error + + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("pageSize")) + data, err := ec.unmarshalOInt2ᚖint(ctx, v) + if err != nil { + return it, err + } + it.PageSize = data } } @@ -41277,9 +41303,6 @@ func (ec *executionContext) _PersistentVolumeClaimSpec(ctx context.Context, sel out.Values[i] = ec._PersistentVolumeClaimSpec_resources(ctx, field, obj) case "volumeName": out.Values[i] = ec._PersistentVolumeClaimSpec_volumeName(ctx, field, obj) - if out.Values[i] == graphql.Null { - out.Invalids++ - } case "storageClassName": out.Values[i] = ec._PersistentVolumeClaimSpec_storageClassName(ctx, field, obj) case "volumeMode": @@ -43892,6 +43915,11 @@ func (ec *executionContext) unmarshalNDeleteCommonInput2githubᚗcomᚋkubeagi return res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) unmarshalNDeleteRAGInput2githubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDeleteRAGInput(ctx context.Context, v interface{}) (DeleteRAGInput, error) { + res, err := ec.unmarshalInputDeleteRAGInput(ctx, v) + return res, graphql.ErrorOnPath(ctx, err) +} + func (ec *executionContext) unmarshalNDeleteVersionedDatasetInput2githubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDeleteVersionedDatasetInput(ctx context.Context, v interface{}) (DeleteVersionedDatasetInput, error) { res, err := ec.unmarshalInputDeleteVersionedDatasetInput(ctx, v) return res, graphql.ErrorOnPath(ctx, err) @@ -44360,6 +44388,11 @@ func (ec *executionContext) unmarshalNUpdateApplicationMetadataInput2githubᚗco return res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) unmarshalNUpdateRAGInput2githubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐUpdateRAGInput(ctx context.Context, v interface{}) (UpdateRAGInput, error) { + res, err := ec.unmarshalInputUpdateRAGInput(ctx, v) + return res, graphql.ErrorOnPath(ctx, err) +} + func (ec *executionContext) unmarshalNUpdateVersionedDatasetInput2githubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐUpdateVersionedDatasetInput(ctx context.Context, v interface{}) (UpdateVersionedDatasetInput, error) { res, err := ec.unmarshalInputUpdateVersionedDatasetInput(ctx, v) return res, graphql.ErrorOnPath(ctx, err) @@ -45248,14 +45281,6 @@ func (ec *executionContext) unmarshalODeleteDataProcessInput2ᚖgithubᚗcomᚋk return &res, graphql.ErrorOnPath(ctx, err) } -func (ec *executionContext) unmarshalODeleteRAGInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐDeleteRAGInput(ctx context.Context, v interface{}) (*DeleteRAGInput, error) { - if v == nil { - return nil, nil - } - res, err := ec.unmarshalInputDeleteRAGInput(ctx, v) - return &res, graphql.ErrorOnPath(ctx, err) -} - func (ec *executionContext) marshalOEmbedderMutation2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐEmbedderMutation(ctx context.Context, sel ast.SelectionSet, v *EmbedderMutation) graphql.Marshaler { if v == nil { return graphql.Null @@ -46195,14 +46220,6 @@ func (ec *executionContext) unmarshalOUpdateModelServiceInput2ᚖgithubᚗcomᚋ return &res, graphql.ErrorOnPath(ctx, err) } -func (ec *executionContext) unmarshalOUpdateRAGInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐUpdateRAGInput(ctx context.Context, v interface{}) (*UpdateRAGInput, error) { - if v == nil { - return nil, nil - } - res, err := ec.unmarshalInputUpdateRAGInput(ctx, v) - return &res, graphql.ErrorOnPath(ctx, err) -} - func (ec *executionContext) unmarshalOUpdateWorkerInput2ᚖgithubᚗcomᚋkubeagiᚋarcadiaᚋapiserverᚋgraphᚋgeneratedᚐUpdateWorkerInput(ctx context.Context, v interface{}) (*UpdateWorkerInput, error) { if v == nil { return nil, nil diff --git a/apiserver/graph/generated/models_gen.go b/apiserver/graph/generated/models_gen.go index dc5b5a273..34483e2d8 100644 --- a/apiserver/graph/generated/models_gen.go +++ b/apiserver/graph/generated/models_gen.go @@ -319,7 +319,7 @@ type CreateModelServiceInput struct { } type CreateRAGInput struct { - Name string `json:"name"` + Name *string `json:"name,omitempty"` Namespace string `json:"namespace"` Labels map[string]interface{} `json:"labels,omitempty"` Annotations map[string]interface{} `json:"annotations,omitempty"` @@ -1050,6 +1050,12 @@ type ListRAGInput struct { Status *string `json:"status,omitempty"` // 根据名字,displayName字段获取 Keyword *string `json:"keyword,omitempty"` + // 分页页码, + // 规则: 从1开始,默认是1 + Page *int `json:"page,omitempty"` + // 每页数量, + // 规则: 默认10 + PageSize *int `json:"pageSize,omitempty"` } type ListVersionedDatasetInput struct { @@ -1250,7 +1256,7 @@ type PersistentVolumeClaimSpec struct { AccessModes []string `json:"accessModes,omitempty"` Selector *Selector `json:"selector,omitempty"` Resources *Resource `json:"resources,omitempty"` - VolumeName string `json:"volumeName"` + VolumeName *string `json:"volumeName,omitempty"` StorageClassName *string `json:"storageClassName,omitempty"` VolumeMode *string `json:"volumeMode,omitempty"` Datasource *TypedObjectReference `json:"datasource,omitempty"` diff --git a/apiserver/graph/impl/rag.resolvers.go b/apiserver/graph/impl/rag.resolvers.go index 85ab3b654..e2cda9e54 100644 --- a/apiserver/graph/impl/rag.resolvers.go +++ b/apiserver/graph/impl/rag.resolvers.go @@ -9,61 +9,112 @@ import ( "fmt" "github.com/kubeagi/arcadia/apiserver/graph/generated" + "github.com/kubeagi/arcadia/apiserver/pkg/application" + "github.com/kubeagi/arcadia/apiserver/pkg/llm" + "github.com/kubeagi/arcadia/apiserver/pkg/rag" ) // Rag is the resolver for the RAG field. func (r *mutationResolver) Rag(ctx context.Context) (*generated.RAGMutation, error) { - panic(fmt.Errorf("not implemented: Rag - RAG")) + return &generated.RAGMutation{}, nil } // Rag is the resolver for the RAG field. func (r *queryResolver) Rag(ctx context.Context) (*generated.RAGQuery, error) { - panic(fmt.Errorf("not implemented: Rag - RAG")) + return &generated.RAGQuery{}, nil } // Application is the resolver for the application field. func (r *rAGResolver) Application(ctx context.Context, obj *generated.Rag) (*generated.Application, error) { - panic(fmt.Errorf("not implemented: Application - application")) + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + r1, err := rag.GetV1alpha1RAG(ctx, c, obj.Name, obj.Namespace) + if err != nil { + return nil, err + } + ns := obj.Namespace + if r1.Spec.Application.Namespace != nil { + ns = *r1.Spec.Application.Namespace + } + return application.GetApplication(ctx, c, r1.Spec.Application.Name, ns) } // Datasets is the resolver for the datasets field. func (r *rAGResolver) Datasets(ctx context.Context, obj *generated.Rag) ([]*generated.RAGDataset, error) { - panic(fmt.Errorf("not implemented: Datasets - datasets")) + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return rag.GetRAGDatasets(ctx, c, obj.Name, obj.Namespace) } // JudgeLlm is the resolver for the judgeLLM field. func (r *rAGResolver) JudgeLlm(ctx context.Context, obj *generated.Rag) (*generated.Llm, error) { - panic(fmt.Errorf("not implemented: JudgeLlm - judgeLLM")) + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + r1, err := rag.GetV1alpha1RAG(ctx, c, obj.Name, obj.Namespace) + if err != nil { + return nil, err + } + ns := obj.Namespace + if r1.Spec.Application.Namespace != nil { + ns = *r1.Spec.Application.Namespace + } + return llm.ReadLLM(ctx, c, r1.Spec.JudgeLLM.Name, ns) } // Metrics is the resolver for the metrics field. func (r *rAGResolver) Metrics(ctx context.Context, obj *generated.Rag) ([]*generated.RAGMetric, error) { - panic(fmt.Errorf("not implemented: Metrics - metrics")) + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return rag.GetRAGMetrics(ctx, c, obj.Name, obj.Namespace) } // CreateRag is the resolver for the createRAG field. func (r *rAGMutationResolver) CreateRag(ctx context.Context, obj *generated.RAGMutation, input generated.CreateRAGInput) (*generated.Rag, error) { - panic(fmt.Errorf("not implemented: CreateRag - createRAG")) + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return rag.CreateRAG(ctx, c, &input) } // UpdateRag is the resolver for the updateRAG field. -func (r *rAGMutationResolver) UpdateRag(ctx context.Context, obj *generated.RAGMutation, input *generated.UpdateRAGInput) (*generated.Rag, error) { +func (r *rAGMutationResolver) UpdateRag(ctx context.Context, obj *generated.RAGMutation, input generated.UpdateRAGInput) (*generated.Rag, error) { panic(fmt.Errorf("not implemented: UpdateRag - updateRAG")) } // DeleteRag is the resolver for the deleteRAG field. -func (r *rAGMutationResolver) DeleteRag(ctx context.Context, obj *generated.RAGMutation, input *generated.DeleteRAGInput) (*string, error) { - panic(fmt.Errorf("not implemented: DeleteRag - deleteRAG")) +func (r *rAGMutationResolver) DeleteRag(ctx context.Context, obj *generated.RAGMutation, input generated.DeleteRAGInput) (*string, error) { + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return nil, rag.DeleteRAG(ctx, c, &input) } // GetRag is the resolver for the getRAG field. func (r *rAGQueryResolver) GetRag(ctx context.Context, obj *generated.RAGQuery, name string, namespace string) (*generated.Rag, error) { - panic(fmt.Errorf("not implemented: GetRag - getRAG")) + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return rag.GetRAG(ctx, c, name, namespace) } // ListRag is the resolver for the listRAG field. func (r *rAGQueryResolver) ListRag(ctx context.Context, obj *generated.RAGQuery, input generated.ListRAGInput) (*generated.PaginatedResult, error) { - panic(fmt.Errorf("not implemented: ListRag - listRAG")) + c, err := getClientFromCtx(ctx) + if err != nil { + return nil, err + } + return rag.ListRAG(ctx, c, &input) } // RAG returns generated.RAGResolver implementation. diff --git a/apiserver/graph/schema/k8s.graphqls b/apiserver/graph/schema/k8s.graphqls index 56497302d..ea7db753b 100644 --- a/apiserver/graph/schema/k8s.graphqls +++ b/apiserver/graph/schema/k8s.graphqls @@ -33,7 +33,7 @@ type PersistentVolumeClaimSpec { accessModes: [String!] selector: Selector resources: Resource - volumeName: String! + volumeName: String storageClassName: String volumeMode: String datasource: TypedObjectReference diff --git a/apiserver/graph/schema/rag.gql b/apiserver/graph/schema/rag.gql index ea13004c2..7f37531e7 100644 --- a/apiserver/graph/schema/rag.gql +++ b/apiserver/graph/schema/rag.gql @@ -22,6 +22,10 @@ query listRAG($input: ListRAGInput!){ operator } } + resources { + limits + requests + } volumeName storageClassName volumeMode @@ -40,6 +44,26 @@ query listRAG($input: ListRAGInput!){ displayName } } + datasets { + source { + apiGroup + kind + name + namespace + displayName + } + } + judgeLLM { + name + namespace + baseUrl + models + provider + type + status + message + displayName + } serviceAccountName suspend status @@ -74,6 +98,10 @@ query getRAG($name: String!, $namespace: String!){ volumeName storageClassName volumeMode + resources { + limits + requests + } datasource { apiGroup kind @@ -190,6 +218,10 @@ mutation createRAG($input: CreateRAGInput!){ volumeName storageClassName volumeMode + resources { + limits + requests + } datasource { apiGroup kind @@ -238,6 +270,10 @@ mutation updateRAG($input: UpdateRAGInput!){ volumeName storageClassName volumeMode + resources { + limits + requests + } datasource { apiGroup kind diff --git a/apiserver/graph/schema/rag.graphqls b/apiserver/graph/schema/rag.graphqls index ccaddb260..08faa4089 100644 --- a/apiserver/graph/schema/rag.graphqls +++ b/apiserver/graph/schema/rag.graphqls @@ -121,7 +121,7 @@ type RAG { } input CreateRAGInput { - name: String! + name: String namespace: String! labels: Map annotations: Map @@ -165,12 +165,23 @@ input ListRAGInput { status: String """根据名字,displayName字段获取""" keyword: String + """ + 分页页码, + 规则: 从1开始,默认是1 + """ + page: Int + + """ + 每页数量, + 规则: 默认10 + """ + pageSize: Int } type RAGMutation { createRAG(input: CreateRAGInput!): RAG! - updateRAG(input: UpdateRAGInput): RAG! - deleteRAG(input: DeleteRAGInput): Void + updateRAG(input: UpdateRAGInput!): RAG! + deleteRAG(input: DeleteRAGInput!): Void } type RAGQuery { diff --git a/apiserver/pkg/common/common_filter.go b/apiserver/pkg/common/common_filter.go index af53a2f60..0a54713b5 100644 --- a/apiserver/pkg/common/common_filter.go +++ b/apiserver/pkg/common/common_filter.go @@ -25,6 +25,7 @@ import ( "k8s.io/client-go/dynamic" "github.com/kubeagi/arcadia/api/base/v1alpha1" + evav1alpha1 "github.com/kubeagi/arcadia/api/evaluation/v1alpha1" "github.com/kubeagi/arcadia/apiserver/graph/generated" ) @@ -224,3 +225,28 @@ func FilterWorkerByType(c dynamic.Interface, namespace, modelType string) Resour return strings.Contains(cache[w.Spec.Model.Name], modelType) } } + +// RAG Filter + +func FilterRAGByStatus(status string) ResourceFilter { + return func(u *unstructured.Unstructured) bool { + rag := evav1alpha1.RAG{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &rag); err != nil { + return false + } + ragStatus, _, _ := evav1alpha1.RagStatus(&rag) + return ragStatus == status + } +} + +func FilterByRAGKeyword(keyword string) ResourceFilter { + return func(u *unstructured.Unstructured) bool { + rag := evav1alpha1.RAG{} + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &rag); err != nil { + return false + } + return strings.Contains(rag.Name, keyword) || + strings.Contains(rag.Spec.DisplayName, keyword) || + strings.Contains(rag.Spec.Description, keyword) + } +} diff --git a/apiserver/pkg/rag/rag.go b/apiserver/pkg/rag/rag.go new file mode 100644 index 000000000..4ce5a7699 --- /dev/null +++ b/apiserver/pkg/rag/rag.go @@ -0,0 +1,502 @@ +/* +Copyright 2024 KubeAGI. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package rag + +import ( + "context" + "fmt" + "math/rand" + "time" + + "github.com/minio/minio-go/v7" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + + "github.com/kubeagi/arcadia/api/base/v1alpha1" + evav1alpha1 "github.com/kubeagi/arcadia/api/evaluation/v1alpha1" + "github.com/kubeagi/arcadia/apiserver/graph/generated" + "github.com/kubeagi/arcadia/apiserver/pkg/common" + "github.com/kubeagi/arcadia/pkg/utils" +) + +const ( + letterBytes = "abcdefghijklmnopqrstuvwxyz0123456789" +) + +func generateRandomString(length int) string { + r := rand.New(rand.NewSource(time.Now().UnixMicro())) + b := make([]byte, length) + for i := range b { + b[i] = letterBytes[r.Intn(len(letterBytes))] + } + return string(b) +} + +func generateKubernetesResourceName(prefix string, length int) string { + randomString := generateRandomString(length) + return fmt.Sprintf("%s-%s", prefix, randomString) +} +func setRAGSatus(rag *evav1alpha1.RAG, r *generated.Rag) { + status, phase, phaseMsg := evav1alpha1.RagStatus(rag) + *r.Phase = string(phase) + *r.PhaseMessage = phaseMsg + r.Status = status + r.Suspend = rag.Spec.Suspend +} + +func gen2storage(p generated.PersistentVolumeClaimSpecInput) *corev1.PersistentVolumeClaimSpec { + pvc := &corev1.PersistentVolumeClaimSpec{ + VolumeName: p.VolumeName, + } + if len(pvc.AccessModes) > 0 { + pvc.AccessModes = make([]corev1.PersistentVolumeAccessMode, 0) + for _, s := range pvc.AccessModes { + pvc.AccessModes = append(pvc.AccessModes, corev1.PersistentVolumeAccessMode(s)) + } + } + if p.Selector != nil { + pvc.Selector = &v1.LabelSelector{} + if len(p.Selector.MatchLabels) > 0 { + pvc.Selector.MatchLabels = make(map[string]string) + for k, v := range p.Selector.MatchLabels { + pvc.Selector.MatchLabels[k] = v.(string) + } + } + if len(p.Selector.MatchExpressions) > 0 { + pvc.Selector.MatchExpressions = make([]v1.LabelSelectorRequirement, 0) + for _, item := range p.Selector.MatchExpressions { + i := v1.LabelSelectorRequirement{ + Key: *item.Key, + Values: make([]string, 0), + Operator: v1.LabelSelectorOperator(*item.Operator), + } + for _, s := range item.Values { + i.Values = append(i.Values, *s) + } + pvc.Selector.MatchExpressions = append(pvc.Selector.MatchExpressions, i) + } + } + } + if p.Resources != nil { + pvc.Resources = corev1.ResourceRequirements{} + if len(p.Resources.Limits) > 0 { + pvc.Resources.Limits = make(corev1.ResourceList) + for k, v := range p.Resources.Limits { + q, _ := resource.ParseQuantity(v.(string)) + pvc.Resources.Limits[corev1.ResourceName(k)] = q + } + } + if len(p.Resources.Requests) > 0 { + pvc.Resources.Requests = make(corev1.ResourceList) + for k, v := range p.Resources.Requests { + q, _ := resource.ParseQuantity(v.(string)) + pvc.Resources.Requests[corev1.ResourceName(k)] = q + } + } + } + + if p.StorageClassName != nil { + pvc.StorageClassName = p.StorageClassName + } + if p.VolumeMode != nil { + a := corev1.PersistentVolumeMode(*p.VolumeMode) + pvc.VolumeMode = &a + } + // TODO set datasource + return pvc +} +func storage2gen(pvcSpec *corev1.PersistentVolumeClaimSpec) generated.PersistentVolumeClaimSpec { + pvc := generated.PersistentVolumeClaimSpec{} + if pvcSpec.VolumeName != "" { + pvc.VolumeName = new(string) + *pvc.VolumeName = pvcSpec.VolumeName + } + if pvcSpec.StorageClassName != nil { + pvc.StorageClassName = new(string) + *pvc.StorageClassName = *pvcSpec.StorageClassName + } + if pvcSpec.VolumeMode != nil { + pvc.VolumeMode = new(string) + *pvc.VolumeMode = string(*pvcSpec.VolumeMode) + } + + for _, am := range pvcSpec.AccessModes { + pvc.AccessModes = append(pvc.AccessModes, string(am)) + } + + if pvcSpec.Selector != nil { + pvc.Selector = new(generated.Selector) + if len(pvcSpec.Selector.MatchLabels) > 0 { + pvc.Selector.MatchLabels = make(map[string]interface{}) + for k, v := range pvcSpec.Selector.MatchLabels { + pvc.Selector.MatchLabels[k] = v + } + } + if len(pvcSpec.Selector.MatchExpressions) > 0 { + pvc.Selector.MatchExpressions = make([]*generated.LabelSelectorRequirement, 0) + for _, item := range pvcSpec.Selector.MatchExpressions { + a := &generated.LabelSelectorRequirement{ + Key: new(string), + Operator: new(string), + Values: make([]*string, 0), + } + *a.Key = item.Key + *a.Operator = string(item.Operator) + for i := 0; i < len(item.Values); i++ { + a.Values = append(a.Values, &item.Values[i]) + } + } + } + } + + if len(pvcSpec.Resources.Requests) > 0 || len(pvcSpec.Resources.Limits) > 0 { + pvc.Resources = new(generated.Resource) + } + if len(pvcSpec.Resources.Limits) > 0 { + pvc.Resources.Limits = make(map[string]interface{}) + for k, v := range pvcSpec.Resources.Limits { + pvc.Resources.Limits[string(k)] = v + } + } + if len(pvcSpec.Resources.Requests) > 0 { + pvc.Resources.Requests = make(map[string]interface{}) + for k, v := range pvcSpec.Resources.Requests { + pvc.Resources.Requests[string(k)] = v + } + } + + if pvcSpec.DataSource != nil { + pvc.Datasource = new(generated.TypedObjectReference) + pvc.Datasource.APIGroup = pvcSpec.DataSource.APIGroup + pvc.Datasource.Kind = pvcSpec.DataSource.Kind + pvc.Datasource.Name = pvcSpec.DataSource.Name + } + if pvcSpec.DataSourceRef != nil { + pvc.DataSourceRef = new(generated.TypedObjectReference) + pvc.DataSourceRef.APIGroup = pvcSpec.DataSourceRef.APIGroup + pvc.DataSourceRef.Kind = pvcSpec.DataSource.Kind + pvc.DataSourceRef.Name = pvcSpec.DataSource.Name + } + + return pvc +} + +func rag2modelConverter(u *unstructured.Unstructured) (generated.PageNode, error) { + return rag2model(u) +} + +func rag2model(o *unstructured.Unstructured) (*generated.Rag, error) { + r := &generated.Rag{ + Name: o.GetName(), + Namespace: o.GetNamespace(), + Labels: map[string]interface{}{}, + Annotations: map[string]interface{}{}, + Creator: new(string), + DisplayName: new(string), + Description: new(string), + CreationTimestamp: new(time.Time), + CompleteTimestamp: new(time.Time), + ServiceAccountName: "", + Suspend: false, + Status: "", + Phase: new(string), + PhaseMessage: new(string), + } + + structuredRag := evav1alpha1.RAG{} + err := runtime.DefaultUnstructuredConverter.FromUnstructured(o.Object, &structuredRag) + if err != nil { + return nil, err + } + for k, v := range o.GetLabels() { + r.Labels[k] = v + } + for k, v := range o.GetAnnotations() { + r.Annotations[k] = v + } + *r.Creator = structuredRag.Spec.Creator + *r.DisplayName = structuredRag.Spec.DisplayName + *r.Description = structuredRag.Spec.Description + *r.CreationTimestamp = o.GetCreationTimestamp().Time + if structuredRag.Status.CompletionTime != nil { + *r.CompleteTimestamp = structuredRag.Status.CompletionTime.Time + } + r.ServiceAccountName = structuredRag.Spec.ServiceAccountName + r.Storage = storage2gen(structuredRag.Spec.Storage) + setRAGSatus(&structuredRag, r) + return r, nil +} + +func CreateRAG(ctx context.Context, kubeClient dynamic.Interface, input *generated.CreateRAGInput) (*generated.Rag, error) { + rag := &evav1alpha1.RAG{ + TypeMeta: v1.TypeMeta{ + Kind: "RAG", + APIVersion: evav1alpha1.GroupVersion.String(), + }, + ObjectMeta: v1.ObjectMeta{ + Namespace: input.Namespace, + Labels: make(map[string]string), + Annotations: make(map[string]string), + }, + Spec: evav1alpha1.RAGSpec{}, + } + name := generateKubernetesResourceName("rag", 10) + if input.Name != nil { + name = *input.Name + } + rag.Name = name + if input.DisplayName != nil { + rag.Spec.DisplayName = *input.DisplayName + } + if input.Description != nil { + rag.Spec.Description = *input.Description + } + rag.Spec.Application = &v1alpha1.TypedObjectReference{ + APIGroup: input.Application.APIGroup, + Kind: input.Application.Kind, + Name: input.Application.Name, + Namespace: input.Application.Namespace, + } + rag.Spec.Datasets = make([]evav1alpha1.Dataset, 0) + for i, set := range input.Datasets { + ds := evav1alpha1.Dataset{ + Source: &v1alpha1.TypedObjectReference{ + APIGroup: input.Datasets[i].Source.APIGroup, + Kind: input.Datasets[i].Source.Kind, + Name: input.Datasets[i].Source.Name, + Namespace: input.Datasets[i].Source.Namespace, + }, + Files: set.Files, + } + rag.Spec.Datasets = append(rag.Spec.Datasets, ds) + } + + rag.Spec.JudgeLLM = &v1alpha1.TypedObjectReference{ + APIGroup: input.JudgeLlm.APIGroup, + Kind: input.JudgeLlm.Kind, + Name: input.JudgeLlm.Name, + Namespace: input.JudgeLlm.Namespace, + } + + rag.Spec.Metrics = make([]evav1alpha1.Metric, 0) + for _, m := range input.Metrics { + mm := evav1alpha1.Metric{ + Parameters: make([]evav1alpha1.Parameter, 0), + } + if m.MetricKind != nil { + mm.Kind = evav1alpha1.MetricsKind(*m.MetricKind) + } + if m.ToleranceThreshbold != nil { + mm.ToleranceThreshbold = *m.ToleranceThreshbold + } + for _, p := range m.Parameters { + mm.Parameters = append(mm.Parameters, evav1alpha1.Parameter{ + Key: *p.Key, + Value: *p.Value, + }) + } + rag.Spec.Metrics = append(rag.Spec.Metrics, mm) + } + + rag.Spec.Storage = gen2storage(input.Storage) + if input.ServiceAccountName != nil { + rag.Spec.ServiceAccountName = *input.ServiceAccountName + } + if input.Suspend != nil { + rag.Spec.Suspend = *input.Suspend + } + + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(rag) + if err != nil { + return nil, err + } + u1, err := kubeClient.Resource(schema.GroupVersionResource{Group: evav1alpha1.Group, Version: evav1alpha1.Version, Resource: "rags"}).Namespace(input.Namespace).Create(ctx, &unstructured.Unstructured{Object: u}, v1.CreateOptions{}) + if err != nil { + return nil, err + } + return rag2model(u1) +} + +func ListRAG(ctx context.Context, kubeClient dynamic.Interface, input *generated.ListRAGInput) (*generated.PaginatedResult, error) { + listOptions := v1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", evav1alpha1.EvaluationApplicationLabel, input.AppName), + } + page, size := 1, 10 + if input.Page != nil && *input.Page > 0 { + page = *input.Page + } + if input.PageSize != nil && *input.PageSize > 0 { + size = *input.PageSize + } + filter := make([]common.ResourceFilter, 0) + if input.Keyword != nil { + filter = append(filter, common.FilterByRAGKeyword(*input.Keyword)) + } + if input.Status != nil { + filter = append(filter, common.FilterRAGByStatus(*input.Status)) + } + list, err := kubeClient.Resource(schema.GroupVersionResource{ + Group: evav1alpha1.Group, + Version: evav1alpha1.Version, + Resource: "rags", + }).Namespace(input.Namespace).List(ctx, listOptions) + if err != nil { + return nil, err + } + return common.ListReources(list, page, size, rag2modelConverter, filter...) +} + +func GetRAG(ctx context.Context, kubeClient dynamic.Interface, name, namespace string) (*generated.Rag, error) { + u, err := kubeClient.Resource(schema.GroupVersionResource{ + Group: evav1alpha1.Group, + Version: evav1alpha1.Version, + Resource: "rags", + }).Namespace(namespace).Get(ctx, name, v1.GetOptions{}) + if err != nil { + return nil, err + } + return rag2model(u) +} + +func GetV1alpha1RAG(ctx context.Context, kubeClient dynamic.Interface, name, namespace string) (*evav1alpha1.RAG, error) { + u, err := kubeClient.Resource(schema.GroupVersionResource{ + Group: evav1alpha1.Group, + Version: evav1alpha1.Version, + Resource: "rags", + }).Namespace(namespace).Get(ctx, name, v1.GetOptions{}) + if err != nil { + return nil, err + } + structuredRag := evav1alpha1.RAG{} + err = runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &structuredRag) + if err != nil { + return nil, err + } + return &structuredRag, nil +} + +func getFiles(ctx context.Context, kubeClient dynamic.Interface, bucket string, files []string) ([]*generated.F, error) { + oss, err := common.SystemDatasourceOSS(ctx, nil, kubeClient) + if err != nil { + return nil, err + } + fs := make([]*generated.F, 0) + for _, f := range files { + obj, err := oss.Client.StatObject(ctx, bucket, f, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + + size := utils.BytesToSizedStr(obj.Size) + gf := &generated.F{ + Path: f, + Size: &size, + } + tags, err := oss.Client.GetObjectTagging(ctx, bucket, f, minio.GetObjectTaggingOptions{}) + if err != nil { + return nil, err + } + tagsMap := tags.ToMap() + if v, ok := tagsMap[v1alpha1.ObjectTypeTag]; ok { + gf.FileType = v + } + + if v, ok := tagsMap[v1alpha1.ObjectCountTag]; ok { + gf.Count = &v + } + fs = append(fs, gf) + } + return fs, nil +} +func GetRAGMetrics(ctx context.Context, kubeClient dynamic.Interface, name, namespace string) ([]*generated.RAGMetric, error) { + rag, err := GetV1alpha1RAG(ctx, kubeClient, name, namespace) + if err != nil { + return nil, err + } + metrics := make([]*generated.RAGMetric, 0) + for _, m := range rag.Spec.Metrics { + mm := &generated.RAGMetric{ + ToleranceThreshbold: new(int), + Parameters: make([]*generated.Parameter, 0), + } + *mm.ToleranceThreshbold = m.ToleranceThreshbold + if r := string(m.Kind); r != "" { + mm.MetricKind = new(string) + *mm.MetricKind = r + } + for _, p := range m.Parameters { + pp := &generated.Parameter{ + Key: &p.Key, + Value: &p.Value, + } + mm.Parameters = append(mm.Parameters, pp) + } + metrics = append(metrics, mm) + } + return metrics, nil +} + +func GetRAGDatasets(ctx context.Context, kubeClient dynamic.Interface, name, namespace string) ([]*generated.RAGDataset, error) { + rag, err := GetV1alpha1RAG(ctx, kubeClient, name, namespace) + if err != nil { + return nil, err + } + if err != nil { + return nil, err + } + nodes := make([]*generated.RAGDataset, 0) + for _, ds := range rag.Spec.Datasets { + // TODO, enen, versioneddataset is ok + if ds.Source.Kind == "VersionedDataset" { + ns := namespace + if ds.Source.Namespace != nil { + ns = *ds.Source.Namespace + } + files, err := getFiles(ctx, kubeClient, ns, ds.Files) + if err != nil { + return nil, err + } + nodes = append(nodes, &generated.RAGDataset{ + Source: &generated.TypedObjectReference{ + APIGroup: ds.Source.APIGroup, + Kind: ds.Source.Kind, + Name: ds.Source.Name, + Namespace: ds.Source.Namespace, + }, + Files: files, + }) + } + } + return nodes, nil +} + +func DeleteRAG(ctx context.Context, kubeClient dynamic.Interface, input *generated.DeleteRAGInput) error { + if input.Name != "" { + return kubeClient.Resource(schema.GroupVersionResource{Group: evav1alpha1.Group, Version: evav1alpha1.Version, Resource: "rags"}). + Namespace(input.Namespace).Delete(ctx, input.Name, v1.DeleteOptions{}) + } + if input.LabelSelector != nil { + return kubeClient.Resource(schema.GroupVersionResource{Group: evav1alpha1.Group, Version: evav1alpha1.Version, Resource: "rags"}). + Namespace(input.Namespace).DeleteCollection(ctx, v1.DeleteOptions{}, v1.ListOptions{ + LabelSelector: *input.LabelSelector, + }) + } + return nil +}