Skip to content

Commit

Permalink
Metric instrumentation framework
Browse files Browse the repository at this point in the history
Signed-off-by: Eddie Torres <[email protected]>
  • Loading branch information
torredil committed Oct 9, 2023
1 parent 27ad245 commit ae2798b
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 94 deletions.
16 changes: 3 additions & 13 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ package main

import (
"context"
"net/http"
"time"

flag "github.com/spf13/pflag"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
logsapi "k8s.io/component-base/logs/api/v1"
json "k8s.io/component-base/logs/json"
"k8s.io/component-base/metrics/legacyregistry"

"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -58,17 +56,9 @@ func main() {
}()
}

cloud.RegisterMetrics()
if options.ServerOptions.HttpEndpoint != "" {
mux := http.NewServeMux()
mux.Handle("/metrics", legacyregistry.HandlerWithReset())
go func() {
err := http.ListenAndServe(options.ServerOptions.HttpEndpoint, mux)
if err != nil {
klog.ErrorS(err, "failed to listen & serve metrics", "endpoint", options.ServerOptions.HttpEndpoint)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
r := metrics.NewRecorder()

Check failure on line 60 in cmd/main.go

View workflow job for this annotation

GitHub Actions / buildx (ubuntu-latest)

undefined: metrics.NewRecorder

Check failure on line 60 in cmd/main.go

View workflow job for this annotation

GitHub Actions / Generate PR Coverage

undefined: metrics.NewRecorder
r.InitializeMetricsHandler(options.ServerOptions.HttpEndpoint, "/metrics")
}

drv, err := driver.NewDriver(
Expand Down
75 changes: 0 additions & 75 deletions pkg/cloud/aws_metrics.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ var _ Cloud = &cloud{}
// NewCloud returns a new instance of AWS cloud
// It panics if session is invalid
func NewCloud(region string, awsSdkDebugLog bool, userAgentExtra string) (Cloud, error) {
RegisterMetrics()
return newEC2Cloud(region, awsSdkDebugLog, userAgentExtra)
}

Expand Down
23 changes: 18 additions & 5 deletions pkg/cloud/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,32 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws/request"

"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/metrics"
"k8s.io/klog/v2"
)

// RecordRequestsComplete is added to the Complete chain; called after any request
// RecordRequestsHandler is added to the Complete chain; called after any request
func RecordRequestsHandler(r *request.Request) {
recordAWSMetric(operationName(r), time.Since(r.Time).Seconds(), r.Error)
labels := map[string]string{
"request": operationName(r),
}

if r.Error != nil {
metrics.Recorder().IncreaseCount("cloudprovider_aws_api_request_errors", labels)
} else {
duration := time.Since(r.Time).Seconds()
metrics.Recorder().ObserveHistogram("cloudprovider_aws_api_request_duration_seconds", duration, labels, nil)
}
}

// RecordThrottlesAfterRetry is added to the AfterRetry chain; called after any error
// RecordThrottledRequestsHandler is added to the AfterRetry chain; called after any error
func RecordThrottledRequestsHandler(r *request.Request) {
labels := map[string]string{
"operation_name": operationName(r),
}

if r.IsErrorThrottle() {
recordAWSThrottlesMetric(operationName(r))
metrics.Recorder().IncreaseCount("cloudprovider_aws_api_throttled_requests_total", labels)
klog.InfoS("Got RequestLimitExceeded error on AWS request", "request", describeRequest(r))
}
}
Expand Down
149 changes: 149 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package metrics

import (
"net/http"
"sync"
"time"

"k8s.io/component-base/metrics"
"k8s.io/klog/v2"
)

var (
r *metricRecorder // singleton instance of metricRecorder
once sync.Once
)

type metricRecorder struct {
registry metrics.KubeRegistry
metrics map[string]interface{}
}

// Recorder returns the singleton instance of metricRecorder.
// nil is returned if the recorder is not initialized.
func Recorder() *metricRecorder {
return r
}

// InitializeRecorder initializes a new metricRecorder instance if it hasn't been initialized.
func InitializeRecorder() *metricRecorder {
once.Do(func() {
r = &metricRecorder{
registry: metrics.NewKubeRegistry(),
metrics: make(map[string]interface{}),
}
})
return r
}

// IncreaseCount increases the counter metric by 1.
func (m *metricRecorder) IncreaseCount(name string, labels map[string]string) {
if m == nil {
return // recorder is not initialized
}

metric, ok := m.metrics[name]

if !ok {
klog.V(4).InfoS("Metric not found, registering", "name", name, "labels", labels)
m.registerCounterVec(name, "ebs_csi_aws_com metric", getLabelNames(labels))
m.IncreaseCount(name, labels)
return
}

metric.(*metrics.CounterVec).With(metrics.Labels(labels)).Inc()
}

// ObserveHistogram records the given value in the histogram metric.
func (m *metricRecorder) ObserveHistogram(name string, value float64, labels map[string]string, buckets []float64) {
if m == nil {
return // recorder is not initialized
}
metric, ok := m.metrics[name]

if !ok {
klog.V(4).InfoS("Metric not found, registering", "name", name, "labels", labels, "buckets", buckets)
m.registerHistogramVec(name, "ebs_csi_aws_com metric", getLabelNames(labels), buckets)
m.ObserveHistogram(name, value, labels, buckets)
return
}

metric.(*metrics.HistogramVec).With(metrics.Labels(labels)).Observe(value)
}

// InitializeMetricsHandler starts a new HTTP server to expose the metrics.
func (m *metricRecorder) InitializeMetricsHandler(address, path string) {
if m == nil {
klog.InfoS("InitializeMetricsHandler: metric recorder is not initialized")
return
}

mux := http.NewServeMux()
mux.Handle(path, metrics.HandlerFor(
m.registry,
metrics.HandlerOpts{
ErrorHandling: metrics.ContinueOnError,
}))

server := &http.Server{
Addr: address,
Handler: mux,
ReadTimeout: 3 * time.Second,
}

go func() {
klog.InfoS("Metric server listening", "address", address, "path", path)

if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
klog.ErrorS(err, "Failed to start metric server", "address", address, "path", path)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}()
}

func (m *metricRecorder) registerHistogramVec(name, help string, labels []string, buckets []float64) {
if _, exists := m.metrics[name]; exists {
return
}
histogram := createHistogramVec(name, help, labels, buckets)
m.metrics[name] = histogram
m.registry.MustRegister(histogram)
}

func (m *metricRecorder) registerCounterVec(name, help string, labels []string) {
if _, exists := m.metrics[name]; exists {
return
}
counter := createCounterVec(name, help, labels)
m.metrics[name] = counter
m.registry.MustRegister(counter)
}

func createHistogramVec(name, help string, labels []string, buckets []float64) *metrics.HistogramVec {
opts := &metrics.HistogramOpts{
Name: name,
Help: help,
StabilityLevel: metrics.ALPHA,
Buckets: buckets,
}
return metrics.NewHistogramVec(opts, labels)
}

func createCounterVec(name, help string, labels []string) *metrics.CounterVec {
return metrics.NewCounterVec(
&metrics.CounterOpts{
Name: name,
Help: help,
StabilityLevel: metrics.ALPHA,
},
labels,
)
}

func getLabelNames(labels map[string]string) []string {
names := make([]string, 0, len(labels))
for n := range labels {
names = append(names, n)
}
return names
}
Loading

0 comments on commit ae2798b

Please sign in to comment.