Skip to content

Commit

Permalink
WIP Add code path to launch instances using AWS Fleet API
Browse files Browse the repository at this point in the history
  • Loading branch information
adammw committed May 24, 2024
1 parent cf606a1 commit da087e7
Show file tree
Hide file tree
Showing 6 changed files with 260 additions and 19 deletions.
22 changes: 22 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,20 @@ type launchTemplate struct {

type mixedInstancesPolicy struct {
launchTemplate *launchTemplate
launchTemplateOverrides []*autoscaling.LaunchTemplateOverrides
instanceTypesOverrides []string
instanceRequirementsOverrides *autoscaling.InstanceRequirements
instanceRequirements *ec2.InstanceRequirements
}

type instancesDistribution struct {
onDemandAllocationStrategy string
onDemandBaseCapacity int
onDemandPercentageAboveBaseCapacity int
spotAllocationStrategy string
spotMaxPrice string
}

type asg struct {
AwsRef

Expand All @@ -73,9 +82,11 @@ type asg struct {
lastUpdateTime time.Time

AvailabilityZones []string
SubnetIDs []string
LaunchConfigurationName string
LaunchTemplate *launchTemplate
MixedInstancesPolicy *mixedInstancesPolicy
InstancesDistribution *instancesDistribution
Tags []*autoscaling.TagDescription
}

Expand Down Expand Up @@ -545,6 +556,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {

curSize: int(aws.Int64Value(g.DesiredCapacity)),
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
SubnetIDs: strings.Split(aws.StringValue(g.VPCZoneIdentifier), ","),
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
Tags: g.Tags,
}
Expand Down Expand Up @@ -573,6 +585,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {

asg.MixedInstancesPolicy = &mixedInstancesPolicy{
launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification),
launchTemplateOverrides: g.MixedInstancesPolicy.LaunchTemplate.Overrides,
instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
instanceRequirementsOverrides: getInstanceTypeRequirements(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
}
Expand All @@ -586,6 +599,15 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
if len(asg.MixedInstancesPolicy.instanceTypesOverrides) != 0 && asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil {
return nil, fmt.Errorf("invalid setup of both instance type and instance requirements overrides configured")
}

asg.InstancesDistribution = &instancesDistribution{
onDemandAllocationStrategy: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.OnDemandAllocationStrategy),
onDemandBaseCapacity: int(aws.Int64Value(g.MixedInstancesPolicy.InstancesDistribution.OnDemandBaseCapacity)),
onDemandPercentageAboveBaseCapacity: int(aws.Int64Value(g.MixedInstancesPolicy.InstancesDistribution.OnDemandPercentageAboveBaseCapacity)),
spotAllocationStrategy: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.SpotAllocationStrategy),
// TODO: support SpotInstancePools?
spotMaxPrice: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.SpotMaxPrice),
}
}

return asg, nil
Expand Down
31 changes: 20 additions & 11 deletions cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ var (

// awsCloudProvider implements CloudProvider interface.
type awsCloudProvider struct {
awsManager *AwsManager
resourceLimiter *cloudprovider.ResourceLimiter
awsManager *AwsManager
resourceLimiter *cloudprovider.ResourceLimiter
useCreateFleetAndAttachAPI bool
}

// BuildAwsCloudProvider builds CloudProvider implementation for AWS.
func BuildAwsCloudProvider(awsManager *AwsManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) {
func BuildAwsCloudProvider(awsManager *AwsManager, resourceLimiter *cloudprovider.ResourceLimiter, useCreateFleetAndAttachAPI bool) (cloudprovider.CloudProvider, error) {
aws := &awsCloudProvider{
awsManager: awsManager,
resourceLimiter: resourceLimiter,
awsManager: awsManager,
resourceLimiter: resourceLimiter,
useCreateFleetAndAttachAPI: useCreateFleetAndAttachAPI,
}
return aws, nil
}
Expand Down Expand Up @@ -99,8 +101,9 @@ func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
ngs := make([]cloudprovider.NodeGroup, 0, len(asgs))
for _, asg := range asgs {
ngs = append(ngs, &AwsNodeGroup{
asg: asg,
awsManager: aws.awsManager,
asg: asg,
awsManager: aws.awsManager,
useCreateFleetAndAttachAPI: aws.useCreateFleetAndAttachAPI,
})
}

Expand Down Expand Up @@ -217,8 +220,9 @@ func AwsRefFromProviderId(id string) (*AwsInstanceRef, error) {

// AwsNodeGroup implements NodeGroup interface.
type AwsNodeGroup struct {
awsManager *AwsManager
asg *asg
awsManager *AwsManager
asg *asg
useCreateFleetAndAttachAPI bool
}

// MaxSize returns maximum size of the node group.
Expand Down Expand Up @@ -277,7 +281,12 @@ func (ng *AwsNodeGroup) IncreaseSize(delta int) error {
if size+delta > ng.asg.maxSize {
return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.asg.maxSize)
}
return ng.awsManager.SetAsgSize(ng.asg, size+delta)

if ng.useCreateFleetAndAttachAPI {
return ng.awsManager.LaunchAndAttach(ng.asg, delta)
} else {
return ng.awsManager.SetAsgSize(ng.asg, size+delta)
}
}

// AtomicIncreaseSize is not implemented.
Expand Down Expand Up @@ -460,7 +469,7 @@ func BuildAWS(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
klog.Fatalf("Failed to create AWS Manager: %v", err)
}

provider, err := BuildAwsCloudProvider(manager, rl)
provider, err := BuildAwsCloudProvider(manager, rl, opts.AWSUseCreateFleetAndAttachAPI)
if err != nil {
klog.Fatalf("Failed to create AWS cloud provider: %v", err)
}
Expand Down
202 changes: 202 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ package aws
import (
"errors"
"fmt"
"math"
"math/rand"
"regexp"
"strconv"
"strings"
"sync"
"time"

apiv1 "k8s.io/api/core/v1"
Expand All @@ -34,6 +36,7 @@ import (

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws/awserr"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/eks"
Expand All @@ -46,6 +49,7 @@ const (
operationPollInterval = 100 * time.Millisecond
maxRecordsReturnedByAPI = 100
maxAsgNamesPerDescribe = 100
maxAttachInstanceCount = 20
refreshInterval = 1 * time.Minute
autoDiscovererTypeASG = "asg"
asgAutoDiscovererKeyTag = "tag"
Expand Down Expand Up @@ -156,6 +160,204 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
return m.asgCache.SetAsgSize(asg, size)
}

// LaunchAndAttach launches a fleet of instances and attaches them to the ASG
func (m *AwsManager) LaunchAndAttach(asg *asg, size int) error {
// TODO: needs locking
// TODO: needs to inform asgCache to increment its size

spotCapacity, onDemandCapacity := m.calculateSpotCapacity(asg, size)
tags := m.getInstanceTags(asg)
launchTemplateConfigs, err := m.getFleetLaunchTemplateConfigs(asg)
if err != nil {
return fmt.Errorf("getting launch template configs, %w", err)
}

// Call Fleet API to immediately trigger EC2 instance launch
params := &ec2.CreateFleetInput{
Type: aws.String(ec2.FleetTypeInstant),
LaunchTemplateConfigs: launchTemplateConfigs,
TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{
OnDemandTargetCapacity: aws.Int64(int64(onDemandCapacity)),
SpotTargetCapacity: aws.Int64(int64(spotCapacity)),
TotalTargetCapacity: aws.Int64(int64(size)),
DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO: what should this default be, does it matter?
// TODO: support attribute-based instance type capacity selection
},
TagSpecifications: []*ec2.TagSpecification{
{ResourceType: aws.String(ec2.ResourceTypeInstance), Tags: tags},
{ResourceType: aws.String(ec2.ResourceTypeVolume), Tags: tags},
{ResourceType: aws.String(ec2.ResourceTypeFleet), Tags: tags},
},
SpotOptions: &ec2.SpotOptionsRequest{
AllocationStrategy: aws.String(asg.InstancesDistribution.spotAllocationStrategy),
},
OnDemandOptions: &ec2.OnDemandOptionsRequest{
AllocationStrategy: aws.String(asg.InstancesDistribution.onDemandAllocationStrategy),
},
}
fleetOutput, err := m.awsService.CreateFleet(params)
if err != nil {
return fmt.Errorf("creating fleet, %w", err)
}

// extract created instance IDs
var instanceIDs []*string
for _, instance := range fleetOutput.Instances {
instanceIDs = append(instanceIDs, instance.InstanceIds...)
}

// Attach the instances to the ASG in groups of 20
var wg sync.WaitGroup
var attachErrs []error
for i := 0; i < len(instanceIDs); i += maxAttachInstanceCount {
end := i + maxAttachInstanceCount
if end > len(instanceIDs) {
end = len(instanceIDs)
}
wg.Add(1)

go func(instanceIDs []*string) {
defer wg.Done()

params := &autoscaling.AttachInstancesInput{
InstanceIds: instanceIDs,
AutoScalingGroupName: aws.String(asg.Name),
}

// TODO: add a timeout to this loop
for {
_, err := m.awsService.AttachInstances(params)
if err != nil {
// retry on pending instances ValidationError
var aerr awserr.Error
if errors.As(err, &aerr) && aerr.Code() == "ValidationError" && strings.Contains(aerr.Message(), "pending") {
time.Sleep(operationPollInterval)
continue
}

// otherwise add to attachErrs which get raised at the end
attachErrs = append(attachErrs, err)
}
break
}

}(instanceIDs[i:end])
}
wg.Wait()

// Return any errors that occurred during instance attachment
// TODO: terminate instances that failed to attach and/or fail back to ASG SetDesiredCapacity
return fmt.Errorf("attaching instances to ASG %q: %+v", asg.Name, attachErrs)

// Calculate how many instances failed to launch, fallback to incrementing ASG's SetDesiredCapacity
failedLaunchCount := len(fleetOutput.Errors) - size
if failedLaunchCount > 0 {
klog.Warningf("failed to launch %d instances for %s via CreateFleet call - falling back to SetDesiredCapacity: %+v",
failedLaunchCount, asg.Name, fleetOutput.Errors)
return m.SetAsgSize(asg, asg.curSize+failedLaunchCount)
}

return nil
}

func (m *AwsManager) getInstanceTags(asg *asg) []*ec2.Tag {
tags := make([]*ec2.Tag, 0, len(asg.Tags))
for i := range asg.Tags {
if asg.Tags[i].PropagateAtLaunch != nil && *asg.Tags[i].PropagateAtLaunch {
key := asg.Tags[i].Key
if str := aws.StringValue(key); strings.HasPrefix(str, "aws:") {
key = aws.String(fmt.Sprintf("reserved:%s", str))
}
tags = append(tags, &ec2.Tag{Key: key, Value: asg.Tags[i].Value})
}
}
return tags
}

func (m *AwsManager) calculateSpotCapacity(asg *asg, size int) (spotCapacity int, onDemandCapacity int) {
for size > 0 {
if asg.curSize < asg.InstancesDistribution.onDemandBaseCapacity {
onDemandCapacity++
size--
} else {
// TODO: should this consider the current ratio of spot/on-demand instances?
onDemand := int(math.Floor(float64(size) * float64(asg.InstancesDistribution.onDemandPercentageAboveBaseCapacity) / 100))
onDemandCapacity += onDemand
spotCapacity += size - onDemand
size = 0
}
}
return
}

func (m *AwsManager) getFleetLaunchTemplateConfigs(asg *asg) ([]*ec2.FleetLaunchTemplateConfigRequest, error) {
var launchTemplateConfigs []*ec2.FleetLaunchTemplateConfigRequest

subnetIDOverrides := make([]*ec2.FleetLaunchTemplateOverridesRequest, len(asg.SubnetIDs))
for i, subnetID := range asg.SubnetIDs {
subnetIDOverrides[i] = &ec2.FleetLaunchTemplateOverridesRequest{
SubnetId: aws.String(subnetID),
}
}

var defaultLaunchTemplateSpecification *ec2.FleetLaunchTemplateSpecificationRequest = nil
if asg.LaunchTemplate != nil {
defaultLaunchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: aws.String(asg.LaunchTemplate.name),
Version: aws.String(asg.LaunchTemplate.version),
}
}

if asg.MixedInstancesPolicy != nil {
defaultLaunchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: aws.String(asg.MixedInstancesPolicy.launchTemplate.name),
Version: aws.String(asg.MixedInstancesPolicy.launchTemplate.version),
}

for i := range asg.MixedInstancesPolicy.launchTemplateOverrides {
lto := asg.MixedInstancesPolicy.launchTemplateOverrides[i]
launchTemplateSpecification := defaultLaunchTemplateSpecification
if lto.LaunchTemplateSpecification != nil {
launchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{
LaunchTemplateName: lto.LaunchTemplateSpecification.LaunchTemplateName,
Version: lto.LaunchTemplateSpecification.Version,
}
}

overrides := make([]*ec2.FleetLaunchTemplateOverridesRequest, len(subnetIDOverrides))
for i := range subnetIDOverrides {
overrides[i] = &ec2.FleetLaunchTemplateOverridesRequest{
SubnetId: subnetIDOverrides[i].SubnetId,

InstanceType: lto.InstanceType,
// TODO: support weighted capacity and instance requirements
}
if asg.InstancesDistribution.spotMaxPrice != "" {
overrides[i].MaxPrice = aws.String(asg.InstancesDistribution.spotMaxPrice)
}
}

launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{
LaunchTemplateSpecification: launchTemplateSpecification,
Overrides: overrides,
})
}
}

if len(launchTemplateConfigs) == 0 {
if defaultLaunchTemplateSpecification == nil {
return nil, fmt.Errorf("cannot find LaunchTemplate for ASG %q", asg.Name)
}

launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{
LaunchTemplateSpecification: defaultLaunchTemplateSpecification,
Overrides: subnetIDOverrides,
})
}

return launchTemplateConfigs, nil
}

// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
func (m *AwsManager) DeleteInstances(instances []*AwsInstanceRef) error {
if err := m.asgCache.DeleteInstances(instances); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

// autoScalingI is the interface abstracting specific API calls of the auto-scaling service provided by AWS SDK for use in CA
type autoScalingI interface {
AttachInstances(input *autoscaling.AttachInstancesInput) (*autoscaling.AttachInstancesOutput, error)
DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error
DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error)
DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error)
Expand All @@ -40,6 +41,7 @@ type autoScalingI interface {

// ec2I is the interface abstracting specific API calls of the EC2 service provided by AWS SDK for use in CA
type ec2I interface {
CreateFleet(input *ec2.CreateFleetInput) (*ec2.CreateFleetOutput, error)
DescribeImages(input *ec2.DescribeImagesInput) (*ec2.DescribeImagesOutput, error)
DescribeLaunchTemplateVersions(input *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error)
GetInstanceTypesFromInstanceRequirementsPages(input *ec2.GetInstanceTypesFromInstanceRequirementsInput, fn func(*ec2.GetInstanceTypesFromInstanceRequirementsOutput, bool) bool) error
Expand Down
3 changes: 3 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ type AutoscalingOptions struct {
BalancingLabels []string
// AWSUseStaticInstanceList tells if AWS cloud provider use static instance type list or dynamically fetch from remote APIs.
AWSUseStaticInstanceList bool
// AWSUseCreateFleetAndAttachAPI tells the AWS cloud provider to increase the size of ASGs by launching instances directly
// via the CreateFleet API and attach them to the ASG, instead of increasing the capacity to increase the speed of scale up.
AWSUseCreateFleetAndAttachAPI bool
// GCEOptions contain autoscaling options specific to GCE cloud provider.
GCEOptions GCEOptions
// KubeClientOpts specify options for kube client
Expand Down
Loading

0 comments on commit da087e7

Please sign in to comment.