Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding copy operation to Azure Blob Storage, AWS S3, and GCP Bucket bindings #3585

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bindings/aws/s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ binding:
description: "Delete blob"
- name: list
description: "List blob"
- name: copy
description: "Copy blob"
capabilities: []
builtinAuthenticationProfiles:
- name: "aws"
Expand Down
54 changes: 52 additions & 2 deletions bindings/aws/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (s *AWSS3) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
presignOperation,
}
}
Expand Down Expand Up @@ -240,7 +241,7 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi
}, nil
}

func (s *AWSS3) presign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (s *AWSS3) presign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
metadata, err := s.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err)
Expand Down Expand Up @@ -389,6 +390,53 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding
}, nil
}

func (s *AWSS3) copy(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
_, err := s.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err)
}

source := req.Metadata["source"]
if source == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'source' missing")
}

destinationBucket := req.Metadata["bucket"]
if destinationBucket == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'bucket' missing")
}

destinationKey := req.Metadata["destinationKey"]
if destinationKey == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'destinationKey' missing")
}

_, err = s.s3Client.CopyObject(&s3.CopyObjectInput{
// Bucket is the destination bucket.
Bucket: ptr.Of(destinationBucket),

// CopySource is the source bucket and key.
CopySource: ptr.Of(source),

// Key is the key of the destination object.
Key: ptr.Of(destinationKey),

// MetadataDirective is the directive to apply to the metadata of the destination object.
MetadataDirective: ptr.Of(s3.MetadataDirectiveCopy),
})
if err != nil {
return nil, fmt.Errorf("s3 binding error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"source": source,
"destinationBucket": destinationBucket,
"destinationKey": destinationKey,
},
}, nil
}

func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
Expand All @@ -399,8 +447,10 @@ func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi
return s.delete(ctx, req)
case bindings.ListOperation:
return s.list(ctx, req)
case bindings.CopyOperation:
return s.copy(req)
case presignOperation:
return s.presign(ctx, req)
return s.presign(req)
default:
return nil, fmt.Errorf("s3 binding error: unsupported operation %s", req.Operation)
}
Expand Down
118 changes: 113 additions & 5 deletions bindings/azure/blobstorage/blobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ import (
"io"
"reflect"
"strconv"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/google/uuid"

"github.com/dapr/components-contrib/bindings"
storagecommon "github.com/dapr/components-contrib/common/component/azure/blobstorage"
contribMetadata "github.com/dapr/components-contrib/metadata"
contribmetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
Expand All @@ -57,8 +59,9 @@ const (
// Specifies the maximum number of blobs to return, including all BlobPrefix elements. If the request does not
// specify maxresults the server will return up to 5,000 items.
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs#uri-parameters
maxResults int32 = 5000
endpointKey = "endpoint"
maxResults int32 = 5000
endpointKey = "endpoint"
presignOperation = "presign"
)

var ErrMissingBlobName = errors.New("blobName is a required attribute")
Expand All @@ -76,6 +79,10 @@ type createResponse struct {
BlobName string `json:"blobName"`
}

type presignResponse struct {
PresignURL string `json:"presignURL"`
}

type listInclude struct {
Copy bool `json:"copy"`
Metadata bool `json:"metadata"`
Expand Down Expand Up @@ -112,6 +119,7 @@ func (a *AzureBlobStorage) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
}
}

Expand Down Expand Up @@ -344,6 +352,102 @@ func (a *AzureBlobStorage) list(ctx context.Context, req *bindings.InvokeRequest
}, nil
}

func (a *AzureBlobStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
sourceBlobName := req.Metadata["sourceBlobName"]
if sourceBlobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceBlobName' missing")
}

destinationBlobName := req.Metadata["destinationBlobName"]
if destinationBlobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationBlobName' missing")
}

sourceContainerName := req.Metadata["sourceContainerName"]
if sourceContainerName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceContainerName' missing")
}

destinationContainerName := req.Metadata["destinationContainerName"]
if destinationContainerName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationContainerName' missing")
}

sourceBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", sourceContainerName, sourceBlobName))
destinationBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", destinationContainerName, destinationBlobName))

copyURL := sourceBlobClient.URL()
_, err := destinationBlobClient.StartCopyFromURL(ctx, copyURL, nil)
if err != nil {
return nil, fmt.Errorf("azure blob storage error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"sourceBlobName": sourceBlobName,
"destinationBlobName": destinationBlobName,
},
}, nil
}

func (a *AzureBlobStorage) presign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
blobName := req.Metadata[metadataKeyBlobName]
if blobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata '%s' missing", metadataKeyBlobName)
}

presignTTL := req.Metadata["presignTTL"]
if presignTTL == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'presignTTL' missing")
}

ttl, err := time.ParseDuration(presignTTL)
if err != nil {
return nil, fmt.Errorf("azure blob storage error: cannot parse duration %s: %w", presignTTL, err)
}

blobClient := a.containerClient.NewBlockBlobClient(blobName)
sasURL, err := a.generateSASURL(blobClient, ttl)
if err != nil {
return nil, fmt.Errorf("azure blob storage error: %w", err)
}

jsonResponse, err := json.Marshal(presignResponse{
PresignURL: sasURL,
})
if err != nil {
return nil, fmt.Errorf("s3 binding error: error marshalling presign response: %w", err)
}

return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

func (a *AzureBlobStorage) generateSASURL(blobClient *blockblob.Client, ttl time.Duration) (string, error) {
permissions := sas.AccountPermissions{
Read: true,
}

sasValues := sas.AccountSignatureValues{
Protocol: sas.ProtocolHTTPS,
ExpiryTime: time.Now().UTC().Add(ttl),
Permissions: permissions.String(),
}

credential, err := azblob.NewSharedKeyCredential(a.metadata.AccountName, a.metadata.AccountKey)
if err != nil {
return "", fmt.Errorf("error creating shared key credential: %w", err)
}

sasQueryParams, err := sasValues.SignWithSharedKey(credential)
if err != nil {
return "", fmt.Errorf("error generating SAS query parameters: %w", err)
}

return fmt.Sprintf("%s?%s", blobClient.URL(), sasQueryParams.Encode()), nil
}

func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
Expand All @@ -354,6 +458,10 @@ func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeReque
return a.delete(ctx, req)
case bindings.ListOperation:
return a.list(ctx, req)
case bindings.CopyOperation:
return a.copy(ctx, req)
case presignOperation:
return a.presign(req)
default:
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
}
Expand All @@ -371,9 +479,9 @@ func (a *AzureBlobStorage) isValidDeleteSnapshotsOptionType(accessType azblob.De
}

// GetComponentMetadata returns the metadata of the component.
func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) {
func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribmetadata.MetadataMap) {
metadataStruct := storagecommon.BlobStorageMetadata{}
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.BindingType)
contribmetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribmetadata.BindingType)
return
}

Expand Down
2 changes: 2 additions & 0 deletions bindings/azure/blobstorage/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ binding:
description: "Delete blob"
- name: list
description: "List blob"
- name: copy
description: "Copy blob"
capabilities: []
builtinAuthenticationProfiles:
- name: "azuread"
Expand Down
34 changes: 32 additions & 2 deletions bindings/gcp/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
signOperation,
}
}
Expand All @@ -153,8 +154,10 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
return g.delete(ctx, req)
case bindings.ListOperation:
return g.list(ctx, req)
case bindings.CopyOperation:
return g.copy(ctx, req)
case signOperation:
return g.sign(ctx, req)
return g.sign(req)
default:
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
}
Expand Down Expand Up @@ -307,6 +310,33 @@ func (g *GCPStorage) list(ctx context.Context, req *bindings.InvokeRequest) (*bi
}, nil
}

func (g *GCPStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
sourceKey := req.Metadata["sourceKey"]
if sourceKey == "" {
return nil, fmt.Errorf("gcp bucket binding error: required metadata 'sourceKey' missing")
}

destinationKey := req.Metadata["destinationKey"]
if destinationKey == "" {
return nil, fmt.Errorf("gcp bucket binding error: required metadata 'destinationKey' missing")
}

src := g.client.Bucket(g.metadata.Bucket).Object(sourceKey)
dst := g.client.Bucket(g.metadata.Bucket).Object(destinationKey)

_, err := dst.CopierFrom(src).Run(ctx)
if err != nil {
return nil, fmt.Errorf("gcp bucket binding error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"sourceKey": sourceKey,
"destinationKey": destinationKey,
},
}, nil
}

func (g *GCPStorage) Close() error {
return g.client.Close()
}
Expand Down Expand Up @@ -345,7 +375,7 @@ func (g *GCPStorage) GetComponentMetadata() (metadataInfo metadata.MetadataMap)
return
}

func (g *GCPStorage) sign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (g *GCPStorage) sign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
metadata, err := g.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("gcp binding error. error merge metadata : %w", err)
Expand Down
8 changes: 4 additions & 4 deletions bindings/gcp/bucket/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func TestParseMetadata(t *testing.T) {
t.Run("check backward compatibility", func(t *testing.T) {
gs := GCPStorage{logger: logger.NewLogger("test")}

request := bindings.InvokeRequest{}
request.Operation = bindings.CreateOperation
request.Metadata = map[string]string{
"name": "my_file.txt",
request := bindings.InvokeRequest{
Metadata: map[string]string{
"name": "my_file.txt",
},
}
result := gs.handleBackwardCompatibilityForMetadata(request.Metadata)
assert.NotEmpty(t, result["key"])
Expand Down
1 change: 1 addition & 0 deletions bindings/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
CreateOperation OperationKind = "create"
DeleteOperation OperationKind = "delete"
ListOperation OperationKind = "list"
CopyOperation OperationKind = "copy"
)

// GetMetadataAsBool parses metadata as bool.
Expand Down