diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 13a39bf22..01ae04585 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -28,6 +28,14 @@ rules: verbs: - get - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - watch - apiGroups: - "" resources: diff --git a/config/samples/arcadia_v1alpha1_embedders.yaml b/config/samples/arcadia_v1alpha1_embedders.yaml index f2b73e0c1..f17b19b69 100644 --- a/config/samples/arcadia_v1alpha1_embedders.yaml +++ b/config/samples/arcadia_v1alpha1_embedders.yaml @@ -5,7 +5,7 @@ metadata: namespace: arcadia type: Opaque data: - apiKey: "NGZjY2VjZWIxNjY2Y2QxMTgwOGMyMThkNmQ2MTk5NTAuVENYVXZhUUNXRnlJa3hCMw==" # replace this with your API key + apiKey: "MjZlMzA4YzljNTA2NjcxNmFjYjdhM2M0OGUyZGU0YTcuck9zWktDVEJ0RzljV2JKNw==" # replace this with your API key --- apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1 kind: Embedder diff --git a/config/samples/arcadia_v1alpha1_llm.yaml b/config/samples/arcadia_v1alpha1_llm.yaml index 53728413d..b91f4e7ee 100644 --- a/config/samples/arcadia_v1alpha1_llm.yaml +++ b/config/samples/arcadia_v1alpha1_llm.yaml @@ -4,7 +4,7 @@ metadata: name: zhipuai type: Opaque data: - apiKey: "NGZjY2VjZWIxNjY2Y2QxMTgwOGMyMThkNmQ2MTk5NTAuVENYVXZhUUNXRnlJa3hCMw==" # replace this with your API key + apiKey: "MjZlMzA4YzljNTA2NjcxNmFjYjdhM2M0OGUyZGU0YTcuck9zWktDVEJ0RzljV2JKNw==" # replace this with your API key --- apiVersion: arcadia.kubeagi.k8s.com.cn/v1alpha1 kind: LLM diff --git a/controllers/namespace_controller.go b/controllers/namespace_controller.go new file mode 100644 index 000000000..94300590d --- /dev/null +++ b/controllers/namespace_controller.go @@ -0,0 +1,169 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/minio/minio-go/v7" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/kubeagi/arcadia/pkg/config" + "github.com/kubeagi/arcadia/pkg/datasource" + "github.com/kubeagi/arcadia/pkg/utils" +) + +const ( + BucketNotEmpty = "The bucket you tried to delete is not empty" + BucketNotExist = "The specified bucket does not exist" + + // this is the name of a configmap under the same namespace as operator. the key of the data field is the name of each namespace not to be handled. + SkipNamespaceConfigMap = "skip-namespaces" +) + +type NamespacetReconciler struct { + client.Client + Scheme *runtime.Scheme +} + +//+kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the Dataset object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.2/pkg/reconcile +func (r *NamespacetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + _ = log.FromContext(ctx) + + var err error + instance := &v1.Namespace{} + if err = r.Client.Get(ctx, req.NamespacedName, instance); err != nil { + if errors.IsNotFound(err) { + err = r.removeBucket(ctx, req.Name) + return reconcile.Result{RequeueAfter: 3 * time.Second}, err + } + return reconcile.Result{}, err + } + skip, err := r.checkSkippedNamespace(ctx, instance.Name) + if err != nil { + return reconcile.Result{}, err + } + if skip { + klog.Infof("namespace %s is in the filter list and will not be created, delete the corresponding bucket.", instance.Name) + return reconcile.Result{}, nil + } + + err = r.syncBucket(ctx, instance.Name) + return ctrl.Result{}, err +} + +// SetupWithManager sets up the controller with the Manager. +func (r *NamespacetReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1.Namespace{}, builder.WithPredicates(predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + return false + }, + })). + Complete(r) +} + +func (r *NamespacetReconciler) ossClient(ctx context.Context) (*datasource.OSS, error) { + systemDatasource, err := config.GetSystemDatasource(ctx, r.Client) + if err != nil { + klog.Errorf("get system datasource error %s", err) + return nil, err + } + endpoint := systemDatasource.Spec.Enpoint.DeepCopy() + if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { + endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) + } + oss, err := datasource.NewOSS(ctx, r.Client, endpoint) + if err != nil { + klog.Errorf("generate new minio client error %s", err) + return nil, err + } + return oss, nil +} + +func (r *NamespacetReconciler) syncBucket(ctx context.Context, namespace string) error { + oss, err := r.ossClient(ctx) + if err != nil { + err = fmt.Errorf("sync bucket: failed to get oss client error %w", err) + klog.Error(err) + return err + } + exists, err := oss.Client.BucketExists(ctx, namespace) + if err != nil { + err = fmt.Errorf("check if the bucket exists and an error occurs, error %w", err) + klog.Error(err) + return err + } + if !exists { + klog.Infof("bucket %s does not exist, ready to create bucket", namespace) + if err = oss.Client.MakeBucket(ctx, namespace, minio.MakeBucketOptions{}); err != nil { + err = fmt.Errorf("and error osccured creating the bucket, error %w", err) + klog.Error(err) + return err + } + } + return nil +} + +func (r *NamespacetReconciler) removeBucket(ctx context.Context, namespace string) error { + oss, err := r.ossClient(ctx) + if err != nil { + err = fmt.Errorf("remove bucket: failed to get oss client error %w", err) + klog.Error(err) + return err + } + err = oss.Client.RemoveBucket(ctx, namespace) + if err == nil || err.Error() == BucketNotExist { + return nil + } + return err +} + +func (r *NamespacetReconciler) checkSkippedNamespace(ctx context.Context, namespace string) (bool, error) { + cm := v1.ConfigMap{} + if err := r.Client.Get(ctx, types.NamespacedName{Namespace: utils.GetSelfNamespace(), Name: SkipNamespaceConfigMap}, &cm); err != nil { + if errors.IsNotFound(err) { + return false, nil + } + return false, err + } + _, ok := cm.Data[namespace] + return ok, nil +} diff --git a/deploy/charts/arcadia/Chart.yaml b/deploy/charts/arcadia/Chart.yaml index 4cf69aeea..65188e52d 100644 --- a/deploy/charts/arcadia/Chart.yaml +++ b/deploy/charts/arcadia/Chart.yaml @@ -2,7 +2,7 @@ apiVersion: v2 name: arcadia description: A Helm chart(KubeBB Component) for KubeAGI Arcadia type: application -version: 0.1.31 +version: 0.1.32 appVersion: "0.0.1" keywords: diff --git a/deploy/charts/arcadia/templates/rbac.yaml b/deploy/charts/arcadia/templates/rbac.yaml index ed502c84f..53c1c8bab 100644 --- a/deploy/charts/arcadia/templates/rbac.yaml +++ b/deploy/charts/arcadia/templates/rbac.yaml @@ -45,6 +45,14 @@ rules: verbs: - get - watch +- apiGroups: + - "" + resources: + - namespaces + verbs: + - get + - list + - watch - apiGroups: - "" resources: @@ -152,6 +160,19 @@ rules: - patch - update - watch +- apiGroups: + - arcadia.kubeagi.k8s.com.cn + resources: + - embedders + - llms + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - arcadia.kubeagi.k8s.com.cn resources: @@ -166,6 +187,15 @@ rules: - get - patch - update +- apiGroups: + - arcadia.kubeagi.k8s.com.cn + resources: + - embedders/status + - llms/status + verbs: + - get + - patch + - update - apiGroups: - arcadia.kubeagi.k8s.com.cn resources: diff --git a/main.go b/main.go index 68da8fff3..b2d9bd160 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strconv" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -50,6 +51,7 @@ func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(arcadiav1alpha1.AddToScheme(scheme)) + utilruntime.Must(v1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -204,6 +206,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "VectorStore") os.Exit(1) } + if err = (&controllers.NamespacetReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "Namespace") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {