diff --git a/graphql-server/go-server/pkg/common/read_csv.go b/graphql-server/go-server/pkg/common/read_csv.go new file mode 100644 index 000000000..3d3138d50 --- /dev/null +++ b/graphql-server/go-server/pkg/common/read_csv.go @@ -0,0 +1,55 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package common + +import ( + "encoding/csv" + "io" +) + +// ReadCSV function reads the data in lines from startLine and returns an error if there is an error, +// you can determine if there is still data by determining if err is io. +func ReadCSV(o io.Reader, startLine, lines int64) ([][]string, error) { + var ( + line []string + err error + cur = int64(0) + recordLines = int64(0) + result [][]string + ) + + csvReader := csv.NewReader(o) + + for { + line, err = csvReader.Read() + if err != nil { + if err != io.EOF { + return nil, err + } + break + } + cur++ + if cur >= startLine { + if recordLines >= lines { + break + } + result = append(result, line) + recordLines++ + } + } + + return result, err +} diff --git a/graphql-server/go-server/pkg/common/read_csv_test.go b/graphql-server/go-server/pkg/common/read_csv_test.go new file mode 100644 index 000000000..996851f8d --- /dev/null +++ b/graphql-server/go-server/pkg/common/read_csv_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2023 KubeAGI. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package common + +import ( + "bytes" + "io" + "reflect" + "testing" +) + +const csvData = `a,a,a +b,b,b +c,c,c +d,d,d +e,e,e +f,f,f +g,g,g +i,i,i +1,1,1 +2,2,2 +3,3,3 +4,4,4 +5,5,5 +6,6,6` + +func TestReadCSV(t *testing.T) { + type input struct { + startLine, size int64 + exp [][]string + expErr error + } + + reader := bytes.NewReader([]byte(csvData)) + for _, tc := range []input{ + { + 1, 1, [][]string{{"a", "a", "a"}}, nil, + }, + { + 1, 2, [][]string{{"a", "a", "a"}, {"b", "b", "b"}}, nil, + }, + { + 2, 1, [][]string{{"b", "b", "b"}}, nil, + }, + { + 2, 2, [][]string{{"b", "b", "b"}, {"c", "c", "c"}}, nil, + }, + { + 9, 10, [][]string{{"1", "1", "1"}, {"2", "2", "2"}, {"3", "3", "3"}, {"4", "4", "4"}, {"5", "5", "5"}, {"6", "6", "6"}}, io.EOF, + }, + { + 14, 1, [][]string{{"6", "6", "6"}}, io.EOF, + }, + { + 14, 2, [][]string{{"6", "6", "6"}}, io.EOF, + }, + { + 8, 3, [][]string{{"i", "i", "i"}, {"1", "1", "1"}, {"2", "2", "2"}}, nil, + }, + { + 1, 15, [][]string{ + {"a", "a", "a"}, + {"b", "b", "b"}, + {"c", "c", "c"}, + {"d", "d", "d"}, + {"e", "e", "e"}, + {"f", "f", "f"}, + {"g", "g", "g"}, + {"i", "i", "i"}, + {"1", "1", "1"}, + {"2", "2", "2"}, + {"3", "3", "3"}, + {"4", "4", "4"}, + {"5", "5", "5"}, + {"6", "6", "6"}, + }, io.EOF, + }, + { + 1, 14, [][]string{ + {"a", "a", "a"}, + {"b", "b", "b"}, + {"c", "c", "c"}, + {"d", "d", "d"}, + {"e", "e", "e"}, + {"f", "f", "f"}, + {"g", "g", "g"}, + {"i", "i", "i"}, + {"1", "1", "1"}, + {"2", "2", "2"}, + {"3", "3", "3"}, + {"4", "4", "4"}, + {"5", "5", "5"}, + {"6", "6", "6"}, + }, io.EOF, + }, + { + 15, 2, nil, io.EOF, + }, + { + // page=1, size=3 + 1, 3, [][]string{ + {"a", "a", "a"}, + {"b", "b", "b"}, + {"c", "c", "c"}, + }, nil, + }, + { + // page=2,size=3 + 4, 3, [][]string{ + {"d", "d", "d"}, + {"e", "e", "e"}, + {"f", "f", "f"}, + }, nil, + }, + { + // page=3,size=3 + 7, 3, [][]string{ + {"g", "g", "g"}, + {"i", "i", "i"}, + {"1", "1", "1"}, + }, nil, + }, + { + // page=4,size=3 + 10, 3, [][]string{ + {"2", "2", "2"}, + {"3", "3", "3"}, + {"4", "4", "4"}, + }, nil, + }, + { + // page=5,size=3 + 13, 3, [][]string{ + {"5", "5", "5"}, + {"6", "6", "6"}, + }, io.EOF, + }, + } { + r, err := ReadCSV(reader, tc.startLine, tc.size) + if err != tc.expErr || !reflect.DeepEqual(tc.exp, r) { + t.Fatalf("expect %v get %v, expect error %v get %v", tc.exp, r, tc.expErr, err) + } + _, _ = reader.Seek(0, io.SeekStart) + } +} diff --git a/graphql-server/go-server/service/minio_server.go b/graphql-server/go-server/service/minio_server.go index 9086ba571..297e84633 100644 --- a/graphql-server/go-server/service/minio_server.go +++ b/graphql-server/go-server/service/minio_server.go @@ -96,12 +96,19 @@ type ( FileName string `json:"fileName"` UploadID string `json:"uploadID"` } + + ReadCSVResp struct { + Rows [][]string `json:"rows"` + More bool `json:"more"` + } ) const ( bucketQuery = "bucket" bucketPathQuery = "bucketPath" md5Query = "md5" + + maxCSVLines = 100 ) /* @@ -568,6 +575,68 @@ func (m *minioAPI) Download(ctx *gin.Context) { _, _ = io.Copy(ctx.Writer, info) } +func (m *minioAPI) ReadCSVLines(ctx *gin.Context) { + var ( + page int64 + lines int64 + + bucket, bucketPath string + fileName string + ) + _, _ = fmt.Sscanf(ctx.Query("page"), "%d", &page) + _, _ = fmt.Sscanf(ctx.Query("size"), "%d", &lines) + if page <= 0 { + klog.Errorf("the minimum page should be 1") + ctx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ + "message": "the minimum page should be 1", + }) + return + } + if lines <= 0 || lines > maxCSVLines { + klog.Errorf("the number of lines read should be greater than zero and less than or equal to %d", maxCSVLines) + ctx.AbortWithStatusJSON(http.StatusBadGateway, gin.H{ + "message": fmt.Sprintf("the number of lines read should be greater than zero and less than or equal to %d", maxCSVLines), + }) + return + } + bucket = ctx.Query(bucketQuery) + bucketPath = ctx.Query(bucketPathQuery) + fileName = ctx.Query("fileName") + + objectName := fmt.Sprintf("%s/%s", bucketPath, fileName) + source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client) + if err != nil { + klog.Errorf("failed to get system datasource error %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + + object, err := source.Client.GetObject(context.TODO(), bucket, objectName, minio.GetObjectOptions{}) + if err != nil { + klog.Errorf("failed to get data, error is %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + startLine := (page-1)*lines + 1 + result, err := common.ReadCSV(object, startLine, lines) + if err != nil && err != io.EOF { + klog.Errorf("there is an error reading the csv file, the error is %s", err) + ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ + "message": err.Error(), + }) + return + } + resp := ReadCSVResp{ + Rows: result, + More: err == nil, + } + ctx.JSON(http.StatusOK, resp) +} + func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) { c, err := client.GetClient(nil) if err != nil { @@ -598,5 +667,6 @@ func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) { group.DELETE("/versioneddataset/files", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "delete", "versioneddatasets"), api.DeleteFiles) group.GET("/versioneddataset/files/stat", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.StatFile) group.GET("/versioneddataset/files/download", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.Download) + group.GET("/versioneddataset/files/csv", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.ReadCSVLines) } }