From da087e7ff66abb668a97d083ae03e07955b97d63 Mon Sep 17 00:00:00 2001 From: Adam Malcontenti-Wilson Date: Fri, 24 Nov 2023 17:28:45 +1100 Subject: [PATCH] WIP Add code path to launch instances using AWS Fleet API --- .../cloudprovider/aws/auto_scaling_groups.go | 22 ++ .../cloudprovider/aws/aws_cloud_provider.go | 31 ++- .../cloudprovider/aws/aws_manager.go | 202 ++++++++++++++++++ .../cloudprovider/aws/aws_wrapper.go | 2 + .../config/autoscaling_options.go | 3 + cluster-autoscaler/main.go | 19 +- 6 files changed, 260 insertions(+), 19 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 4667e285b153..307e5fba2124 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -59,11 +59,20 @@ type launchTemplate struct { type mixedInstancesPolicy struct { launchTemplate *launchTemplate + launchTemplateOverrides []*autoscaling.LaunchTemplateOverrides instanceTypesOverrides []string instanceRequirementsOverrides *autoscaling.InstanceRequirements instanceRequirements *ec2.InstanceRequirements } +type instancesDistribution struct { + onDemandAllocationStrategy string + onDemandBaseCapacity int + onDemandPercentageAboveBaseCapacity int + spotAllocationStrategy string + spotMaxPrice string +} + type asg struct { AwsRef @@ -73,9 +82,11 @@ type asg struct { lastUpdateTime time.Time AvailabilityZones []string + SubnetIDs []string LaunchConfigurationName string LaunchTemplate *launchTemplate MixedInstancesPolicy *mixedInstancesPolicy + InstancesDistribution *instancesDistribution Tags []*autoscaling.TagDescription } @@ -545,6 +556,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { curSize: int(aws.Int64Value(g.DesiredCapacity)), AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones), + SubnetIDs: strings.Split(aws.StringValue(g.VPCZoneIdentifier), ","), LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName), Tags: g.Tags, } @@ -573,6 +585,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { asg.MixedInstancesPolicy = &mixedInstancesPolicy{ launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification), + launchTemplateOverrides: g.MixedInstancesPolicy.LaunchTemplate.Overrides, instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides), instanceRequirementsOverrides: getInstanceTypeRequirements(g.MixedInstancesPolicy.LaunchTemplate.Overrides), } @@ -586,6 +599,15 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) { if len(asg.MixedInstancesPolicy.instanceTypesOverrides) != 0 && asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil { return nil, fmt.Errorf("invalid setup of both instance type and instance requirements overrides configured") } + + asg.InstancesDistribution = &instancesDistribution{ + onDemandAllocationStrategy: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.OnDemandAllocationStrategy), + onDemandBaseCapacity: int(aws.Int64Value(g.MixedInstancesPolicy.InstancesDistribution.OnDemandBaseCapacity)), + onDemandPercentageAboveBaseCapacity: int(aws.Int64Value(g.MixedInstancesPolicy.InstancesDistribution.OnDemandPercentageAboveBaseCapacity)), + spotAllocationStrategy: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.SpotAllocationStrategy), + // TODO: support SpotInstancePools? + spotMaxPrice: aws.StringValue(g.MixedInstancesPolicy.InstancesDistribution.SpotMaxPrice), + } } return asg, nil diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go index f515530b7ccb..d2eb7582003e 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider.go @@ -53,15 +53,17 @@ var ( // awsCloudProvider implements CloudProvider interface. type awsCloudProvider struct { - awsManager *AwsManager - resourceLimiter *cloudprovider.ResourceLimiter + awsManager *AwsManager + resourceLimiter *cloudprovider.ResourceLimiter + useCreateFleetAndAttachAPI bool } // BuildAwsCloudProvider builds CloudProvider implementation for AWS. -func BuildAwsCloudProvider(awsManager *AwsManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) { +func BuildAwsCloudProvider(awsManager *AwsManager, resourceLimiter *cloudprovider.ResourceLimiter, useCreateFleetAndAttachAPI bool) (cloudprovider.CloudProvider, error) { aws := &awsCloudProvider{ - awsManager: awsManager, - resourceLimiter: resourceLimiter, + awsManager: awsManager, + resourceLimiter: resourceLimiter, + useCreateFleetAndAttachAPI: useCreateFleetAndAttachAPI, } return aws, nil } @@ -99,8 +101,9 @@ func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup { ngs := make([]cloudprovider.NodeGroup, 0, len(asgs)) for _, asg := range asgs { ngs = append(ngs, &AwsNodeGroup{ - asg: asg, - awsManager: aws.awsManager, + asg: asg, + awsManager: aws.awsManager, + useCreateFleetAndAttachAPI: aws.useCreateFleetAndAttachAPI, }) } @@ -217,8 +220,9 @@ func AwsRefFromProviderId(id string) (*AwsInstanceRef, error) { // AwsNodeGroup implements NodeGroup interface. type AwsNodeGroup struct { - awsManager *AwsManager - asg *asg + awsManager *AwsManager + asg *asg + useCreateFleetAndAttachAPI bool } // MaxSize returns maximum size of the node group. @@ -277,7 +281,12 @@ func (ng *AwsNodeGroup) IncreaseSize(delta int) error { if size+delta > ng.asg.maxSize { return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.asg.maxSize) } - return ng.awsManager.SetAsgSize(ng.asg, size+delta) + + if ng.useCreateFleetAndAttachAPI { + return ng.awsManager.LaunchAndAttach(ng.asg, delta) + } else { + return ng.awsManager.SetAsgSize(ng.asg, size+delta) + } } // AtomicIncreaseSize is not implemented. @@ -460,7 +469,7 @@ func BuildAWS(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover klog.Fatalf("Failed to create AWS Manager: %v", err) } - provider, err := BuildAwsCloudProvider(manager, rl) + provider, err := BuildAwsCloudProvider(manager, rl, opts.AWSUseCreateFleetAndAttachAPI) if err != nil { klog.Fatalf("Failed to create AWS cloud provider: %v", err) } diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index 6a649a5860a3..28415b6978fe 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -21,10 +21,12 @@ package aws import ( "errors" "fmt" + "math" "math/rand" "regexp" "strconv" "strings" + "sync" "time" apiv1 "k8s.io/api/core/v1" @@ -34,6 +36,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws/awserr" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/eks" @@ -46,6 +49,7 @@ const ( operationPollInterval = 100 * time.Millisecond maxRecordsReturnedByAPI = 100 maxAsgNamesPerDescribe = 100 + maxAttachInstanceCount = 20 refreshInterval = 1 * time.Minute autoDiscovererTypeASG = "asg" asgAutoDiscovererKeyTag = "tag" @@ -156,6 +160,204 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error { return m.asgCache.SetAsgSize(asg, size) } +// LaunchAndAttach launches a fleet of instances and attaches them to the ASG +func (m *AwsManager) LaunchAndAttach(asg *asg, size int) error { + // TODO: needs locking + // TODO: needs to inform asgCache to increment its size + + spotCapacity, onDemandCapacity := m.calculateSpotCapacity(asg, size) + tags := m.getInstanceTags(asg) + launchTemplateConfigs, err := m.getFleetLaunchTemplateConfigs(asg) + if err != nil { + return fmt.Errorf("getting launch template configs, %w", err) + } + + // Call Fleet API to immediately trigger EC2 instance launch + params := &ec2.CreateFleetInput{ + Type: aws.String(ec2.FleetTypeInstant), + LaunchTemplateConfigs: launchTemplateConfigs, + TargetCapacitySpecification: &ec2.TargetCapacitySpecificationRequest{ + OnDemandTargetCapacity: aws.Int64(int64(onDemandCapacity)), + SpotTargetCapacity: aws.Int64(int64(spotCapacity)), + TotalTargetCapacity: aws.Int64(int64(size)), + DefaultTargetCapacityType: aws.String(ec2.DefaultTargetCapacityTypeOnDemand), // TODO: what should this default be, does it matter? + // TODO: support attribute-based instance type capacity selection + }, + TagSpecifications: []*ec2.TagSpecification{ + {ResourceType: aws.String(ec2.ResourceTypeInstance), Tags: tags}, + {ResourceType: aws.String(ec2.ResourceTypeVolume), Tags: tags}, + {ResourceType: aws.String(ec2.ResourceTypeFleet), Tags: tags}, + }, + SpotOptions: &ec2.SpotOptionsRequest{ + AllocationStrategy: aws.String(asg.InstancesDistribution.spotAllocationStrategy), + }, + OnDemandOptions: &ec2.OnDemandOptionsRequest{ + AllocationStrategy: aws.String(asg.InstancesDistribution.onDemandAllocationStrategy), + }, + } + fleetOutput, err := m.awsService.CreateFleet(params) + if err != nil { + return fmt.Errorf("creating fleet, %w", err) + } + + // extract created instance IDs + var instanceIDs []*string + for _, instance := range fleetOutput.Instances { + instanceIDs = append(instanceIDs, instance.InstanceIds...) + } + + // Attach the instances to the ASG in groups of 20 + var wg sync.WaitGroup + var attachErrs []error + for i := 0; i < len(instanceIDs); i += maxAttachInstanceCount { + end := i + maxAttachInstanceCount + if end > len(instanceIDs) { + end = len(instanceIDs) + } + wg.Add(1) + + go func(instanceIDs []*string) { + defer wg.Done() + + params := &autoscaling.AttachInstancesInput{ + InstanceIds: instanceIDs, + AutoScalingGroupName: aws.String(asg.Name), + } + + // TODO: add a timeout to this loop + for { + _, err := m.awsService.AttachInstances(params) + if err != nil { + // retry on pending instances ValidationError + var aerr awserr.Error + if errors.As(err, &aerr) && aerr.Code() == "ValidationError" && strings.Contains(aerr.Message(), "pending") { + time.Sleep(operationPollInterval) + continue + } + + // otherwise add to attachErrs which get raised at the end + attachErrs = append(attachErrs, err) + } + break + } + + }(instanceIDs[i:end]) + } + wg.Wait() + + // Return any errors that occurred during instance attachment + // TODO: terminate instances that failed to attach and/or fail back to ASG SetDesiredCapacity + return fmt.Errorf("attaching instances to ASG %q: %+v", asg.Name, attachErrs) + + // Calculate how many instances failed to launch, fallback to incrementing ASG's SetDesiredCapacity + failedLaunchCount := len(fleetOutput.Errors) - size + if failedLaunchCount > 0 { + klog.Warningf("failed to launch %d instances for %s via CreateFleet call - falling back to SetDesiredCapacity: %+v", + failedLaunchCount, asg.Name, fleetOutput.Errors) + return m.SetAsgSize(asg, asg.curSize+failedLaunchCount) + } + + return nil +} + +func (m *AwsManager) getInstanceTags(asg *asg) []*ec2.Tag { + tags := make([]*ec2.Tag, 0, len(asg.Tags)) + for i := range asg.Tags { + if asg.Tags[i].PropagateAtLaunch != nil && *asg.Tags[i].PropagateAtLaunch { + key := asg.Tags[i].Key + if str := aws.StringValue(key); strings.HasPrefix(str, "aws:") { + key = aws.String(fmt.Sprintf("reserved:%s", str)) + } + tags = append(tags, &ec2.Tag{Key: key, Value: asg.Tags[i].Value}) + } + } + return tags +} + +func (m *AwsManager) calculateSpotCapacity(asg *asg, size int) (spotCapacity int, onDemandCapacity int) { + for size > 0 { + if asg.curSize < asg.InstancesDistribution.onDemandBaseCapacity { + onDemandCapacity++ + size-- + } else { + // TODO: should this consider the current ratio of spot/on-demand instances? + onDemand := int(math.Floor(float64(size) * float64(asg.InstancesDistribution.onDemandPercentageAboveBaseCapacity) / 100)) + onDemandCapacity += onDemand + spotCapacity += size - onDemand + size = 0 + } + } + return +} + +func (m *AwsManager) getFleetLaunchTemplateConfigs(asg *asg) ([]*ec2.FleetLaunchTemplateConfigRequest, error) { + var launchTemplateConfigs []*ec2.FleetLaunchTemplateConfigRequest + + subnetIDOverrides := make([]*ec2.FleetLaunchTemplateOverridesRequest, len(asg.SubnetIDs)) + for i, subnetID := range asg.SubnetIDs { + subnetIDOverrides[i] = &ec2.FleetLaunchTemplateOverridesRequest{ + SubnetId: aws.String(subnetID), + } + } + + var defaultLaunchTemplateSpecification *ec2.FleetLaunchTemplateSpecificationRequest = nil + if asg.LaunchTemplate != nil { + defaultLaunchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{ + LaunchTemplateName: aws.String(asg.LaunchTemplate.name), + Version: aws.String(asg.LaunchTemplate.version), + } + } + + if asg.MixedInstancesPolicy != nil { + defaultLaunchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{ + LaunchTemplateName: aws.String(asg.MixedInstancesPolicy.launchTemplate.name), + Version: aws.String(asg.MixedInstancesPolicy.launchTemplate.version), + } + + for i := range asg.MixedInstancesPolicy.launchTemplateOverrides { + lto := asg.MixedInstancesPolicy.launchTemplateOverrides[i] + launchTemplateSpecification := defaultLaunchTemplateSpecification + if lto.LaunchTemplateSpecification != nil { + launchTemplateSpecification = &ec2.FleetLaunchTemplateSpecificationRequest{ + LaunchTemplateName: lto.LaunchTemplateSpecification.LaunchTemplateName, + Version: lto.LaunchTemplateSpecification.Version, + } + } + + overrides := make([]*ec2.FleetLaunchTemplateOverridesRequest, len(subnetIDOverrides)) + for i := range subnetIDOverrides { + overrides[i] = &ec2.FleetLaunchTemplateOverridesRequest{ + SubnetId: subnetIDOverrides[i].SubnetId, + + InstanceType: lto.InstanceType, + // TODO: support weighted capacity and instance requirements + } + if asg.InstancesDistribution.spotMaxPrice != "" { + overrides[i].MaxPrice = aws.String(asg.InstancesDistribution.spotMaxPrice) + } + } + + launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{ + LaunchTemplateSpecification: launchTemplateSpecification, + Overrides: overrides, + }) + } + } + + if len(launchTemplateConfigs) == 0 { + if defaultLaunchTemplateSpecification == nil { + return nil, fmt.Errorf("cannot find LaunchTemplate for ASG %q", asg.Name) + } + + launchTemplateConfigs = append(launchTemplateConfigs, &ec2.FleetLaunchTemplateConfigRequest{ + LaunchTemplateSpecification: defaultLaunchTemplateSpecification, + Overrides: subnetIDOverrides, + }) + } + + return launchTemplateConfigs, nil +} + // DeleteInstances deletes the given instances. All instances must be controlled by the same ASG. func (m *AwsManager) DeleteInstances(instances []*AwsInstanceRef) error { if err := m.asgCache.DeleteInstances(instances); err != nil { diff --git a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go index b8c1f71b661b..9d98a13a8aef 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_wrapper.go @@ -31,6 +31,7 @@ import ( // autoScalingI is the interface abstracting specific API calls of the auto-scaling service provided by AWS SDK for use in CA type autoScalingI interface { + AttachInstances(input *autoscaling.AttachInstancesInput) (*autoscaling.AttachInstancesOutput, error) DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error DescribeLaunchConfigurations(*autoscaling.DescribeLaunchConfigurationsInput) (*autoscaling.DescribeLaunchConfigurationsOutput, error) DescribeScalingActivities(*autoscaling.DescribeScalingActivitiesInput) (*autoscaling.DescribeScalingActivitiesOutput, error) @@ -40,6 +41,7 @@ type autoScalingI interface { // ec2I is the interface abstracting specific API calls of the EC2 service provided by AWS SDK for use in CA type ec2I interface { + CreateFleet(input *ec2.CreateFleetInput) (*ec2.CreateFleetOutput, error) DescribeImages(input *ec2.DescribeImagesInput) (*ec2.DescribeImagesOutput, error) DescribeLaunchTemplateVersions(input *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error) GetInstanceTypesFromInstanceRequirementsPages(input *ec2.GetInstanceTypesFromInstanceRequirementsInput, fn func(*ec2.GetInstanceTypesFromInstanceRequirementsOutput, bool) bool) error diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index a18b1c58fbc1..3359151bbab9 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -231,6 +231,9 @@ type AutoscalingOptions struct { BalancingLabels []string // AWSUseStaticInstanceList tells if AWS cloud provider use static instance type list or dynamically fetch from remote APIs. AWSUseStaticInstanceList bool + // AWSUseCreateFleetAndAttachAPI tells the AWS cloud provider to increase the size of ASGs by launching instances directly + // via the CreateFleet API and attach them to the ASG, instead of increasing the capacity to increase the speed of scale up. + AWSUseCreateFleetAndAttachAPI bool // GCEOptions contain autoscaling options specific to GCE cloud provider. GCEOptions GCEOptions // KubeClientOpts specify options for kube client diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index c5ff21aaa4e7..69bd0f5d11a1 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -208,12 +208,13 @@ var ( regional = flag.Bool("regional", false, "Cluster is regional.") newPodScaleUpDelay = flag.Duration("new-pod-scale-up-delay", 0*time.Second, "Pods less than this old will not be considered for scale-up. Can be increased for individual pods through annotation 'cluster-autoscaler.kubernetes.io/pod-scale-up-delay'.") - ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)") - startupTaintsFlag = multiStringFlag("startup-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Equivalent to ignore-taint)") - statusTaintsFlag = multiStringFlag("status-taint", "Specifies a taint to ignore in node templates when considering to scale a node group but nodes will not be treated as unready") - balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar") - balancingLabelsFlag = multiStringFlag("balancing-label", "Specifies a label to use for comparing if two node groups are similar, rather than the built in heuristics. Setting this flag disables all other comparison logic, and cannot be combined with --balancing-ignore-label.") - awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only") + ignoreTaintsFlag = multiStringFlag("ignore-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Deprecated, use startup-taints instead)") + startupTaintsFlag = multiStringFlag("startup-taint", "Specifies a taint to ignore in node templates when considering to scale a node group (Equivalent to ignore-taint)") + statusTaintsFlag = multiStringFlag("status-taint", "Specifies a taint to ignore in node templates when considering to scale a node group but nodes will not be treated as unready") + balancingIgnoreLabelsFlag = multiStringFlag("balancing-ignore-label", "Specifies a label to ignore in addition to the basic and cloud-provider set of labels when comparing if two node groups are similar") + balancingLabelsFlag = multiStringFlag("balancing-label", "Specifies a label to use for comparing if two node groups are similar, rather than the built in heuristics. Setting this flag disables all other comparison logic, and cannot be combined with --balancing-ignore-label.") + awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only") + awsUseCreateFleetAndAttachAPI = flag.Bool("aws-use-create-fleet-and-attach-api", false, "Should CA increase the size of ASGs by launching instances directly via the CreateFleet API and attach them to the ASG. AWS only") // GCE specific flags concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.") @@ -398,8 +399,9 @@ func createAutoscalingOptions() config.AutoscalingOptions { KubeConfigPath: *kubeConfigFile, APIContentType: *kubeAPIContentType, }, - NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout, - AWSUseStaticInstanceList: *awsUseStaticInstanceList, + NodeDeletionDelayTimeout: *nodeDeletionDelayTimeout, + AWSUseStaticInstanceList: *awsUseStaticInstanceList, + AWSUseCreateFleetAndAttachAPI: *awsUseCreateFleetAndAttachAPI, GCEOptions: config.GCEOptions{ ConcurrentRefreshes: *concurrentGceRefreshes, MigInstancesMinRefreshWaitTime: *gceMigInstancesMinRefreshWaitTime, @@ -635,6 +637,7 @@ func main() { logsapi.AddFlags(loggingConfig, pflag.CommandLine) featureGate.AddFlag(pflag.CommandLine) + kube_flag.InitFlags() logs.InitLogs()