diff --git a/pkg/controllers/disruption/consolidation_test.go b/pkg/controllers/disruption/consolidation_test.go index b30ff54b7d..30cb2df47a 100644 --- a/pkg/controllers/disruption/consolidation_test.go +++ b/pkg/controllers/disruption/consolidation_test.go @@ -3788,6 +3788,8 @@ var _ = Describe("Consolidation", func() { Eventually(finished.Load, 10*time.Second).Should(BeTrue()) wg.Wait() + ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{}) + // nothing should be removed since the node is no longer empty Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1)) Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1)) diff --git a/pkg/controllers/disruption/emptynodeconsolidation.go b/pkg/controllers/disruption/emptynodeconsolidation.go index 2a9ccf31fc..d04dbcf092 100644 --- a/pkg/controllers/disruption/emptynodeconsolidation.go +++ b/pkg/controllers/disruption/emptynodeconsolidation.go @@ -19,8 +19,8 @@ package disruption import ( "context" "errors" - "fmt" + "github.com/samber/lo" "knative.dev/pkg/logging" "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling" @@ -88,31 +88,25 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB return Command{}, scheduling.Results{}, errors.New("interrupted") case <-c.clock.After(consolidationTTL): } - validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, c.ShouldDisrupt, c.queue) - if err != nil { - logging.FromContext(ctx).Errorf("computing validation candidates %s", err) - return Command{}, scheduling.Results{}, err - } - // Get the current representation of the proposed candidates from before the validation timeout - // We do this so that we can re-validate that the candidates that were computed before we made the decision are the same - candidatesToDelete := mapCandidates(cmd.candidates, validationCandidates) - postValidationMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder) + v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder, c.queue) + validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...) if err != nil { - return Command{}, scheduling.Results{}, fmt.Errorf("building disruption budgets, %w", err) - } - - // The deletion of empty NodeClaims is easy to validate, we just ensure that: - // 1. All the candidatesToDelete are still empty - // 2. The node isn't a target of a recent scheduling simulation - // 3. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget - for _, n := range candidatesToDelete { - if len(n.reschedulablePods) != 0 || c.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 { + if IsValidationError(err) { logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd) return Command{}, scheduling.Results{}, nil } - postValidationMapping[n.nodePool.Name]-- + return Command{}, scheduling.Results{}, err } + + // TODO (jmdeal@): better encapsulate within validation + if lo.ContainsBy(validatedCandidates, func(c *Candidate) bool { + return len(c.reschedulablePods) != 0 + }) { + logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd) + return Command{}, scheduling.Results{}, nil + } + return cmd, scheduling.Results{}, nil } diff --git a/pkg/controllers/disruption/multinodeconsolidation.go b/pkg/controllers/disruption/multinodeconsolidation.go index 66472f35f5..29470c14a9 100644 --- a/pkg/controllers/disruption/multinodeconsolidation.go +++ b/pkg/controllers/disruption/multinodeconsolidation.go @@ -91,15 +91,12 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB return cmd, scheduling.Results{}, nil } - v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue) - isValid, err := v.IsValid(ctx, cmd) - if err != nil { - return Command{}, scheduling.Results{}, fmt.Errorf("validating, %w", err) - } - - if !isValid { - logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd) - return Command{}, scheduling.Results{}, nil + if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil { + if IsValidationError(err) { + logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd) + return Command{}, scheduling.Results{}, nil + } + return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err) } return cmd, results, nil } diff --git a/pkg/controllers/disruption/singlenodeconsolidation.go b/pkg/controllers/disruption/singlenodeconsolidation.go index 9bb4cd9b69..3e49c45e67 100644 --- a/pkg/controllers/disruption/singlenodeconsolidation.go +++ b/pkg/controllers/disruption/singlenodeconsolidation.go @@ -50,7 +50,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption consolidationTypeLabel: s.ConsolidationType(), }).Set(float64(len(candidates))) - v := NewValidation(consolidationTTL, s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue) + v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue) // Set a timeout timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration) @@ -78,14 +78,13 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption if cmd.Action() == NoOpAction { continue } - isValid, err := v.IsValid(ctx, cmd) - if err != nil { + if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil { + if IsValidationError(err) { + logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd) + return Command{}, scheduling.Results{}, nil + } return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err) } - if !isValid { - logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd) - return Command{}, scheduling.Results{}, nil - } return cmd, results, nil } if !constrainedByBudgets { diff --git a/pkg/controllers/disruption/validation.go b/pkg/controllers/disruption/validation.go index 32d712ce42..6b06bb69db 100644 --- a/pkg/controllers/disruption/validation.go +++ b/pkg/controllers/disruption/validation.go @@ -35,82 +35,114 @@ import ( "sigs.k8s.io/karpenter/pkg/events" ) +type ValidationError struct { + error +} + +func NewValidationError(err error) *ValidationError { + return &ValidationError{error: err} +} + +func IsValidationError(err error) bool { + if err == nil { + return false + } + var validationError *ValidationError + return errors.As(err, &validationError) +} + // Validation is used to perform validation on a consolidation command. It makes an assumption that when re-used, all // of the commands passed to IsValid were constructed based off of the same consolidation state. This allows it to // skip the validation TTL for all but the first command. type Validation struct { - validationPeriod time.Duration - start time.Time - clock clock.Clock - cluster *state.Cluster - kubeClient client.Client - cloudProvider cloudprovider.CloudProvider - provisioner *provisioning.Provisioner - once sync.Once - recorder events.Recorder - queue *orchestration.Queue + start time.Time + clock clock.Clock + cluster *state.Cluster + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider + provisioner *provisioning.Provisioner + once sync.Once + recorder events.Recorder + queue *orchestration.Queue } -func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, +func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner, cp cloudprovider.CloudProvider, recorder events.Recorder, queue *orchestration.Queue) *Validation { return &Validation{ - validationPeriod: validationPeriod, - clock: clk, - cluster: cluster, - kubeClient: kubeClient, - provisioner: provisioner, - cloudProvider: cp, - recorder: recorder, - queue: queue, + clock: clk, + cluster: cluster, + kubeClient: kubeClient, + provisioner: provisioner, + cloudProvider: cp, + recorder: recorder, + queue: queue, } } -//nolint:gocyclo -func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) { +func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod time.Duration) error { var err error v.once.Do(func() { v.start = v.clock.Now() }) - waitDuration := v.validationPeriod - v.clock.Since(v.start) + waitDuration := validationPeriod - v.clock.Since(v.start) if waitDuration > 0 { select { case <-ctx.Done(): - return false, errors.New("context canceled") + return errors.New("context canceled") case <-v.clock.After(waitDuration): } } - // Get the current representation of the proposed candidates from before the validation timeout - // We do this so that we can re-validate that the candidates that were computed before we made the decision are the same - // We perform filtering here to ensure that none of the proposed candidates have blocking PDBs or do-not-evict/do-not-disrupt pods scheduled to them - validationCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue) + validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...) if err != nil { - return false, fmt.Errorf("constructing validation candidates, %w", err) + return err } - validationCandidates = mapCandidates(cmd.candidates, validationCandidates) - // If we filtered out any candidates, return false as some NodeClaims in the consolidation decision have changed. - if len(validationCandidates) != len(cmd.candidates) { - return false, nil + if err := v.ValidateCommand(ctx, cmd, validatedCandidates); err != nil { + return err } - // Rebuild the disruption budget mapping to see if any budgets have changed since validation. - postValidationMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder) + // Revalidate candidates after validating the command. This mitigates the chance of a race condition outlined in + // the following GitHub issue: https://github.com/kubernetes-sigs/karpenter/issues/1167. + if _, err = v.ValidateCandidates(ctx, validatedCandidates...); err != nil { + return err + } + return nil +} + +// ValidateCandidates gets the current representation of the provided candidates and ensures that they are all still valid. +// For a candidate to still be valid, the following conditions must be met: +// +// a. It must pass the global candidate filtering logic (no blocking PDBs, no do-not-disrupt annotation, etc) +// b. It must not have any pods nominated for it +// c. It must still be disruptable without violating node disruption budgets +// +// If these conditions are met for all candidates, ValidateCandidates returns a slice with the updated representations. +func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Candidate) ([]*Candidate, error) { + validatedCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue) if err != nil { - return false, fmt.Errorf("building disruption budgets, %w", err) - } - // 1. a candidate we are about to delete is a target of a currently pending pod, wait for that to settle - // before continuing consolidation - // 2. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget - for _, n := range validationCandidates { - if v.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 { - return false, nil - } - postValidationMapping[n.nodePool.Name]-- + return nil, fmt.Errorf("constructing validation candidates, %w", err) } - isValid, err := v.ValidateCommand(ctx, cmd, validationCandidates) + validatedCandidates = mapCandidates(candidates, validatedCandidates) + // If we filtered out any candidates, return nil as some NodeClaims in the consolidation decision have changed. + if len(validatedCandidates) != len(candidates) { + return nil, NewValidationError(fmt.Errorf("%d candidates are no longer valid", len(candidates)-len(validatedCandidates))) + } + disruptionBudgetMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder) if err != nil { - return false, fmt.Errorf("validating command, %w", err) + return nil, fmt.Errorf("building disruption budgets, %w", err) + } + // Return nil if any candidate meets either of the following conditions: + // a. A pod was nominated to the candidate + // b. Disrupting the candidate would violate node disruption budgets + for _, vc := range validatedCandidates { + if v.cluster.IsNodeNominated(vc.ProviderID()) { + return nil, NewValidationError(fmt.Errorf("a candidate was nominated during validation")) + } + if disruptionBudgetMapping[vc.nodePool.Name] == 0 { + return nil, NewValidationError(fmt.Errorf("a candidate can no longer be disrupted without violating budgets")) + } + disruptionBudgetMapping[vc.nodePool.Name]-- } - return isValid, nil + return validatedCandidates, nil } // ShouldDisrupt is a predicate used to filter candidates @@ -123,17 +155,17 @@ func (v *Validation) ShouldDisrupt(_ context.Context, c *Candidate) bool { } // ValidateCommand validates a command for a Method -func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) (bool, error) { +func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) error { // None of the chosen candidate are valid for execution, so retry if len(candidates) == 0 { - return false, nil + return NewValidationError(fmt.Errorf("no candidates")) } results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...) if err != nil { - return false, fmt.Errorf("simluating scheduling, %w", err) + return fmt.Errorf("simluating scheduling, %w", err) } if !results.AllNonPendingPodsScheduled() { - return false, nil + return NewValidationError(fmt.Errorf("all pending pods could not be scheduled")) } // We want to ensure that the re-simulated scheduling using the current cluster state produces the same result. @@ -145,22 +177,22 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate if len(results.NewNodeClaims) == 0 { if len(cmd.replacements) == 0 { // scheduling produced zero new NodeClaims and we weren't expecting any, so this is valid. - return true, nil + return nil } // if it produced no new NodeClaims, but we were expecting one we should re-simulate as there is likely a better // consolidation option now - return false, nil + return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) } // we need more than one replacement node which is never valid currently (all of our node replacement is m->1, never m->n) if len(results.NewNodeClaims) > 1 { - return false, nil + return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) } // we now know that scheduling simulation wants to create one new node if len(cmd.replacements) == 0 { // but we weren't expecting any new NodeClaims, so this is invalid - return false, nil + return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) } // We know that the scheduling simulation wants to create a new node and that the command we are verifying wants @@ -175,11 +207,11 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate // now says that we need to launch a 4xlarge. It's still launching the correct number of NodeClaims, but it's just // as expensive or possibly more so we shouldn't validate. if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, results.NewNodeClaims[0].InstanceTypeOptions) { - return false, nil + return NewValidationError(fmt.Errorf("scheduling simulation produced new results")) } // Now we know: // - current scheduling simulation says to create a new node with types T = {T_0, T_1, ..., T_n} // - our lifecycle command says to create a node with types {U_0, U_1, ..., U_n} where U is a subset of T - return true, nil + return nil }