Skip to content

Commit

Permalink
checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Nov 18, 2024
1 parent 4071259 commit 88e8d86
Show file tree
Hide file tree
Showing 31 changed files with 129 additions and 95 deletions.
2 changes: 0 additions & 2 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ const (
// Karpenter specific domains and labels
const (
NodePoolLabelKey = apis.Group + "/nodepool"
NodeClassGroup = apis.Group + "/nodeclass-group"
NodeClassKind = apis.Group + "/nodeclass-kind"
NodeInitializedLabelKey = apis.Group + "/initialized"
NodeRegisteredLabelKey = apis.Group + "/registered"
CapacityTypeLabelKey = apis.Group + "/capacity-type"
Expand Down
4 changes: 1 addition & 3 deletions pkg/apis/v1/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package v1

import (
"fmt"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -114,7 +112,7 @@ type NodeClassReference struct {
func (ncr *NodeClassReference) GroupKind() schema.GroupKind {
return schema.GroupKind{
Group: ncr.Group,
Kind: ncr.Kind,
Kind: ncr.Kind,
}
}

Expand Down
1 change: 0 additions & 1 deletion pkg/apis/v1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package v1
import (
"fmt"
"math"
"sort"
"strconv"

"github.com/mitchellh/hashstructure/v2"
Expand Down
6 changes: 0 additions & 6 deletions pkg/apis/v1/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ package v1_test

import (
"context"
"math/rand"
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
Expand All @@ -53,6 +50,3 @@ var _ = AfterEach(func() {
var _ = AfterSuite(func() {
Expect(env.Stop()).To(Succeed(), "Failed to stop environment")
})

var _ = Describe("OrderByWeight", func() {
})
3 changes: 1 addition & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func NewControllers(
recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider,
) []controller.Controller {

cluster := state.NewCluster(clock, kubeClient)
cluster := state.NewCluster(clock, kubeClient, cloudProvider)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

emptyConsolidation := disruption.NewEmptiness(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, emptyConsolidation.Reason())
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, emptyConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, emptyConsolidation.Class(), queue)
Expand Down Expand Up @@ -679,7 +679,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

emptyConsolidation := disruption.NewEmptiness(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, emptyConsolidation.Reason())
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, emptyConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, emptyConsolidation.ShouldDisrupt, emptyConsolidation.Class(), queue)
Expand All @@ -703,7 +703,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, multiConsolidation.Reason())
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, multiConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, multiConsolidation.Class(), queue)
Expand Down Expand Up @@ -766,7 +766,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

multiConsolidation := disruption.NewMultiNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, multiConsolidation.Reason())
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, multiConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, multiConsolidation.ShouldDisrupt, multiConsolidation.Class(), queue)
Expand All @@ -790,7 +790,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, singleConsolidation.Reason())
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, singleConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, singleConsolidation.Class(), queue)
Expand Down Expand Up @@ -853,7 +853,7 @@ var _ = Describe("Consolidation", func() {
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, nodes, nodeClaims)

singleConsolidation := disruption.NewSingleNodeConsolidation(disruption.MakeConsolidation(fakeClock, cluster, env.Client, prov, cloudProvider, recorder, queue))
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, singleConsolidation.Reason())
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, singleConsolidation.Reason())
Expect(err).To(Succeed())

candidates, err := disruption.GetCandidates(ctx, cluster, env.Client, recorder, fakeClock, cloudProvider, singleConsolidation.ShouldDisrupt, singleConsolidation.Class(), queue)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
if len(candidates) == 0 {
return false, nil
}
disruptionBudgetMapping, err := BuildDisruptionBudgetMapping(ctx, c.cluster, c.clock, c.kubeClient, c.recorder, disruption.Reason())
disruptionBudgetMapping, err := BuildDisruptionBudgetMapping(ctx, c.cluster, c.clock, c.kubeClient, c.cloudProvider, c.recorder, disruption.Reason())
if err != nil {
return false, fmt.Errorf("building disruption budgets, %w", err)
}
Expand Down Expand Up @@ -285,13 +285,13 @@ func (c *Controller) logAbnormalRuns(ctx context.Context) {

// logInvalidBudgets will log if there are any invalid schedules detected
func (c *Controller) logInvalidBudgets(ctx context.Context) {
nodePools, err := nodepool.ListManagedNodePools(ctx, c.kubeClient, c.cloudProvider)
nps, err := nodepool.List(ctx, c.kubeClient, nodepool.WithManagedNodePoolFilter(c.cloudProvider))
if err != nil {
log.FromContext(ctx).Error(err, "failed listing nodepools")
return
}
var buf bytes.Buffer
for _, np := range nodePools {
for _, np := range nps {
// Use a dummy value of 100 since we only care if this errors.
for _, method := range c.methods {
if _, err := np.GetAllowedDisruptionsByReason(c.clock, 100, method.Reason()); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...))
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.NodePoolFieldIndexer(ctx)))
ctx = options.ToContext(ctx, test.Options())
fakeClock = clock.NewFakeClock(time.Now())
cloudProvider = fake.NewCloudProvider()
cluster = state.NewCluster(fakeClock, env.Client)
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock)
queue = NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
Expand Down
18 changes: 9 additions & 9 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(coreapis.CRDs...), test.WithCRDs(v1alpha1.CRDs...))
env = test.NewEnvironment(test.WithCRDs(coreapis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.NodePoolFieldIndexer(ctx)))
ctx = options.ToContext(ctx, test.Options())
cloudProvider = fake.NewCloudProvider()
fakeClock = clock.NewFakeClock(time.Now())
cluster = state.NewCluster(fakeClock, env.Client)
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cloudProvider, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock)
queue = NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
Expand Down Expand Up @@ -638,7 +638,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() {
ExpectApplied(ctx, env.Client, unmanaged)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{unmanaged}, []*v1.NodeClaim{})
for _, reason := range allKnownDisruptionReasons {
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, reason)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, reason)
Expect(err).To(Succeed())
// This should not bring in the unmanaged node.
Expect(budgets[nodePool.Name]).To(Equal(10))
Expand Down Expand Up @@ -669,7 +669,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() {
ExpectReconcileSucceeded(ctx, nodeClaimStateController, client.ObjectKeyFromObject(nodeClaim))

for _, reason := range allKnownDisruptionReasons {
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, reason)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, reason)
Expect(err).To(Succeed())
// This should not bring in the uninitialized node.
Expect(budgets[nodePool.Name]).To(Equal(10))
Expand Down Expand Up @@ -701,7 +701,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() {
ExpectReconcileSucceeded(ctx, nodeClaimStateController, client.ObjectKeyFromObject(nodeClaim))

for _, reason := range allKnownDisruptionReasons {
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, reason)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, reason)
Expect(err).To(Succeed())
// This should not bring in the terminating node.
Expect(budgets[nodePool.Name]).To(Equal(10))
Expand All @@ -723,7 +723,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() {
}

for _, reason := range allKnownDisruptionReasons {
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, reason)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, reason)
Expect(err).To(Succeed())
Expect(budgets[nodePool.Name]).To(Equal(0))
}
Expand All @@ -747,7 +747,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() {
}

for _, reason := range allKnownDisruptionReasons {
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, reason)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, reason)
Expect(err).To(Succeed())
Expect(budgets[nodePool.Name]).To(Equal(8))
}
Expand All @@ -768,7 +768,7 @@ var _ = Describe("BuildDisruptionBudgetMapping", func() {
}

for _, reason := range allKnownDisruptionReasons {
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, recorder, reason)
budgets, err := disruption.BuildDisruptionBudgetMapping(ctx, cluster, fakeClock, env.Client, cloudProvider, recorder, reason)
Expect(err).To(Succeed())
Expect(budgets[nodePool.Name]).To(Equal(8))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Cand
if len(validatedCandidates) != len(candidates) {
return nil, NewValidationError(fmt.Errorf("%d candidates are no longer valid", len(candidates)-len(validatedCandidates)))
}
disruptionBudgetMapping, err := BuildDisruptionBudgetMapping(ctx, v.cluster, v.clock, v.kubeClient, v.recorder, v.reason)
disruptionBudgetMapping, err := BuildDisruptionBudgetMapping(ctx, v.cluster, v.clock, v.kubeClient, v.cloudProvider, v.recorder, v.reason)
if err != nil {
return nil, fmt.Errorf("building disruption budgets, %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/metrics/node/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...))
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.NodePoolFieldIndexer(ctx)))

ctx = options.ToContext(ctx, test.Options())
cloudProvider = fake.NewCloudProvider()
cloudProvider.InstanceTypes = fake.InstanceTypesAssorted()
fakeClock = clock.NewFakeClock(time.Now())
cluster = state.NewCluster(fakeClock, env.Client)
cluster = state.NewCluster(fakeClock, env.Client, cloudProvider)
nodeController = informer.NewNodeController(env.Client, cluster)
metricsStateController = node.NewController(cluster)
})
Expand Down
7 changes: 5 additions & 2 deletions pkg/controllers/metrics/nodepool/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/metrics/nodepool"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand All @@ -39,6 +40,7 @@ import (
var nodePoolController *nodepool.Controller
var ctx context.Context
var env *test.Environment
var cp *fake.CloudProvider

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -47,8 +49,9 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...))
nodePoolController = nodepool.NewController(env.Client)
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.NodePoolFieldIndexer(ctx)))
cp = fake.NewCloudProvider()
nodePoolController = nodepool.NewController(env.Client, cp)
})

var _ = AfterSuite(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/consistency/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var _ = BeforeSuite(func() {
ctx = options.ToContext(ctx, test.Options())
cp = &fake.CloudProvider{}
recorder = test.NewEventRecorder()
nodeClaimConsistencyController = consistency.NewController(fakeClock, env.Client, recorder)
nodeClaimConsistencyController = consistency.NewController(fakeClock, env.Client, cp, recorder)
})

var _ = AfterSuite(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/disruption/drift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ var _ = Describe("Drift", func() {
var nodePoolController *hash.Controller
BeforeEach(func() {
cp.Drifted = ""
nodePoolController = hash.NewController(env.Client)
nodePoolController = hash.NewController(env.Client, cp)
nodePool = &v1.NodePool{
ObjectMeta: nodePool.ObjectMeta,
Spec: v1.NodePoolSpec{
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/nodeclaim/expiration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/expiration"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/options"
Expand All @@ -43,6 +44,7 @@ import (
var ctx context.Context
var expirationController *expiration.Controller
var env *test.Environment
var cp *fake.CloudProvider
var fakeClock *clock.FakeClock

func TestAPIs(t *testing.T) {
Expand All @@ -59,7 +61,8 @@ var _ = BeforeSuite(func() {
})
}))
ctx = options.ToContext(ctx, test.Options())
expirationController = expiration.NewController(fakeClock, env.Client)
cp = fake.NewCloudProvider()
expirationController = expiration.NewController(fakeClock, env.Client, cp)
})

var _ = AfterSuite(func() {
Expand Down
9 changes: 4 additions & 5 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/operator/injection"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
"sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
)

type Controller struct {
Expand All @@ -60,7 +59,7 @@ func NewController(c clock.Clock, kubeClient client.Client, cloudProvider cloudp
func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "nodeclaim.garbagecollection")

nodeClaims, err := nodeclaimutil.List(ctx, c.kubeClient, nodeclaim.WithManagedNodeClaimFilter(c.cloudProvider))
nodeClaims, err := nodeclaimutils.List(ctx, c.kubeClient, nodeclaimutils.WithManagedNodeClaimFilter(c.cloudProvider))
if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -84,10 +83,10 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {

errs := make([]error, len(nodeClaims))
workqueue.ParallelizeUntil(ctx, 20, len(nodeClaims), func(i int) {
node, err := nodeclaimutil.NodeForNodeClaim(ctx, c.kubeClient, nodeClaims[i])
node, err := nodeclaimutils.NodeForNodeClaim(ctx, c.kubeClient, nodeClaims[i])
// Ignore these errors since a registered NodeClaim should only have a NotFound node when
// the Node was deleted out from under us and a Duplicate Node is an invalid state
if nodeclaimutil.IgnoreDuplicateNodeError(nodeclaimutil.IgnoreNodeNotFoundError(err)) != nil {
if nodeclaimutils.IgnoreDuplicateNodeError(nodeclaimutils.IgnoreNodeNotFoundError(err)) != nil {
errs[i] = err
}
// We do a check on the Ready condition of the node since, even though the CloudProvider says the instance
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/podevents/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = BeforeSuite(func() {
}))
ctx = options.ToContext(ctx, test.Options())
cp = fake.NewCloudProvider()
podEventsController = podevents.NewController(fakeClock, env.Client)
podEventsController = podevents.NewController(fakeClock, env.Client, cp)
})

var _ = AfterSuite(func() {
Expand Down
Loading

0 comments on commit 88e8d86

Please sign in to comment.