Skip to content

Commit

Permalink
Merge pull request kubeagi#369 from 0xff-dev/read-csv
Browse files Browse the repository at this point in the history
feat: support for reading csv files with several rows
  • Loading branch information
bjwswang authored Dec 14, 2023
2 parents 8465356 + 340e454 commit 1891ebf
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 0 deletions.
55 changes: 55 additions & 0 deletions graphql-server/go-server/pkg/common/read_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2023 KubeAGI.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common

import (
"encoding/csv"
"io"
)

// ReadCSV function reads the data in lines from startLine and returns an error if there is an error,
// you can determine if there is still data by determining if err is io.
func ReadCSV(o io.Reader, startLine, lines int64) ([][]string, error) {
var (
line []string
err error
cur = int64(0)
recordLines = int64(0)
result [][]string
)

csvReader := csv.NewReader(o)

for {
line, err = csvReader.Read()
if err != nil {
if err != io.EOF {
return nil, err
}
break
}
cur++
if cur >= startLine {
if recordLines >= lines {
break
}
result = append(result, line)
recordLines++
}
}

return result, err
}
158 changes: 158 additions & 0 deletions graphql-server/go-server/pkg/common/read_csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
Copyright 2023 KubeAGI.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package common

import (
"bytes"
"io"
"reflect"
"testing"
)

const csvData = `a,a,a
b,b,b
c,c,c
d,d,d
e,e,e
f,f,f
g,g,g
i,i,i
1,1,1
2,2,2
3,3,3
4,4,4
5,5,5
6,6,6`

func TestReadCSV(t *testing.T) {
type input struct {
startLine, size int64
exp [][]string
expErr error
}

reader := bytes.NewReader([]byte(csvData))
for _, tc := range []input{
{
1, 1, [][]string{{"a", "a", "a"}}, nil,
},
{
1, 2, [][]string{{"a", "a", "a"}, {"b", "b", "b"}}, nil,
},
{
2, 1, [][]string{{"b", "b", "b"}}, nil,
},
{
2, 2, [][]string{{"b", "b", "b"}, {"c", "c", "c"}}, nil,
},
{
9, 10, [][]string{{"1", "1", "1"}, {"2", "2", "2"}, {"3", "3", "3"}, {"4", "4", "4"}, {"5", "5", "5"}, {"6", "6", "6"}}, io.EOF,
},
{
14, 1, [][]string{{"6", "6", "6"}}, io.EOF,
},
{
14, 2, [][]string{{"6", "6", "6"}}, io.EOF,
},
{
8, 3, [][]string{{"i", "i", "i"}, {"1", "1", "1"}, {"2", "2", "2"}}, nil,
},
{
1, 15, [][]string{
{"a", "a", "a"},
{"b", "b", "b"},
{"c", "c", "c"},
{"d", "d", "d"},
{"e", "e", "e"},
{"f", "f", "f"},
{"g", "g", "g"},
{"i", "i", "i"},
{"1", "1", "1"},
{"2", "2", "2"},
{"3", "3", "3"},
{"4", "4", "4"},
{"5", "5", "5"},
{"6", "6", "6"},
}, io.EOF,
},
{
1, 14, [][]string{
{"a", "a", "a"},
{"b", "b", "b"},
{"c", "c", "c"},
{"d", "d", "d"},
{"e", "e", "e"},
{"f", "f", "f"},
{"g", "g", "g"},
{"i", "i", "i"},
{"1", "1", "1"},
{"2", "2", "2"},
{"3", "3", "3"},
{"4", "4", "4"},
{"5", "5", "5"},
{"6", "6", "6"},
}, io.EOF,
},
{
15, 2, nil, io.EOF,
},
{
// page=1, size=3
1, 3, [][]string{
{"a", "a", "a"},
{"b", "b", "b"},
{"c", "c", "c"},
}, nil,
},
{
// page=2,size=3
4, 3, [][]string{
{"d", "d", "d"},
{"e", "e", "e"},
{"f", "f", "f"},
}, nil,
},
{
// page=3,size=3
7, 3, [][]string{
{"g", "g", "g"},
{"i", "i", "i"},
{"1", "1", "1"},
}, nil,
},
{
// page=4,size=3
10, 3, [][]string{
{"2", "2", "2"},
{"3", "3", "3"},
{"4", "4", "4"},
}, nil,
},
{
// page=5,size=3
13, 3, [][]string{
{"5", "5", "5"},
{"6", "6", "6"},
}, io.EOF,
},
} {
r, err := ReadCSV(reader, tc.startLine, tc.size)
if err != tc.expErr || !reflect.DeepEqual(tc.exp, r) {
t.Fatalf("expect %v get %v, expect error %v get %v", tc.exp, r, tc.expErr, err)
}
_, _ = reader.Seek(0, io.SeekStart)
}
}
70 changes: 70 additions & 0 deletions graphql-server/go-server/service/minio_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,19 @@ type (
FileName string `json:"fileName"`
UploadID string `json:"uploadID"`
}

ReadCSVResp struct {
Rows [][]string `json:"rows"`
More bool `json:"more"`
}
)

const (
bucketQuery = "bucket"
bucketPathQuery = "bucketPath"
md5Query = "md5"

maxCSVLines = 100
)

/*
Expand Down Expand Up @@ -568,6 +575,68 @@ func (m *minioAPI) Download(ctx *gin.Context) {
_, _ = io.Copy(ctx.Writer, info)
}

func (m *minioAPI) ReadCSVLines(ctx *gin.Context) {
var (
page int64
lines int64

bucket, bucketPath string
fileName string
)
_, _ = fmt.Sscanf(ctx.Query("page"), "%d", &page)
_, _ = fmt.Sscanf(ctx.Query("size"), "%d", &lines)
if page <= 0 {
klog.Errorf("the minimum page should be 1")
ctx.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"message": "the minimum page should be 1",
})
return
}
if lines <= 0 || lines > maxCSVLines {
klog.Errorf("the number of lines read should be greater than zero and less than or equal to %d", maxCSVLines)
ctx.AbortWithStatusJSON(http.StatusBadGateway, gin.H{
"message": fmt.Sprintf("the number of lines read should be greater than zero and less than or equal to %d", maxCSVLines),
})
return
}
bucket = ctx.Query(bucketQuery)
bucketPath = ctx.Query(bucketPathQuery)
fileName = ctx.Query("fileName")

objectName := fmt.Sprintf("%s/%s", bucketPath, fileName)
source, err := common.SystemDatasourceOSS(ctx.Request.Context(), nil, m.client)
if err != nil {
klog.Errorf("failed to get system datasource error %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}

object, err := source.Client.GetObject(context.TODO(), bucket, objectName, minio.GetObjectOptions{})
if err != nil {
klog.Errorf("failed to get data, error is %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
startLine := (page-1)*lines + 1
result, err := common.ReadCSV(object, startLine, lines)
if err != nil && err != io.EOF {
klog.Errorf("there is an error reading the csv file, the error is %s", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"message": err.Error(),
})
return
}
resp := ReadCSVResp{
Rows: result,
More: err == nil,
}
ctx.JSON(http.StatusOK, resp)
}

func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) {
c, err := client.GetClient(nil)
if err != nil {
Expand Down Expand Up @@ -598,5 +667,6 @@ func RegisterMinIOAPI(group *gin.RouterGroup, conf gqlconfig.ServerConfig) {
group.DELETE("/versioneddataset/files", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "delete", "versioneddatasets"), api.DeleteFiles)
group.GET("/versioneddataset/files/stat", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.StatFile)
group.GET("/versioneddataset/files/download", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.Download)
group.GET("/versioneddataset/files/csv", auth.AuthInterceptor(conf.EnableOIDC, oidc.Verifier, "get", "versioneddatasets"), api.ReadCSVLines)
}
}

0 comments on commit 1891ebf

Please sign in to comment.