Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multipart blob download #5715

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 101 additions & 20 deletions flytecopilot/data/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"sync"

"github.com/ghodss/yaml"
"github.com/golang/protobuf/jsonpb"
Expand All @@ -31,40 +33,120 @@
mode core.IOStrategy_DownloadMode
}

// TODO add support for multipart blobs
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toFilePath string) (interface{}, error) {
ref := storage.DataReference(blob.Uri)
scheme, _, _, err := ref.Split()
func (d Downloader) handleBlob(ctx context.Context, blob *core.Blob, toPath string) (interface{}, error) {
blobRef := storage.DataReference(blob.Uri)
scheme, c, _, err := blobRef.Split()
if err != nil {
return nil, errors.Wrapf(err, "Blob uri incorrectly formatted")
}
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, ref)
} else {
if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
logger.Warnf(ctx, "Currently only single part blobs are supported, we will force multipart to be 'path/00000'")
ref, err = d.store.ConstructReference(ctx, ref, "000000")
if err != nil {

if blob.GetMetadata().GetType().Dimensionality == core.BlobType_MULTIPART {
maxItems := 100
cursor := storage.NewCursorAtStart()
var items []storage.DataReference
var keys []string
for {
items, cursor, err = d.store.List(ctx, blobRef, maxItems, cursor)
if err != nil || len(items) == 0 {
logger.Errorf(ctx, "failed to collect items from multipart blob [%s]", blobRef)
return nil, err
}
for _, item := range items {
keys = append(keys, item.String())
}
if storage.IsCursorEnd(cursor) {
break
}
}
reader, err = DownloadFileFromStorage(ctx, ref, d.store)

success := 0
var mu sync.Mutex
var wg sync.WaitGroup
for _, k := range keys {
absPath := fmt.Sprintf("%s://%s/%s", scheme, c, k)

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if err := recover(); err != nil {
logger.Errorf(ctx, "recover receives error: %s", err)

Check warning on line 73 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L73

Added line #L73 was not covered by tests
}
}()

ref := storage.DataReference(absPath)
reader, err := DownloadFileFromStorage(ctx, ref, d.store)
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
return

Check warning on line 81 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L80-L81

Added lines #L80 - L81 were not covered by tests
}
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)

Check warning on line 86 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L86

Added line #L86 was not covered by tests
}
}()

_, _, k, err := ref.Split()
if err != nil {
logger.Errorf(ctx, "Failed to parse ref [%s]", ref)
return

Check warning on line 93 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}
newPath := filepath.Join(toPath, k)
dir := filepath.Dir(newPath)

mu.Lock()
// 0755: the directory can be read by anyone but can only be written by the owner
os.MkdirAll(dir, 0755)
mu.Unlock()
writer, err := os.Create(newPath)
if err != nil {
wayner0628 marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf(ctx, "failed to open file at path %s", newPath)
return

Check warning on line 105 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L104-L105

Added lines #L104 - L105 were not covered by tests
}
defer func() {
err := writer.Close()
if err != nil {
logger.Errorf(ctx, "failed to close File write stream. Error: %s", err)

Check warning on line 110 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L110

Added line #L110 was not covered by tests
}
}()

_, err = io.Copy(writer, reader)
if err != nil {
logger.Errorf(ctx, "failed to write remote data to local filesystem")
return

Check warning on line 117 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L116-L117

Added lines #L116 - L117 were not covered by tests
}
mu.Lock()
success += 1
mu.Unlock()
}()
}
wg.Wait()
logger.Infof(ctx, "Successfully copied [%d] remote files from [%s] to local [%s]", success, blobRef, toPath)
return toPath, nil
}

// reader should be declared here (avoid being shared across all goroutines)
var reader io.ReadCloser
if scheme == "http" || scheme == "https" {
reader, err = DownloadFileFromHTTP(ctx, blobRef)
} else {
reader, err = DownloadFileFromStorage(ctx, blobRef, d.store)
}
if err != nil {
logger.Errorf(ctx, "Failed to download from ref [%s]", ref)
logger.Errorf(ctx, "Failed to download from ref [%s]", blobRef)

Check warning on line 137 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L137

Added line #L137 was not covered by tests
return nil, err
}
defer func() {
err := reader.Close()
if err != nil {
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", ref, err)
logger.Errorf(ctx, "failed to close Blob read stream @ref [%s]. Error: %s", blobRef, err)

Check warning on line 143 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L143

Added line #L143 was not covered by tests
}
}()

writer, err := os.Create(toFilePath)
writer, err := os.Create(toPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to open file at path %s", toFilePath)
return nil, errors.Wrapf(err, "failed to open file at path %s", toPath)

Check warning on line 149 in flytecopilot/data/download.go

View check run for this annotation

Codecov / codecov/patch

flytecopilot/data/download.go#L149

Added line #L149 was not covered by tests
}
defer func() {
err := writer.Close()
Expand All @@ -76,12 +158,11 @@
if err != nil {
return nil, errors.Wrapf(err, "failed to write remote data to local filesystem")
}
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, ref, toFilePath)
return toFilePath, nil
logger.Infof(ctx, "Successfully copied [%d] bytes remote data from [%s] to local [%s]", v, blobRef, toPath)
return toPath, nil
}

func (d Downloader) handleSchema(ctx context.Context, schema *core.Schema, toFilePath string) (interface{}, error) {
// TODO Handle schema type
return d.handleBlob(ctx, &core.Blob{Uri: schema.Uri, Metadata: &core.BlobMetadata{Type: &core.BlobType{Dimensionality: core.BlobType_MULTIPART}}}, toFilePath)
}

Expand Down
151 changes: 151 additions & 0 deletions flytecopilot/data/download_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package data

import (
"bytes"
"context"
"os"
"path/filepath"
"testing"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/stretchr/testify/assert"
)

func TestHandleBlobMultipart(t *testing.T) {
t.Run("Successful Query", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/folder/file1")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))
ref = storage.DataReference("s3://container/folder/file2")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
for _, file := range []string{"file1", "file2"} {
if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", file)
}
}
})

t.Run("No Items", func(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/folder",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_MULTIPART,
},
},
}

toPath := "./inputs"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete directory: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.Error(t, err)
assert.Nil(t, result)
})
}

func TestHandleBlobSinglePart(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
ref := storage.DataReference("s3://container/file")
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{}))

d := Downloader{store: s}

blob := &core.Blob{
Uri: "s3://container/file",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}

func TestHandleBlobHTTP(t *testing.T) {
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
d := Downloader{store: s}

blob := &core.Blob{
Uri: "https://raw.githubusercontent.com/flyteorg/flyte/master/README.md",
Metadata: &core.BlobMetadata{
Type: &core.BlobType{
Dimensionality: core.BlobType_SINGLE,
},
},
}

toPath := "./input"
defer func() {
err := os.RemoveAll(toPath)
if err != nil {
t.Errorf("Failed to delete file: %v", err)
}
}()

result, err := d.handleBlob(context.Background(), blob, toPath)
assert.NoError(t, err)
assert.Equal(t, toPath, result)

// Check if files were created and data written
if _, err := os.Stat(toPath); os.IsNotExist(err) {
t.Errorf("expected file %s to exist", toPath)
}
}
23 changes: 22 additions & 1 deletion flytestdlib/storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"io"
"io/ioutil"
"os"
"strings"

"github.com/flyteorg/flyte/flytestdlib/logger"
)

type rawFile = []byte
Expand Down Expand Up @@ -55,7 +58,25 @@
}

func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
var items []DataReference
prefix := strings.TrimSuffix(string(reference), "/") + "/"

Check warning on line 62 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L61-L62

Added lines #L61 - L62 were not covered by tests

for ref := range s.cache {
if strings.HasPrefix(ref.String(), prefix) {
_, _, k, err := ref.Split()
if err != nil {
logger.Errorf(ctx, "failed to split reference [%s]", ref)
continue

Check warning on line 69 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L64-L69

Added lines #L64 - L69 were not covered by tests
}
items = append(items, DataReference(k))

Check warning on line 71 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L71

Added line #L71 was not covered by tests
}
}

if len(items) == 0 {
return nil, NewCursorAtEnd(), os.ErrNotExist

Check warning on line 76 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L75-L76

Added lines #L75 - L76 were not covered by tests
}

return items, NewCursorAtEnd(), nil

Check warning on line 79 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L79

Added line #L79 was not covered by tests
}

func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
Expand Down
6 changes: 5 additions & 1 deletion flytestdlib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
}
}

func IsCursorEnd(cursor Cursor) bool {
return cursor.cursorState == AtEndCursorState

Check warning on line 79 in flytestdlib/storage/storage.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/storage.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}

// DataStore is a simplified interface for accessing and storing data in one of the Cloud stores.
// Today we rely on Stow for multi-cloud support, but this interface abstracts that part
type DataStore struct {
Expand Down Expand Up @@ -113,7 +117,7 @@
// Head gets metadata about the reference. This should generally be a light weight operation.
Head(ctx context.Context, reference DataReference) (Metadata, error)

// List gets a list of items given a prefix, using a paginated API
// List gets a list of items (relative path to the reference input) given a prefix, using a paginated API
List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)

// ReadRaw retrieves a byte array from the Blob store or an error
Expand Down
Loading