Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Make catalog async client array-aware #39

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions go/tasks/pluginmachinery/catalog/async_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package catalog
import (
"context"

"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

"github.com/lyft/flytestdlib/bitarray"

"github.com/lyft/flytestdlib/errors"
Expand All @@ -24,7 +28,8 @@ const (

type UploadRequest struct {
Key Key
ArtifactData io.OutputReader
ArtifactData io.OutputFilePaths
DataStore *storage.DataStore
ArtifactMetadata Metadata
}

Expand All @@ -49,8 +54,9 @@ type UploadFuture interface {

// Catalog Download Request to represent async operation download request.
type DownloadRequest struct {
Key Key
Target io.OutputWriter
Key Key
Target io.OutputWriter
DataStore *storage.DataStore
}

// Catalog download future to represent async process of downloading catalog artifacts.
Expand All @@ -73,13 +79,58 @@ type DownloadResponse interface {
GetCachedCount() int
}

// Catalog Download Request to represent async operation download request.
type DownloadArrayRequest struct {
// Identifier is the same among all subtasks of the array
Identifier core.Identifier
// Cache version is the same among all subtasks of the array
CacheVersion string
// Interface is the same among all subtasks of the array
TypedInterface core.TypedInterface

dataStore *storage.DataStore

// Base input reader to build subtasks input readers from
BaseInputReader io.InputReader
// Base output writer to build subtasks input readers from
BaseTarget io.OutputWriter
Indexes *bitarray.BitSet
Count int
}

type UploadArrayRequest struct {
// Identifier is the same among all subtasks of the array
Identifier core.Identifier
// Cache version is the same among all subtasks of the array
CacheVersion string
// Interface is the same among all subtasks of the array
TypedInterface core.TypedInterface
// ArtifactMetadata is the same among all subtasks of the array
ArtifactMetadata Metadata

dataStore *storage.DataStore

// Base input reader to build subtasks input readers from
BaseInputReader io.InputReader
// Base output reader to build subtasks input readers from
BaseArtifactData io.OutputFilePaths
Indexes *bitarray.BitSet
Count int
}

// An interface that helps async interaction with catalog service
type AsyncClient interface {
// Returns if an entry exists for the given task and input. It returns the data as a LiteralMap
Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error)
Download(ctx context.Context, request DownloadRequest) (outputFuture DownloadFuture, err error)

// Adds a new entry to catalog for the given task execution context and the generated output
Upload(ctx context.Context, request UploadRequest) (putFuture UploadFuture, err error)

// Returns if an entry exists for the given task and input. It returns the data as a LiteralMap
DownloadArray(ctx context.Context, request DownloadArrayRequest) (outputFuture DownloadFuture, err error)

// Adds a new entry to catalog for the given task execution context and the generated output
Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error)
UploadArray(ctx context.Context, requests UploadArrayRequest) (putFuture UploadFuture, err error)
}

var _ AsyncClient = AsyncClientImpl{}
175 changes: 93 additions & 82 deletions go/tasks/pluginmachinery/catalog/async_client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (

"github.com/lyft/flytestdlib/promutils"

"github.com/lyft/flytestdlib/bitarray"

"github.com/lyft/flytestdlib/errors"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue"
Expand All @@ -22,12 +20,8 @@ var base32Encoder = base32.NewEncoding(specialEncoderKey).WithPadding(base32.NoP

// An async-client for catalog that can queue download and upload requests on workqueues.
type AsyncClientImpl struct {
Reader workqueue.IndexedWorkQueue
Writer workqueue.IndexedWorkQueue
}

func formatWorkItemID(key Key, idx int, suffix string) string {
return fmt.Sprintf("%v-%v-%v", key, idx, suffix)
ArrayReader workqueue.IndexedWorkQueue
ArrayWriter workqueue.IndexedWorkQueue
}

func consistentHash(str string) (string, error) {
Expand All @@ -41,117 +35,134 @@ func consistentHash(str string) (string, error) {
return base32Encoder.EncodeToString(b), nil
}

func (c AsyncClientImpl) Download(ctx context.Context, requests ...DownloadRequest) (outputFuture DownloadFuture, err error) {
status := ResponseStatusReady
cachedResults := bitarray.NewBitSet(uint(len(requests)))
cachedCount := 0
var respErr error
for idx, request := range requests {
uniqueOutputLoc, err := consistentHash(request.Target.GetOutputPrefixPath().String())
if err != nil {
return nil, err
}
// Returns if an entry exists for the given task and input. It returns the data as a LiteralMap
func (c AsyncClientImpl) DownloadArray(ctx context.Context, request DownloadArrayRequest) (outputFuture DownloadFuture, err error) {
workItemID := fmt.Sprintf("%v-%v-%v-%v-%v-%v", request.Identifier.String(), request.Count,
request.BaseTarget.GetOutputPrefixPath(), request.TypedInterface, request.BaseInputReader.GetInputPrefixPath(),
request.CacheVersion)

workItemID := formatWorkItemID(request.Key, idx, uniqueOutputLoc)
err = c.Reader.Queue(ctx, workItemID, NewReaderWorkItem(
request.Key,
request.Target))
hashedID, err := consistentHash(workItemID)
if err != nil {
return nil, err
}

if err != nil {
return nil, err
}
err = c.ArrayReader.Queue(ctx, hashedID, NewArrayReaderWorkItem(request))
if err != nil {
return nil, err
}

info, found, err := c.Reader.Get(workItemID)
if err != nil {
return nil, errors.Wrapf(ErrSystemError, err, "Failed to lookup from reader workqueue for info: %v", workItemID)
}
info, found, err := c.ArrayReader.Get(hashedID)
if err != nil {
return nil, errors.Wrapf(ErrSystemError, err, "Failed to lookup from reader workqueue for info: %v", workItemID)
}

if !found {
return nil, errors.Errorf(ErrSystemError, "Item not found in the reader workqueue even though it was just added. ItemID: %v", workItemID)
}
if !found {
return nil, errors.Errorf(ErrSystemError, "Item not found in the reader workqueue even though it was just added. ItemID: %v", workItemID)
}

switch info.Status() {
case workqueue.WorkStatusSucceeded:
readerWorkItem, casted := info.Item().(*ReaderWorkItem)
if !casted {
return nil, errors.Errorf(ErrSystemError, "Item wasn't casted to ReaderWorkItem. ItemID: %v. Type: %v", workItemID, reflect.TypeOf(info))
}

if readerWorkItem.IsCached() {
cachedResults.Set(uint(idx))
cachedCount++
}
case workqueue.WorkStatusFailed:
respErr = info.Error()
case workqueue.WorkStatusNotDone:
status = ResponseStatusNotReady
switch info.Status() {
case workqueue.WorkStatusSucceeded:
readerWorkItem, casted := info.Item().(*ArrayReaderWorkItem)
if !casted {
return nil, errors.Errorf(ErrSystemError, "Item wasn't casted to ReaderWorkItem. ItemID: %v. Type: %v", workItemID, reflect.TypeOf(info))
}
}

return newDownloadFuture(status, respErr, cachedResults, len(requests), cachedCount), nil
return newDownloadFuture(ResponseStatusReady, nil, readerWorkItem.CachedResults(), request.Count), nil
case workqueue.WorkStatusFailed:
return newDownloadFuture(ResponseStatusReady, info.Error(), nil, request.Count), nil
default:
return newDownloadFuture(ResponseStatusNotReady, nil, nil, request.Count), nil
}
}

func (c AsyncClientImpl) Upload(ctx context.Context, requests ...UploadRequest) (putFuture UploadFuture, err error) {
status := ResponseStatusReady
var respErr error
for idx, request := range requests {
workItemID := formatWorkItemID(request.Key, idx, "")
err := c.Writer.Queue(ctx, workItemID, NewWriterWorkItem(
request.Key,
request.ArtifactData,
request.ArtifactMetadata))

if err != nil {
return nil, err
}
// Adds a new entry to catalog for the given task execution context and the generated output
func (c AsyncClientImpl) UploadArray(ctx context.Context, request UploadArrayRequest) (putFuture UploadFuture, err error) {
workItemID := fmt.Sprintf("%v-%v-%v-%v-%v", request.Identifier.String(), request.Count,
request.TypedInterface, request.BaseInputReader.GetInputPrefixPath(), request.CacheVersion)

info, found, err := c.Writer.Get(workItemID)
if err != nil {
return nil, errors.Wrapf(ErrSystemError, err, "Failed to lookup from writer workqueue for info: %v", workItemID)
}
hashedID, err := consistentHash(workItemID)
if err != nil {
return nil, err
}

if !found {
return nil, errors.Errorf(ErrSystemError, "Item not found in the writer workqueue even though it was just added. ItemID: %v", workItemID)
}
err = c.ArrayWriter.Queue(ctx, hashedID, NewArrayWriterWorkItem(request))
if err != nil {
return nil, err
}

switch info.Status() {
case workqueue.WorkStatusNotDone:
status = ResponseStatusNotReady
case workqueue.WorkStatusFailed:
respErr = info.Error()
}
info, found, err := c.ArrayWriter.Get(hashedID)
if err != nil {
return nil, errors.Wrapf(ErrSystemError, err, "Failed to lookup from reader workqueue for info: %v", workItemID)
}

if !found {
return nil, errors.Errorf(ErrSystemError, "Item not found in the reader workqueue even though it was just added. ItemID: %v", workItemID)
}

switch info.Status() {
case workqueue.WorkStatusSucceeded:
return newUploadFuture(ResponseStatusReady, nil), nil
case workqueue.WorkStatusFailed:
return newUploadFuture(ResponseStatusReady, info.Error()), nil
default:
return newUploadFuture(ResponseStatusNotReady, nil), nil
}
}

func (c AsyncClientImpl) Download(ctx context.Context, request DownloadRequest) (outputFuture DownloadFuture, err error) {
return c.DownloadArray(ctx, DownloadArrayRequest{
Identifier: request.Key.Identifier,
CacheVersion: request.Key.CacheVersion,
TypedInterface: request.Key.TypedInterface,
BaseInputReader: request.Key.InputReader,
BaseTarget: request.Target,
dataStore: request.DataStore,
Indexes: nil,
Count: 0,
})
}

return newUploadFuture(status, respErr), nil
func (c AsyncClientImpl) Upload(ctx context.Context, requests UploadRequest) (putFuture UploadFuture, err error) {
return c.UploadArray(ctx, UploadArrayRequest{
Identifier: requests.Key.Identifier,
CacheVersion: requests.Key.CacheVersion,
TypedInterface: requests.Key.TypedInterface,
ArtifactMetadata: requests.ArtifactMetadata,
dataStore: requests.DataStore,
BaseInputReader: requests.Key.InputReader,
BaseArtifactData: requests.ArtifactData,
Indexes: nil,
Count: 0,
})
}

func (c AsyncClientImpl) Start(ctx context.Context) error {
if err := c.Reader.Start(ctx); err != nil {
if err := c.ArrayReader.Start(ctx); err != nil {
return errors.Wrapf(ErrSystemError, err, "Failed to start reader queue.")
}

if err := c.Writer.Start(ctx); err != nil {
if err := c.ArrayWriter.Start(ctx); err != nil {
return errors.Wrapf(ErrSystemError, err, "Failed to start writer queue.")
}

return nil
}

func NewAsyncClient(client Client, cfg Config, scope promutils.Scope) (AsyncClientImpl, error) {
readerWorkQueue, err := workqueue.NewIndexedWorkQueue("reader", NewReaderProcessor(client), cfg.ReaderWorkqueueConfig,
arrayReaderWorkQueue, err := workqueue.NewIndexedWorkQueue("reader", NewArrayReaderProcessor(client, cfg.Reader.MaxItemsPerRound), cfg.Reader.Workqueue,
scope.NewSubScope("reader"))
if err != nil {
return AsyncClientImpl{}, err
}

writerWorkQueue, err := workqueue.NewIndexedWorkQueue("writer", NewWriterProcessor(client), cfg.WriterWorkqueueConfig,
arrayWriterWorkQueue, err := workqueue.NewIndexedWorkQueue("writer", NewWriterArrayProcessor(client, cfg.Writer.MaxItemsPerRound), cfg.Writer.Workqueue,
scope.NewSubScope("writer"))
if err != nil {
return AsyncClientImpl{}, err
}

return AsyncClientImpl{
Reader: readerWorkQueue,
Writer: writerWorkQueue,
ArrayWriter: arrayWriterWorkQueue,
ArrayReader: arrayReaderWorkQueue,
}, nil
}
Loading