Skip to content

Commit

Permalink
Remove usage of 'state node' in eventing, logging, and errors
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Jan 8, 2025
1 parent 83332db commit a065980
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 40 deletions.
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
Expand Down Expand Up @@ -144,7 +145,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if !results.AllNonPendingPodsScheduled() {
// This method is used by multi-node consolidation as well, so we'll only report in the single node case
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, results.NonPendingPodSchedulingErrors())...)
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, pretty.Sentence(results.NonPendingPodSchedulingErrors()))...)
}
return Command{}, pscheduling.Results{}, nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sort"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
Expand Down Expand Up @@ -100,7 +101,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
}
// Emit an event that we couldn't reschedule the pods on the node.
if !results.AllNonPendingPodsScheduled() {
d.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, results.NonPendingPodSchedulingErrors())...)
d.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, pretty.Sentence(results.NonPendingPodSchedulingErrors()))...)
continue
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/disruption/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,21 @@ func Terminating(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) []ev

// Unconsolidatable is an event that informs the user that a NodeClaim/Node combination cannot be consolidated
// due to the state of the NodeClaim/Node or due to some state of the pods that are scheduled to the NodeClaim/Node
func Unconsolidatable(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) []events.Event {
func Unconsolidatable(node *corev1.Node, nodeClaim *v1.NodeClaim, msg string) []events.Event {
return []events.Event{
{
InvolvedObject: node,
Type: corev1.EventTypeNormal,
Reason: "Unconsolidatable",
Message: reason,
Message: msg,
DedupeValues: []string{string(node.UID)},
DedupeTimeout: time.Minute * 15,
},
{
InvolvedObject: nodeClaim,
Type: corev1.EventTypeNormal,
Reason: "Unconsolidatable",
Message: reason,
Message: msg,
DedupeValues: []string{string(nodeClaim.UID)},
DedupeTimeout: time.Minute * 15,
},
Expand All @@ -92,13 +92,13 @@ func Unconsolidatable(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string)

// Blocked is an event that informs the user that a NodeClaim/Node combination is blocked on deprovisioning
// due to the state of the NodeClaim/Node or due to some state of the pods that are scheduled to the NodeClaim/Node
func Blocked(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) (evs []events.Event) {
func Blocked(node *corev1.Node, nodeClaim *v1.NodeClaim, msg string) (evs []events.Event) {
if node != nil {
evs = append(evs, events.Event{
InvolvedObject: node,
Type: corev1.EventTypeNormal,
Reason: "DisruptionBlocked",
Message: fmt.Sprintf("Cannot disrupt Node: %s", reason),
Message: msg,
DedupeValues: []string{string(node.UID)},
})
}
Expand All @@ -107,7 +107,7 @@ func Blocked(node *corev1.Node, nodeClaim *v1.NodeClaim, reason string) (evs []e
InvolvedObject: nodeClaim,
Type: corev1.EventTypeNormal,
Reason: "DisruptionBlocked",
Message: fmt.Sprintf("Cannot disrupt NodeClaim: %s", reason),
Message: msg,
DedupeValues: []string{string(nodeClaim.UID)},
})
}
Expand Down
42 changes: 21 additions & 21 deletions pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
})
It("should not consider candidates that have do-not-disrupt mirror pods scheduled", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -914,7 +914,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
})
It("should not consider candidates that have do-not-disrupt daemonset pods scheduled", func() {
daemonSet := test.DaemonSet()
Expand Down Expand Up @@ -953,7 +953,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
})
It("should consider candidates that have do-not-disrupt pods scheduled with a terminationGracePeriod set for eventual disruption", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1043,7 +1043,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
})
It("should not consider candidates that have PDB-blocked pods scheduled with a terminationGracePeriod set for graceful disruption", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1079,7 +1079,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err = disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
})
It("should not consider candidates that have do-not-disrupt pods scheduled without a terminationGracePeriod set for eventual disruption", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1107,7 +1107,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.EventualDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
})
It("should not consider candidates that have PDB-blocked pods scheduled without a terminationGracePeriod set for eventual disruption", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1142,7 +1142,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err = disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.EventualDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
})
It("should consider candidates that have do-not-disrupt terminating pods", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1233,7 +1233,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(`disruption is blocked through the "karpenter.sh/do-not-disrupt" annotation`))
Expect(recorder.DetectedEvent(`Cannot disrupt Node: disruption is blocked through the "karpenter.sh/do-not-disrupt" annotation`)).To(BeTrue())
Expect(recorder.DetectedEvent(`Disruption is blocked through the "karpenter.sh/do-not-disrupt" annotation`)).To(BeTrue())
})
It("should not consider candidates that have fully blocking PDBs", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1269,7 +1269,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err = disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
})
It("should not consider candidates that have fully blocking PDBs on daemonset pods", func() {
daemonSet := test.DaemonSet()
Expand Down Expand Up @@ -1316,7 +1316,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err = disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
})
It("should consider candidates that have fully blocking PDBs on mirror pods", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1393,7 +1393,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err = disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(pod)))).To(BeTrue())
})
It("should not consider candidates that have fully blocking PDBs without a terminationGracePeriod set for graceful disruption", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1427,7 +1427,7 @@ var _ = Describe("Candidate Filtering", func() {
_, err = disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf(`pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget))))
Expect(recorder.DetectedEvent(fmt.Sprintf(`Cannot disrupt Node: pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
Expect(recorder.DetectedEvent(fmt.Sprintf(`Pdb %q prevents pod evictions`, client.ObjectKeyFromObject(budget)))).To(BeTrue())
})
It("should consider candidates that have fully blocking PDBs on terminal pods", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down Expand Up @@ -1567,8 +1567,8 @@ var _ = Describe("Candidate Filtering", func() {
Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("state node is nominated for a pending pod"))
Expect(recorder.DetectedEvent("Cannot disrupt Node: state node is nominated for a pending pod")).To(BeTrue())
Expect(err.Error()).To(Equal("node is nominated for a pending pod"))
Expect(recorder.DetectedEvent("Node is nominated for a pending pod")).To(BeTrue())
})
It("should not consider candidates that are deleting", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand All @@ -1590,7 +1590,7 @@ var _ = Describe("Candidate Filtering", func() {
Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("state node is marked for deletion"))
Expect(err.Error()).To(Equal("node is deleting or marked for deletion"))
})
It("should not consider candidates that are MarkedForDeletion", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand All @@ -1611,7 +1611,7 @@ var _ = Describe("Candidate Filtering", func() {
Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("state node is marked for deletion"))
Expect(err.Error()).To(Equal("node is deleting or marked for deletion"))
})
It("should not consider candidates that aren't yet initialized", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand All @@ -1631,7 +1631,7 @@ var _ = Describe("Candidate Filtering", func() {
Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("state node isn't initialized"))
Expect(err.Error()).To(Equal("node isn't initialized"))
})
It("should not consider candidates that are not owned by a NodePool (no karpenter.sh/nodepool label)", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand All @@ -1649,8 +1649,8 @@ var _ = Describe("Candidate Filtering", func() {
Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(`state node doesn't have required label "karpenter.sh/nodepool"`))
Expect(recorder.DetectedEvent(`Cannot disrupt Node: state node doesn't have required label "karpenter.sh/nodepool"`)).To(BeTrue())
Expect(err.Error()).To(Equal(`node doesn't have required label "karpenter.sh/nodepool"`))
Expect(recorder.DetectedEvent(`Node doesn't have required label "karpenter.sh/nodepool"`)).To(BeTrue())
})
It("should not consider candidates that are have a non-existent NodePool", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand All @@ -1674,8 +1674,8 @@ var _ = Describe("Candidate Filtering", func() {
Expect(cluster.Nodes()).To(HaveLen(1))
_, err := disruption.NewCandidate(ctx, env.Client, recorder, fakeClock, cluster.Nodes()[0], pdbLimits, nodePoolMap, nodePoolInstanceTypeMap, queue, disruption.GracefulDisruptionClass)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal(fmt.Sprintf("nodepool %q can't be resolved for state node", nodePool.Name)))
Expect(recorder.DetectedEvent(fmt.Sprintf("Cannot disrupt Node: NodePool %q not found", nodePool.Name))).To(BeTrue())
Expect(err.Error()).To(Equal(fmt.Sprintf("nodepool %q not found", nodePool.Name)))
Expect(recorder.DetectedEvent(fmt.Sprintf("NodePool %q not found", nodePool.Name))).To(BeTrue())
})
It("should consider candidates that do not have the karpenter.sh/capacity-type label", func() {
nodeClaim, node := test.NodeClaimAndNode(v1.NodeClaim{
Expand Down
Loading

0 comments on commit a065980

Please sign in to comment.