Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions cluster-autoscaler/cloudprovider/azure/azure_scale_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package azure

import (
"context"
"errors"
"fmt"
"math/rand"
"strings"
Expand All @@ -39,6 +41,7 @@ import (
var (
defaultVmssInstancesRefreshPeriod = 5 * time.Minute
vmssContextTimeout = 3 * time.Minute
syncContextTimeout = 1 * time.Minute
asyncContextTimeout = 30 * time.Minute
vmssSizeMutex sync.Mutex
)
Expand Down Expand Up @@ -268,10 +271,15 @@ func (scaleSet *ScaleSet) getScaleSetSize() (int64, error) {
}

// waitForCreateOrUpdate waits for the outcome of VMSS capacity update initiated via CreateOrUpdateAsync.
func (scaleSet *ScaleSet) waitForCreateOrUpdateInstances(future *azure.Future) {
func (scaleSet *ScaleSet) waitForCreateOrUpdateInstances(ctx context.Context, async bool, future *azure.Future) error {
var err error

defer func() {
// Don't invalidate caches in sync mode for context deadline exceeded error, expected to continue in async mode.
if !async && errors.Is(err, context.DeadlineExceeded) {
return
}

// Invalidate instanceCache on success and failure. Failure might have created a few instances, but it is very rare.
scaleSet.invalidateInstanceCache()
if err != nil {
Expand All @@ -282,19 +290,12 @@ func (scaleSet *ScaleSet) waitForCreateOrUpdateInstances(future *azure.Future) {
}
}()

ctx, cancel := getContextWithTimeout(asyncContextTimeout)
defer cancel()

klog.V(3).Infof("Calling virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(%s)", scaleSet.Name)
httpResponse, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.WaitForCreateOrUpdateResult(ctx, future, scaleSet.manager.config.ResourceGroup)

isSuccess, err := isSuccessHTTPResponse(httpResponse, err)
if isSuccess {
klog.V(3).Infof("waitForCreateOrUpdateInstances(%s) success", scaleSet.Name)
return
}
_, rerr := isSuccessHTTPResponse(httpResponse, err)

klog.Errorf("waitForCreateOrUpdateInstances(%s) failed, err: %v", scaleSet.Name, err)
return rerr
}

// setScaleSetSize sets ScaleSet size.
Expand All @@ -312,11 +313,37 @@ func (scaleSet *ScaleSet) setScaleSetSize(size int64, delta int) error {
if requiredInstances > 0 {
klog.V(3).Infof("Remaining unsatisfied count is %d. Attempting to increase scale set %q "+
"capacity", requiredInstances, scaleSet.Name)
err := scaleSet.createOrUpdateInstances(&vmssInfo, size)
future, err := scaleSet.createOrUpdateInstances(&vmssInfo, size)
if err != nil {
klog.Errorf("Failed to request capacity for scale set %q to %d: %v", scaleSet.Name, requiredInstances, err)
return err
}

ctx, cancel := getContextWithTimeout(syncContextTimeout)
defer cancel()
err = scaleSet.waitForCreateOrUpdateInstances(ctx, false, future)
if err == nil {
klog.V(3).Infof("Increased capacity for scale set %q to %d", scaleSet.Name, requiredInstances)
return nil
}

if !errors.Is(err, context.DeadlineExceeded) {
klog.Errorf("Failed to increase capacity for scale set %q to %d: %v", scaleSet.Name, requiredInstances, err)
return err
}

// Switch to async wait for the outcome of VMSS capacity update.
go func() {
ctx, cancel := getContextWithTimeout(asyncContextTimeout)
defer cancel()

err = scaleSet.waitForCreateOrUpdateInstances(ctx, true, future)
if err != nil {
klog.Errorf("Failed to increase capacity for scale set %q to %d: %v", scaleSet.Name, requiredInstances, err)
} else {
klog.V(3).Infof("Increased capacity for scale set %q to %d", scaleSet.Name, requiredInstances)
}
}()
}
return nil
}
Expand Down Expand Up @@ -435,9 +462,9 @@ func (scaleSet *ScaleSet) Belongs(node *apiv1.Node) (bool, error) {
return true, nil
}

func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachineScaleSet, newSize int64) error {
func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachineScaleSet, newSize int64) (*azure.Future, error) {
if vmssInfo == nil {
return fmt.Errorf("vmssInfo cannot be nil while increating scaleSet capacity")
return nil, fmt.Errorf("vmssInfo cannot be nil while increating scaleSet capacity")
}

scaleSet.sizeMutex.Lock()
Expand Down Expand Up @@ -470,15 +497,14 @@ func (scaleSet *ScaleSet) createOrUpdateInstances(vmssInfo *compute.VirtualMachi
future, rerr := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdateAsync(ctx, scaleSet.manager.config.ResourceGroup, scaleSet.Name, op)
if rerr != nil {
klog.Errorf("virtualMachineScaleSetsClient.CreateOrUpdate for scale set %q failed: %+v", scaleSet.Name, rerr)
return rerr.Error()
return nil, rerr.Error()
}

// Proactively set the VMSS size so autoscaler makes better decisions.
scaleSet.curSize = newSize
scaleSet.lastSizeRefresh = time.Now()

go scaleSet.waitForCreateOrUpdateInstances(future)
return nil
return future, nil
}

// DeleteInstances deletes the given instances. All instances must be controlled by the same nodegroup.
Expand Down
Loading