diff --git a/api/base/v1alpha1/datasource.go b/api/base/v1alpha1/datasource.go index ded3e0380..fa00ff10f 100644 --- a/api/base/v1alpha1/datasource.go +++ b/api/base/v1alpha1/datasource.go @@ -21,6 +21,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func (datasource Datasource) TypedObjectReference() *TypedObjectReference { + return &TypedObjectReference{ + APIGroup: &GroupVersion.Group, + Kind: "Datasource", + Name: datasource.Name, + Namespace: &datasource.Namespace, + } +} + const ( LabelDatasourceType = Group + "/datasource-type" ) diff --git a/api/base/v1alpha1/embedder.go b/api/base/v1alpha1/embedder.go index be81ed316..fee03bea0 100644 --- a/api/base/v1alpha1/embedder.go +++ b/api/base/v1alpha1/embedder.go @@ -26,6 +26,15 @@ import ( "github.com/kubeagi/arcadia/pkg/embeddings" ) +func (e Embedder) TypedObjectReference() *TypedObjectReference { + return &TypedObjectReference{ + APIGroup: &GroupVersion.Group, + Kind: "Embedder", + Name: e.Name, + Namespace: &e.Namespace, + } +} + func (e Embedder) AuthAPIKey(ctx context.Context, c client.Client) (string, error) { if e.Spec.Endpoint == nil { return "", nil diff --git a/api/base/v1alpha1/knowledgebase.go b/api/base/v1alpha1/knowledgebase.go index d2b977862..2cb1ac87c 100644 --- a/api/base/v1alpha1/knowledgebase.go +++ b/api/base/v1alpha1/knowledgebase.go @@ -6,6 +6,21 @@ import ( "k8s.io/utils/pointer" ) +const ( + LabelKnowledgeBaseType = Group + "/knowledgebase-type" +) + +type KnowledgeBaseType string + +const ( + KnowledgeBaseTypeNormal KnowledgeBaseType = "normal" + KnowledgeBaseTypeConversation KnowledgeBaseType = "conversation" +) + +func (kb *KnowledgeBase) IsTypeConversation() bool { + return kb.Spec.Type == KnowledgeBaseTypeConversation +} + const ( // UpdateSourceFileAnnotationKey is the key of the update source file annotation UpdateSourceFileAnnotationKey = Group + "/update-source-file-time" diff --git a/api/base/v1alpha1/knowledgebase_types.go b/api/base/v1alpha1/knowledgebase_types.go index 2be2fbd2b..40a8119bf 100644 --- a/api/base/v1alpha1/knowledgebase_types.go +++ b/api/base/v1alpha1/knowledgebase_types.go @@ -24,6 +24,10 @@ import ( type KnowledgeBaseSpec struct { CommonSpec `json:",inline"` + // Type defines the type of knowledgebase + // +kubebuilder:default=normal + Type KnowledgeBaseType `json:"type,omitempty"` + // Embedder defines the embedder to embedding files Embedder *TypedObjectReference `json:"embedder,omitempty"` @@ -112,6 +116,7 @@ type KnowledgeBaseStatus struct { //+kubebuilder:object:root=true //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="display-name",type=string,JSONPath=`.spec.displayName` +//+kubebuilder:printcolumn:name="type",type=string,JSONPath=`.spec.type` // KnowledgeBase is the Schema for the knowledgebases API type KnowledgeBase struct { diff --git a/api/base/v1alpha1/vectorstore.go b/api/base/v1alpha1/vectorstore.go index b86d33fb8..e0f4b3132 100644 --- a/api/base/v1alpha1/vectorstore.go +++ b/api/base/v1alpha1/vectorstore.go @@ -21,6 +21,15 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func (vs VectorStore) TypedObjectReference() *TypedObjectReference { + return &TypedObjectReference{ + APIGroup: &GroupVersion.Group, + Kind: "VectorStore", + Name: vs.Name, + Namespace: &vs.Namespace, + } +} + const ( LabelVectorStoreType = Group + "/vectorstore-type" ) diff --git a/apiserver/pkg/chat/chat_docs.go b/apiserver/pkg/chat/chat_docs.go index 5f219cce7..49bcf213c 100644 --- a/apiserver/pkg/chat/chat_docs.go +++ b/apiserver/pkg/chat/chat_docs.go @@ -29,13 +29,17 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/minio/minio-go/v7" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1" "github.com/kubeagi/arcadia/apiserver/pkg/auth" "github.com/kubeagi/arcadia/apiserver/pkg/chat/storage" + pkgclient "github.com/kubeagi/arcadia/apiserver/pkg/client" "github.com/kubeagi/arcadia/apiserver/pkg/common" + "github.com/kubeagi/arcadia/pkg/config" ) // ReceiveConversationDocs receive and process docs for a conversation @@ -118,6 +122,13 @@ func (cs *ChatServer) ReceiveConversationFile(ctx context.Context, messageID str Object: objectPath, } + // build/update conversation knowledgebase + err = cs.BuildConversationKnowledgeBase(ctx, req, document) + if err != nil { + // only log error + klog.Errorf("failed to build conversation knowledgebase %s with error %s", req.ConversationID, err.Error()) + } + // process document with map-reduce message := storage.Message{ ID: messageID, @@ -157,3 +168,60 @@ func (cs *ChatServer) ReceiveConversationFile(ctx context.Context, messageID str }, }, nil } + +// BuildConversationKnowledgeBase create/updates knowledgebase for this conversation. +// Conversation ID will be the knowledgebase name and document will be placed unde filegroup +// Knoweledgebase will embed the document into vectorstore which can be used in this conversation as references(similarity search) +func (cs *ChatServer) BuildConversationKnowledgeBase(ctx context.Context, req ConversationFilesReqBody, document storage.Document) error { + // get system embedding suite + embedder, vs, err := common.SystemEmbeddingSuite(ctx, cs.cli) + if err != nil { + return err + } + + // new knowledgebase + kb := &arcadiav1alpha1.KnowledgeBase{ + ObjectMeta: v1.ObjectMeta{ + Name: req.ConversationID, + Namespace: req.AppNamespace, + Labels: map[string]string{ + arcadiav1alpha1.LabelKnowledgeBaseType: string(arcadiav1alpha1.KnowledgeBaseTypeConversation), + }, + }, + Spec: arcadiav1alpha1.KnowledgeBaseSpec{ + CommonSpec: arcadiav1alpha1.CommonSpec{ + DisplayName: "Conversation", + Description: "Knowledgebase built for conversation", + }, + Type: arcadiav1alpha1.KnowledgeBaseTypeConversation, + Embedder: embedder.TypedObjectReference(), + VectorStore: vs.TypedObjectReference(), + FileGroups: make([]arcadiav1alpha1.FileGroup, 0), + }, + } + + // app as ownerreference + app, _, err := cs.getApp(ctx, req.APPName, req.AppNamespace) + if err != nil { + return err + } + // systemDatasource which stores the document + systemDatasource, err := config.GetSystemDatasource(ctx, cs.cli) + if err != nil { + return err + } + // create or update the conversation knowledgebase + _, err = controllerutil.CreateOrUpdate(ctx, cs.cli, kb, func() error { + if err := controllerutil.SetControllerReference(app, kb, pkgclient.Scheme); err != nil { + return err + } + // append document path + kb.Spec.FileGroups = append(kb.Spec.FileGroups, arcadiav1alpha1.FileGroup{ + Source: systemDatasource.TypedObjectReference(), + Paths: []string{document.Object}, + }) + return nil + }) + + return err +} diff --git a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml index ed3d6ef6e..a21d6ae0a 100644 --- a/config/crd/bases/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml +++ b/config/crd/bases/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml @@ -19,6 +19,9 @@ spec: - jsonPath: .spec.displayName name: display-name type: string + - jsonPath: .spec.type + name: type + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -117,6 +120,10 @@ spec: type: object type: object type: array + type: + default: normal + description: Type defines the type of knowledgebase + type: string vectorStore: description: VectorStore defines the vectorstore to store results properties: diff --git a/controllers/base/knowledgebase_controller.go b/controllers/base/knowledgebase_controller.go index 26749326c..6a86aa226 100644 --- a/controllers/base/knowledgebase_controller.go +++ b/controllers/base/knowledgebase_controller.go @@ -24,6 +24,7 @@ import ( "io" "path/filepath" "reflect" + "strings" "sync" "time" @@ -353,34 +354,64 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo if group.Source == nil { return errNoSource } - versionedDataset := &arcadiav1alpha1.VersionedDataset{} + ns := kb.Namespace if group.Source.Namespace != nil { ns = *group.Source.Namespace } - if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, versionedDataset); err != nil { - if apierrors.IsNotFound(err) { - return errNoSource - } - return err - } - if !versionedDataset.Status.IsReady() { - return errDataSourceNotReady - } - system, err := config.GetSystemDatasource(ctx, r.Client) - if err != nil { - return err - } - endpoint := system.Spec.Endpoint.DeepCopy() - if endpoint != nil && endpoint.AuthSecret != nil { - endpoint.AuthSecret.WithNameSpace(system.Namespace) - } - ds, err := datasource.NewLocal(ctx, r.Client, endpoint) - if err != nil { - return err - } + var ds datasource.Datasource info := &arcadiav1alpha1.OSS{Bucket: ns} + var vsBasePath string + switch strings.ToLower(group.Source.Kind) { + case "versioneddataset": + versionedDataset := &arcadiav1alpha1.VersionedDataset{} + if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, versionedDataset); err != nil { + if apierrors.IsNotFound(err) { + return errNoSource + } + return err + } + if versionedDataset.Spec.Dataset == nil { + return fmt.Errorf("versionedDataset.Spec.Dataset is nil") + } + if !versionedDataset.Status.IsReady() { + return errDataSourceNotReady + } + system, err := config.GetSystemDatasource(ctx, r.Client) + if err != nil { + return err + } + endpoint := system.Spec.Endpoint.DeepCopy() + if endpoint != nil && endpoint.AuthSecret != nil { + endpoint.AuthSecret.WithNameSpace(system.Namespace) + } + ds, err = datasource.NewLocal(ctx, r.Client, endpoint) + if err != nil { + return err + } + // basepath for this versioneddataset + vsBasePath = filepath.Join("dataset", versionedDataset.Spec.Dataset.Name, versionedDataset.Spec.Version) + case "datasource": + dsObj := &arcadiav1alpha1.Datasource{} + if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, dsObj); err != nil { + if apierrors.IsNotFound(err) { + return errNoSource + } + return err + } + if !dsObj.Status.IsReady() { + return errDataSourceNotReady + } + ds, err = datasource.NewOSS(ctx, r.Client, &dsObj.Spec.Endpoint) + if err != nil { + return err + } + // for none-conversation knowledgebase, bucket is the same as datasource. + if kb.Spec.Type != arcadiav1alpha1.KnowledgeBaseTypeConversation { + info.Bucket = dsObj.Spec.OSS.Bucket + } + } if len(kb.Status.FileGroupDetail) == 0 { // brand new knowledgebase, init status. @@ -391,7 +422,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo var fileGroupDetail *arcadiav1alpha1.FileGroupDetail pathMap := make(map[string]*arcadiav1alpha1.FileDetails, 1) for i, detail := range kb.Status.FileGroupDetail { - if detail.Source != nil && detail.Source.Name == versionedDataset.Name && detail.Source.GetNamespace(kb.GetNamespace()) == versionedDataset.GetNamespace() { + if detail.Source != nil && detail.Source.Name == group.Source.Name && detail.Source.GetNamespace(kb.GetNamespace()) == ns { fileGroupDetail = &kb.Status.FileGroupDetail[i] for i, detail := range fileGroupDetail.FileDetails { pathMap[detail.Path] = &fileGroupDetail.FileDetails[i] @@ -430,13 +461,15 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo }) fileDetail = &fileGroupDetail.FileDetails[len(fileGroupDetail.FileDetails)-1] } - if versionedDataset.Spec.Dataset == nil { - err = fmt.Errorf("versionedDataset.Spec.Dataset is nil") - errs = append(errs, err) - fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed) - continue + + switch strings.ToLower(group.Source.Kind) { + case "versioneddataset": + // info.Object has been + info.Object = filepath.Join(vsBasePath, path) + case "datasource": + info.Object = path } - info.Object = filepath.Join("dataset", versionedDataset.Spec.Dataset.Name, versionedDataset.Spec.Version, path) + stat, err := ds.StatFile(ctx, info) log.V(5).Info(fmt.Sprintf("raw StatFile:%#v", stat), "path", path) if err != nil { diff --git a/deploy/charts/arcadia/Chart.yaml b/deploy/charts/arcadia/Chart.yaml index ee2ddbe6d..5615037a5 100644 --- a/deploy/charts/arcadia/Chart.yaml +++ b/deploy/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(Also a KubeBB Component) for KubeAGI Arcadia type: application -version: 0.3.16 +version: 0.3.17 appVersion: "0.2.0" keywords: diff --git a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml index ed3d6ef6e..a21d6ae0a 100644 --- a/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml +++ b/deploy/charts/arcadia/crds/arcadia.kubeagi.k8s.com.cn_knowledgebases.yaml @@ -19,6 +19,9 @@ spec: - jsonPath: .spec.displayName name: display-name type: string + - jsonPath: .spec.type + name: type + type: string name: v1alpha1 schema: openAPIV3Schema: @@ -117,6 +120,10 @@ spec: type: object type: object type: array + type: + default: normal + description: Type defines the type of knowledgebase + type: string vectorStore: description: VectorStore defines the vectorstore to store results properties: