From 5b0d9178260816698cd26728c614cea32f32ae61 Mon Sep 17 00:00:00 2001 From: ytimocin Date: Wed, 23 Oct 2024 18:19:17 -0700 Subject: [PATCH] Adding copy operation to Azure Blob Storage and AWS S3 bindings Signed-off-by: ytimocin --- bindings/aws/s3/metadata.yaml | 2 + bindings/aws/s3/s3.go | 45 ++++++++++++++++++++++ bindings/azure/blobstorage/blobstorage.go | 47 +++++++++++++++++++++-- bindings/azure/blobstorage/metadata.yaml | 2 + bindings/gcp/bucket/bucket.go | 37 +++++++++++++++++- bindings/gcp/bucket/bucket_test.go | 8 ++-- bindings/requests.go | 1 + 7 files changed, 133 insertions(+), 9 deletions(-) diff --git a/bindings/aws/s3/metadata.yaml b/bindings/aws/s3/metadata.yaml index e33fcbad3c..b18720a8db 100644 --- a/bindings/aws/s3/metadata.yaml +++ b/bindings/aws/s3/metadata.yaml @@ -19,6 +19,8 @@ binding: description: "Delete blob" - name: list description: "List blob" + - name: copy + description: "Copy blob" capabilities: [] builtinAuthenticationProfiles: - name: "aws" diff --git a/bindings/aws/s3/s3.go b/bindings/aws/s3/s3.go index cc67cec94f..de577f787d 100644 --- a/bindings/aws/s3/s3.go +++ b/bindings/aws/s3/s3.go @@ -157,6 +157,7 @@ func (s *AWSS3) Operations() []bindings.OperationKind { bindings.GetOperation, bindings.DeleteOperation, bindings.ListOperation, + bindings.CopyOperation, presignOperation, } } @@ -389,6 +390,48 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding }, nil } +func (s *AWSS3) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + metadata, err := s.metadata.mergeWithRequestMetadata(req) + if err != nil { + return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err) + } + + source := req.Metadata["source"] + if source == "" { + return nil, fmt.Errorf("s3 binding error: required metadata 'source' missing") + } + + destinationKey := req.Metadata["destinationKey"] + if destinationKey == "" { + return nil, fmt.Errorf("s3 binding error: required metadata 'destinationKey' missing") + } + + _, err = s.s3Client.CopyObject(&s3.CopyObjectInput{ + // Bucket is the destination bucket. + Bucket: ptr.Of(metadata.Bucket), + + // CopySource is the source bucket and key. + CopySource: ptr.Of(source), + + // Key is the key of the destination object. + Key: ptr.Of(destinationKey), + + // MetadataDirective is the directive to apply to the metadata of the destination object. + MetadataDirective: ptr.Of(s3.MetadataDirectiveCopy), + }) + if err != nil { + return nil, fmt.Errorf("s3 binding error: copy operation failed: %w", err) + } + + return &bindings.InvokeResponse{ + Metadata: map[string]string{ + "source": source, + "destinationBucket": metadata.Bucket, + "destinationKey": destinationKey, + }, + }, nil +} + func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { switch req.Operation { case bindings.CreateOperation: @@ -399,6 +442,8 @@ func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi return s.delete(ctx, req) case bindings.ListOperation: return s.list(ctx, req) + case bindings.CopyOperation: + return s.copy(ctx, req) case presignOperation: return s.presign(ctx, req) default: diff --git a/bindings/azure/blobstorage/blobstorage.go b/bindings/azure/blobstorage/blobstorage.go index b132f9bf4a..6c07ab2585 100644 --- a/bindings/azure/blobstorage/blobstorage.go +++ b/bindings/azure/blobstorage/blobstorage.go @@ -32,7 +32,7 @@ import ( "github.com/dapr/components-contrib/bindings" storagecommon "github.com/dapr/components-contrib/common/component/azure/blobstorage" - contribMetadata "github.com/dapr/components-contrib/metadata" + contribmetadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" "github.com/dapr/kit/ptr" ) @@ -112,6 +112,7 @@ func (a *AzureBlobStorage) Operations() []bindings.OperationKind { bindings.GetOperation, bindings.DeleteOperation, bindings.ListOperation, + bindings.CopyOperation, } } @@ -344,6 +345,44 @@ func (a *AzureBlobStorage) list(ctx context.Context, req *bindings.InvokeRequest }, nil } +func (a *AzureBlobStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + sourceBlobName := req.Metadata["sourceBlobName"] + if sourceBlobName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceBlobName' missing") + } + + destinationBlobName := req.Metadata["destinationBlobName"] + if destinationBlobName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationBlobName' missing") + } + + sourceContainerName := req.Metadata["sourceContainerName"] + if sourceContainerName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceContainerName' missing") + } + + destinationContainerName := req.Metadata["destinationContainerName"] + if destinationContainerName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationContainerName' missing") + } + + sourceBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", sourceContainerName, sourceBlobName)) + destinationBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", destinationContainerName, destinationBlobName)) + + copyURL := sourceBlobClient.URL() + _, err := destinationBlobClient.StartCopyFromURL(ctx, copyURL, nil) + if err != nil { + return nil, fmt.Errorf("azure blob storage error: copy operation failed: %w", err) + } + + return &bindings.InvokeResponse{ + Metadata: map[string]string{ + "sourceBlobName": sourceBlobName, + "destinationBlobName": destinationBlobName, + }, + }, nil +} + func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { switch req.Operation { case bindings.CreateOperation: @@ -354,6 +393,8 @@ func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeReque return a.delete(ctx, req) case bindings.ListOperation: return a.list(ctx, req) + case bindings.CopyOperation: + return a.copy(ctx, req) default: return nil, fmt.Errorf("unsupported operation %s", req.Operation) } @@ -371,9 +412,9 @@ func (a *AzureBlobStorage) isValidDeleteSnapshotsOptionType(accessType azblob.De } // GetComponentMetadata returns the metadata of the component. -func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) { +func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribmetadata.MetadataMap) { metadataStruct := storagecommon.BlobStorageMetadata{} - contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.BindingType) + contribmetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribmetadata.BindingType) return } diff --git a/bindings/azure/blobstorage/metadata.yaml b/bindings/azure/blobstorage/metadata.yaml index 0c436bb02e..ec28df2738 100644 --- a/bindings/azure/blobstorage/metadata.yaml +++ b/bindings/azure/blobstorage/metadata.yaml @@ -19,6 +19,8 @@ binding: description: "Delete blob" - name: list description: "List blob" + - name: copy + description: "Copy blob" capabilities: [] builtinAuthenticationProfiles: - name: "azuread" diff --git a/bindings/gcp/bucket/bucket.go b/bindings/gcp/bucket/bucket.go index 2a8b5e1faa..d4cd81e6eb 100644 --- a/bindings/gcp/bucket/bucket.go +++ b/bindings/gcp/bucket/bucket.go @@ -137,6 +137,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind { bindings.GetOperation, bindings.DeleteOperation, bindings.ListOperation, + bindings.CopyOperation, signOperation, } } @@ -153,8 +154,10 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (* return g.delete(ctx, req) case bindings.ListOperation: return g.list(ctx, req) + case bindings.CopyOperation: + return g.copy(ctx, req) case signOperation: - return g.sign(ctx, req) + return g.sign(req) default: return nil, fmt.Errorf("unsupported operation %s", req.Operation) } @@ -307,6 +310,36 @@ func (g *GCPStorage) list(ctx context.Context, req *bindings.InvokeRequest) (*bi }, nil } +func (g *GCPStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + // Extract metadata + sourceKey := req.Metadata["sourceKey"] + if sourceKey == "" { + return nil, fmt.Errorf("gcp bucket binding error: required metadata 'sourceKey' missing") + } + + destinationKey := req.Metadata["destinationKey"] + if destinationKey == "" { + return nil, fmt.Errorf("gcp bucket binding error: required metadata 'destinationKey' missing") + } + + // Perform the copy operation + src := g.client.Bucket(g.metadata.Bucket).Object(sourceKey) + dst := g.client.Bucket(g.metadata.Bucket).Object(destinationKey) + + _, err := dst.CopierFrom(src).Run(ctx) + if err != nil { + return nil, fmt.Errorf("gcp bucket binding error: copy operation failed: %w", err) + } + + // Return the response + return &bindings.InvokeResponse{ + Metadata: map[string]string{ + "sourceKey": sourceKey, + "destinationKey": destinationKey, + }, + }, nil +} + func (g *GCPStorage) Close() error { return g.client.Close() } @@ -345,7 +378,7 @@ func (g *GCPStorage) GetComponentMetadata() (metadataInfo metadata.MetadataMap) return } -func (g *GCPStorage) sign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (g *GCPStorage) sign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { metadata, err := g.metadata.mergeWithRequestMetadata(req) if err != nil { return nil, fmt.Errorf("gcp binding error. error merge metadata : %w", err) diff --git a/bindings/gcp/bucket/bucket_test.go b/bindings/gcp/bucket/bucket_test.go index 6922050acb..098d463f0a 100644 --- a/bindings/gcp/bucket/bucket_test.go +++ b/bindings/gcp/bucket/bucket_test.go @@ -76,10 +76,10 @@ func TestParseMetadata(t *testing.T) { t.Run("check backward compatibility", func(t *testing.T) { gs := GCPStorage{logger: logger.NewLogger("test")} - request := bindings.InvokeRequest{} - request.Operation = bindings.CreateOperation - request.Metadata = map[string]string{ - "name": "my_file.txt", + request := bindings.InvokeRequest{ + Metadata: map[string]string{ + "name": "my_file.txt", + }, } result := gs.handleBackwardCompatibilityForMetadata(request.Metadata) assert.NotEmpty(t, result["key"]) diff --git a/bindings/requests.go b/bindings/requests.go index 385ae4ecc6..9d4ea2bbcb 100644 --- a/bindings/requests.go +++ b/bindings/requests.go @@ -34,6 +34,7 @@ const ( CreateOperation OperationKind = "create" DeleteOperation OperationKind = "delete" ListOperation OperationKind = "list" + CopyOperation OperationKind = "copy" ) // GetMetadataAsBool parses metadata as bool.