Skip to content

Commit

Permalink
Adding copy operation to Azure Blob Storage and AWS S3 bindings
Browse files Browse the repository at this point in the history
Signed-off-by: ytimocin <[email protected]>
  • Loading branch information
ytimocin committed Oct 28, 2024
1 parent ab9422d commit 5b0d917
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 9 deletions.
2 changes: 2 additions & 0 deletions bindings/aws/s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ binding:
description: "Delete blob"
- name: list
description: "List blob"
- name: copy
description: "Copy blob"
capabilities: []
builtinAuthenticationProfiles:
- name: "aws"
Expand Down
45 changes: 45 additions & 0 deletions bindings/aws/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (s *AWSS3) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
presignOperation,
}
}
Expand Down Expand Up @@ -389,6 +390,48 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding
}, nil
}

func (s *AWSS3) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
metadata, err := s.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err)
}

source := req.Metadata["source"]
if source == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'source' missing")
}

destinationKey := req.Metadata["destinationKey"]
if destinationKey == "" {
return nil, fmt.Errorf("s3 binding error: required metadata 'destinationKey' missing")
}

_, err = s.s3Client.CopyObject(&s3.CopyObjectInput{
// Bucket is the destination bucket.
Bucket: ptr.Of(metadata.Bucket),

// CopySource is the source bucket and key.
CopySource: ptr.Of(source),

// Key is the key of the destination object.
Key: ptr.Of(destinationKey),

// MetadataDirective is the directive to apply to the metadata of the destination object.
MetadataDirective: ptr.Of(s3.MetadataDirectiveCopy),
})
if err != nil {
return nil, fmt.Errorf("s3 binding error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"source": source,
"destinationBucket": metadata.Bucket,
"destinationKey": destinationKey,
},
}, nil
}

func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
Expand All @@ -399,6 +442,8 @@ func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi
return s.delete(ctx, req)
case bindings.ListOperation:
return s.list(ctx, req)
case bindings.CopyOperation:
return s.copy(ctx, req)
case presignOperation:
return s.presign(ctx, req)
default:
Expand Down
47 changes: 44 additions & 3 deletions bindings/azure/blobstorage/blobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

"github.com/dapr/components-contrib/bindings"
storagecommon "github.com/dapr/components-contrib/common/component/azure/blobstorage"
contribMetadata "github.com/dapr/components-contrib/metadata"
contribmetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
Expand Down Expand Up @@ -112,6 +112,7 @@ func (a *AzureBlobStorage) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
}
}

Expand Down Expand Up @@ -344,6 +345,44 @@ func (a *AzureBlobStorage) list(ctx context.Context, req *bindings.InvokeRequest
}, nil
}

func (a *AzureBlobStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
sourceBlobName := req.Metadata["sourceBlobName"]
if sourceBlobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceBlobName' missing")
}

destinationBlobName := req.Metadata["destinationBlobName"]
if destinationBlobName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationBlobName' missing")
}

sourceContainerName := req.Metadata["sourceContainerName"]
if sourceContainerName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceContainerName' missing")
}

destinationContainerName := req.Metadata["destinationContainerName"]
if destinationContainerName == "" {
return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationContainerName' missing")
}

sourceBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", sourceContainerName, sourceBlobName))
destinationBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", destinationContainerName, destinationBlobName))

copyURL := sourceBlobClient.URL()
_, err := destinationBlobClient.StartCopyFromURL(ctx, copyURL, nil)
if err != nil {
return nil, fmt.Errorf("azure blob storage error: copy operation failed: %w", err)
}

return &bindings.InvokeResponse{
Metadata: map[string]string{
"sourceBlobName": sourceBlobName,
"destinationBlobName": destinationBlobName,
},
}, nil
}

func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
Expand All @@ -354,6 +393,8 @@ func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeReque
return a.delete(ctx, req)
case bindings.ListOperation:
return a.list(ctx, req)
case bindings.CopyOperation:
return a.copy(ctx, req)
default:
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
}
Expand All @@ -371,9 +412,9 @@ func (a *AzureBlobStorage) isValidDeleteSnapshotsOptionType(accessType azblob.De
}

// GetComponentMetadata returns the metadata of the component.
func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) {
func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribmetadata.MetadataMap) {
metadataStruct := storagecommon.BlobStorageMetadata{}
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.BindingType)
contribmetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribmetadata.BindingType)
return
}

Expand Down
2 changes: 2 additions & 0 deletions bindings/azure/blobstorage/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ binding:
description: "Delete blob"
- name: list
description: "List blob"
- name: copy
description: "Copy blob"
capabilities: []
builtinAuthenticationProfiles:
- name: "azuread"
Expand Down
37 changes: 35 additions & 2 deletions bindings/gcp/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
bindings.GetOperation,
bindings.DeleteOperation,
bindings.ListOperation,
bindings.CopyOperation,
signOperation,
}
}
Expand All @@ -153,8 +154,10 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
return g.delete(ctx, req)
case bindings.ListOperation:
return g.list(ctx, req)
case bindings.CopyOperation:
return g.copy(ctx, req)
case signOperation:
return g.sign(ctx, req)
return g.sign(req)
default:
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
}
Expand Down Expand Up @@ -307,6 +310,36 @@ func (g *GCPStorage) list(ctx context.Context, req *bindings.InvokeRequest) (*bi
}, nil
}

func (g *GCPStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
// Extract metadata
sourceKey := req.Metadata["sourceKey"]
if sourceKey == "" {
return nil, fmt.Errorf("gcp bucket binding error: required metadata 'sourceKey' missing")
}

destinationKey := req.Metadata["destinationKey"]
if destinationKey == "" {
return nil, fmt.Errorf("gcp bucket binding error: required metadata 'destinationKey' missing")
}

// Perform the copy operation
src := g.client.Bucket(g.metadata.Bucket).Object(sourceKey)
dst := g.client.Bucket(g.metadata.Bucket).Object(destinationKey)

_, err := dst.CopierFrom(src).Run(ctx)
if err != nil {
return nil, fmt.Errorf("gcp bucket binding error: copy operation failed: %w", err)
}

// Return the response
return &bindings.InvokeResponse{
Metadata: map[string]string{
"sourceKey": sourceKey,
"destinationKey": destinationKey,
},
}, nil
}

func (g *GCPStorage) Close() error {
return g.client.Close()
}
Expand Down Expand Up @@ -345,7 +378,7 @@ func (g *GCPStorage) GetComponentMetadata() (metadataInfo metadata.MetadataMap)
return
}

func (g *GCPStorage) sign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
func (g *GCPStorage) sign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
metadata, err := g.metadata.mergeWithRequestMetadata(req)
if err != nil {
return nil, fmt.Errorf("gcp binding error. error merge metadata : %w", err)
Expand Down
8 changes: 4 additions & 4 deletions bindings/gcp/bucket/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ func TestParseMetadata(t *testing.T) {
t.Run("check backward compatibility", func(t *testing.T) {
gs := GCPStorage{logger: logger.NewLogger("test")}

request := bindings.InvokeRequest{}
request.Operation = bindings.CreateOperation
request.Metadata = map[string]string{
"name": "my_file.txt",
request := bindings.InvokeRequest{
Metadata: map[string]string{
"name": "my_file.txt",
},
}
result := gs.handleBackwardCompatibilityForMetadata(request.Metadata)
assert.NotEmpty(t, result["key"])
Expand Down
1 change: 1 addition & 0 deletions bindings/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
CreateOperation OperationKind = "create"
DeleteOperation OperationKind = "delete"
ListOperation OperationKind = "list"
CopyOperation OperationKind = "copy"
)

// GetMetadataAsBool parses metadata as bool.
Expand Down

0 comments on commit 5b0d917

Please sign in to comment.