Skip to content

Commit

Permalink
chore: Additional upstream metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jigisha620 committed Sep 19, 2024
1 parent e410762 commit 179a217
Show file tree
Hide file tree
Showing 14 changed files with 236 additions and 15 deletions.
87 changes: 77 additions & 10 deletions pkg/controllers/metrics/pod/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
Expand Down Expand Up @@ -71,18 +72,47 @@ var (
Objectives: metrics.SummaryObjectives(),
},
)
podBoundDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "bound_duration_seconds",
Help: "The time from pod creation until the pod is bound.",
Buckets: metrics.DurationBuckets(),
},
labelNames(),
)
podCurrentUnboundTimeSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "current_unbound_time_seconds",
Help: "The time from pod creation until the pod is bound.",
},
[]string{podName, podNameSpace},
)
podUnstartedTimeSeconds = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "karpenter",
Subsystem: metrics.PodSubsystem,
Name: "unstarted_time_seconds",
Help: "The time from pod creation until the pod is running.",
},
[]string{podName, podNameSpace},
)
)

// Controller for the resource
type Controller struct {
kubeClient client.Client
metricStore *metrics.Store

pendingPods sets.Set[string]
pendingPods sets.Set[string]
unscheduledPods sets.Set[string]
}

func init() {
crmetrics.Registry.MustRegister(podState, podStartupDurationSeconds)
crmetrics.Registry.MustRegister(podState, podStartupDurationSeconds, podBoundDurationSeconds, podCurrentUnboundTimeSeconds, podUnstartedTimeSeconds)
}

func labelNames() []string {
Expand All @@ -103,9 +133,10 @@ func labelNames() []string {
// NewController constructs a podController instance
func NewController(kubeClient client.Client) *Controller {
return &Controller{
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
pendingPods: sets.New[string](),
kubeClient: kubeClient,
metricStore: metrics.NewStore(),
pendingPods: sets.New[string](),
unscheduledPods: sets.New[string](),
}
}

Expand All @@ -117,6 +148,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
if err := c.kubeClient.Get(ctx, req.NamespacedName, pod); err != nil {
if errors.IsNotFound(err) {
c.pendingPods.Delete(req.NamespacedName.String())
c.unscheduledPods.Delete(req.NamespacedName.String())
c.metricStore.Delete(req.NamespacedName.String())
}
return reconcile.Result{}, client.IgnoreNotFound(err)
Expand All @@ -132,22 +164,57 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
Labels: labels,
},
})
c.recordPodStartupMetric(pod)
c.recordPodStartupMetric(pod, labels)
return reconcile.Result{}, nil
}

func (c *Controller) recordPodStartupMetric(pod *corev1.Pod) {
func (c *Controller) recordPodStartupMetric(pod *corev1.Pod, labels prometheus.Labels) {
key := client.ObjectKeyFromObject(pod).String()
condScheduled, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodScheduled
})
if pod.Status.Phase == phasePending {
if ok && condScheduled.Status != corev1.ConditionTrue {
podCurrentUnboundTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
}
podUnstartedTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Insert(key)
c.unscheduledPods.Insert(key)
return
}
cond, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
if c.unscheduledPods.Has(key) && ok && condScheduled.Status == corev1.ConditionTrue {
// Delete the unbound metric since the pod is now bound
podCurrentUnboundTimeSeconds.Delete(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
})
podBoundDurationSeconds.With(labels).Observe(condScheduled.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.unscheduledPods.Delete(key)
}
condReady, ok := lo.Find(pod.Status.Conditions, func(c corev1.PodCondition) bool {
return c.Type == corev1.PodReady
})
if c.pendingPods.Has(key) && ok {
podStartupDurationSeconds.Observe(cond.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Delete(key)
if condReady.Status != corev1.ConditionTrue {
podUnstartedTimeSeconds.With(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
}).Set(time.Since(pod.CreationTimestamp.Time).Seconds())
} else {
// Delete the unstarted metric since the pod is now started
podUnstartedTimeSeconds.Delete(map[string]string{
podName: pod.Name,
podNameSpace: pod.Namespace,
})
podStartupDurationSeconds.Observe(condReady.LastTransitionTime.Sub(pod.CreationTimestamp.Time).Seconds())
c.pendingPods.Delete(key)
}
}
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/controllers/metrics/pod/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -84,6 +86,81 @@ var _ = Describe("Pod Metrics", func() {
})
Expect(found).To(BeTrue())
})
It("should update the pod bound and unbound time metrics", func() {
p := test.Pod()
p.Status.Phase = corev1.PodPending
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
metric, found := FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
unboundTime := metric.GetGauge().Value

// Pod is still pending but has bound. At this step pods_unbound_duration should not change.
p.Status.Phase = corev1.PodPending
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodScheduled, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
metric, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
Expect(metric.GetGauge().Value).To(Equal(unboundTime))

// Pod is still running and has bound. At this step pods_bound_duration should be fired and pods_current_unbound_time_seconds should be deleted
p.Status.Phase = corev1.PodRunning
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_current_unbound_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeFalse())
_, found = FindMetricWithLabelValues("karpenter_pods_bound_duration_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())
})
It("should update the pod startup and unstarted time metrics", func() {
p := test.Pod()
p.Status.Phase = corev1.PodPending
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will add pod to pending pods and unscheduled pods set
_, found := FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())

// Pod is now running but not ready
p.Status.Phase = corev1.PodRunning
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionUnknown, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeTrue())

// Pod is now running and ready. At this step pods_startup_duration should be fired and pods_unstarted_time should be deleted
p.Status.Phase = corev1.PodRunning
p.Status.Conditions = []corev1.PodCondition{{Type: corev1.PodReady, Status: corev1.ConditionTrue, LastTransitionTime: metav1.Now()}}
ExpectApplied(ctx, env.Client, p)
ExpectReconcileSucceeded(ctx, podController, client.ObjectKeyFromObject(p)) //This will check if the pod was scheduled or not
_, found = FindMetricWithLabelValues("karpenter_pods_unstarted_time_seconds", map[string]string{
"name": p.GetName(),
"namespace": p.GetNamespace(),
})
Expect(found).To(BeFalse())
_, found = FindMetricWithLabelValues("karpenter_pods_startup_duration_seconds", nil)
Expect(found).To(BeTrue())
})
It("should delete the pod state metric on pod delete", func() {
p := test.Pod()
ExpectApplied(ctx, env.Client, p)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.With(prometheus.Labels{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
}).Inc()
// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
Expand Down
12 changes: 11 additions & 1 deletion pkg/controllers/node/termination/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (
func init() {
crmetrics.Registry.MustRegister(
TerminationDurationSeconds,
NodeLifetimeDurationSeconds)
NodeLifetimeDurationSeconds,
NodesDrainedTotal)
}

const dayDuration = time.Hour * 24
Expand All @@ -44,6 +45,15 @@ var (
},
[]string{metrics.NodePoolLabel},
)
NodesDrainedTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodeSubsystem,
Name: "drained_total",
Help: "The total number of nodes drained by Karpenter",
},
[]string{metrics.NodePoolLabel},
)
NodeLifetimeDurationSeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/node/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var _ = Describe("Termination", func() {
metrics.NodesTerminatedTotal.Reset()
termination.TerminationDurationSeconds.Reset()
termination.NodeLifetimeDurationSeconds.Reset()
termination.NodesDrainedTotal.Reset()
})

Context("Reconciliation", func() {
Expand Down Expand Up @@ -841,6 +842,7 @@ var _ = Describe("Termination", func() {
node = ExpectNodeExists(ctx, env.Client, node.Name)
// Reconcile twice, once to set the NodeClaim to terminating, another to check the instance termination status (and delete the node).
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectMetricCounterValue(termination.NodesDrainedTotal, 1, map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
ExpectObjectReconciled(ctx, env.Client, terminationController, node)

m, ok := FindMetricWithLabelValues("karpenter_nodes_terminated_total", map[string]string{"nodepool": node.Labels[v1.NodePoolLabelKey]})
Expand Down
10 changes: 9 additions & 1 deletion pkg/controllers/node/termination/terminator/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"sigs.k8s.io/karpenter/pkg/metrics"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -182,11 +184,15 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
}); err != nil {
// status codes for the eviction API are defined here:
// https://kubernetes.io/docs/concepts/scheduling-eviction/api-eviction/#how-api-initiated-eviction-works
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
if apierrors.IsNotFound(err) {
// 404 - The pod no longer exists
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L160
return true
}
if apierrors.IsConflict(err) {
// 409 - The pod exists, but it is not the same pod that we initiated the eviction on
// https://github.com/kubernetes/kubernetes/blob/ad19beaa83363de89a7772f4d5af393b85ce5e61/pkg/registry/core/pod/storage/eviction.go#L318
metrics.NodesEvictionRequestsTotal.With(map[string]string{metrics.ResponseLabel: "conflict"}).Inc()
return true
}
if apierrors.IsTooManyRequests(err) { // 429 - PDB violation
Expand All @@ -196,9 +202,11 @@ func (q *Queue) Evict(ctx context.Context, key QueueKey) bool {
}}, fmt.Errorf("evicting pod %s/%s violates a PDB", key.Namespace, key.Name)))
return false
}
metrics.NodesEvictionRequestsTotal.With(map[string]string{metrics.ResponseLabel: "failure"}).Inc()
log.FromContext(ctx).Error(err, "failed evicting pod")
return false
}
metrics.NodesEvictionRequestsTotal.With(map[string]string{metrics.ResponseLabel: "success"}).Inc()
q.recorder.Publish(terminatorevents.EvictPod(&corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: key.Name, Namespace: key.Namespace}}))
return true
}
6 changes: 6 additions & 0 deletions pkg/controllers/node/termination/terminator/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"sigs.k8s.io/karpenter/pkg/metrics"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
Expand Down Expand Up @@ -92,6 +94,7 @@ var _ = Describe("Eviction/Queue", func() {
Labels: testLabels,
},
})
metrics.NodesEvictionRequestsTotal.Reset()
})

Context("Eviction API", func() {
Expand All @@ -102,11 +105,13 @@ var _ = Describe("Eviction/Queue", func() {
It("should succeed with no event when the pod UID conflicts", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.QueueKey{NamespacedName: client.ObjectKeyFromObject(pod), UID: uuid.NewUUID()})).To(BeTrue())
ExpectMetricCounterValue(metrics.NodesEvictionRequestsTotal, 1, map[string]string{metrics.ResponseLabel: "conflict"})
Expect(recorder.Events()).To(HaveLen(0))
})
It("should succeed with an evicted event when there are no PDBs", func() {
ExpectApplied(ctx, env.Client, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeTrue())
ExpectMetricCounterValue(metrics.NodesEvictionRequestsTotal, 1, map[string]string{metrics.ResponseLabel: "success"})
Expect(recorder.Calls("Evicted")).To(Equal(1))
})
It("should succeed with no event when there are PDBs that allow an eviction", func() {
Expand All @@ -130,6 +135,7 @@ var _ = Describe("Eviction/Queue", func() {
})
ExpectApplied(ctx, env.Client, pdb, pdb2, pod)
Expect(queue.Evict(ctx, terminator.NewQueueKey(pod))).To(BeFalse())
ExpectMetricCounterValue(metrics.NodesEvictionRequestsTotal, 1, map[string]string{metrics.ResponseLabel: "failure"})
})
It("should ensure that calling Evict() is valid while making Add() calls", func() {
cancelCtx, cancel := context.WithCancel(ctx)
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,14 @@ func (p *Provisioner) GetPendingPods(ctx context.Context) ([]*corev1.Pod, error)
if err != nil {
return nil, fmt.Errorf("listing pods, %w", err)
}
pods = lo.Reject(pods, func(po *corev1.Pod, _ int) bool {
rejectedPods, pods := lo.FilterReject(pods, func(po *corev1.Pod, _ int) bool {
if err := p.Validate(ctx, po); err != nil {
log.FromContext(ctx).WithValues("Pod", klog.KRef(po.Namespace, po.Name)).V(1).Info(fmt.Sprintf("ignoring pod, %s", err))
return true
}
return false
})
metrics.IgnoredPodCount.Set(float64(len(rejectedPods)))
p.consolidationWarnings(ctx, pods)
return pods, nil
}
Expand Down
Loading

0 comments on commit 179a217

Please sign in to comment.