Skip to content

Commit

Permalink
Feature: Multi-Attach for io2 block devices
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Oct 25, 2023
1 parent fdb9866 commit d316ff8
Show file tree
Hide file tree
Showing 8 changed files with 623 additions and 126 deletions.
81 changes: 81 additions & 0 deletions docs/multi-attach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Multi-Attach

The multi-attach capability allows you to attach a single EBS volume to multiple EC2 instances located within the same Availability Zone (AZ). This shared volume can be utilized by several pods running on distinct nodes.

## Important

- EBS Multi-Attach does not support standard file systems. Standard file systems such as `XFS`, `EXT3`, `EXT4`, and `NTFS` aren't designed to be simultaneously accessed by multiple servers or EC2 instances.
- Simultaneous access to a standard file system can result in data corruption or data loss.
- You should use a clustered file system to ensure data resiliency and reliability for your production workloads.
- Multi-Attach is only enabled for `IO2` block devices.

Refer to the official AWS documentation on [Multi-Attach](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-volumes-multi.html) for more information, best practices, and limitations of this capability.

## Example

1. Create a `StorageClass` referencing an `IO2` volume type:
```
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: ebs-sc
provisioner: ebs.csi.aws.com
volumeBindingMode: WaitForFirstConsumer
parameters:
type: io2
iops: "1000"
```

2. Create a `PersistentVolumeClaim` referencing the `ReadWriteMany` access and `Block` device modes:
```
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: block-claim
spec:
accessModes:
- ReadWriteMany
volumeMode: Block
storageClassName: ebs-sc
resources:
requests:
storage: 4Gi
```

3. Create a `DaemonSet` to deploy the driver on all nodes:
```
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: app-daemon
spec:
selector:
matchLabels:
name: app
template:
metadata:
labels:
name: app
spec:
containers:
- name: app
image: busybox
command: ["/bin/sh", "-c"]
args: ["tail -f /dev/null"]
volumeDevices:
- name: data
devicePath: /dev/xvda
volumes:
- name: data
persistentVolumeClaim:
claimName: block-claim
```

4. Verify the `DaemonSet` is running:
```
$ kubectl get pods -A
NAMESPACE NAME READY STATUS RESTARTS AGE
default app-daemon-9hdgw 1/1 Running 0 18s
default app-daemon-xm8zr 1/1 Running 0 18s
```
80 changes: 18 additions & 62 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,9 @@ type DiskOptions struct {
BlockExpress bool
// KmsKeyID represents a fully qualified resource name to the key to use for encryption.
// example: arn:aws:kms:us-east-1:012345678910:key/abcd1234-a123-456a-a12b-a123b4cd56ef
KmsKeyID string
SnapshotID string
KmsKeyID string
SnapshotID string
MultiAttachEnabled bool
}

// ModifyDiskOptions represents parameters to modify an EBS volume
Expand Down Expand Up @@ -345,6 +346,10 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return nil, fmt.Errorf("invalid AWS VolumeType %q", diskOptions.VolumeType)
}

if diskOptions.MultiAttachEnabled && createType != VolumeTypeIO2 {
return nil, fmt.Errorf("CreateDisk: multi-attach is only supported for io2 volumes")
}

if maxIops > 0 {
if diskOptions.IOPS > 0 {
requestedIops = int64(diskOptions.IOPS)
Expand Down Expand Up @@ -381,11 +386,12 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
clientToken := sha256.Sum256([]byte(volumeName))

requestInput := &ec2.CreateVolumeInput{
AvailabilityZone: aws.String(zone),
ClientToken: aws.String(hex.EncodeToString(clientToken[:])),
Size: aws.Int64(capacityGiB),
VolumeType: aws.String(createType),
Encrypted: aws.Bool(diskOptions.Encrypted),
AvailabilityZone: aws.String(zone),
ClientToken: aws.String(hex.EncodeToString(clientToken[:])),
Size: aws.Int64(capacityGiB),
VolumeType: aws.String(createType),
Encrypted: aws.Bool(diskOptions.Encrypted),
MultiAttachEnabled: aws.Bool(diskOptions.MultiAttachEnabled),
}

if !util.IsSBE(zone) {
Expand Down Expand Up @@ -549,40 +555,19 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string

resp, attachErr := c.ec2.AttachVolumeWithContext(ctx, request)
if attachErr != nil {
var awsErr awserr.Error
if errors.As(attachErr, &awsErr) {
if awsErr.Code() == "VolumeInUse" {
return "", ErrVolumeInUse
}
}
return "", fmt.Errorf("could not attach volume %q to node %q: %w", volumeID, nodeID, attachErr)
}
klog.V(5).InfoS("[Debug] AttachVolume", "volumeID", volumeID, "nodeID", nodeID, "resp", resp)
}

attachment, err := c.WaitForAttachmentState(ctx, volumeID, volumeAttachedState, *instance.InstanceId, device.Path, device.IsAlreadyAssigned)
_, err = c.WaitForAttachmentState(ctx, volumeID, volumeAttachedState, *instance.InstanceId, device.Path, device.IsAlreadyAssigned)

// This is the only situation where we taint the device
if err != nil {
device.Taint()
return "", err
}

// Double check the attachment to be 100% sure we attached the correct volume at the correct mountpoint
// It could happen otherwise that we see the volume attached from a previous/separate AttachVolume call,
// which could theoretically be against a different device (or even instance).
if attachment == nil {
// Impossible?
return "", fmt.Errorf("unexpected state: attachment nil after attached %q to %q", volumeID, nodeID)
}
if device.Path != aws.StringValue(attachment.Device) {
// Already checked in waitForAttachmentState(), but just to be sure...
return "", fmt.Errorf("disk attachment of %q to %q failed: requested device %q but found %q", volumeID, nodeID, device.Path, aws.StringValue(attachment.Device))
}
if *instance.InstanceId != aws.StringValue(attachment.InstanceId) {
return "", fmt.Errorf("disk attachment of %q to %q failed: requested instance %q but found %q", volumeID, nodeID, *instance.InstanceId, aws.StringValue(attachment.InstanceId))
}

// TODO: Check volume capability matches for ALREADY_EXISTS
// This could happen when request volume already attached to request node,
// but is incompatible with the specified volume_capability or readonly flag
Expand Down Expand Up @@ -674,41 +659,12 @@ func (c *cloud) WaitForAttachmentState(ctx context.Context, volumeID, expectedSt
return false, nil
}

if len(volume.Attachments) > 1 {
// Shouldn't happen; log so we know if it is
klog.InfoS("Found multiple attachments for volume", "volumeID", volumeID, "volume", volume)
}
attachmentState := ""
attachmentState := volumeDetachedState

for _, a := range volume.Attachments {
if attachmentState != "" {
// Shouldn't happen; log so we know if it is
klog.InfoS("Found multiple attachments for volume", "volumeID", volumeID, "volume", volume)
}
if a.State != nil {
if a.State != nil && aws.StringValue(a.InstanceId) == expectedInstance && aws.StringValue(a.Device) == expectedDevice {
attachmentState = aws.StringValue(a.State)
attachment = a
attachmentState = *a.State
} else {
// Shouldn't happen; log so we know if it is
klog.InfoS("Ignoring nil attachment state for volume", "volumeID", volumeID, "attachment", a)
}
}
if attachmentState == "" {
attachmentState = volumeDetachedState
}
if attachment != nil {
// AWS eventual consistency can go back in time.
// For example, we're waiting for a volume to be attached as /dev/xvdba, but AWS can tell us it's
// attached as /dev/xvdbb, where it was attached before and it was already detached.
// Retry couple of times, hoping AWS starts reporting the right status.
device := aws.StringValue(attachment.Device)
if expectedDevice != "" && device != "" && device != expectedDevice {
klog.InfoS("Expected device for volume not found", "expectedDevice", expectedDevice, "expectedState", expectedState, "volumeID", volumeID, "device", device, "attachmentState", attachmentState)
return false, nil
}
instanceID := aws.StringValue(attachment.InstanceId)
if expectedInstance != "" && instanceID != "" && instanceID != expectedInstance {
klog.InfoS("Expected instance for volume not found", "expectedInstance", expectedInstance, "expectedState", expectedState, "volumeID", volumeID, "instanceID", instanceID, "attachmentState", attachmentState)
return false, nil
}
}

Expand Down
102 changes: 96 additions & 6 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,43 @@ func TestCreateDisk(t *testing.T) {
},
expErr: nil,
},
{
name: "success: multi-attach with IO2",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(4),
Tags: map[string]string{VolumeNameTagKey: "vol-test", AwsEbsDriverTagKey: "true"},
VolumeType: VolumeTypeIO2,
MultiAttachEnabled: true,
IOPSPerGB: 10000,
},
expDisk: &Disk{
VolumeID: "vol-test",
CapacityGiB: 4,
AvailabilityZone: defaultZone,
},
expCreateVolumeInput: &ec2.CreateVolumeInput{
Iops: aws.Int64(2000),
},
expErr: nil,
},
{
name: "failure: multi-attach with GP3",
volumeName: "vol-test-name",
diskOptions: &DiskOptions{
CapacityBytes: util.GiBToBytes(4),
Tags: map[string]string{VolumeNameTagKey: "vol-test", AwsEbsDriverTagKey: "true"},
VolumeType: VolumeTypeGP3,
MultiAttachEnabled: true,
IOPSPerGB: 10000,
},
expDisk: &Disk{
VolumeID: "vol-test",
CapacityGiB: 4,
AvailabilityZone: defaultZone,
},
expErr: fmt.Errorf("CreateDisk: multi-attach is only supported for io2 volumes"),
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -715,17 +752,18 @@ func TestAttachDisk(t *testing.T) {
name string
volumeID string
nodeID string
nodeID2 string
path string
expErr error
mockFunc func(*MockEC2API, context.Context, string, string, string, dm.DeviceManager)
mockFunc func(*MockEC2API, context.Context, string, string, string, string, dm.DeviceManager)
}{
{
name: "success: AttachVolume normal",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
path: defaultPath,
expErr: nil,
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) {
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
volumeRequest := createVolumeRequest(volumeID)
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)
Expand All @@ -743,7 +781,7 @@ func TestAttachDisk(t *testing.T) {
nodeID: defaultNodeID,
path: defaultPath,
expErr: nil,
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) {
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
volumeRequest := createVolumeRequest(volumeID)
instanceRequest := createInstanceRequest(nodeID)

Expand All @@ -762,7 +800,7 @@ func TestAttachDisk(t *testing.T) {
nodeID: defaultNodeID,
path: defaultPath,
expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, errors.New("AttachVolume error")),
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) {
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)

Expand All @@ -778,7 +816,7 @@ func TestAttachDisk(t *testing.T) {
nodeID: defaultNodeID,
path: defaultPath,
expErr: fmt.Errorf("could not attach volume %q to node %q: %w", defaultVolumeID, defaultNodeID, ErrVolumeInUse),
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, path string, dm dm.DeviceManager) {
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)

Expand All @@ -788,6 +826,52 @@ func TestAttachDisk(t *testing.T) {
)
},
},
{
name: "success: AttachVolume multi-attach",
volumeID: defaultVolumeID,
nodeID: defaultNodeID,
nodeID2: "node-1239",
path: defaultPath,
expErr: nil,
mockFunc: func(mockEC2 *MockEC2API, ctx context.Context, volumeID, nodeID, nodeID2, path string, dm dm.DeviceManager) {
volumeRequest := createVolumeRequest(volumeID)
instanceRequest := createInstanceRequest(nodeID)
attachRequest := createAttachRequest(volumeID, nodeID, path)

createInstanceRequest2 := createInstanceRequest(nodeID2)
attachRequest2 := createAttachRequest(volumeID, nodeID2, path)

dvOutput := &ec2.DescribeVolumesOutput{
Volumes: []*ec2.Volume{
{
VolumeId: aws.String(volumeID),
Attachments: []*ec2.VolumeAttachment{
{
Device: aws.String(path),
InstanceId: aws.String(nodeID),
State: aws.String("attached"),
},
{
Device: aws.String(path),
InstanceId: aws.String(nodeID2),
State: aws.String("attached"),
},
},
},
},
}

gomock.InOrder(
mockEC2.EXPECT().DescribeInstancesWithContext(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest).Return(createAttachVolumeOutput(volumeID, nodeID, path, "attached"), nil),
mockEC2.EXPECT().DescribeVolumesWithContext(ctx, volumeRequest).Return(createDescribeVolumesOutput(volumeID, nodeID, path, "attached"), nil),

mockEC2.EXPECT().DescribeInstancesWithContext(ctx, createInstanceRequest2).Return(newDescribeInstancesOutput(nodeID2), nil),
mockEC2.EXPECT().AttachVolumeWithContext(ctx, attachRequest2).Return(createAttachVolumeOutput(volumeID, nodeID2, path, "attached"), nil),
mockEC2.EXPECT().DescribeVolumesWithContext(ctx, volumeRequest).Return(dvOutput, nil),
)
},
},
}

for _, tc := range testCases {
Expand All @@ -799,7 +883,7 @@ func TestAttachDisk(t *testing.T) {
ctx := context.Background()
dm := c.(*cloud).dm

tc.mockFunc(mockEC2, ctx, tc.volumeID, tc.nodeID, tc.path, dm)
tc.mockFunc(mockEC2, ctx, tc.volumeID, tc.nodeID, tc.nodeID2, tc.path, dm)

devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID)

Expand All @@ -811,6 +895,12 @@ func TestAttachDisk(t *testing.T) {
assert.Equal(t, tc.path, devicePath)
}

if tc.nodeID2 != "" {
devicePath, err := c.AttachDisk(ctx, tc.volumeID, tc.nodeID2)
assert.NoError(t, err)
assert.Equal(t, tc.path, devicePath)
}

mockCtrl.Finish()
})
}
Expand Down
Loading

0 comments on commit d316ff8

Please sign in to comment.