Skip to content

Commit

Permalink
chore: Additional upstream metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 committed Sep 20, 2024
1 parent e410762 commit ac04b26
Show file tree
Hide file tree
Showing 15 changed files with 270 additions and 11 deletions.
90 changes: 83 additions & 7 deletions pkg/controllers/metrics/pod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
Expand Down Expand Up @@ -71,18 +72,47 @@ var (
Objectives: metrics.SummaryObjectives(),
},
)
podBoundDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "bound_duration_seconds",
Help: "The time from pod creation until the pod is bound.",
Buckets: metrics.DurationBuckets(),
},
labelNames(),
)
podCurrentUnboundTimeSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "current_unbound_time_seconds",
Help: "The time from pod creation until the pod is bound.",
},
[]string{podName, podNameSpace},
)
podUnstartedTimeSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "unstarted_time_seconds",
Help: "The time from pod creation until the pod is running.",
},
[]string{podName, podNameSpace},
)
)

// Controller for the resource
type Controller struct {
kubeClient client.Client
metricStore *metrics.Store

pendingPods sets.Set[string]
pendingPods sets.Set[string]
unscheduledPods sets.Set[string]
}

func init() {
crmetrics.Registry.MustRegister(podState, podStartupDurationSeconds)
crmetrics.Registry.MustRegister(podState, podStartupDurationSeconds, podBoundDurationSeconds, podCurrentUnboundTimeSeconds, podUnstartedTimeSeconds)
}

func labelNames() []string {
Expand All @@ -103,9 +133,10 @@ func labelNames() []string {
// NewController constructs a podController instance
func NewController(kubeClient client.Client) *Controller {
return &Controller{
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
pendingPods: sets.New[string](),
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
pendingPods: sets.New[string](),
unscheduledPods: sets.New[string](),
}
}

Expand All @@ -117,6 +148,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
if err := c.kubeClient.Get(ctx, req.NamespacedName, pod); err != nil {
if errors.IsNotFound(err) {
c.pendingPods.Delete(req.NamespacedName.String())
c.unscheduledPods.Delete(req.NamespacedName.String())
c.metricStore.Delete(req.NamespacedName.String())
}
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -133,21 +165,65 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
},
})
c.recordPodStartupMetric(pod)
c.recordPodBoundMetric(pod, labels)
return reconcile.Result{}, nil
}

func (c *Controller) recordPodStartupMetric(pod *corev1.Pod) {
key := client.ObjectKeyFromObject(pod).String()
if pod.Status.Phase == phasePending {
podUnstartedTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Insert(key)
return
}
cond, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodReady
})
if c.pendingPods.Has(key) && ok {
podStartupDurationSeconds.Observe(cond.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Delete(key)
if cond.Status != corev1.ConditionTrue {
podUnstartedTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
} else {
// Delete the unstarted metric since the pod is now started
podUnstartedTimeSeconds.Delete(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
})
podStartupDurationSeconds.Observe(cond.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Delete(key)
}
}
}
func (c *Controller) recordPodBoundMetric(pod *corev1.Pod, labels prometheus.Labels) {
key := client.ObjectKeyFromObject(pod).String()
condScheduled, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodScheduled
})
if pod.Status.Phase == phasePending {
// The podsScheduled condition may be True or Unknown while a pod is in the pending state. Only when the pod is not bound,
// as shown by the PodScheduled condition not being set to true, do we wish to emit the pod_current_unbound_time_seconds metric.
if ok && condScheduled.Status != corev1.ConditionTrue {
podCurrentUnboundTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
}
c.unscheduledPods.Insert(key)
return
}
if c.unscheduledPods.Has(key) && ok {
// Delete the unbound metric since the pod is now bound
podCurrentUnboundTimeSeconds.Delete(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
})
podBoundDurationSeconds.With(labels).Observe(condScheduled.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.unscheduledPods.Delete(key)
}
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/controllers/metrics/pod/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -84,6 +86,81 @@ var _ = Describe("Pod Metrics", func() {
})
Expect(found).To(BeTrue())
})
It("should update the pod bound and unbound time metrics", func() {
p := test.Pod()
p.Status.Phase = corev1.PodPending
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
metric, found := FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
unboundTime := metric.GetGauge().Value

// Pod is still pending but has bound. At this step pods_unbound_duration should not change.
p.Status.Phase = corev1.PodPending
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
metric, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
Expect(metric.GetGauge().Value).To(Equal(unboundTime))

// Pod is still running and has bound. At this step pods_bound_duration should be fired and pods_current_unbound_time_seconds should be deleted
p.Status.Phase = corev1.PodRunning
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeFalse())
_, found = FindMetricWithLabelValues("karpenter_pods_bound_duration_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
})
It("should update the pod startup and unstarted time metrics", func() {
p := test.Pod()
p.Status.Phase = corev1.PodPending
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
_, found := FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())

// Pod is now running but not ready
p.Status.Phase = corev1.PodRunning
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())

// Pod is now running and ready. At this step pods_startup_duration should be fired and pods_unstarted_time should be deleted
p.Status.Phase = corev1.PodRunning
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeFalse())
_, found = FindMetricWithLabelValues("karpenter_pods_startup_duration_seconds", nil)
Expect(found).To(BeTrue())
})
It("should delete the pod state metric on pod delete", func() {
p := test.Pod()
ExpectApplied(ctx, env.Client, p)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.With(prometheus.Labels{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
}).Inc()
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
Expand Down
12 changes: 11 additions & 1 deletion pkg/controllers/node/termination/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
func init() {
crmetrics.Registry.MustRegister(
TerminationDurationSeconds,
NodeLifetimeDurationSeconds)
NodeLifetimeDurationSeconds,
NodesDrainedTotal)
}

const dayDuration = time.Hour * 24
Expand All @@ -44,6 +45,15 @@ var (
},
[]string{metrics.NodePoolLabel},
)
NodesDrainedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "drained_total",
Help: "The total number of nodes drained by Karpenter",
},
[]string{metrics.NodePoolLabel},
)
NodeLifetimeDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var _ = Describe("Termination", func() {
metrics.NodesTerminatedTotal.Reset()
termination.TerminationDurationSeconds.Reset()
termination.NodeLifetimeDurationSeconds.Reset()
termination.NodesDrainedTotal.Reset()
})

Context("Reconciliation", func() {
Expand Down Expand Up @@ -841,6 +842,7 @@ var _ = Describe("Termination", func() {
node = ExpectNodeExists(ctx, env.Client, node.Name)
// Reconcile twice, once to set the NodeClaim to terminating, another to check the instance termination status (and delete the node).
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectMetricCounterValue(termination.NodesDrainedTotal, 1, map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
ExpectObjectReconciled(ctx, env.Client, terminationController, node)

m, ok := FindMetricWithLabelValues("karpenter_nodes_terminated_total", map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"sigs.k8s.io/karpenter/pkg/metrics"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -180,6 +182,10 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
},
},
}); err != nil {
if status, ok := err.(apierrors.APIStatus); ok || errors.As(err, &status) {
code := status.Status().Code
NodesEvictionRequestsTotal.With(map[string]string{metrics.CodeLabel: fmt.Sprintf("%d", code)}).Inc()
}
// status codes for the eviction API are defined here:
// https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
Expand All @@ -199,6 +205,7 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
log.FromContext(ctx).Error(err, "failed evicting pod")
return false
}
NodesEvictionRequestsTotal.With(map[string]string{metrics.CodeLabel: "200"}).Inc()
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
return true
}
38 changes: 38 additions & 0 deletions pkg/controllers/node/termination/terminator/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terminator

import (
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

"sigs.k8s.io/karpenter/pkg/metrics"
)

var NodesEvictionRequestsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "eviction_requests_total",
Help: "The total number of eviction requests made by Karpenter",
},
[]string{metrics.CodeLabel},
)

func init() {
crmetrics.Registry.MustRegister(NodesEvictionRequestsTotal)
}
6 changes: 6 additions & 0 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"sigs.k8s.io/karpenter/pkg/metrics"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
Expand Down Expand Up @@ -92,6 +94,7 @@ var _ = Describe("Eviction/Queue", func() {
Labels: testLabels,
},
})
terminator.NodesEvictionRequestsTotal.Reset()
})

Context("Eviction API", func() {
Expand All @@ -102,11 +105,13 @@ var _ = Describe("Eviction/Queue", func() {
It("should succeed with no event when the pod UID conflicts", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue())
ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{metrics.CodeLabel: "409"})
Expect(recorder.Events()).To(HaveLen(0))
})
It("should succeed with an evicted event when there are no PDBs", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue())
ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{metrics.CodeLabel: "200"})
Expect(recorder.Calls("Evicted")).To(Equal(1))
})
It("should succeed with no event when there are PDBs that allow an eviction", func() {
Expand All @@ -130,6 +135,7 @@ var _ = Describe("Eviction/Queue", func() {
})
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
ExpectMetricCounterValue(terminator.NodesEvictionRequestsTotal, 1, map[string]string{metrics.CodeLabel: "500"})
})
It("should ensure that calling Evict() is valid while making Add() calls", func() {
cancelCtx, cancel := context.WithCancel(ctx)
Expand Down
Loading

0 comments on commit ac04b26

Please sign in to comment.