Skip to content

Commit

Permalink
Request coalescing for resizing and modifying volume
Browse files Browse the repository at this point in the history
Signed-off-by: Hanyue Liang <[email protected]>
  • Loading branch information
hanyuel committed Jul 12, 2023
1 parent 2bac0f8 commit 5fdd5df
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 8 deletions.
92 changes: 92 additions & 0 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,98 @@ func (c *cloud) ModifyDisk(ctx context.Context, volumeID string, options *Modify
return c.waitForVolumeSize(ctx, volumeID)
}

func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) {
klog.V(4).InfoS("Received ResizeOrModifyDisk request", "volumeID", volumeID, "options", options, "newSizeBytes", newSizeBytes)
newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

// AWS resizes in chunks of GiB (not GB)
oldSizeGiB = aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
}
return oldSizeGiB, nil
}
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
if options.IOPS != 0 {
req.Iops = aws.Int64(int64(options.IOPS))
}
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.VolumeType == VolumeTypeGP3 {
req.Throughput = aws.Int64(int64(options.Throughput))
}

response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: &volumeID}
if _, err := c.ec2.DeleteVolumeWithContext(ctx, request); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/cloud_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Cloud interface {
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error)
ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int64, err error)
WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error)
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
Expand Down
37 changes: 30 additions & 7 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ const isManagedByDriver = "true"

// controllerService represents the controller service of CSI driver
type controllerService struct {
cloud cloud.Cloud
inFlight *internal.InFlight
driverOptions *DriverOptions
cloud cloud.Cloud
inFlight *internal.InFlight
driverOptions *DriverOptions
modifyVolumeManager *modifyVolumeManager

rpc.UnimplementedModifyServer
}
Expand Down Expand Up @@ -97,9 +98,10 @@ func newControllerService(driverOptions *DriverOptions) controllerService {
}

return controllerService{
cloud: cloudSrv,
inFlight: internal.NewInFlight(),
driverOptions: driverOptions,
cloud: cloudSrv,
inFlight: internal.NewInFlight(),
driverOptions: driverOptions,
modifyVolumeManager: newModifyVolumeManager(),
}
}

Expand Down Expand Up @@ -508,11 +510,32 @@ func (d *controllerService) ControllerExpandVolume(ctx context.Context, req *csi
return nil, status.Error(codes.InvalidArgument, "After round-up, volume size exceeds the limit specified")
}

actualSizeGiB, err := d.cloud.ResizeDisk(ctx, volumeID, newSize)
modifyVolumeRequest := modifyVolumeRequest{
newSize: newSize,
}

err := d.addModifyVolumeRequest(ctx, volumeID, &modifyVolumeRequest)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err)
}

var actualSizeGiB int64
for {
select {
case err := <-d.modifyVolumeManager.requestHandlerMap[volumeID].errChan:
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err)
}
break
case size := <-d.modifyVolumeManager.requestHandlerMap[volumeID].resultChan:
actualSizeGiB = size
break
default:
}
}

d.modifyVolumeManager.deleteRequestHandler(volumeID)

nodeExpansionRequired := true
// if this is a raw block device, no expansion should be necessary on the node
cap := req.GetVolumeCapability()
Expand Down
162 changes: 161 additions & 1 deletion pkg/driver/controller_modify_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package driver

import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
Expand All @@ -17,8 +20,148 @@ const (
ModificationKeyIOPS = "iops"

ModificationKeyThroughput = "throughput"

modifyVolumeRequestHandlerTimeout = 5 * time.Second
)

type modifyVolumeRequest struct {
newSize int64
modifyDiskOptions cloud.ModifyDiskOptions
}

type modifyVolumeRequestHandler struct {
volumeID string
// Channel for sending requests to the Timer thread for the volume
requestChan chan *modifyVolumeRequest
// Channel for sending errors of ec2 ModifyVolume API call
errChan chan error
// Channel for sending the actual volume size after the ec2 ModifyVolume API call
resultChan chan int64
// Merged request from the requests that have been accepted for the volume
mergedRequest *modifyVolumeRequest
mux sync.Mutex
}

type modifyVolumeManager struct {
// Map of volume ID to modifyVolumeRequestHandler
requestHandlerMap map[string]*modifyVolumeRequestHandler
mux *sync.RWMutex
}

func newModifyVolumeManager() *modifyVolumeManager {
return &modifyVolumeManager{
requestHandlerMap: make(map[string]*modifyVolumeRequestHandler),
mux: &sync.RWMutex{},
}
}

func (m *modifyVolumeManager) deleteRequestHandler(volumeID string) {
m.mux.Lock()
defer m.mux.Unlock()
delete(m.requestHandlerMap, volumeID)
}

// This function validates the new request against the merged request for the volume.
// If any of the volume properties of the merged request overlaps with the volume properties of the new request, this function
// will return an error and the new request will be rejected.
// If none of the volume properties of the merged request overlaps with the properties of the new request, this function
// won't return an error and the request will be accepted for further processing.
func (h *modifyVolumeRequestHandler) validateModifyVolumeRequest(r *modifyVolumeRequest) error {
h.mux.Lock()
defer h.mux.Unlock()
if r.newSize != 0 && h.mergedRequest.newSize != 0 {
return fmt.Errorf("There's already pending request for resizing the volume. Rejecting the new resizing request")
}
if r.modifyDiskOptions.IOPS != 0 && h.mergedRequest.modifyDiskOptions.IOPS != 0 {
return fmt.Errorf("IOPS was already requested by a previous request.")
}
if r.modifyDiskOptions.Throughput != 0 && h.mergedRequest.modifyDiskOptions.Throughput != 0 {
return fmt.Errorf("Throughput was already requested by a previous request.")
}
if r.modifyDiskOptions.VolumeType != "" && h.mergedRequest.modifyDiskOptions.VolumeType != "" {
return fmt.Errorf("Volume type was already requested by a previous request.")
}
return nil
}

func (h *modifyVolumeRequestHandler) mergeModifyVolumeRequest(r *modifyVolumeRequest) {
h.mux.Lock()
defer h.mux.Unlock()
if r.newSize != 0 {
h.mergedRequest.newSize = r.newSize
}
if r.modifyDiskOptions.IOPS != 0 {
h.mergedRequest.modifyDiskOptions.IOPS = r.modifyDiskOptions.IOPS
}
if r.modifyDiskOptions.Throughput != 0 {
h.mergedRequest.modifyDiskOptions.Throughput = r.modifyDiskOptions.Throughput
}
if r.modifyDiskOptions.VolumeType != "" {
h.mergedRequest.modifyDiskOptions.VolumeType = r.modifyDiskOptions.VolumeType
}
}

func (d *controllerService) addModifyVolumeRequest(ctx context.Context, volumeID string, r *modifyVolumeRequest) error {
d.modifyVolumeManager.mux.RLock()
requestHandler, ok := d.modifyVolumeManager.requestHandlerMap[volumeID]
d.modifyVolumeManager.mux.RUnlock()
if ok {
err := requestHandler.validateModifyVolumeRequest(r)
if err == nil {
requestHandler.requestChan <- r
} else {
return err
}
} else {
d.modifyVolumeManager.mux.Lock()
defer d.modifyVolumeManager.mux.Unlock()

requestChan := make(chan *modifyVolumeRequest)
errChan := make(chan error)
resultChan := make(chan int64)
requestHandler := modifyVolumeRequestHandler{
requestChan: requestChan,
errChan: errChan,
resultChan: resultChan,
mergedRequest: r,
volumeID: volumeID,
mux: sync.Mutex{},
}
d.modifyVolumeManager.requestHandlerMap[volumeID] = &requestHandler
go d.startVolumeTimer(ctx, &requestHandler)
}
return nil
}

func (d *controllerService) startVolumeTimer(ctx context.Context, h *modifyVolumeRequestHandler) {
for {
select {
case req := <-h.requestChan:
h.mergeModifyVolumeRequest(req)
case <-time.After(modifyVolumeRequestHandlerTimeout):
actualSizeGiB, err := d.executeModifyVolumeRequest(ctx, h.volumeID, h.mergedRequest)
if err != nil {
h.errChan <- err
} else {
h.resultChan <- actualSizeGiB
}
return
case <-ctx.Done():
return
default:
}
}
}

func (d *controllerService) executeModifyVolumeRequest(ctx context.Context, volumeID string, req *modifyVolumeRequest) (int64, error) {
actualSizeGiB, err := d.cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions)
if err != nil {
return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err)
} else {
return actualSizeGiB, nil
}
}

func (d *controllerService) GetCSIDriverModificationCapability(
_ context.Context,
_ *rpc.GetCSIDriverModificationCapabilityRequest,
Expand Down Expand Up @@ -55,9 +198,26 @@ func (d *controllerService) ModifyVolumeProperties(
modifyOptions.VolumeType = value
}
}
if err := d.cloud.ModifyDisk(ctx, name, &modifyOptions); err != nil {
modifyVolumeRequest := modifyVolumeRequest{
modifyDiskOptions: modifyOptions,
}
if err := d.addModifyVolumeRequest(ctx, name, &modifyVolumeRequest); err != nil {
return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err)
}
for {
select {
case err := <-d.modifyVolumeManager.requestHandlerMap[name].errChan:
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err)
}
break
case <-d.modifyVolumeManager.requestHandlerMap[name].resultChan:
break
default:
}
}
d.modifyVolumeManager.deleteRequestHandler(name)

return &rpc.ModifyVolumePropertiesResponse{}, nil
}

Expand Down

0 comments on commit 5fdd5df

Please sign in to comment.