diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index 70ee16a740..a6e2e53585 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -498,6 +498,98 @@ func (c *cloud) ModifyDisk(ctx context.Context, volumeID string, options *Modify return c.waitForVolumeSize(ctx, volumeID) } +func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) { + klog.V(4).InfoS("Received ResizeOrModifyDisk request", "volumeID", volumeID, "options", options, "newSizeBytes", newSizeBytes) + newSizeGiB := util.RoundUpGiB(newSizeBytes) + var oldSizeGiB int64 + if newSizeBytes != 0 { + request := &ec2.DescribeVolumesInput{ + VolumeIds: []*string{ + aws.String(volumeID), + }, + } + volume, err := c.getVolume(ctx, request) + if err != nil { + return 0, err + } + + // AWS resizes in chunks of GiB (not GB) + oldSizeGiB = aws.Int64Value(volume.Size) + + latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID) + + if latestMod != nil && modFetchError == nil { + state := aws.StringValue(latestMod.ModificationState) + if state == ec2.VolumeModificationStateModifying { + err = c.waitForVolumeSize(ctx, volumeID) + if err != nil { + return oldSizeGiB, err + } + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } + } + + // if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error + // that means we have an API problem. + if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) { + return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError) + } + + // Even if existing volume size is greater than user requested size, we should ensure that there are no pending + // volume modifications objects or volume has completed previously issued modification request. + if oldSizeGiB >= newSizeGiB { + klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB) + err = c.waitForVolumeSize(ctx, volumeID) + if err != nil && !errors.Is(err, VolumeNotBeingModified) { + return oldSizeGiB, err + } + return oldSizeGiB, nil + } + } + + req := &ec2.ModifyVolumeInput{ + VolumeId: aws.String(volumeID), + } + if newSizeBytes != 0 { + req.Size = aws.Int64(newSizeGiB) + } + if options.IOPS != 0 { + req.Iops = aws.Int64(int64(options.IOPS)) + } + if options.VolumeType != "" { + req.VolumeType = aws.String(options.VolumeType) + } + if options.VolumeType == VolumeTypeGP3 { + req.Throughput = aws.Int64(int64(options.Throughput)) + } + + response, err := c.ec2.ModifyVolumeWithContext(ctx, req) + if err != nil { + return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err) + } + + mod := response.VolumeModification + state := aws.StringValue(mod.ModificationState) + + if volumeModificationDone(state) { + if newSizeBytes != 0 { + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } else { + return 0, nil + } + } + + err = c.waitForVolumeSize(ctx, volumeID) + if newSizeBytes != 0 { + if err != nil { + return oldSizeGiB, err + } + return c.checkDesiredSize(ctx, volumeID, newSizeGiB) + } else { + return 0, c.waitForVolumeSize(ctx, volumeID) + } +} + func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { request := &ec2.DeleteVolumeInput{VolumeId: &volumeID} if _, err := c.ec2.DeleteVolumeWithContext(ctx, request); err != nil { diff --git a/pkg/cloud/cloud_interface.go b/pkg/cloud/cloud_interface.go index 72852819ac..0022d8237a 100644 --- a/pkg/cloud/cloud_interface.go +++ b/pkg/cloud/cloud_interface.go @@ -13,6 +13,7 @@ type Cloud interface { AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error) DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error) ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error) + ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int64, err error) WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error) GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error) GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index 7104a83c38..178e73090c 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -61,9 +61,10 @@ const isManagedByDriver = "true" // controllerService represents the controller service of CSI driver type controllerService struct { - cloud cloud.Cloud - inFlight *internal.InFlight - driverOptions *DriverOptions + cloud cloud.Cloud + inFlight *internal.InFlight + driverOptions *DriverOptions + modifyVolumeManager *modifyVolumeManager rpc.UnimplementedModifyServer } @@ -97,9 +98,10 @@ func newControllerService(driverOptions *DriverOptions) controllerService { } return controllerService{ - cloud: cloudSrv, - inFlight: internal.NewInFlight(), - driverOptions: driverOptions, + cloud: cloudSrv, + inFlight: internal.NewInFlight(), + driverOptions: driverOptions, + modifyVolumeManager: newModifyVolumeManager(), } } @@ -508,11 +510,32 @@ func (d *controllerService) ControllerExpandVolume(ctx context.Context, req *csi return nil, status.Error(codes.InvalidArgument, "After round-up, volume size exceeds the limit specified") } - actualSizeGiB, err := d.cloud.ResizeDisk(ctx, volumeID, newSize) + modifyVolumeRequest := modifyVolumeRequest{ + newSize: newSize, + } + + err := d.addModifyVolumeRequest(ctx, volumeID, &modifyVolumeRequest) if err != nil { return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err) } + var actualSizeGiB int64 + for { + select { + case err := <-d.modifyVolumeManager.requestHandlerMap[volumeID].errChan: + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err) + } + break + case size := <-d.modifyVolumeManager.requestHandlerMap[volumeID].resultChan: + actualSizeGiB = size + break + default: + } + } + + d.modifyVolumeManager.deleteRequestHandler(volumeID) + nodeExpansionRequired := true // if this is a raw block device, no expansion should be necessary on the node cap := req.GetVolumeCapability() diff --git a/pkg/driver/controller_modify_volume.go b/pkg/driver/controller_modify_volume.go index ba2db7f561..74514f0579 100644 --- a/pkg/driver/controller_modify_volume.go +++ b/pkg/driver/controller_modify_volume.go @@ -2,7 +2,10 @@ package driver import ( "context" + "fmt" "strconv" + "sync" + "time" "github.com/awslabs/volume-modifier-for-k8s/pkg/rpc" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" @@ -17,8 +20,148 @@ const ( ModificationKeyIOPS = "iops" ModificationKeyThroughput = "throughput" + + modifyVolumeRequestHandlerTimeout = 5 * time.Second ) +type modifyVolumeRequest struct { + newSize int64 + modifyDiskOptions cloud.ModifyDiskOptions +} + +type modifyVolumeRequestHandler struct { + volumeID string + // Channel for sending requests to the Timer thread for the volume + requestChan chan *modifyVolumeRequest + // Channel for sending errors of ec2 ModifyVolume API call + errChan chan error + // Channel for sending the actual volume size after the ec2 ModifyVolume API call + resultChan chan int64 + // Merged request from the requests that have been accepted for the volume + mergedRequest *modifyVolumeRequest + mux sync.Mutex +} + +type modifyVolumeManager struct { + // Map of volume ID to modifyVolumeRequestHandler + requestHandlerMap map[string]*modifyVolumeRequestHandler + mux *sync.RWMutex +} + +func newModifyVolumeManager() *modifyVolumeManager { + return &modifyVolumeManager{ + requestHandlerMap: make(map[string]*modifyVolumeRequestHandler), + mux: &sync.RWMutex{}, + } +} + +func (m *modifyVolumeManager) deleteRequestHandler(volumeID string) { + m.mux.Lock() + defer m.mux.Unlock() + delete(m.requestHandlerMap, volumeID) +} + +// This function validates the new request against the merged request for the volume. +// If any of the volume properties of the merged request overlaps with the volume properties of the new request, this function +// will return an error and the new request will be rejected. +// If none of the volume properties of the merged request overlaps with the properties of the new request, this function +// won't return an error and the request will be accepted for further processing. +func (h *modifyVolumeRequestHandler) validateModifyVolumeRequest(r *modifyVolumeRequest) error { + h.mux.Lock() + defer h.mux.Unlock() + if r.newSize != 0 && h.mergedRequest.newSize != 0 { + return fmt.Errorf("There's already pending request for resizing the volume. Rejecting the new resizing request") + } + if r.modifyDiskOptions.IOPS != 0 && h.mergedRequest.modifyDiskOptions.IOPS != 0 { + return fmt.Errorf("IOPS was already requested by a previous request.") + } + if r.modifyDiskOptions.Throughput != 0 && h.mergedRequest.modifyDiskOptions.Throughput != 0 { + return fmt.Errorf("Throughput was already requested by a previous request.") + } + if r.modifyDiskOptions.VolumeType != "" && h.mergedRequest.modifyDiskOptions.VolumeType != "" { + return fmt.Errorf("Volume type was already requested by a previous request.") + } + return nil +} + +func (h *modifyVolumeRequestHandler) mergeModifyVolumeRequest(r *modifyVolumeRequest) { + h.mux.Lock() + defer h.mux.Unlock() + if r.newSize != 0 { + h.mergedRequest.newSize = r.newSize + } + if r.modifyDiskOptions.IOPS != 0 { + h.mergedRequest.modifyDiskOptions.IOPS = r.modifyDiskOptions.IOPS + } + if r.modifyDiskOptions.Throughput != 0 { + h.mergedRequest.modifyDiskOptions.Throughput = r.modifyDiskOptions.Throughput + } + if r.modifyDiskOptions.VolumeType != "" { + h.mergedRequest.modifyDiskOptions.VolumeType = r.modifyDiskOptions.VolumeType + } +} + +func (d *controllerService) addModifyVolumeRequest(ctx context.Context, volumeID string, r *modifyVolumeRequest) error { + d.modifyVolumeManager.mux.RLock() + requestHandler, ok := d.modifyVolumeManager.requestHandlerMap[volumeID] + d.modifyVolumeManager.mux.RUnlock() + if ok { + err := requestHandler.validateModifyVolumeRequest(r) + if err == nil { + requestHandler.requestChan <- r + } else { + return err + } + } else { + d.modifyVolumeManager.mux.Lock() + defer d.modifyVolumeManager.mux.Unlock() + + requestChan := make(chan *modifyVolumeRequest) + errChan := make(chan error) + resultChan := make(chan int64) + requestHandler := modifyVolumeRequestHandler{ + requestChan: requestChan, + errChan: errChan, + resultChan: resultChan, + mergedRequest: r, + volumeID: volumeID, + mux: sync.Mutex{}, + } + d.modifyVolumeManager.requestHandlerMap[volumeID] = &requestHandler + go d.startVolumeTimer(ctx, &requestHandler) + } + return nil +} + +func (d *controllerService) startVolumeTimer(ctx context.Context, h *modifyVolumeRequestHandler) { + for { + select { + case req := <-h.requestChan: + h.mergeModifyVolumeRequest(req) + case <-time.After(modifyVolumeRequestHandlerTimeout): + actualSizeGiB, err := d.executeModifyVolumeRequest(ctx, h.volumeID, h.mergedRequest) + if err != nil { + h.errChan <- err + } else { + h.resultChan <- actualSizeGiB + } + return + case <-ctx.Done(): + return + default: + } + } +} + +func (d *controllerService) executeModifyVolumeRequest(ctx context.Context, volumeID string, req *modifyVolumeRequest) (int64, error) { + actualSizeGiB, err := d.cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions) + if err != nil { + return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err) + } else { + return actualSizeGiB, nil + } +} + func (d *controllerService) GetCSIDriverModificationCapability( _ context.Context, _ *rpc.GetCSIDriverModificationCapabilityRequest, @@ -55,9 +198,26 @@ func (d *controllerService) ModifyVolumeProperties( modifyOptions.VolumeType = value } } - if err := d.cloud.ModifyDisk(ctx, name, &modifyOptions); err != nil { + modifyVolumeRequest := modifyVolumeRequest{ + modifyDiskOptions: modifyOptions, + } + if err := d.addModifyVolumeRequest(ctx, name, &modifyVolumeRequest); err != nil { return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err) } + for { + select { + case err := <-d.modifyVolumeManager.requestHandlerMap[name].errChan: + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err) + } + break + case <-d.modifyVolumeManager.requestHandlerMap[name].resultChan: + break + default: + } + } + d.modifyVolumeManager.deleteRequestHandler(name) + return &rpc.ModifyVolumePropertiesResponse{}, nil }