Skip to content

Commit

Permalink
Merge pull request kubeagi#873 from bjwswang/main
Browse files Browse the repository at this point in the history
feat: build knowledgebase for conversation
  • Loading branch information
bjwswang authored Mar 15, 2024
2 parents fbd6bfb + 052bbfc commit c1ebe9b
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 30 deletions.
9 changes: 9 additions & 0 deletions api/base/v1alpha1/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (datasource Datasource) TypedObjectReference() *TypedObjectReference {
return &TypedObjectReference{
APIGroup: &GroupVersion.Group,
Kind: "Datasource",
Name: datasource.Name,
Namespace: &datasource.Namespace,
}
}

const (
LabelDatasourceType = Group + "/datasource-type"
)
Expand Down
9 changes: 9 additions & 0 deletions api/base/v1alpha1/embedder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
"github.com/kubeagi/arcadia/pkg/embeddings"
)

func (e Embedder) TypedObjectReference() *TypedObjectReference {
return &TypedObjectReference{
APIGroup: &GroupVersion.Group,
Kind: "Embedder",
Name: e.Name,
Namespace: &e.Namespace,
}
}

func (e Embedder) AuthAPIKey(ctx context.Context, c client.Client) (string, error) {
if e.Spec.Endpoint == nil {
return "", nil
Expand Down
15 changes: 15 additions & 0 deletions api/base/v1alpha1/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ import (
"k8s.io/utils/pointer"
)

const (
LabelKnowledgeBaseType = Group + "/knowledgebase-type"
)

type KnowledgeBaseType string

const (
KnowledgeBaseTypeNormal KnowledgeBaseType = "normal"
KnowledgeBaseTypeConversation KnowledgeBaseType = "conversation"
)

func (kb *KnowledgeBase) IsTypeConversation() bool {
return kb.Spec.Type == KnowledgeBaseTypeConversation
}

const (
// UpdateSourceFileAnnotationKey is the key of the update source file annotation
UpdateSourceFileAnnotationKey = Group + "/update-source-file-time"
Expand Down
5 changes: 5 additions & 0 deletions api/base/v1alpha1/knowledgebase_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
type KnowledgeBaseSpec struct {
CommonSpec `json:",inline"`

// Type defines the type of knowledgebase
// +kubebuilder:default=normal
Type KnowledgeBaseType `json:"type,omitempty"`

// Embedder defines the embedder to embedding files
Embedder *TypedObjectReference `json:"embedder,omitempty"`

Expand Down Expand Up @@ -112,6 +116,7 @@ type KnowledgeBaseStatus struct {
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="display-name",type=string,JSONPath=`.spec.displayName`
//+kubebuilder:printcolumn:name="type",type=string,JSONPath=`.spec.type`

// KnowledgeBase is the Schema for the knowledgebases API
type KnowledgeBase struct {
Expand Down
9 changes: 9 additions & 0 deletions api/base/v1alpha1/vectorstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (vs VectorStore) TypedObjectReference() *TypedObjectReference {
return &TypedObjectReference{
APIGroup: &GroupVersion.Group,
Kind: "VectorStore",
Name: vs.Name,
Namespace: &vs.Namespace,
}
}

const (
LabelVectorStoreType = Group + "/vectorstore-type"
)
Expand Down
68 changes: 68 additions & 0 deletions apiserver/pkg/chat/chat_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ import (

"github.com/jackc/pgx/v5/pgconn"
"github.com/minio/minio-go/v7"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

arcadiav1alpha1 "github.com/kubeagi/arcadia/api/base/v1alpha1"
"github.com/kubeagi/arcadia/apiserver/pkg/auth"
"github.com/kubeagi/arcadia/apiserver/pkg/chat/storage"
pkgclient "github.com/kubeagi/arcadia/apiserver/pkg/client"
"github.com/kubeagi/arcadia/apiserver/pkg/common"
"github.com/kubeagi/arcadia/pkg/config"
)

// ReceiveConversationDocs receive and process docs for a conversation
Expand Down Expand Up @@ -118,6 +122,13 @@ func (cs *ChatServer) ReceiveConversationFile(ctx context.Context, messageID str
Object: objectPath,
}

// build/update conversation knowledgebase
err = cs.BuildConversationKnowledgeBase(ctx, req, document)
if err != nil {
// only log error
klog.Errorf("failed to build conversation knowledgebase %s with error %s", req.ConversationID, err.Error())
}

// process document with map-reduce
message := storage.Message{
ID: messageID,
Expand Down Expand Up @@ -157,3 +168,60 @@ func (cs *ChatServer) ReceiveConversationFile(ctx context.Context, messageID str
},
}, nil
}

// BuildConversationKnowledgeBase create/updates knowledgebase for this conversation.
// Conversation ID will be the knowledgebase name and document will be placed unde filegroup
// Knoweledgebase will embed the document into vectorstore which can be used in this conversation as references(similarity search)
func (cs *ChatServer) BuildConversationKnowledgeBase(ctx context.Context, req ConversationFilesReqBody, document storage.Document) error {
// get system embedding suite
embedder, vs, err := common.SystemEmbeddingSuite(ctx, cs.cli)
if err != nil {
return err
}

// new knowledgebase
kb := &arcadiav1alpha1.KnowledgeBase{
ObjectMeta: v1.ObjectMeta{
Name: req.ConversationID,
Namespace: req.AppNamespace,
Labels: map[string]string{
arcadiav1alpha1.LabelKnowledgeBaseType: string(arcadiav1alpha1.KnowledgeBaseTypeConversation),
},
},
Spec: arcadiav1alpha1.KnowledgeBaseSpec{
CommonSpec: arcadiav1alpha1.CommonSpec{
DisplayName: "Conversation",
Description: "Knowledgebase built for conversation",
},
Type: arcadiav1alpha1.KnowledgeBaseTypeConversation,
Embedder: embedder.TypedObjectReference(),
VectorStore: vs.TypedObjectReference(),
FileGroups: make([]arcadiav1alpha1.FileGroup, 0),
},
}

// app as ownerreference
app, _, err := cs.getApp(ctx, req.APPName, req.AppNamespace)
if err != nil {
return err
}
// systemDatasource which stores the document
systemDatasource, err := config.GetSystemDatasource(ctx, cs.cli)
if err != nil {
return err
}
// create or update the conversation knowledgebase
_, err = controllerutil.CreateOrUpdate(ctx, cs.cli, kb, func() error {
if err := controllerutil.SetControllerReference(app, kb, pkgclient.Scheme); err != nil {
return err
}
// append document path
kb.Spec.FileGroups = append(kb.Spec.FileGroups, arcadiav1alpha1.FileGroup{
Source: systemDatasource.TypedObjectReference(),
Paths: []string{document.Object},
})
return nil
})

return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ spec:
- jsonPath: .spec.displayName
name: display-name
type: string
- jsonPath: .spec.type
name: type
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -117,6 +120,10 @@ spec:
type: object
type: object
type: array
type:
default: normal
description: Type defines the type of knowledgebase
type: string
vectorStore:
description: VectorStore defines the vectorstore to store results
properties:
Expand Down
91 changes: 62 additions & 29 deletions controllers/base/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"path/filepath"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -353,34 +354,64 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
if group.Source == nil {
return errNoSource
}
versionedDataset := &arcadiav1alpha1.VersionedDataset{}

ns := kb.Namespace
if group.Source.Namespace != nil {
ns = *group.Source.Namespace
}
if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, versionedDataset); err != nil {
if apierrors.IsNotFound(err) {
return errNoSource
}
return err
}
if !versionedDataset.Status.IsReady() {
return errDataSourceNotReady
}

system, err := config.GetSystemDatasource(ctx, r.Client)
if err != nil {
return err
}
endpoint := system.Spec.Endpoint.DeepCopy()
if endpoint != nil && endpoint.AuthSecret != nil {
endpoint.AuthSecret.WithNameSpace(system.Namespace)
}
ds, err := datasource.NewLocal(ctx, r.Client, endpoint)
if err != nil {
return err
}
var ds datasource.Datasource
info := &arcadiav1alpha1.OSS{Bucket: ns}
var vsBasePath string
switch strings.ToLower(group.Source.Kind) {
case "versioneddataset":
versionedDataset := &arcadiav1alpha1.VersionedDataset{}
if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, versionedDataset); err != nil {
if apierrors.IsNotFound(err) {
return errNoSource
}
return err
}
if versionedDataset.Spec.Dataset == nil {
return fmt.Errorf("versionedDataset.Spec.Dataset is nil")
}
if !versionedDataset.Status.IsReady() {
return errDataSourceNotReady
}
system, err := config.GetSystemDatasource(ctx, r.Client)
if err != nil {
return err
}
endpoint := system.Spec.Endpoint.DeepCopy()
if endpoint != nil && endpoint.AuthSecret != nil {
endpoint.AuthSecret.WithNameSpace(system.Namespace)
}
ds, err = datasource.NewLocal(ctx, r.Client, endpoint)
if err != nil {
return err
}
// basepath for this versioneddataset
vsBasePath = filepath.Join("dataset", versionedDataset.Spec.Dataset.Name, versionedDataset.Spec.Version)
case "datasource":
dsObj := &arcadiav1alpha1.Datasource{}
if err = r.Get(ctx, types.NamespacedName{Name: group.Source.Name, Namespace: ns}, dsObj); err != nil {
if apierrors.IsNotFound(err) {
return errNoSource
}
return err
}
if !dsObj.Status.IsReady() {
return errDataSourceNotReady
}
ds, err = datasource.NewOSS(ctx, r.Client, &dsObj.Spec.Endpoint)
if err != nil {
return err
}
// for none-conversation knowledgebase, bucket is the same as datasource.
if kb.Spec.Type != arcadiav1alpha1.KnowledgeBaseTypeConversation {
info.Bucket = dsObj.Spec.OSS.Bucket
}
}

if len(kb.Status.FileGroupDetail) == 0 {
// brand new knowledgebase, init status.
Expand All @@ -391,7 +422,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
var fileGroupDetail *arcadiav1alpha1.FileGroupDetail
pathMap := make(map[string]*arcadiav1alpha1.FileDetails, 1)
for i, detail := range kb.Status.FileGroupDetail {
if detail.Source != nil && detail.Source.Name == versionedDataset.Name && detail.Source.GetNamespace(kb.GetNamespace()) == versionedDataset.GetNamespace() {
if detail.Source != nil && detail.Source.Name == group.Source.Name && detail.Source.GetNamespace(kb.GetNamespace()) == ns {
fileGroupDetail = &kb.Status.FileGroupDetail[i]
for i, detail := range fileGroupDetail.FileDetails {
pathMap[detail.Path] = &fileGroupDetail.FileDetails[i]
Expand Down Expand Up @@ -430,13 +461,15 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
})
fileDetail = &fileGroupDetail.FileDetails[len(fileGroupDetail.FileDetails)-1]
}
if versionedDataset.Spec.Dataset == nil {
err = fmt.Errorf("versionedDataset.Spec.Dataset is nil")
errs = append(errs, err)
fileDetail.UpdateErr(err, arcadiav1alpha1.FileProcessPhaseFailed)
continue

switch strings.ToLower(group.Source.Kind) {
case "versioneddataset":
// info.Object has been
info.Object = filepath.Join(vsBasePath, path)
case "datasource":
info.Object = path
}
info.Object = filepath.Join("dataset", versionedDataset.Spec.Dataset.Name, versionedDataset.Spec.Version, path)

stat, err := ds.StatFile(ctx, info)
log.V(5).Info(fmt.Sprintf("raw StatFile:%#v", stat), "path", path)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion deploy/charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(Also a KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.3.16
version: 0.3.17
appVersion: "0.2.0"

keywords:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ spec:
- jsonPath: .spec.displayName
name: display-name
type: string
- jsonPath: .spec.type
name: type
type: string
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -117,6 +120,10 @@ spec:
type: object
type: object
type: array
type:
default: normal
description: Type defines the type of knowledgebase
type: string
vectorStore:
description: VectorStore defines the vectorstore to store results
properties:
Expand Down

0 comments on commit c1ebe9b

Please sign in to comment.