From abd7b73bd2cd673d9cc28a59f08a4a331e2d6f55 Mon Sep 17 00:00:00 2001 From: 0xff-dev Date: Wed, 13 Dec 2023 13:42:39 +0800 Subject: [PATCH] fix: fixed an issue where services deployed via helm are dependent on each other and do not start --- controllers/datasource_controller.go | 2 +- controllers/namespace_controller.go | 2 +- controllers/versioneddataset_controller.go | 4 +- graphql-server/go-server/pkg/common/common.go | 15 +++ graphql-server/go-server/pkg/model/model.go | 2 +- .../pkg/versioneddataset/versioned_dataset.go | 2 +- .../go-server/service/minio_server.go | 107 +++++++++++++----- pkg/datasource/datasource.go | 2 +- pkg/datasource/oss.go | 88 +++++--------- pkg/scheduler/scheduler.go | 2 +- pkg/worker/loader.go | 2 +- 11 files changed, 129 insertions(+), 99 deletions(-) diff --git a/controllers/datasource_controller.go b/controllers/datasource_controller.go index 84e73602e..094f9987e 100644 --- a/controllers/datasource_controller.go +++ b/controllers/datasource_controller.go @@ -164,7 +164,7 @@ func (r *DatasourceReconciler) Checkdatasource(ctx context.Context, logger logr. if endpoint.AuthSecret != nil { endpoint.AuthSecret.WithNameSpace(instance.Namespace) } - ds, err = datasource.NewOSS(ctx, r.Client, endpoint) + ds, err = datasource.NewOSS(ctx, r.Client, nil, endpoint) if err != nil { return r.UpdateStatus(ctx, instance, err) } diff --git a/controllers/namespace_controller.go b/controllers/namespace_controller.go index 34cd30208..8362b847a 100644 --- a/controllers/namespace_controller.go +++ b/controllers/namespace_controller.go @@ -132,7 +132,7 @@ func (r *NamespaceReconciler) ossClient(ctx context.Context) (*datasource.OSS, e if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) } - oss, err := datasource.NewOSS(ctx, r.Client, endpoint) + oss, err := datasource.NewOSS(ctx, r.Client, nil, endpoint) if err != nil { klog.Errorf("generate new minio client error %s", err) return nil, err diff --git a/controllers/versioneddataset_controller.go b/controllers/versioneddataset_controller.go index 8a202df64..5a6364f5d 100644 --- a/controllers/versioneddataset_controller.go +++ b/controllers/versioneddataset_controller.go @@ -222,7 +222,7 @@ func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger log if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) } - oss, err := datasource.NewOSS(ctx, r.Client, endpoint) + oss, err := datasource.NewOSS(ctx, r.Client, nil, endpoint) if err != nil { logger.Error(err, "Failed to generate new minio client") return false, nil, err @@ -242,7 +242,7 @@ func (r *VersionedDatasetReconciler) removeBucketFiles(ctx context.Context, logg if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) } - oss, err := datasource.NewOSS(ctx, r.Client, endpoint) + oss, err := datasource.NewOSS(ctx, r.Client, nil, endpoint) if err != nil { logger.Error(err, "Failed to generate new minio client") return err diff --git a/graphql-server/go-server/pkg/common/common.go b/graphql-server/go-server/pkg/common/common.go index 7775436e4..541cef9d5 100644 --- a/graphql-server/go-server/pkg/common/common.go +++ b/graphql-server/go-server/pkg/common/common.go @@ -23,8 +23,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubeagi/arcadia/graphql-server/go-server/graph/generated" + "github.com/kubeagi/arcadia/pkg/config" + "github.com/kubeagi/arcadia/pkg/datasource" ) var ( @@ -62,3 +65,15 @@ func ResouceUpdate(ctx context.Context, c dynamic.Interface, resource generated. Object: newObject, }, options, subresources...) } + +func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client, dynamicClient dynamic.Interface) (*datasource.OSS, error) { + systemDatasource, err := config.GetSystemDatasource(ctx, mgrClient, dynamicClient) + if err != nil { + return nil, err + } + endpoint := systemDatasource.Spec.Enpoint.DeepCopy() + if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { + endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) + } + return datasource.NewOSS(ctx, mgrClient, dynamicClient, endpoint) +} diff --git a/graphql-server/go-server/pkg/model/model.go b/graphql-server/go-server/pkg/model/model.go index 383fade04..b5f85b01b 100644 --- a/graphql-server/go-server/pkg/model/model.go +++ b/graphql-server/go-server/pkg/model/model.go @@ -289,7 +289,7 @@ func ModelFiles(ctx context.Context, c dynamic.Interface, modelName, namespace s endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) } - oss, err := datasource.NewOSSWithDynamciClient(ctx, c, endpoint) + oss, err := datasource.NewOSS(ctx, nil, c, endpoint) if err != nil { return nil, err } diff --git a/graphql-server/go-server/pkg/versioneddataset/versioned_dataset.go b/graphql-server/go-server/pkg/versioneddataset/versioned_dataset.go index 5d86a3b01..9d2d5112a 100644 --- a/graphql-server/go-server/pkg/versioneddataset/versioned_dataset.go +++ b/graphql-server/go-server/pkg/versioneddataset/versioned_dataset.go @@ -113,7 +113,7 @@ func VersionFiles(ctx context.Context, c dynamic.Interface, input *generated.Ver endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) } - oss, err := datasource.NewOSSWithDynamciClient(ctx, c, endpoint) + oss, err := datasource.NewOSS(ctx, nil, c, endpoint) if err != nil { return nil, err } diff --git a/graphql-server/go-server/service/minio_server.go b/graphql-server/go-server/service/minio_server.go index f0d64d4df..9086ba571 100644 --- a/graphql-server/go-server/service/minio_server.go +++ b/graphql-server/go-server/service/minio_server.go @@ -25,6 +25,7 @@ import ( "github.com/gin-gonic/gin" "github.com/minio/minio-go/v7" + "k8s.io/client-go/dynamic" "k8s.io/klog/v2" "github.com/kubeagi/arcadia/api/base/v1alpha1" @@ -34,16 +35,14 @@ import ( "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/common" "github.com/kubeagi/arcadia/graphql-server/go-server/pkg/oidc" "github.com/kubeagi/arcadia/pkg/cache" - "github.com/kubeagi/arcadia/pkg/config" "github.com/kubeagi/arcadia/pkg/datasource" ) type ( minioAPI struct { conf gqlconfig.ServerConfig - source *datasource.OSS - - store cache.Cache + client dynamic.Interface + store cache.Cache } Chunk struct { @@ -138,8 +137,16 @@ func (m *minioAPI) GetSuccessChunks(ctx *gin.Context) { Done: false, } + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } // First check if the file already exists in minio - anyObject, err := m.source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{Bucket: bucketName, Object: objectName}) + anyObject, err := source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{Bucket: bucketName, Object: objectName}) if err == nil { objectInfo, ok := anyObject.(minio.ObjectInfo) if !ok { @@ -187,7 +194,7 @@ func (m *minioAPI) GetSuccessChunks(ctx *gin.Context) { // Checking already uploaded chunks r.UploadID = fileChunk.UploadID r.Chunks = make([]Chunk, 0) - result, err := m.source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(bucketName), + result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(bucketName), datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileChunk.FileName), datasource.WithUploadID(fileChunk.UploadID)) if err != nil { @@ -249,7 +256,15 @@ func (m *minioAPI) NewMultipart(ctx *gin.Context) { return } - uploadID, err := m.source.NewMultipartIdentifier(ctx.Request.Context(), datasource.WithBucket(body.Bucket), + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + uploadID, err := source.NewMultipartIdentifier(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(body.FileName), datasource.WithAnnotations(map[string]string{ "size": fmt.Sprintf("%d", body.Size), @@ -332,7 +347,15 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) { } fileChunk := fc.(*common.FileChunk) - result, err := m.source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(body.Bucket), + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(fileChunk.FileName), datasource.WithUploadID(body.UploadID)) if err != nil { @@ -352,7 +375,7 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) { } } - url, err := m.source.GenMultipartSignedURL(ctx.Request.Context(), + url, err := source.GenMultipartSignedURL(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath), datasource.WithUploadID(body.UploadID), @@ -382,7 +405,15 @@ func (m *minioAPI) CompleteMultipart(ctx *gin.Context) { }) return } - err := m.source.Complete(ctx.Request.Context(), + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + err = source.Complete(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath), datasource.WithUploadID(body.UploadID), @@ -408,11 +439,19 @@ func (m *minioAPI) DeleteFiles(ctx *gin.Context) { }) return } + source, err := common.SystemDatasourceOSS(context.TODO(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } bucketPath := strings.TrimSuffix(body.BucketPath, "/") for _, f := range body.Files { go func(fn string) { - if err := m.source.Remove(context.TODO(), &v1alpha1.OSS{Bucket: body.Bucket, Object: fmt.Sprintf("%s/%s", bucketPath, fn)}); err != nil { + if err := source.Remove(context.TODO(), &v1alpha1.OSS{Bucket: body.Bucket, Object: fmt.Sprintf("%s/%s", bucketPath, fn)}); err != nil { klog.Errorf("faile to delete file %s/%s from bucket %s, error %s", bucketPath, fn, body.Bucket, err) } }(f) @@ -430,7 +469,15 @@ func (m *minioAPI) Abort(ctx *gin.Context) { return } - if err := m.source.Abort(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath), + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + if err := source.Abort(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(body.FileName), datasource.WithUploadID(body.UploadID)); err != nil { klog.Errorf("failed to stop file upload, error %s", err) ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ @@ -438,6 +485,7 @@ func (m *minioAPI) Abort(ctx *gin.Context) { }) return } + ctx.JSON(http.StatusOK, "success") } @@ -446,7 +494,15 @@ func (m *minioAPI) StatFile(ctx *gin.Context) { bucket := ctx.Query(bucketQuery) bucketPath := ctx.Query(bucketPathQuery) - anyObject, err := m.source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{ + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + anyObject, err := source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{ Object: fmt.Sprintf("%s/%s", bucketPath, fileName), Bucket: bucket, }) @@ -493,7 +549,15 @@ func (m *minioAPI) Download(ctx *gin.Context) { objectName := fmt.Sprintf("%s/%s", bucketPath, fileName) opt := minio.GetObjectOptions{} _ = opt.SetRange(from, end) - info, err := m.source.Client.GetObject(ctx.Request.Context(), bucket, objectName, opt) + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + info, err := source.Client.GetObject(ctx.Request.Context(), bucket, objectName, opt) if err != nil { klog.Errorf("failed to get object %s/%s range %d-%d errro %s", bucket, objectName, from, end, err) ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ @@ -509,21 +573,8 @@ func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) { if err != nil { panic(err) } - systemDatasource, err := config.GetSystemDatasource(context.TODO(), nil, c) - if err != nil { - panic(err) - } - endpoint := systemDatasource.Spec.Enpoint.DeepCopy() - if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { - endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) - } - - oss, err := datasource.NewOSSWithDynamciClient(context.TODO(), c, endpoint) - if err != nil { - panic(err) - } - api := minioAPI{conf: conf, store: cache.NewMemCache(), source: oss} + api := minioAPI{conf: conf, store: cache.NewMemCache(), client: c} { // model apis diff --git a/pkg/datasource/datasource.go b/pkg/datasource/datasource.go index 7ea8982aa..8533e3b2e 100644 --- a/pkg/datasource/datasource.go +++ b/pkg/datasource/datasource.go @@ -168,7 +168,7 @@ type Local struct { } func NewLocal(ctx context.Context, c client.Client, endpoint *v1alpha1.Endpoint) (*Local, error) { - oss, err := NewOSS(ctx, c, endpoint) + oss, err := NewOSS(ctx, c, nil, endpoint) if err != nil { return nil, err } diff --git a/pkg/datasource/oss.go b/pkg/datasource/oss.go index 8790c0cf1..2743dae1c 100644 --- a/pkg/datasource/oss.go +++ b/pkg/datasource/oss.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubeagi/arcadia/api/base/v1alpha1" + "github.com/kubeagi/arcadia/pkg/utils" ) var ( @@ -58,78 +59,41 @@ var ( ossDefaultGetTagOpt = minio.GetObjectTaggingOptions{} ) -func NewOSSWithDynamciClient(ctx context.Context, c dynamic.Interface, endpoint *v1alpha1.Endpoint) (*OSS, error) { +func NewOSS(ctx context.Context, c client.Client, dc dynamic.Interface, endpoint *v1alpha1.Endpoint) (*OSS, error) { var accessKeyID, secretAccessKey string if endpoint.AuthSecret != nil { if endpoint.AuthSecret.Namespace == nil { return nil, errors.New("no namepsace found for endpoint.authsecret") } - secret, err := c.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}). - Namespace(*endpoint.AuthSecret.Namespace).Get(ctx, endpoint.AuthSecret.Name, v1.GetOptions{}) - if err != nil { + if err := utils.ValidateClient(c, dc); err != nil { return nil, err } - data, _, _ := unstructured.NestedStringMap(secret.Object, "data") - - if ds, err := base64.StdEncoding.DecodeString(data["rootUser"]); err == nil { - accessKeyID = string(ds) - } - if ds, err := base64.StdEncoding.DecodeString(data["rootPassword"]); err == nil { - secretAccessKey = string(ds) - } - // TODO: implement https(secure check) - // if !endpoint.Insecure { - // } - } - - mc, err := minio.New(endpoint.URL, &minio.Options{ - Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), - Secure: !endpoint.Insecure, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - }) - if err != nil { - return nil, err - } - - core, err := minio.NewCore(endpoint.URL, &minio.Options{ - Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), - Secure: !endpoint.Insecure, - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - }) - if err != nil { - return nil, err - } - - return &OSS{Client: mc, Core: core}, nil -} + if dc != nil { + secret, err := dc.Resource(schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}). + Namespace(*endpoint.AuthSecret.Namespace).Get(ctx, endpoint.AuthSecret.Name, v1.GetOptions{}) + if err != nil { + return nil, err + } + data, _, _ := unstructured.NestedStringMap(secret.Object, "data") -func NewOSS(ctx context.Context, c client.Client, endpoint *v1alpha1.Endpoint) (*OSS, error) { - var accessKeyID, secretAccessKey string - if endpoint.AuthSecret != nil { - if endpoint.AuthSecret.Namespace == nil { - return nil, errors.New("no namepsace found for endpoint.authsecret") + if ds, err := base64.StdEncoding.DecodeString(data["rootUser"]); err == nil { + accessKeyID = string(ds) + } + if ds, err := base64.StdEncoding.DecodeString(data["rootPassword"]); err == nil { + secretAccessKey = string(ds) + } } - secret := corev1.Secret{} - if err := c.Get(ctx, types.NamespacedName{ - Namespace: *endpoint.AuthSecret.Namespace, - Name: endpoint.AuthSecret.Name, - }, &secret); err != nil { - return nil, err + if c != nil { + secret := corev1.Secret{} + if err := c.Get(ctx, types.NamespacedName{ + Namespace: *endpoint.AuthSecret.Namespace, + Name: endpoint.AuthSecret.Name, + }, &secret); err != nil { + return nil, err + } + accessKeyID = string(secret.Data["rootUser"]) + secretAccessKey = string(secret.Data["rootPassword"]) } - accessKeyID = string(secret.Data["rootUser"]) - secretAccessKey = string(secret.Data["rootPassword"]) - - // TODO: implement https(secure check) - // if !endpoint.Insecure { - // } } mc, err := minio.New(endpoint.URL, &minio.Options{ diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2ae47af9e..59b536485 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -59,7 +59,7 @@ func NewScheduler(ctx context.Context, c client.Client, instance *v1alpha1.Versi if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil { endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace) } - oss, err := datasource.NewOSS(ctx1, c, endpoint) + oss, err := datasource.NewOSS(ctx1, c, nil, endpoint) if err != nil { cancel() klog.Errorf("generate new minio client error %s", err) diff --git a/pkg/worker/loader.go b/pkg/worker/loader.go index 6a8d16f77..08ef616dc 100644 --- a/pkg/worker/loader.go +++ b/pkg/worker/loader.go @@ -49,7 +49,7 @@ func NewLoaderOSS(ctx context.Context, c client.Client, endpoint *arcadiav1alpha return nil, errors.New("nil oss endpoint") } - oss, err := datasource.NewOSS(ctx, c, endpoint) + oss, err := datasource.NewOSS(ctx, c, nil, endpoint) if err != nil { return nil, fmt.Errorf("failed to new oss client with %w", err) }