diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 217d92dc1e..fd96e5869e 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -76,7 +76,7 @@ func NewControllers( informer.NewNodePoolController(kubeClient, cluster), informer.NewNodeClaimController(kubeClient, cluster), termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder), - metricspod.NewController(kubeClient), + metricspod.NewController(kubeClient, cluster), metricsnodepool.NewController(kubeClient), metricsnode.NewController(cluster), nodepoolreadiness.NewController(kubeClient, cloudProvider), diff --git a/pkg/controllers/metrics/pod/controller.go b/pkg/controllers/metrics/pod/controller.go index 1c1cdcf2ab..ade440c853 100644 --- a/pkg/controllers/metrics/pod/controller.go +++ b/pkg/controllers/metrics/pod/controller.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/operator/injection" ) @@ -51,14 +52,13 @@ const ( podHostCapacityType = "capacity_type" podHostInstanceType = "instance_type" podPhase = "phase" - phasePending = "Pending" ) var ( PodState = opmetrics.NewPrometheusGauge( crmetrics.Registry, prometheus.GaugeOpts{ - Namespace: "karpenter", + Namespace: metrics.Namespace, Subsystem: metrics.PodSubsystem, Name: "state", Help: "Pod state is the current state of pods. This metric can be used several ways as it is labeled by the pod name, namespace, owner, node, nodepool name, zone, architecture, capacity type, instance type and pod phase.", @@ -68,7 +68,7 @@ var ( PodStartupDurationSeconds = opmetrics.NewPrometheusSummary( crmetrics.Registry, prometheus.SummaryOpts{ - Namespace: "karpenter", + Namespace: metrics.Namespace, Subsystem: metrics.PodSubsystem, Name: "startup_duration_seconds", Help: "The time from pod creation until the pod is running.", @@ -76,10 +76,20 @@ var ( }, []string{}, ) + PodUnstartedTimeSeconds = opmetrics.NewPrometheusGauge( + crmetrics.Registry, + prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.PodSubsystem, + Name: "unstarted_time_seconds", + Help: "The time from pod creation until the pod is running.", + }, + []string{podName, podNamespace}, + ) PodBoundDurationSeconds = opmetrics.NewPrometheusHistogram( crmetrics.Registry, prometheus.HistogramOpts{ - Namespace: "karpenter", + Namespace: metrics.Namespace, Subsystem: metrics.PodSubsystem, Name: "bound_duration_seconds", Help: "The time from pod creation until the pod is bound.", @@ -87,23 +97,68 @@ var ( }, []string{}, ) - PodCurrentUnboundTimeSeconds = opmetrics.NewPrometheusGauge( + PodUnboundTimeSeconds = opmetrics.NewPrometheusGauge( crmetrics.Registry, prometheus.GaugeOpts{ - Namespace: "karpenter", + Namespace: metrics.Namespace, Subsystem: metrics.PodSubsystem, - Name: "current_unbound_time_seconds", + Name: "unbound_time_seconds", Help: "The time from pod creation until the pod is bound.", }, []string{podName, podNamespace}, ) - PodUnstartedTimeSeconds = opmetrics.NewPrometheusGauge( + // Stage: alpha + PodProvisioningBoundDurationSeconds = opmetrics.NewPrometheusHistogram( + crmetrics.Registry, + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.PodSubsystem, + Name: "provisioning_bound_duration_seconds", + Help: "The time from when Karpenter first thinks the pod can schedule until it binds. Note: this calculated from a point in memory, not by the pod creation timestamp.", + }, + []string{}, + ) + // Stage: alpha + PodProvisioningUnboundTimeSeconds = opmetrics.NewPrometheusGauge( crmetrics.Registry, prometheus.GaugeOpts{ - Namespace: "karpenter", + Namespace: metrics.Namespace, Subsystem: metrics.PodSubsystem, - Name: "unstarted_time_seconds", - Help: "The time from pod creation until the pod is running.", + Name: "provisioning_unbound_time_seconds", + Help: "The time from when Karpenter first thinks the pod can schedule until it binds. Note: this calculated from a point in memory, not by the pod creation timestamp.", + }, + []string{podName, podNamespace}, + ) + // Stage: alpha + PodProvisioningStartupDurationSeconds = opmetrics.NewPrometheusHistogram( + crmetrics.Registry, + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.PodSubsystem, + Name: "provisioning_startup_duration_seconds", + Help: "The time from when Karpenter first thinks the pod can schedule until the pod is running. Note: this calculated from a point in memory, not by the pod creation timestamp.", + }, + []string{}, + ) + // Stage: alpha + PodProvisioningUnstartedTimeSeconds = opmetrics.NewPrometheusGauge( + crmetrics.Registry, + prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.PodSubsystem, + Name: "provisioning_unstarted_time_seconds", + Help: "The time from when Karpenter first thinks the pod can schedule until the pod is running. Note: this calculated from a point in memory, not by the pod creation timestamp.", + }, + []string{podName, podNamespace}, + ) + // Stage: alpha + PodProvisioningSchedulingUndecidedTimeSeconds = opmetrics.NewPrometheusGauge( + crmetrics.Registry, + prometheus.GaugeOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.PodSubsystem, + Name: "provisioning_scheduling_undecided_time_seconds", + Help: "The time from when Karpenter has seen a pod without making a scheduling decision for the pod. Note: this calculated from a point in memory, not by the pod creation timestamp.", }, []string{podName, podNamespace}, ) @@ -113,6 +168,7 @@ var ( type Controller struct { kubeClient client.Client metricStore *metrics.Store + cluster *state.Cluster pendingPods sets.Set[string] unscheduledPods sets.Set[string] @@ -134,12 +190,13 @@ func labelNames() []string { } // NewController constructs a podController instance -func NewController(kubeClient client.Client) *Controller { +func NewController(kubeClient client.Client, cluster *state.Cluster) *Controller { return &Controller{ kubeClient: kubeClient, metricStore: metrics.NewStore(), pendingPods: sets.New[string](), unscheduledPods: sets.New[string](), + cluster: cluster, } } @@ -156,9 +213,21 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco podName: req.Name, podNamespace: req.Namespace, }) + PodProvisioningUnstartedTimeSeconds.Delete(map[string]string{ + podName: req.Name, + podNamespace: req.Namespace, + }) c.unscheduledPods.Delete(req.NamespacedName.String()) // Delete the unbound metric since the pod is deleted - PodCurrentUnboundTimeSeconds.Delete(map[string]string{ + PodUnboundTimeSeconds.Delete(map[string]string{ + podName: req.Name, + podNamespace: req.Namespace, + }) + PodProvisioningUnboundTimeSeconds.Delete(map[string]string{ + podName: req.Name, + podNamespace: req.Namespace, + }) + PodProvisioningSchedulingUndecidedTimeSeconds.Delete(map[string]string{ podName: req.Name, podNamespace: req.Namespace, }) @@ -177,18 +246,48 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco Labels: labels, }, }) - c.recordPodStartupMetric(pod) - c.recordPodBoundMetric(pod) - return reconcile.Result{}, nil + c.recordPodSchedulingUndecidedMetric(pod) + // Get the time for when we Karpenter first thought the pod was schedulable. This should be zero if we didn't simulate for this pod. + schedulableTime := c.cluster.PodSchedulingSuccessTime(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}) + c.recordPodStartupMetric(pod, schedulableTime) + c.recordPodBoundMetric(pod, schedulableTime) + // Requeue every 30s for pods that are stuck without a state change + return reconcile.Result{RequeueAfter: 30 * time.Second}, nil } -func (c *Controller) recordPodStartupMetric(pod *corev1.Pod) { +func (c *Controller) recordPodSchedulingUndecidedMetric(pod *corev1.Pod) { + nn := client.ObjectKeyFromObject(pod) + // If we've made a decision on this pod, delete the metric idempotently and return + if decisionTime := c.cluster.PodSchedulingDecisionTime(nn); !decisionTime.IsZero() { + PodProvisioningSchedulingUndecidedTimeSeconds.Delete(map[string]string{ + podName: pod.Name, + podNamespace: pod.Namespace, + }) + return + } + // If we haven't made a decision, get the time that we ACK'd the pod and emit the metric based on that + if podAckTime := c.cluster.PodAckTime(nn); !podAckTime.IsZero() { + PodProvisioningSchedulingUndecidedTimeSeconds.Set(time.Since(podAckTime).Seconds(), map[string]string{ + podName: pod.Name, + podNamespace: pod.Namespace, + }) + return + } +} + +func (c *Controller) recordPodStartupMetric(pod *corev1.Pod, schedulableTime time.Time) { key := client.ObjectKeyFromObject(pod).String() - if pod.Status.Phase == phasePending { + if pod.Status.Phase == corev1.PodPending { PodUnstartedTimeSeconds.Set(time.Since(pod.CreationTimestamp.Time).Seconds(), map[string]string{ podName: pod.Name, podNamespace: pod.Namespace, }) + if !schedulableTime.IsZero() { + PodProvisioningUnstartedTimeSeconds.Set(time.Since(schedulableTime).Seconds(), map[string]string{ + podName: pod.Name, + podNamespace: pod.Namespace, + }) + } c.pendingPods.Insert(key) return } @@ -201,40 +300,69 @@ func (c *Controller) recordPodStartupMetric(pod *corev1.Pod) { podName: pod.Name, podNamespace: pod.Namespace, }) + if !schedulableTime.IsZero() { + PodProvisioningUnstartedTimeSeconds.Set(time.Since(schedulableTime).Seconds(), map[string]string{ + podName: pod.Name, + podNamespace: pod.Namespace, + }) + } } else { // Delete the unstarted metric since the pod is now started PodUnstartedTimeSeconds.Delete(map[string]string{ podName: pod.Name, podNamespace: pod.Namespace, }) + PodProvisioningUnstartedTimeSeconds.Delete(map[string]string{ + podName: pod.Name, + podNamespace: pod.Namespace, + }) PodStartupDurationSeconds.Observe(cond.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds(), nil) + if !schedulableTime.IsZero() { + PodProvisioningStartupDurationSeconds.Observe(cond.LastTransitionTime.Sub(schedulableTime).Seconds(), nil) + } c.pendingPods.Delete(key) + // Clear cluster state's representation of these pods as we don't need to keep track of them anymore + c.cluster.ClearPodSchedulingMappings(client.ObjectKeyFromObject(pod)) } } } -func (c *Controller) recordPodBoundMetric(pod *corev1.Pod) { +func (c *Controller) recordPodBoundMetric(pod *corev1.Pod, schedulableTime time.Time) { key := client.ObjectKeyFromObject(pod).String() - condScheduled, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool { + cond, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool { return c.Type == corev1.PodScheduled }) - if pod.Status.Phase == phasePending { - // If the podScheduled condition does not exist, or it exists and is not set to true, we emit pod_current_unbound_time_seconds metric. - if !ok || condScheduled.Status != corev1.ConditionTrue { - PodCurrentUnboundTimeSeconds.Set(time.Since(pod.CreationTimestamp.Time).Seconds(), map[string]string{ + if pod.Status.Phase == corev1.PodPending { + if !ok || cond.Status != corev1.ConditionTrue { + // If the podScheduled condition does not exist, or it exists and is not set to true, we emit pod_current_unbound_time_seconds metric. + PodUnboundTimeSeconds.Set(time.Since(pod.CreationTimestamp.Time).Seconds(), map[string]string{ podName: pod.Name, podNamespace: pod.Namespace, }) + if !schedulableTime.IsZero() { + PodProvisioningUnboundTimeSeconds.Set(time.Since(schedulableTime).Seconds(), map[string]string{ + podName: pod.Name, + podNamespace: pod.Namespace, + }) + } } c.unscheduledPods.Insert(key) return } - if c.unscheduledPods.Has(key) && ok && condScheduled.Status == corev1.ConditionTrue { + if c.unscheduledPods.Has(key) && ok && cond.Status == corev1.ConditionTrue { // Delete the unbound metric since the pod is now bound - PodCurrentUnboundTimeSeconds.Delete(map[string]string{ + PodUnboundTimeSeconds.Delete(map[string]string{ + podName: pod.Name, + podNamespace: pod.Namespace, + }) + PodProvisioningUnboundTimeSeconds.Delete(map[string]string{ podName: pod.Name, podNamespace: pod.Namespace, }) - PodBoundDurationSeconds.Observe(condScheduled.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds(), nil) + + PodBoundDurationSeconds.Observe(cond.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds(), nil) + if !schedulableTime.IsZero() { + PodProvisioningBoundDurationSeconds.Observe(cond.LastTransitionTime.Sub(schedulableTime).Seconds(), nil) + } c.unscheduledPods.Delete(key) } } diff --git a/pkg/controllers/metrics/pod/suite_test.go b/pkg/controllers/metrics/pod/suite_test.go index 8ffdbe5de5..984626ecc0 100644 --- a/pkg/controllers/metrics/pod/suite_test.go +++ b/pkg/controllers/metrics/pod/suite_test.go @@ -19,15 +19,18 @@ package pod_test import ( "context" "testing" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/controllers/metrics/pod" + "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/test" . "sigs.k8s.io/karpenter/pkg/test/expectations" . "sigs.k8s.io/karpenter/pkg/utils/testing" @@ -36,6 +39,8 @@ import ( var podController *pod.Controller var ctx context.Context var env *test.Environment +var cluster *state.Cluster +var fakeClock *clock.FakeClock func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -45,7 +50,13 @@ func TestAPIs(t *testing.T) { var _ = BeforeSuite(func() { env = test.NewEnvironment() - podController = pod.NewController(env.Client) + fakeClock = clock.NewFakeClock(time.Now()) + cluster = state.NewCluster(fakeClock, env.Client) + podController = pod.NewController(env.Client, cluster) +}) + +var _ = AfterEach(func() { + cluster.Reset() }) var _ = AfterSuite(func() { @@ -90,10 +101,18 @@ var _ = Describe("Pod Metrics", func() { p := test.Pod() p.Status.Phase = corev1.PodPending - // PodScheduled condition does not exist, emit pods_current_unbound_time_seconds metric + fakeClock.Step(1 * time.Hour) + cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) + + // PodScheduled condition does not exist, emit pods_unbound_time_seconds metric ExpectApplied(ctx, env.Client, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set - _, found := FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{ + _, found := FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unbound_time_seconds", map[string]string{ "name": p.GetName(), "namespace": p.GetNamespace(), }) @@ -102,40 +121,65 @@ var _ = Describe("Pod Metrics", func() { p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}} ExpectApplied(ctx, env.Client, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set - metric, found := FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{ + metric, found := FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{ "name": p.GetName(), "namespace": p.GetNamespace(), }) - Expect(found).To(BeTrue()) unboundTime := metric.GetGauge().Value + Expect(found).To(BeTrue()) + metric, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unbound_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) + provisioningUnboundTime := metric.GetGauge().Value // Pod is still pending but has bound. At this step pods_unbound_duration should not change. p.Status.Phase = corev1.PodPending p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()}} ExpectApplied(ctx, env.Client, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not - metric, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{ + metric, found = FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{ "name": p.GetName(), "namespace": p.GetNamespace(), }) Expect(found).To(BeTrue()) Expect(metric.GetGauge().Value).To(Equal(unboundTime)) - // Pod is still running and has bound. At this step pods_bound_duration should be fired and pods_current_unbound_time_seconds should be deleted + metric, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unbound_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) + Expect(metric.GetGauge().Value).To(Equal(provisioningUnboundTime)) + + // Pod is still running and has bound. At this step pods_bound_duration should be fired and pods_unbound_time_seconds should be deleted p.Status.Phase = corev1.PodRunning ExpectApplied(ctx, env.Client, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not - _, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{ + _, found = FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) + + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unbound_time_seconds", map[string]string{ "name": p.GetName(), "namespace": p.GetNamespace(), }) Expect(found).To(BeFalse()) + _, found = FindMetricWithLabelValues("karpenter_pods_bound_duration_seconds", map[string]string{}) Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_bound_duration_seconds", map[string]string{}) + Expect(found).To(BeTrue()) }) It("should update the pod startup and unstarted time metrics", func() { p := test.Pod() p.Status.Phase = corev1.PodPending + + fakeClock.Step(1 * time.Hour) + cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) ExpectApplied(ctx, env.Client, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set _, found := FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{ @@ -143,6 +187,11 @@ var _ = Describe("Pod Metrics", func() { "namespace": p.GetNamespace(), }) Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unstarted_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) // Pod is now running but readiness condition is not set p.Status.Phase = corev1.PodRunning @@ -153,6 +202,11 @@ var _ = Describe("Pod Metrics", func() { "namespace": p.GetNamespace(), }) Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unstarted_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) // Pod is now running but readiness is unknown p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}} @@ -163,6 +217,11 @@ var _ = Describe("Pod Metrics", func() { "namespace": p.GetNamespace(), }) Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unstarted_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) // Pod is now running and ready. At this step pods_startup_duration should be fired and pods_unstarted_time should be deleted p.Status.Phase = corev1.PodRunning @@ -174,15 +233,92 @@ var _ = Describe("Pod Metrics", func() { "namespace": p.GetNamespace(), }) Expect(found).To(BeFalse()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unstarted_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) + _, found = FindMetricWithLabelValues("karpenter_pods_startup_duration_seconds", nil) Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_startup_duration_seconds", nil) + Expect(found).To(BeTrue()) }) - It("should delete pod unstarted time and pod unbound duration metric on pod delete", func() { + It("should create and delete provisioning undecided metrics based on scheduling simulatinos", func() { p := test.Pod() p.Status.Phase = corev1.PodPending ExpectApplied(ctx, env.Client, p) + + ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) + fakeClock.Step(1 * time.Hour) + _, found := FindMetricWithLabelValues("karpenter_pods_provisioning_scheduling_undecided_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) + + // Expect the metric to exist now that we've ack'd the pod + cluster.AckPods(p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) - _, found := FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{ + fakeClock.Step(1 * time.Hour) + + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_scheduling_undecided_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) + + cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) + ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) + + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_scheduling_undecided_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) + }) + It("should delete provisioning undecided metrics based on pod deletes", func() { + p := test.Pod() + p.Status.Phase = corev1.PodPending + ExpectApplied(ctx, env.Client, p) + + ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) + fakeClock.Step(1 * time.Hour) + _, found := FindMetricWithLabelValues("karpenter_pods_provisioning_scheduling_undecided_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) + + // Expect the metric to exist now that we've ack'd the pod + cluster.AckPods(p) + ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) + fakeClock.Step(1 * time.Hour) + + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_scheduling_undecided_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) + + ExpectDeleted(ctx, env.Client, p) + ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) + + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_scheduling_undecided_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) + }) + It("should delete pod unbound and unstarted time metrics on pod delete", func() { + p := test.Pod() + p.Status.Phase = corev1.PodPending + ExpectApplied(ctx, env.Client, p) + + cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, p) + ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) + + _, found := FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{ "name": p.GetName(), "namespace": p.GetNamespace(), }) @@ -192,10 +328,20 @@ var _ = Describe("Pod Metrics", func() { "namespace": p.GetNamespace(), }) Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unbound_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unstarted_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeTrue()) ExpectDeleted(ctx, env.Client, p) ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) - _, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{ + _, found = FindMetricWithLabelValues("karpenter_pods_unbound_time_seconds", map[string]string{ "name": p.GetName(), "namespace": p.GetNamespace(), }) @@ -205,6 +351,16 @@ var _ = Describe("Pod Metrics", func() { "namespace": p.GetNamespace(), }) Expect(found).To(BeFalse()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unbound_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) + _, found = FindMetricWithLabelValues("karpenter_pods_provisioning_unstarted_time_seconds", map[string]string{ + "name": p.GetName(), + "namespace": p.GetNamespace(), + }) + Expect(found).To(BeFalse()) }) It("should delete the pod state metric on pod delete", func() { p := test.Pod() diff --git a/pkg/controllers/nodeclaim/disruption/drift_test.go b/pkg/controllers/nodeclaim/disruption/drift_test.go index e33f940466..0137607ce3 100644 --- a/pkg/controllers/nodeclaim/disruption/drift_test.go +++ b/pkg/controllers/nodeclaim/disruption/drift_test.go @@ -56,6 +56,7 @@ var _ = Describe("Drift", func() { }) // NodeClaims are required to be launched before they can be evaluated for drift nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeLaunched) + Expect(nodeClaim.StatusConditions().Clear(v1.ConditionTypeDrifted)).To(Succeed()) }) It("should detect drift", func() { cp.Drifted = "drifted" diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 447fb1ab9b..093a562e87 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -324,6 +324,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { if err != nil { return scheduler.Results{}, err } + // Get pods from nodes that are preparing for deletion // We do this after getting the pending pods so that we undershoot if pods are // actively migrating from a node that is being deleted @@ -345,11 +346,15 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { } return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err) } + // ACK the pending pods at the start of the scheduling loop so that we can emit metrics on when we actually first try to schedule it. + p.cluster.AckPods(pendingPods...) results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes) scheduler.UnschedulablePodsCount.Set(float64(len(results.PodErrors)), map[string]string{scheduler.ControllerLabel: injection.GetControllerName(ctx)}) if len(results.NewNodeClaims) > 0 { log.FromContext(ctx).WithValues("Pods", pretty.Slice(lo.Map(pods, func(p *corev1.Pod, _ int) string { return klog.KRef(p.Namespace, p.Name).String() }), 5), "duration", time.Since(start)).Info("found provisionable pod(s)") } + // Mark in memory when these pods were marked as schedulable or when we made a decision on the pods + p.cluster.MarkPodSchedulingDecisions(results.PodErrors, pendingPods...) results.Record(ctx, p.recorder, p.cluster) return results, nil } diff --git a/pkg/controllers/provisioning/scheduling/suite_test.go b/pkg/controllers/provisioning/scheduling/suite_test.go index f548ed6ecf..1f0b19037f 100644 --- a/pkg/controllers/provisioning/scheduling/suite_test.go +++ b/pkg/controllers/provisioning/scheduling/suite_test.go @@ -3643,7 +3643,7 @@ var _ = Context("Scheduling", func() { Describe("Metrics", func() { It("should surface the queueDepth metric while executing the scheduling loop", func() { - nodePool := test.NodePool() + nodePool = test.NodePool() ExpectApplied(ctx, env.Client, nodePool) // all of these pods have anti-affinity to each other labels := map[string]string{ @@ -3678,7 +3678,7 @@ var _ = Context("Scheduling", func() { wg.Wait() }) It("should surface the UnschedulablePodsCount metric while executing the scheduling loop", func() { - nodePool := test.NodePool(v1.NodePool{ + nodePool = test.NodePool(v1.NodePool{ Spec: v1.NodePoolSpec{ Template: v1.NodeClaimTemplate{ Spec: v1.NodeClaimTemplateSpec{ @@ -3698,7 +3698,7 @@ var _ = Context("Scheduling", func() { }, }) ExpectApplied(ctx, env.Client, nodePool) - //Creates 15 pods, 5 schedulable and 10 unschedulable + // Creates 15 pods, 5 schedulable and 10 unschedulable podsUnschedulable := test.UnschedulablePods(test.PodOptions{NodeSelector: map[string]string{corev1.LabelInstanceTypeStable: "unknown"}}, 10) podsSchedulable := test.UnschedulablePods(test.PodOptions{NodeSelector: map[string]string{corev1.LabelInstanceTypeStable: "default-instance-type"}}, 5) pods := append(podsUnschedulable, podsSchedulable...) @@ -3715,7 +3715,7 @@ var _ = Context("Scheduling", func() { Expect(err).To(BeNil()) }) It("should surface the schedulingDuration metric after executing a scheduling loop", func() { - nodePool := test.NodePool() + nodePool = test.NodePool() ExpectApplied(ctx, env.Client, nodePool) // all of these pods have anti-affinity to each other labels := map[string]string{ @@ -3742,6 +3742,31 @@ var _ = Context("Scheduling", func() { _, ok = lo.Find(m.Histogram.Bucket, func(b *io_prometheus_client.Bucket) bool { return lo.FromPtr(b.CumulativeCount) > 0 }) Expect(ok).To(BeTrue()) }) + It("should set the PodSchedulerDecisionSeconds metric after a scheduling loop", func() { + // Find the starting point since the metric is shared across test suites + m, _ := FindMetricWithLabelValues("karpenter_pods_scheduling_decision_duration_seconds", nil) + val := uint64(0) + if m != nil { + val = lo.FromPtr(m.Histogram.SampleCount) + } + + nodePool = test.NodePool() + ExpectApplied(ctx, env.Client, nodePool) + podsUnschedulable := test.UnschedulablePods(test.PodOptions{}, 3) + for _, p := range podsUnschedulable { + ExpectApplied(ctx, env.Client, p) + cluster.AckPods(p) + } + + // step the clock so the metric isn't 0 + fakeClock.Step(1 * time.Minute) + _, err := prov.Schedule(ctx) + Expect(err).To(BeNil()) + + m, ok := FindMetricWithLabelValues("karpenter_pods_scheduling_decision_duration_seconds", nil) + Expect(ok).To(BeTrue()) + Expect(lo.FromPtr(m.Histogram.SampleCount)).To(BeNumerically("==", val+3)) + }) }) }) diff --git a/pkg/controllers/state/cluster.go b/pkg/controllers/state/cluster.go index 28b51ea3fb..e008d18778 100644 --- a/pkg/controllers/state/cluster.go +++ b/pkg/controllers/state/cluster.go @@ -53,6 +53,10 @@ type Cluster struct { nodeClaimNameToProviderID map[string]string // node claim name -> provider id daemonSetPods sync.Map // daemonSet -> existing pod + podAcks sync.Map // pod namespaced name -> time when Karpenter first saw the pod as pending + podsSchedulingAttempted sync.Map // pod namespaced name -> time when Karpenter tried to schedule a pod + podsSchedulableTimes sync.Map // pod namespaced name -> time when it was first marked as able to fit to a node + clusterStateMu sync.RWMutex // Separate mutex as this is called in some places that mu is held // A monotonically increasing timestamp representing the time state of the // cluster with respect to consolidation. This increases when something has @@ -73,6 +77,9 @@ func NewCluster(clk clock.Clock, client client.Client) *Cluster { daemonSetPods: sync.Map{}, nodeNameToProviderID: map[string]string{}, nodeClaimNameToProviderID: map[string]string{}, + podAcks: sync.Map{}, + podsSchedulableTimes: sync.Map{}, + podsSchedulingAttempted: sync.Map{}, } } @@ -307,15 +314,79 @@ func (c *Cluster) UpdatePod(ctx context.Context, pod *corev1.Pod) error { return err } +// AckPods marks the pod as acknowledged for scheduling from the provisioner. This is only done once per-pod. +func (c *Cluster) AckPods(pods ...*corev1.Pod) { + now := c.clock.Now() + for _, pod := range pods { + // store the value as now only if it doesn't exist. + c.podAcks.LoadOrStore(types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}, now) + } +} + +// PodAckTime will return the time the pod was first seen in our cache. +func (c *Cluster) PodAckTime(podKey types.NamespacedName) time.Time { + if ackTime, ok := c.podAcks.Load(podKey); ok { + return ackTime.(time.Time) + } + return time.Time{} +} + +// MarkPodSchedulingDecisions keeps track of when we first tried to schedule a pod to a node. +// This also marks when the pod is first seen as schedulable for pod metrics. +// We'll only emit a metric for a pod if we haven't done it before. +func (c *Cluster) MarkPodSchedulingDecisions(podErrors map[*corev1.Pod]error, pods ...*corev1.Pod) { + now := c.clock.Now() + for _, p := range pods { + nn := client.ObjectKeyFromObject(p) + // If there's no error for the pod, then we mark it as schedulable + if err, ok := podErrors[p]; !ok || err == nil { + c.podsSchedulableTimes.LoadOrStore(nn, now) + } + _, alreadyExists := c.podsSchedulingAttempted.LoadOrStore(nn, now) + // If we already attempted this, we don't need to emit another metric. + if !alreadyExists { + // We should have ACK'd the pod. + if ackTime := c.PodAckTime(nn); !ackTime.IsZero() { + PodSchedulingDecisionSeconds.Observe(c.clock.Since(ackTime).Seconds(), nil) + } + } + } +} + +// PodSchedulingDecisionTime returns when Karpenter first decided if a pod could schedule a pod in scheduling simulations. +// This returns 0, false if Karpenter never made a decision on the pod. +func (c *Cluster) PodSchedulingDecisionTime(podKey types.NamespacedName) time.Time { + if val, found := c.podsSchedulingAttempted.Load(podKey); found { + return val.(time.Time) + } + return time.Time{} +} + +// PodSchedulingSuccessTime returns when Karpenter first thought it could schedule a pod in its scheduling simulation. +// This returns 0, false if the pod was never considered in scheduling as a pending pod. +func (c *Cluster) PodSchedulingSuccessTime(podKey types.NamespacedName) time.Time { + if val, found := c.podsSchedulableTimes.Load(podKey); found { + return val.(time.Time) + } + return time.Time{} +} + func (c *Cluster) DeletePod(podKey types.NamespacedName) { c.mu.Lock() defer c.mu.Unlock() c.antiAffinityPods.Delete(podKey) c.updateNodeUsageFromPodCompletion(podKey) + c.ClearPodSchedulingMappings(podKey) c.MarkUnconsolidated() } +func (c *Cluster) ClearPodSchedulingMappings(podKey types.NamespacedName) { + c.podAcks.Delete(podKey) + c.podsSchedulableTimes.Delete(podKey) + c.podsSchedulingAttempted.Delete(podKey) +} + // MarkUnconsolidated marks the cluster state as being unconsolidated. This should be called in any situation where // something in the cluster has changed such that the cluster may have moved from a non-consolidatable to a consolidatable // state. diff --git a/pkg/controllers/state/metrics.go b/pkg/controllers/state/metrics.go index 172378b601..1c09d52654 100644 --- a/pkg/controllers/state/metrics.go +++ b/pkg/controllers/state/metrics.go @@ -39,7 +39,6 @@ var ( }, []string{}, ) - ClusterStateSynced = opmetrics.NewPrometheusGauge( crmetrics.Registry, prometheus.GaugeOpts{ @@ -60,4 +59,14 @@ var ( }, []string{}, ) + PodSchedulingDecisionSeconds = opmetrics.NewPrometheusHistogram( + crmetrics.Registry, + prometheus.HistogramOpts{ + Namespace: metrics.Namespace, + Subsystem: metrics.PodSubsystem, + Name: "scheduling_decision_duration_seconds", + Help: "The time it takes for Karpenter to first try to schedule a pod after it's been seen.", + }, + []string{}, + ) ) diff --git a/pkg/controllers/state/suite_test.go b/pkg/controllers/state/suite_test.go index 7e3612441b..391e001607 100644 --- a/pkg/controllers/state/suite_test.go +++ b/pkg/controllers/state/suite_test.go @@ -99,6 +99,24 @@ var _ = AfterEach(func() { cloudProvider.Reset() }) +var _ = Describe("Pod Ack", func() { + It("should only mark pods as schedulable once", func() { + pod := test.Pod() + ExpectApplied(ctx, env.Client, pod) + nn := client.ObjectKeyFromObject(pod) + + setTime := cluster.PodSchedulingSuccessTime(nn) + Expect(setTime.IsZero()).To(BeTrue()) + + cluster.MarkPodSchedulingDecisions(map[*corev1.Pod]error{}, pod) + setTime = cluster.PodSchedulingSuccessTime(nn) + Expect(setTime.IsZero()).To(BeFalse()) + + newTime := cluster.PodSchedulingSuccessTime(nn) + Expect(newTime.Compare(setTime)).To(Equal(0)) + }) +}) + var _ = Describe("Volume Usage/Limits", func() { var nodeClaim *v1.NodeClaim var node *corev1.Node