Skip to content

Commit

Permalink
Merge pull request #557 from Abirdcfly/kn
Browse files Browse the repository at this point in the history
feat: knowledgebase support more granular update
  • Loading branch information
bjwswang authored Jan 16, 2024
2 parents 0a8f671 + b01bbdb commit c36cbe1
Show file tree
Hide file tree
Showing 18 changed files with 429 additions and 134 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/pgvector_image_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ name: Build pgvector images
on:
pull_request:
branches: [main]
paths:
- 'deploy/pgvector/Dockerfile'
push:
branches: [main]
paths:
Expand Down
5 changes: 5 additions & 0 deletions api/base/v1alpha1/knowledgebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// UpdateSourceFileAnnotationKey is the key of the update source file annotation
UpdateSourceFileAnnotationKey = Group + "/update-source-file-time"
)

func (kb *KnowledgeBase) VectorStoreCollectionName() string {
return kb.Namespace + "_" + kb.Name
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/app-node/chain/llmchain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package chain

import (
"context"
"reflect"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -127,6 +128,9 @@ func (r *LLMChainReconciler) patchStatus(ctx context.Context, instance *api.LLMC
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(instance), latest); err != nil {
return err
}
if reflect.DeepEqual(instance.Status, latest.Status) {
return nil
}
patch := client.MergeFrom(latest.DeepCopy())
latest.Status = instance.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("LLMChain-controller"))
Expand Down
4 changes: 4 additions & 0 deletions controllers/app-node/chain/retrieval_qa_chain_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package chain

import (
"context"
"reflect"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -127,6 +128,9 @@ func (r *RetrievalQAChainReconciler) patchStatus(ctx context.Context, instance *
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(instance), latest); err != nil {
return err
}
if reflect.DeepEqual(instance.Status, latest.Status) {
return nil
}
patch := client.MergeFrom(latest.DeepCopy())
latest.Status = instance.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("RetrievalQAChain-controller"))
Expand Down
4 changes: 4 additions & 0 deletions controllers/app-node/prompt/prompt_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package chain

import (
"context"
"reflect"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -125,6 +126,9 @@ func (r *PromptReconciler) patchStatus(ctx context.Context, instance *api.Prompt
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(instance), latest); err != nil {
return err
}
if reflect.DeepEqual(instance.Status, latest.Status) {
return nil
}
patch := client.MergeFrom(latest.DeepCopy())
latest.Status = instance.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("Prompt-controller"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package chain

import (
"context"
"reflect"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -127,6 +128,9 @@ func (r *KnowledgeBaseRetrieverReconciler) patchStatus(ctx context.Context, inst
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(instance), latest); err != nil {
return err
}
if reflect.DeepEqual(instance.Status, latest.Status) {
return nil
}
patch := client.MergeFrom(latest.DeepCopy())
latest.Status = instance.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("KnowledgeBaseRetriever-controller"))
Expand Down
4 changes: 4 additions & 0 deletions controllers/base/application_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"reflect"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -192,6 +193,9 @@ func (r *ApplicationReconciler) patchStatus(ctx context.Context, app *arcadiav1a
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(app), latest); err != nil {
return err
}
if reflect.DeepEqual(app.Status, latest.Status) {
return nil
}
patch := client.MergeFrom(latest.DeepCopy())
latest.Status = app.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("application-controller"))
Expand Down
136 changes: 94 additions & 42 deletions controllers/base/knowledgebase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"path/filepath"
"reflect"
"sync"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/tmc/langchaingo/documentloaders"
"github.com/tmc/langchaingo/schema"
"github.com/tmc/langchaingo/textsplitter"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -68,6 +70,8 @@ type KnowledgeBaseReconciler struct {
Scheme *runtime.Scheme
mu sync.Mutex
HasHandledSuccessPath map[string]bool
readyMu sync.Mutex
ReadyMap map[string]bool
}

//+kubebuilder:rbac:groups=arcadia.kubeagi.k8s.com.cn,resources=knowledgebases,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -129,19 +133,29 @@ func (r *KnowledgeBaseReconciler) Reconcile(ctx context.Context, req ctrl.Reques
kb, result, err = r.reconcile(ctx, log, kb)

// Update status after reconciliation.
if updateStatusErr := r.patchStatus(ctx, kb); updateStatusErr != nil {
if updateStatusErr := r.patchStatus(ctx, log, kb); updateStatusErr != nil {
log.Error(updateStatusErr, "unable to update status after reconciliation")
return ctrl.Result{Requeue: true}, updateStatusErr
}
log.V(5).Info("Reconcile done")

return result, err
}

func (r *KnowledgeBaseReconciler) patchStatus(ctx context.Context, kb *arcadiav1alpha1.KnowledgeBase) error {
func (r *KnowledgeBaseReconciler) patchStatus(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) error {
latest := &arcadiav1alpha1.KnowledgeBase{}
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(kb), latest); err != nil {
return err
}
if reflect.DeepEqual(kb.Status, latest.Status) {
log.V(5).Info("status not changed, skip")
return nil
}
if r.isReady(kb) && !kb.Status.IsReady() {
log.V(5).Info("status is ready,but not get it from cluster, has cache, skip update status")
return nil
}
log.V(5).Info(fmt.Sprintf("try to patch status %#v", kb.Status))
patch := client.MergeFrom(latest.DeepCopy())
latest.Status = kb.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner("knowledgebase-controller"))
Expand All @@ -155,45 +169,60 @@ func (r *KnowledgeBaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *KnowledgeBaseReconciler) reconcile(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) (*arcadiav1alpha1.KnowledgeBase, ctrl.Result, error) {
// Observe generation change
if kb.Status.ObservedGeneration != kb.Generation {
kb.Status.ObservedGeneration = kb.Generation
kb = r.setCondition(kb, kb.InitCondition())
if updateStatusErr := r.patchStatus(ctx, kb); updateStatusErr != nil {
// Observe generation change or manual update
if kb.Status.ObservedGeneration != kb.Generation || kb.Annotations[arcadiav1alpha1.UpdateSourceFileAnnotationKey] != "" {
r.cleanupHasHandledSuccessPath(kb)
if kb.Status.ObservedGeneration != kb.Generation {
log.Info("Generation changed")
kb.Status.ObservedGeneration = kb.Generation
}
kb = r.setCondition(log, kb, kb.InitCondition())
if updateStatusErr := r.patchStatus(ctx, log, kb); updateStatusErr != nil {
log.Error(updateStatusErr, "unable to update status after generation update")
return kb, ctrl.Result{Requeue: true}, updateStatusErr
}
if kb.Annotations[arcadiav1alpha1.UpdateSourceFileAnnotationKey] != "" {
log.Info("Manual update")
kbNew := kb.DeepCopy()
delete(kbNew.Annotations, arcadiav1alpha1.UpdateSourceFileAnnotationKey)
err := r.Patch(ctx, kbNew, client.MergeFrom(kb))
if err != nil {
return kb, ctrl.Result{Requeue: true}, err
}
}
return kb, ctrl.Result{}, nil
}

if kb.Status.IsReady() {
if kb.Status.IsReady() || r.isReady(kb) {
log.Info("KnowledgeBase is ready, skip reconcile")
return kb, ctrl.Result{}, nil
}

embedderReq := kb.Spec.Embedder
vectorStoreReq := kb.Spec.VectorStore
fileGroupsReq := kb.Spec.FileGroups
if embedderReq == nil || vectorStoreReq == nil || len(fileGroupsReq) == 0 {
kb = r.setCondition(kb, kb.PendingCondition("embedder or vectorstore or filegroups is not setting"))
kb = r.setCondition(log, kb, kb.PendingCondition("embedder or vectorstore or filegroups is not setting"))
return kb, ctrl.Result{}, nil
}

embedder := &arcadiav1alpha1.Embedder{}
if err := r.Get(ctx, types.NamespacedName{Name: kb.Spec.Embedder.Name, Namespace: kb.Spec.Embedder.GetNamespace(kb.GetNamespace())}, embedder); err != nil {
if apierrors.IsNotFound(err) {
kb = r.setCondition(kb, kb.PendingCondition("embedder is not found"))
kb = r.setCondition(log, kb, kb.PendingCondition("embedder is not found"))
return kb, ctrl.Result{RequeueAfter: waitLonger}, nil
}
kb = r.setCondition(kb, kb.ErrorCondition(err.Error()))
kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error()))
return kb, ctrl.Result{}, err
}

vectorStore := &arcadiav1alpha1.VectorStore{}
if err := r.Get(ctx, types.NamespacedName{Name: kb.Spec.VectorStore.Name, Namespace: kb.Spec.VectorStore.GetNamespace(kb.GetNamespace())}, vectorStore); err != nil {
if apierrors.IsNotFound(err) {
kb = r.setCondition(kb, kb.PendingCondition("vectorStore is not found"))
kb = r.setCondition(log, kb, kb.PendingCondition("vectorStore is not found"))
return kb, ctrl.Result{RequeueAfter: waitLonger}, nil
}
kb = r.setCondition(kb, kb.ErrorCondition(err.Error()))
kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error()))
return kb, ctrl.Result{}, err
}

Expand All @@ -205,24 +234,35 @@ func (r *KnowledgeBaseReconciler) reconcile(ctx context.Context, log logr.Logger
}
}
if err := errors.Join(errs...); err != nil {
kb = r.setCondition(kb, kb.ErrorCondition(err.Error()))
kb = r.setCondition(log, kb, kb.ErrorCondition(err.Error()))
return kb, ctrl.Result{RequeueAfter: waitMedium}, nil
} else {
for _, fileGroupDetail := range kb.Status.FileGroupDetail {
for _, fileDetail := range fileGroupDetail.FileDetails {
if fileDetail.Phase == arcadiav1alpha1.FileProcessPhaseFailed && fileDetail.ErrMessage != "" {
kb = r.setCondition(kb, kb.ErrorCondition(fileDetail.ErrMessage))
kb = r.setCondition(log, kb, kb.ErrorCondition(fileDetail.ErrMessage))
return kb, ctrl.Result{RequeueAfter: waitMedium}, nil
}
}
}
kb = r.setCondition(kb, kb.ReadyCondition())
kb = r.setCondition(log, kb, kb.ReadyCondition())
}

return kb, ctrl.Result{}, nil
}

func (r *KnowledgeBaseReconciler) setCondition(kb *arcadiav1alpha1.KnowledgeBase, condition ...arcadiav1alpha1.Condition) *arcadiav1alpha1.KnowledgeBase {
func (r *KnowledgeBaseReconciler) setCondition(log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase, condition ...arcadiav1alpha1.Condition) *arcadiav1alpha1.KnowledgeBase {
ready := false
for _, c := range condition {
if c.Type == arcadiav1alpha1.TypeReady && c.Status == corev1.ConditionTrue {
ready = true
break
}
}
if ready {
r.ready(log, kb)
} else {
r.unready(log, kb)
}
kb.Status.SetConditions(condition...)
return kb
}
Expand Down Expand Up @@ -270,6 +310,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
// brand new knowledgebase, init status.
kb.Status.FileGroupDetail = make([]arcadiav1alpha1.FileGroupDetail, 1)
kb.Status.FileGroupDetail[0].Init(group)
log.V(5).Info("init filegroupdetail status")
}
var fileGroupDetail *arcadiav1alpha1.FileGroupDetail
pathMap := make(map[string]*arcadiav1alpha1.FileDetails, 1)
Expand All @@ -284,6 +325,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
}
if fileGroupDetail == nil {
// this group is newly added
log.V(5).Info("new added group, init filegroupdetail status")
fileGroupDetail = &arcadiav1alpha1.FileGroupDetail{}
fileGroupDetail.Init(group)
kb.Status.FileGroupDetail = append(kb.Status.FileGroupDetail, *fileGroupDetail)
Expand Down Expand Up @@ -376,6 +418,7 @@ func (r *KnowledgeBaseReconciler) reconcileFileGroup(ctx context.Context, log lo
r.HasHandledSuccessPath[r.hasHandledPathKey(kb, group, path)] = true
r.mu.Unlock()
fileDetail.UpdateErr(nil, arcadiav1alpha1.FileProcessPhaseSucceeded)
log.Info("handle FileGroup succeeded")
}
return errors.Join(errs...)
}
Expand Down Expand Up @@ -458,38 +501,18 @@ func (r *KnowledgeBaseReconciler) handleFile(ctx context.Context, log logr.Logge
return err
}
}
for i, doc := range documents {
log.V(5).Info(fmt.Sprintf("document[%d]: embedding:%s, metadata:%v", i, doc.PageContent, doc.Metadata))
}
s, finish, err := vectorstore.NewVectorStore(ctx, store, em, kb.VectorStoreCollectionName(), r.Client, nil)
if err != nil {
return err
}
log.Info("handle file: add documents to embedder")
if _, err = s.AddDocuments(ctx, documents); err != nil {
return err
}
if finish != nil {
finish()
}
log.Info("handle file succeeded")
return nil
return vectorstore.AddDocuments(ctx, log, store, em, kb.VectorStoreCollectionName(), r.Client, nil, documents)
}

func (r *KnowledgeBaseReconciler) reconcileDelete(ctx context.Context, log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) {
r.mu.Lock()
for _, fg := range kb.Spec.FileGroups {
for _, path := range fg.Paths {
delete(r.HasHandledSuccessPath, r.hasHandledPathKey(kb, fg, path))
}
}
r.mu.Unlock()
r.cleanupHasHandledSuccessPath(kb)
r.unready(log, kb)
vectorStore := &arcadiav1alpha1.VectorStore{}
if err := r.Get(ctx, types.NamespacedName{Name: kb.Spec.VectorStore.Name, Namespace: kb.Spec.VectorStore.GetNamespace(kb.GetNamespace())}, vectorStore); err != nil {
log.Error(err, "reconcile delete: get vector store error, may leave garbage data")
return
}
_ = vectorstore.RemoveCollection(ctx, log, vectorStore, kb.VectorStoreCollectionName())
_ = vectorstore.RemoveCollection(ctx, log, vectorStore, kb.VectorStoreCollectionName(), r.Client, nil)
}

func (r *KnowledgeBaseReconciler) hasHandledPathKey(kb *arcadiav1alpha1.KnowledgeBase, filegroup arcadiav1alpha1.FileGroup, path string) string {
Expand All @@ -499,3 +522,32 @@ func (r *KnowledgeBaseReconciler) hasHandledPathKey(kb *arcadiav1alpha1.Knowledg
}
return kb.Name + "/" + kb.Namespace + "/" + sourceName + "/" + path
}

func (r *KnowledgeBaseReconciler) cleanupHasHandledSuccessPath(kb *arcadiav1alpha1.KnowledgeBase) {
r.mu.Lock()
for _, fg := range kb.Spec.FileGroups {
for _, path := range fg.Paths {
delete(r.HasHandledSuccessPath, r.hasHandledPathKey(kb, fg, path))
}
}
r.mu.Unlock()
}

func (r *KnowledgeBaseReconciler) ready(log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) {
r.readyMu.Lock()
defer r.readyMu.Unlock()
log.V(5).Info("ready")
r.ReadyMap[string(kb.GetUID())] = true
}

func (r *KnowledgeBaseReconciler) unready(log logr.Logger, kb *arcadiav1alpha1.KnowledgeBase) {
r.readyMu.Lock()
defer r.readyMu.Unlock()
log.V(5).Info("unready")
delete(r.ReadyMap, string(kb.GetUID()))
}

func (r *KnowledgeBaseReconciler) isReady(kb *arcadiav1alpha1.KnowledgeBase) bool {
v, ok := r.ReadyMap[string(kb.GetUID())]
return ok && v
}
2 changes: 1 addition & 1 deletion deploy/charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.2.11
version: 0.2.12
appVersion: "0.1.0"

keywords:
Expand Down
Loading

0 comments on commit c36cbe1

Please sign in to comment.