Skip to content

Commit 7b62669

Browse files
Consume aws-sdk-go-v2
1 parent c509bb2 commit 7b62669

16 files changed

+1319
-1058
lines changed

cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go

Lines changed: 51 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@ limitations under the License.
1717
package aws
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"reflect"
2223
"strings"
2324
"sync"
2425
"time"
2526

26-
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
27-
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
28-
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
27+
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/aws"
28+
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/service/autoscaling"
29+
autoscalingtypes "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/service/autoscaling/types"
30+
ec2types "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/service/ec2/types"
2931
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
3032
klog "k8s.io/klog/v2"
3133
)
@@ -41,7 +43,7 @@ type asgCache struct {
4143
asgToInstances map[AwsRef][]AwsInstanceRef
4244
instanceToAsg map[AwsInstanceRef]*asg
4345
instanceStatus map[AwsInstanceRef]*string
44-
instanceLifecycle map[AwsInstanceRef]*string
46+
instanceLifecycle map[AwsInstanceRef]autoscalingtypes.LifecycleState
4547
asgInstanceTypeCache *instanceTypeExpirationStore
4648
mutex sync.Mutex
4749
awsService *awsWrapper
@@ -60,8 +62,8 @@ type launchTemplate struct {
6062
type mixedInstancesPolicy struct {
6163
launchTemplate *launchTemplate
6264
instanceTypesOverrides []string
63-
instanceRequirementsOverrides *autoscaling.InstanceRequirements
64-
instanceRequirements *ec2.InstanceRequirements
65+
instanceRequirementsOverrides *autoscalingtypes.InstanceRequirements
66+
instanceRequirements *ec2types.InstanceRequirements
6567
}
6668

6769
type asg struct {
@@ -76,7 +78,7 @@ type asg struct {
7678
LaunchConfigurationName string
7779
LaunchTemplate *launchTemplate
7880
MixedInstancesPolicy *mixedInstancesPolicy
79-
Tags []*autoscaling.TagDescription
81+
Tags []autoscalingtypes.TagDescription
8082
}
8183

8284
func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) {
@@ -86,7 +88,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp
8688
asgToInstances: make(map[AwsRef][]AwsInstanceRef),
8789
instanceToAsg: make(map[AwsInstanceRef]*asg),
8890
instanceStatus: make(map[AwsInstanceRef]*string),
89-
instanceLifecycle: make(map[AwsInstanceRef]*string),
91+
instanceLifecycle: make(map[AwsInstanceRef]autoscalingtypes.LifecycleState),
9092
asgInstanceTypeCache: newAsgInstanceTypeCache(awsService),
9193
interrupt: make(chan struct{}),
9294
asgAutoDiscoverySpecs: autoDiscoverySpecs,
@@ -243,12 +245,12 @@ func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) {
243245
return nil, fmt.Errorf("could not find instance %v", ref)
244246
}
245247

246-
func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (*string, error) {
248+
func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (autoscalingtypes.LifecycleState, error) {
247249
if lifecycle, found := m.instanceLifecycle[ref]; found {
248250
return lifecycle, nil
249251
}
250252

251-
return nil, fmt.Errorf("could not find instance %v", ref)
253+
return "", fmt.Errorf("could not find instance %v", ref)
252254
}
253255

254256
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
@@ -261,12 +263,12 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error {
261263
func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
262264
params := &autoscaling.SetDesiredCapacityInput{
263265
AutoScalingGroupName: aws.String(asg.Name),
264-
DesiredCapacity: aws.Int64(int64(size)),
266+
DesiredCapacity: aws.Int32(int32(size)),
265267
HonorCooldown: aws.Bool(false),
266268
}
267269
klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
268270
start := time.Now()
269-
_, err := m.awsService.SetDesiredCapacity(params)
271+
_, err := m.awsService.SetDesiredCapacity(context.Background(), params)
270272
observeAWSRequest("SetDesiredCapacity", err, start)
271273
if err != nil {
272274
return err
@@ -356,12 +358,11 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
356358
return err
357359
}
358360

359-
if lifecycle != nil &&
360-
*lifecycle == autoscaling.LifecycleStateTerminated ||
361-
*lifecycle == autoscaling.LifecycleStateTerminating ||
362-
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
363-
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
364-
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
361+
if lifecycle == autoscalingtypes.LifecycleStateTerminated ||
362+
lifecycle == autoscalingtypes.LifecycleStateTerminating ||
363+
lifecycle == autoscalingtypes.LifecycleStateTerminatingWait ||
364+
lifecycle == autoscalingtypes.LifecycleStateTerminatingProceed {
365+
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, lifecycle)
365366
continue
366367
}
367368

@@ -370,12 +371,12 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
370371
ShouldDecrementDesiredCapacity: aws.Bool(true),
371372
}
372373
start := time.Now()
373-
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
374+
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(context.Background(), params)
374375
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
375376
if err != nil {
376377
return err
377378
}
378-
klog.V(4).Infof(*resp.Activity.Description)
379+
klog.V(4).Infof("Terminated instance %s in autoscaling group: %s", instance.Name, aws.ToString(resp.Activity.Description))
379380

380381
// Proactively decrement the size so autoscaler makes better decisions
381382
commonAsg.curSize--
@@ -421,7 +422,7 @@ func (m *asgCache) regenerate() error {
421422
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
422423
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
423424
newInstanceStatusMap := make(map[AwsInstanceRef]*string)
424-
newInstanceLifecycleMap := make(map[AwsInstanceRef]*string)
425+
newInstanceLifecycleMap := make(map[AwsInstanceRef]autoscalingtypes.LifecycleState)
425426

426427
// Fetch details of all ASGs
427428
refreshNames := m.buildAsgNames()
@@ -448,7 +449,7 @@ func (m *asgCache) regenerate() error {
448449
// Register or update ASGs
449450
exists := make(map[AwsRef]bool)
450451
for _, group := range groups {
451-
asg, err := m.buildAsgFromAWS(group)
452+
asg, err := m.buildAsgFromAWS(&group)
452453
if err != nil {
453454
return err
454455
}
@@ -497,19 +498,21 @@ func (m *asgCache) regenerate() error {
497498
return nil
498499
}
499500

500-
func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
501+
func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []autoscalingtypes.AutoScalingGroup) []autoscalingtypes.AutoScalingGroup {
502+
var updatedGroups []autoscalingtypes.AutoScalingGroup
501503
for _, g := range groups {
502504
desired := *g.DesiredCapacity
503-
realInstances := int64(len(g.Instances))
505+
realInstances := int32(len(g.Instances))
504506
if desired <= realInstances {
507+
updatedGroups = append(updatedGroups, g)
505508
continue
506509
}
507510

508511
klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+
509512
"Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired)
510513

511514
healthStatus := ""
512-
isAvailable, err := m.isNodeGroupAvailable(g)
515+
isAvailable, err := m.isNodeGroupAvailable(&g)
513516
if err != nil {
514517
klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err)
515518
} else if !isAvailable {
@@ -519,23 +522,24 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut
519522

520523
for i := realInstances; i < desired; i++ {
521524
id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i)
522-
g.Instances = append(g.Instances, &autoscaling.Instance{
525+
g.Instances = append(g.Instances, autoscalingtypes.Instance{
523526
InstanceId: &id,
524-
AvailabilityZone: g.AvailabilityZones[0],
527+
AvailabilityZone: aws.String(g.AvailabilityZones[0]),
525528
HealthStatus: &healthStatus,
526529
})
527530
}
531+
updatedGroups = append(updatedGroups, g)
528532
}
529-
return groups
533+
return updatedGroups
530534
}
531535

532-
func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) {
536+
func (m *asgCache) isNodeGroupAvailable(group *autoscalingtypes.AutoScalingGroup) (bool, error) {
533537
input := &autoscaling.DescribeScalingActivitiesInput{
534538
AutoScalingGroupName: group.AutoScalingGroupName,
535539
}
536540

537541
start := time.Now()
538-
response, err := m.awsService.DescribeScalingActivities(input)
542+
response, err := m.awsService.DescribeScalingActivities(context.Background(), input)
539543
observeAWSRequest("DescribeScalingActivities", err, start)
540544
if err != nil {
541545
return true, err // If we can't describe the scaling activities we assume the node group is available
@@ -547,8 +551,8 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error)
547551
lut := a.lastUpdateTime
548552
if activity.StartTime.Before(lut) {
549553
break
550-
} else if *activity.StatusCode == "Failed" {
551-
klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity)
554+
} else if activity.StatusCode == autoscalingtypes.ScalingActivityStatusCodeFailed {
555+
klog.Warningf("ASG %s scaling failed with description: %s", asgRef.Name, aws.ToString(activity.Description))
552556
return false, nil
553557
}
554558
} else {
@@ -558,11 +562,11 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error)
558562
return true, nil
559563
}
560564

561-
func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
565+
func (m *asgCache) buildAsgFromAWS(g *autoscalingtypes.AutoScalingGroup) (*asg, error) {
562566
spec := dynamic.NodeGroupSpec{
563-
Name: aws.StringValue(g.AutoScalingGroupName),
564-
MinSize: int(aws.Int64Value(g.MinSize)),
565-
MaxSize: int(aws.Int64Value(g.MaxSize)),
567+
Name: aws.ToString(g.AutoScalingGroupName),
568+
MinSize: int(aws.ToInt32(g.MinSize)),
569+
MaxSize: int(aws.ToInt32(g.MaxSize)),
566570
SupportScaleToZero: scaleToZeroSupported,
567571
}
568572

@@ -575,9 +579,9 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
575579
minSize: spec.MinSize,
576580
maxSize: spec.MaxSize,
577581

578-
curSize: int(aws.Int64Value(g.DesiredCapacity)),
579-
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
580-
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
582+
curSize: int(aws.ToInt32(g.DesiredCapacity)),
583+
AvailabilityZones: g.AvailabilityZones,
584+
LaunchConfigurationName: aws.ToString(g.LaunchConfigurationName),
581585
Tags: g.Tags,
582586
}
583587

@@ -586,8 +590,8 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
586590
}
587591

588592
if g.MixedInstancesPolicy != nil {
589-
getInstanceTypes := func(overrides []*autoscaling.LaunchTemplateOverrides) []string {
590-
res := []string{}
593+
getInstanceTypes := func(overrides []autoscalingtypes.LaunchTemplateOverrides) []string {
594+
var res []string
591595
for _, override := range overrides {
592596
if override.InstanceType != nil {
593597
res = append(res, *override.InstanceType)
@@ -596,7 +600,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
596600
return res
597601
}
598602

599-
getInstanceTypeRequirements := func(overrides []*autoscaling.LaunchTemplateOverrides) *autoscaling.InstanceRequirements {
603+
getInstanceTypeRequirements := func(overrides []autoscalingtypes.LaunchTemplateOverrides) *autoscalingtypes.InstanceRequirements {
600604
if len(overrides) == 1 && overrides[0].InstanceRequirements != nil {
601605
return overrides[0].InstanceRequirements
602606
}
@@ -625,8 +629,8 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
625629
return asg, nil
626630
}
627631

628-
func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixedInstancesPolicy) (*ec2.InstanceRequirements, error) {
629-
instanceRequirements := &ec2.InstanceRequirements{}
632+
func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixedInstancesPolicy) (*ec2types.InstanceRequirements, error) {
633+
instanceRequirements := &ec2types.InstanceRequirements{}
630634
if policy.instanceRequirementsOverrides != nil {
631635
var err error
632636
instanceRequirements, err = m.awsService.getEC2RequirementsFromAutoscaling(policy.instanceRequirementsOverrides)
@@ -646,11 +650,11 @@ func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixed
646650
return instanceRequirements, nil
647651
}
648652

649-
func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsInstanceRef {
650-
providerID := fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.AvailabilityZone), aws.StringValue(instance.InstanceId))
653+
func (m *asgCache) buildInstanceRefFromAWS(instance autoscalingtypes.Instance) AwsInstanceRef {
654+
providerID := fmt.Sprintf("aws:///%s/%s", aws.ToString(instance.AvailabilityZone), aws.ToString(instance.InstanceId))
651655
return AwsInstanceRef{
652656
ProviderID: providerID,
653-
Name: aws.StringValue(instance.InstanceId),
657+
Name: aws.ToString(instance.InstanceId),
654658
}
655659
}
656660

0 commit comments

Comments
 (0)