Skip to content

Commit

Permalink
Merge pull request #363 from 0xff-dev/main
Browse files Browse the repository at this point in the history
fix: fixed an issue where services deployed via helm are dependent on…
  • Loading branch information
bjwswang authored Dec 13, 2023
2 parents 5059ecf + abd7b73 commit d5039c6
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 99 deletions.
2 changes: 1 addition & 1 deletion controllers/datasource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (r *DatasourceReconciler) Checkdatasource(ctx context.Context, logger logr.
if endpoint.AuthSecret != nil {
endpoint.AuthSecret.WithNameSpace(instance.Namespace)
}
ds, err = datasource.NewOSS(ctx, r.Client, endpoint)
ds, err = datasource.NewOSS(ctx, r.Client, nil, endpoint)
if err != nil {
return r.UpdateStatus(ctx, instance, err)
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r *NamespaceReconciler) ossClient(ctx context.Context) (*datasource.OSS, e
if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil {
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}
oss, err := datasource.NewOSS(ctx, r.Client, endpoint)
oss, err := datasource.NewOSS(ctx, r.Client, nil, endpoint)
if err != nil {
klog.Errorf("generate new minio client error %s", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions controllers/versioneddataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (r *VersionedDatasetReconciler) checkStatus(ctx context.Context, logger log
if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil {
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}
oss, err := datasource.NewOSS(ctx, r.Client, endpoint)
oss, err := datasource.NewOSS(ctx, r.Client, nil, endpoint)
if err != nil {
logger.Error(err, "Failed to generate new minio client")
return false, nil, err
Expand All @@ -242,7 +242,7 @@ func (r *VersionedDatasetReconciler) removeBucketFiles(ctx context.Context, logg
if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil {
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}
oss, err := datasource.NewOSS(ctx, r.Client, endpoint)
oss, err := datasource.NewOSS(ctx, r.Client, nil, endpoint)
if err != nil {
logger.Error(err, "Failed to generate new minio client")
return err
Expand Down
15 changes: 15 additions & 0 deletions graphql-server/go-server/pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kubeagi/arcadia/graphql-server/go-server/graph/generated"
"github.com/kubeagi/arcadia/pkg/config"
"github.com/kubeagi/arcadia/pkg/datasource"
)

var (
Expand Down Expand Up @@ -62,3 +65,15 @@ func ResouceUpdate(ctx context.Context, c dynamic.Interface, resource generated.
Object: newObject,
}, options, subresources...)
}

func SystemDatasourceOSS(ctx context.Context, mgrClient client.Client, dynamicClient dynamic.Interface) (*datasource.OSS, error) {
systemDatasource, err := config.GetSystemDatasource(ctx, mgrClient, dynamicClient)
if err != nil {
return nil, err
}
endpoint := systemDatasource.Spec.Enpoint.DeepCopy()
if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil {
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}
return datasource.NewOSS(ctx, mgrClient, dynamicClient, endpoint)
}
2 changes: 1 addition & 1 deletion graphql-server/go-server/pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func ModelFiles(ctx context.Context, c dynamic.Interface, modelName, namespace s
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}

oss, err := datasource.NewOSSWithDynamciClient(ctx, c, endpoint)
oss, err := datasource.NewOSS(ctx, nil, c, endpoint)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func VersionFiles(ctx context.Context, c dynamic.Interface, input *generated.Ver
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}

oss, err := datasource.NewOSSWithDynamciClient(ctx, c, endpoint)
oss, err := datasource.NewOSS(ctx, nil, c, endpoint)
if err != nil {
return nil, err
}
Expand Down
107 changes: 79 additions & 28 deletions graphql-server/go-server/service/minio_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/minio/minio-go/v7"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"

"github.com/kubeagi/arcadia/api/base/v1alpha1"
Expand All @@ -34,16 +35,14 @@ import (
"github.com/kubeagi/arcadia/graphql-server/go-server/pkg/common"
"github.com/kubeagi/arcadia/graphql-server/go-server/pkg/oidc"
"github.com/kubeagi/arcadia/pkg/cache"
"github.com/kubeagi/arcadia/pkg/config"
"github.com/kubeagi/arcadia/pkg/datasource"
)

type (
minioAPI struct {
conf gqlconfig.ServerConfig
source *datasource.OSS

store cache.Cache
client dynamic.Interface
store cache.Cache
}

Chunk struct {
Expand Down Expand Up @@ -138,8 +137,16 @@ func (m *minioAPI) GetSuccessChunks(ctx *gin.Context) {
Done: false,
}

source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
// First check if the file already exists in minio
anyObject, err := m.source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{Bucket: bucketName, Object: objectName})
anyObject, err := source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{Bucket: bucketName, Object: objectName})
if err == nil {
objectInfo, ok := anyObject.(minio.ObjectInfo)
if !ok {
Expand Down Expand Up @@ -187,7 +194,7 @@ func (m *minioAPI) GetSuccessChunks(ctx *gin.Context) {
// Checking already uploaded chunks
r.UploadID = fileChunk.UploadID
r.Chunks = make([]Chunk, 0)
result, err := m.source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(bucketName),
result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(bucketName),
datasource.WithBucketPath(bucketPath), datasource.WithFileName(fileChunk.FileName),
datasource.WithUploadID(fileChunk.UploadID))
if err != nil {
Expand Down Expand Up @@ -249,7 +256,15 @@ func (m *minioAPI) NewMultipart(ctx *gin.Context) {
return
}

uploadID, err := m.source.NewMultipartIdentifier(ctx.Request.Context(), datasource.WithBucket(body.Bucket),
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
uploadID, err := source.NewMultipartIdentifier(ctx.Request.Context(), datasource.WithBucket(body.Bucket),
datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(body.FileName),
datasource.WithAnnotations(map[string]string{
"size": fmt.Sprintf("%d", body.Size),
Expand Down Expand Up @@ -332,7 +347,15 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) {
}
fileChunk := fc.(*common.FileChunk)

result, err := m.source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(body.Bucket),
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
result, err := source.CompletedChunks(ctx.Request.Context(), datasource.WithBucket(body.Bucket),
datasource.WithBucketPath(body.BucketPath), datasource.WithFileName(fileChunk.FileName),
datasource.WithUploadID(body.UploadID))
if err != nil {
Expand All @@ -352,7 +375,7 @@ func (m *minioAPI) GetMultipartUploadURL(ctx *gin.Context) {
}
}

url, err := m.source.GenMultipartSignedURL(ctx.Request.Context(),
url, err := source.GenMultipartSignedURL(ctx.Request.Context(),
datasource.WithBucket(body.Bucket),
datasource.WithBucketPath(body.BucketPath),
datasource.WithUploadID(body.UploadID),
Expand Down Expand Up @@ -382,7 +405,15 @@ func (m *minioAPI) CompleteMultipart(ctx *gin.Context) {
})
return
}
err := m.source.Complete(ctx.Request.Context(),
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
err = source.Complete(ctx.Request.Context(),
datasource.WithBucket(body.Bucket),
datasource.WithBucketPath(body.BucketPath),
datasource.WithUploadID(body.UploadID),
Expand All @@ -408,11 +439,19 @@ func (m *minioAPI) DeleteFiles(ctx *gin.Context) {
})
return
}
source, err := common.SystemDatasourceOSS(context.TODO(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}

bucketPath := strings.TrimSuffix(body.BucketPath, "/")
for _, f := range body.Files {
go func(fn string) {
if err := m.source.Remove(context.TODO(), &v1alpha1.OSS{Bucket: body.Bucket, Object: fmt.Sprintf("%s/%s", bucketPath, fn)}); err != nil {
if err := source.Remove(context.TODO(), &v1alpha1.OSS{Bucket: body.Bucket, Object: fmt.Sprintf("%s/%s", bucketPath, fn)}); err != nil {
klog.Errorf("faile to delete file %s/%s from bucket %s, error %s", bucketPath, fn, body.Bucket, err)
}
}(f)
Expand All @@ -430,14 +469,23 @@ func (m *minioAPI) Abort(ctx *gin.Context) {
return
}

if err := m.source.Abort(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath),
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
if err := source.Abort(ctx.Request.Context(), datasource.WithBucket(body.Bucket), datasource.WithBucketPath(body.BucketPath),
datasource.WithFileName(body.FileName), datasource.WithUploadID(body.UploadID)); err != nil {
klog.Errorf("failed to stop file upload, error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}

ctx.JSON(http.StatusOK, "success")
}

Expand All @@ -446,7 +494,15 @@ func (m *minioAPI) StatFile(ctx *gin.Context) {
bucket := ctx.Query(bucketQuery)
bucketPath := ctx.Query(bucketPathQuery)

anyObject, err := m.source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
anyObject, err := source.StatFile(ctx.Request.Context(), &v1alpha1.OSS{
Object: fmt.Sprintf("%s/%s", bucketPath, fileName),
Bucket: bucket,
})
Expand Down Expand Up @@ -493,7 +549,15 @@ func (m *minioAPI) Download(ctx *gin.Context) {
objectName := fmt.Sprintf("%s/%s", bucketPath, fileName)
opt := minio.GetObjectOptions{}
_ = opt.SetRange(from, end)
info, err := m.source.Client.GetObject(ctx.Request.Context(), bucket, objectName, opt)
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
info, err := source.Client.GetObject(ctx.Request.Context(), bucket, objectName, opt)
if err != nil {
klog.Errorf("failed to get object %s/%s range %d-%d errro %s", bucket, objectName, from, end, err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
Expand All @@ -509,21 +573,8 @@ func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) {
if err != nil {
panic(err)
}
systemDatasource, err := config.GetSystemDatasource(context.TODO(), nil, c)
if err != nil {
panic(err)
}
endpoint := systemDatasource.Spec.Enpoint.DeepCopy()
if endpoint.AuthSecret != nil && endpoint.AuthSecret.Namespace == nil {
endpoint.AuthSecret.WithNameSpace(systemDatasource.Namespace)
}

oss, err := datasource.NewOSSWithDynamciClient(context.TODO(), c, endpoint)
if err != nil {
panic(err)
}

api := minioAPI{conf: conf, store: cache.NewMemCache(), source: oss}
api := minioAPI{conf: conf, store: cache.NewMemCache(), client: c}

{
// model apis
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ type Local struct {
}

func NewLocal(ctx context.Context, c client.Client, endpoint *v1alpha1.Endpoint) (*Local, error) {
oss, err := NewOSS(ctx, c, endpoint)
oss, err := NewOSS(ctx, c, nil, endpoint)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit d5039c6

Please sign in to comment.