Skip to content

Commit

Permalink
Merge pull request #1026 from mskanth972/APreuse
Browse files Browse the repository at this point in the history
Reusing Access Points
  • Loading branch information
k8s-ci-robot committed Sep 19, 2023
2 parents e6523c0 + 53bdc4e commit 44c9b85
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 83 deletions.
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The following CSI interfaces are implemented:
| subPathPattern | | `/${.PV.name}` | true | The template used to construct the subPath under which each of the access points created under Dynamic Provisioning. Can be made up of fixed strings and limited variables, is akin to the 'subPathPattern' variable on the [nfs-subdir-external-provisioner](https://github.com/kubernetes-sigs/nfs-subdir-external-provisioner) chart. Supports `.PVC.name`,`.PVC.namespace` and `.PV.name` |
| ensureUniqueDirectory | | true | true | **NOTE: Only set this to false if you're sure this is the behaviour you want**.<br/> Used when dynamic provisioning is enabled, if set to true, appends the a UID to the pattern specified in `subPathPattern` to ensure that access points will not accidentally point at the same directory. |
| az | | "" | true | Used for cross-account mount. `az` under storage class parameter is optional. If specified, mount target associated with the az will be used for cross-account mount. If not specified, a random mount target will be picked for cross account mount |
| reuseAccessPoint | | false | true | When set to true, it creates Accesspoint client-token from the provided PVC name. So that the AccessPoint can be re-used from a differen cluster if same PVC name and storageclass configuration are used. |

**Note**
* Custom Posix group Id range for Access Point root directory must include both `gidRangeStart` and `gidRangeEnd` parameters. These parameters are optional only if both are omitted. If you specify one, the other becomes mandatory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ parameters:
gidRangeEnd: "2000" # optional
basePath: "/dynamic_provisioning" # optional
subPathPattern: "${.PVC.namespace}/${.PVC.name}" # optional
ensureUniqueDirectory: "true" # optional
ensureUniqueDirectory: "true" # optional
reuseAccessPoint: "false" # optional
61 changes: 57 additions & 4 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
)

const (
AccessDeniedException = "AccessDeniedException"
AccessDeniedException = "AccessDeniedException"
AccessPointAlreadyExists = "AccessPointAlreadyExists"
PvcNameTagKey = "pvcName"
)

var (
Expand Down Expand Up @@ -94,7 +96,7 @@ type Efs interface {

type Cloud interface {
GetMetadata() MetadataService
CreateAccessPoint(ctx context.Context, volumeName string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error)
CreateAccessPoint(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions, reuseAccessPoint bool) (accessPoint *AccessPoint, err error)
DeleteAccessPoint(ctx context.Context, accessPointId string) (err error)
DescribeAccessPoint(ctx context.Context, accessPointId string) (accessPoint *AccessPoint, err error)
ListAccessPoints(ctx context.Context, fileSystemId string) (accessPoints []*AccessPoint, err error)
Expand Down Expand Up @@ -161,10 +163,28 @@ func (c *cloud) GetMetadata() MetadataService {
return c.metadata
}

func (c *cloud) CreateAccessPoint(ctx context.Context, volumeName string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error) {
func (c *cloud) CreateAccessPoint(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions, reuseAccessPoint bool) (accessPoint *AccessPoint, err error) {
efsTags := parseEfsTags(accessPointOpts.Tags)

//if reuseAccessPoint is true, check for AP with same Root Directory exists in efs
// if found reuse that AP
if reuseAccessPoint {
existingAP, err := c.findAccessPointByClientToken(ctx, clientToken, accessPointOpts)
if err != nil {
return nil, fmt.Errorf("failed to find access point: %v", err)
}
if existingAP != nil {
//AP path already exists
klog.V(2).Infof("Existing AccessPoint found : %+v", existingAP)
return &AccessPoint{
AccessPointId: existingAP.AccessPointId,
FileSystemId: existingAP.FileSystemId,
CapacityGiB: accessPointOpts.CapacityGiB,
}, nil
}
}
createAPInput := &efs.CreateAccessPointInput{
ClientToken: &volumeName,
ClientToken: &clientToken,
FileSystemId: &accessPointOpts.FileSystemId,
PosixUser: &efs.PosixUser{
Gid: &accessPointOpts.Gid,
Expand All @@ -189,6 +209,7 @@ func (c *cloud) CreateAccessPoint(ctx context.Context, volumeName string, access
}
return nil, fmt.Errorf("Failed to create access point: %v", err)
}
klog.V(5).Infof("Create AP response : %+v", res)

return &AccessPoint{
AccessPointId: *res.AccessPointId,
Expand Down Expand Up @@ -240,6 +261,38 @@ func (c *cloud) DescribeAccessPoint(ctx context.Context, accessPointId string) (
}, nil
}

func (c *cloud) findAccessPointByClientToken(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error) {
klog.V(5).Infof("AccessPointOptions to find AP : %+v", accessPointOpts)
klog.V(2).Infof("ClientToken to find AP : %s", clientToken)
describeAPInput := &efs.DescribeAccessPointsInput{
FileSystemId: &accessPointOpts.FileSystemId,
MaxResults: aws.Int64(1000),
}
res, err := c.efs.DescribeAccessPointsWithContext(ctx, describeAPInput)
if err != nil {
if isAccessDenied(err) {
return
}
if isFileSystemNotFound(err) {
return
}
err = fmt.Errorf("failed to list Access Points of efs = %s : %v", accessPointOpts.FileSystemId, err)
return
}
for _, ap := range res.AccessPoints {
// check if AP exists with same client token
if aws.StringValue(ap.ClientToken) == clientToken {
return &AccessPoint{
AccessPointId: *ap.AccessPointId,
FileSystemId: *ap.FileSystemId,
AccessPointRootDir: *ap.RootDirectory.Path,
}, nil
}
}
klog.V(2).Infof("Access point does not exist")
return nil, nil
}

func (c *cloud) ListAccessPoints(ctx context.Context, fileSystemId string) (accessPoints []*AccessPoint, err error) {
describeAPInput := &efs.DescribeAccessPointsInput{
FileSystemId: &fileSystemId,
Expand Down
133 changes: 129 additions & 4 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloud
import (
"context"
"errors"
"reflect"
"testing"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -27,13 +28,14 @@ func TestCreateAccessPoint(t *testing.T) {
directoryPerms = "0777"
directoryPath = "/test"
volName = "volName"
clientToken = volName
)
testCases := []struct {
name string
testFunc func(t *testing.T)
}{
{
name: "Success",
name: "Success - AP does not exist",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
mockEfs := mocks.NewMockEfs(mockCtl)
Expand Down Expand Up @@ -72,9 +74,63 @@ func TestCreateAccessPoint(t *testing.T) {
},
}

describeAPOutput := &efs.DescribeAccessPointsOutput{
AccessPoints: nil,
}

ctx := context.Background()
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAPOutput, nil)
mockEfs.EXPECT().CreateAccessPointWithContext(gomock.Eq(ctx), gomock.Any()).Return(output, nil)
res, err := c.CreateAccessPoint(ctx, volName, req)
res, err := c.CreateAccessPoint(ctx, clientToken, req, true)

if err != nil {
t.Fatalf("CreateAccessPointFailed is failed: %v", err)
}

if res == nil {
t.Fatal("Result is nil")
}

if accessPointId != res.AccessPointId {
t.Fatalf("AccessPointId mismatched. Expected: %v, Actual: %v", accessPointId, res.AccessPointId)
}

if fsId != res.FileSystemId {
t.Fatalf("FileSystemId mismatched. Expected: %v, Actual: %v", fsId, res.FileSystemId)
}
mockCtl.Finish()
},
},
{
name: "Success - AP already exists",
testFunc: func(t *testing.T) {
mockCtl := gomock.NewController(t)
mockEfs := mocks.NewMockEfs(mockCtl)
c := &cloud{
efs: mockEfs,
}

tags := make(map[string]string)
tags["cluster"] = "efs"

req := &AccessPointOptions{
FileSystemId: fsId,
Uid: uid,
Gid: gid,
DirectoryPerms: directoryPerms,
DirectoryPath: directoryPath,
Tags: tags,
}

describeAPOutput := &efs.DescribeAccessPointsOutput{
AccessPoints: []*efs.AccessPointDescription{
{AccessPointId: aws.String(accessPointId), FileSystemId: aws.String(fsId), ClientToken: aws.String(clientToken), RootDirectory: &efs.RootDirectory{Path: aws.String(directoryPath)}, Tags: []*efs.Tag{{Key: aws.String(PvcNameTagKey), Value: aws.String(volName)}}},
},
}

ctx := context.Background()
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAPOutput, nil)
res, err := c.CreateAccessPoint(ctx, clientToken, req, true)

if err != nil {
t.Fatalf("CreateAccessPointFailed is failed: %v", err)
Expand Down Expand Up @@ -108,10 +164,14 @@ func TestCreateAccessPoint(t *testing.T) {
DirectoryPerms: directoryPerms,
DirectoryPath: directoryPath,
}
describeAPOutput := &efs.DescribeAccessPointsOutput{
AccessPoints: nil,
}

ctx := context.Background()
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Eq(ctx), gomock.Any()).Return(describeAPOutput, nil)
mockEfs.EXPECT().CreateAccessPointWithContext(gomock.Eq(ctx), gomock.Any()).Return(nil, errors.New("CreateAccessPointWithContext failed"))
_, err := c.CreateAccessPoint(ctx, volName, req)
_, err := c.CreateAccessPoint(ctx, clientToken, req, true)
if err == nil {
t.Fatalf("CreateAccessPoint did not fail")
}
Expand All @@ -135,7 +195,7 @@ func TestCreateAccessPoint(t *testing.T) {

ctx := context.Background()
mockEfs.EXPECT().CreateAccessPointWithContext(gomock.Eq(ctx), gomock.Any()).Return(nil, awserr.New(AccessDeniedException, "Access Denied", errors.New("Access Denied")))
_, err := c.CreateAccessPoint(ctx, volName, req)
_, err := c.CreateAccessPoint(ctx, clientToken, req, false)
if err == nil {
t.Fatalf("CreateAccessPoint did not fail")
}
Expand Down Expand Up @@ -862,3 +922,68 @@ func testResult(t *testing.T, funcName string, ret interface{}, err error, expec
}
}
}

func Test_findAccessPointByPath(t *testing.T) {
fsId := "testFsId"
clientToken := "testPvcName"
dirPath := "testPath"
diffClientToken := aws.String("diff")

mockctl := gomock.NewController(t)
defer mockctl.Finish()
mockEfs := mocks.NewMockEfs(mockctl)

expectedSingleAP := &AccessPoint{
AccessPointId: "testApId",
AccessPointRootDir: dirPath,
FileSystemId: fsId,
}

type args struct {
clientToken string
accessPointOpts *AccessPointOptions
}
tests := []struct {
name string
args args
prepare func(*mocks.MockEfs)
wantAccessPoint *AccessPoint
wantErr bool
}{
{name: "Expected_ClientToken_Not_Found", args: args{clientToken, &AccessPointOptions{FileSystemId: fsId, DirectoryPath: dirPath}}, prepare: func(mockEfs *mocks.MockEfs) {
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Any(), gomock.Any()).Return(&efs.DescribeAccessPointsOutput{
AccessPoints: []*efs.AccessPointDescription{{FileSystemId: aws.String(fsId), ClientToken: diffClientToken, AccessPointId: aws.String(expectedSingleAP.AccessPointId), RootDirectory: &efs.RootDirectory{Path: aws.String("differentPath")}}},
}, nil)
}, wantAccessPoint: nil, wantErr: false},
{name: "Expected_Path_Found_In_Multiple_APs_And_One_AP_Filtered_By_ClientToken", args: args{clientToken, &AccessPointOptions{FileSystemId: fsId, DirectoryPath: dirPath}}, prepare: func(mockEfs *mocks.MockEfs) {
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Any(), gomock.Any()).Return(&efs.DescribeAccessPointsOutput{
AccessPoints: []*efs.AccessPointDescription{
{FileSystemId: aws.String(fsId), ClientToken: diffClientToken, AccessPointId: aws.String("differentApId"), RootDirectory: &efs.RootDirectory{Path: aws.String(expectedSingleAP.AccessPointRootDir)}},
{FileSystemId: aws.String(fsId), ClientToken: &clientToken, AccessPointId: aws.String(expectedSingleAP.AccessPointId), RootDirectory: &efs.RootDirectory{Path: aws.String(expectedSingleAP.AccessPointRootDir)}},
},
}, nil)
}, wantAccessPoint: expectedSingleAP, wantErr: false},
{name: "Fail_DescribeAccessPoints", args: args{clientToken, &AccessPointOptions{FileSystemId: fsId, DirectoryPath: dirPath}}, prepare: func(mockEfs *mocks.MockEfs) {
mockEfs.EXPECT().DescribeAccessPointsWithContext(gomock.Any(), gomock.Any()).Return(nil, errors.New("access_denied"))
}, wantAccessPoint: nil, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &cloud{efs: mockEfs}
ctx := context.Background()

if tt.prepare != nil {
tt.prepare(mockEfs)
}

gotAccessPoint, err := c.findAccessPointByClientToken(ctx, tt.args.clientToken, tt.args.accessPointOpts)
if (err != nil) != tt.wantErr {
t.Errorf("findAccessPointByClientToken() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotAccessPoint, tt.wantAccessPoint) {
t.Errorf("findAccessPointByClientToken() gotAccessPoint = %v, want %v", gotAccessPoint, tt.wantAccessPoint)
}
})
}
}
6 changes: 3 additions & 3 deletions pkg/cloud/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (c *FakeCloudProvider) GetMetadata() MetadataService {
return c.m
}

func (c *FakeCloudProvider) CreateAccessPoint(ctx context.Context, volumeName string, accessPointOpts *AccessPointOptions) (accessPoint *AccessPoint, err error) {
ap, exists := c.accessPoints[volumeName]
func (c *FakeCloudProvider) CreateAccessPoint(ctx context.Context, clientToken string, accessPointOpts *AccessPointOptions, usePvcName bool) (accessPoint *AccessPoint, err error) {
ap, exists := c.accessPoints[clientToken]
if exists {
if accessPointOpts.CapacityGiB == ap.CapacityGiB {
return ap, nil
Expand All @@ -45,7 +45,7 @@ func (c *FakeCloudProvider) CreateAccessPoint(ctx context.Context, volumeName st
CapacityGiB: accessPointOpts.CapacityGiB,
}

c.accessPoints[volumeName] = ap
c.accessPoints[clientToken] = ap
return ap, nil
}

Expand Down
31 changes: 28 additions & 3 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package driver

import (
"context"
"crypto/sha256"
"fmt"
"github.com/google/uuid"
"os"
Expand Down Expand Up @@ -56,6 +57,8 @@ const (
SubPathPattern = "subPathPattern"
TempMountPathPrefix = "/var/lib/csi/pv"
Uid = "uid"
ReuseAccessPointKey = "reuseAccessPoint"
PvcNameKey = "csi.storage.k8s.io/pvc/name"
)

var (
Expand All @@ -74,7 +77,25 @@ var (

func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.V(4).Infof("CreateVolume: called with args %+v", *req)

var reuseAccessPoint bool
var err error
volumeParams := req.GetParameters()
volName := req.GetName()
clientToken := volName

// if true, then use sha256 hash of pvcName as clientToken instead of PVC Id
// This allows users to reconnect to the same AP from different k8s cluster
if reuseAccessPointStr, ok := volumeParams[ReuseAccessPointKey]; ok {
reuseAccessPoint, err = strconv.ParseBool(reuseAccessPointStr)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "Invalid value for reuseAccessPoint parameter")
}
if reuseAccessPoint {
clientToken = get64LenHash(volumeParams[PvcNameKey])
klog.V(5).Infof("Client token : %s", clientToken)
}
}
if volName == "" {
return nil, status.Error(codes.InvalidArgument, "Volume name not provided")
}
Expand All @@ -98,7 +119,6 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
var (
azName string
basePath string
err error
gid int64
gidMin int
gidMax int
Expand All @@ -109,7 +129,6 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
)

//Parse parameters
volumeParams := req.GetParameters()
if value, ok := volumeParams[ProvisioningMode]; ok {
provisioningMode = value
//TODO: Add FS provisioning mode check when implemented
Expand Down Expand Up @@ -287,7 +306,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
accessPointsOptions.Gid = gid
accessPointsOptions.DirectoryPath = rootDir

accessPointId, err := localCloud.CreateAccessPoint(ctx, volName, accessPointsOptions)
accessPointId, err := localCloud.CreateAccessPoint(ctx, clientToken, accessPointsOptions, reuseAccessPoint)
if err != nil {
if err == cloud.ErrAccessDenied {
return nil, status.Errorf(codes.Unauthenticated, "Access Denied. Please ensure you have the right AWS permissions: %v", err)
Expand Down Expand Up @@ -565,3 +584,9 @@ func validateEfsPathRequirements(proposedPath string) (bool, error) {
return true, nil
}
}

func get64LenHash(text string) string {
h := sha256.New()
h.Write([]byte(text))
return fmt.Sprintf("%x", h.Sum(nil))
}
Loading

0 comments on commit 44c9b85

Please sign in to comment.