Skip to content

Commit

Permalink
next iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
sbueringer committed Aug 1, 2023
1 parent 73a535d commit dbcc6fb
Show file tree
Hide file tree
Showing 13 changed files with 538 additions and 345 deletions.
6 changes: 3 additions & 3 deletions pkg/builder/builder_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/internal/testing/addr"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

Expand All @@ -57,7 +57,7 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())

// Prevent the metrics listener being created
metrics.DefaultBindAddress = "0"
metricsserver.DefaultBindAddress = "0"

webhook.DefaultPort, _, err = addr.Suggest("")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -67,7 +67,7 @@ var _ = AfterSuite(func() {
Expect(testenv.Stop()).To(Succeed())

// Put the DefaultBindAddress back
metrics.DefaultBindAddress = ":8080"
metricsserver.DefaultBindAddress = ":8080"

// Change the webhook.DefaultPort back to the original default.
webhook.DefaultPort = 9443
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
crscheme "sigs.k8s.io/controller-runtime/pkg/scheme"
)

Expand Down Expand Up @@ -79,12 +79,12 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())

// Prevent the metrics listener being created
metrics.DefaultBindAddress = "0"
metricsserver.DefaultBindAddress = "0"
})

var _ = AfterSuite(func() {
Expect(testenv.Stop()).To(Succeed())

// Put the DefaultBindAddress back
metrics.DefaultBindAddress = ":8080"
metricsserver.DefaultBindAddress = ":8080"
})
83 changes: 7 additions & 76 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
Expand All @@ -44,7 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/internal/httpserver"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)

Expand All @@ -57,7 +56,6 @@ const (

defaultReadinessEndpoint = "/readyz"
defaultLivenessEndpoint = "/healthz"
defaultMetricsEndpoint = "/metrics"
)

var _ Runnable = &controllerManager{}
Expand All @@ -84,16 +82,8 @@ type controllerManager struct {
// on shutdown
leaderElectionReleaseOnCancel bool

// metricsListener is used to serve prometheus metrics
metricsListener net.Listener

// metricsFilter is a func that is added before the metrics handler on the metrics server.
// This can be e.g. used to enforce authentication and authorization on the metrics
// endpoint.
metricsFilter metrics.Filter

// metricsExtraHandlers contains extra handlers to register on http server that serves metrics.
metricsExtraHandlers map[string]http.Handler
// metricsServer is used to serve prometheus metrics
metricsServer metricsserver.Server

// healthProbeListener is used to serve liveness probe
healthProbeListener net.Listener
Expand Down Expand Up @@ -189,28 +179,6 @@ func (cm *controllerManager) add(r Runnable) error {
return cm.runnables.Add(r)
}

// AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics.
func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error {
cm.Lock()
defer cm.Unlock()

if cm.started {
return fmt.Errorf("unable to add new metrics handler because metrics endpoint has already been created")
}

if path == defaultMetricsEndpoint {
return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
}

if _, found := cm.metricsExtraHandlers[path]; found {
return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
}

cm.metricsExtraHandlers[path] = handler
cm.logger.V(2).Info("Registering metrics http server extra handler", "path", path)
return nil
}

// AddHealthzCheck allows you to add Healthz checker.
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
cm.Lock()
Expand Down Expand Up @@ -301,45 +269,6 @@ func (cm *controllerManager) GetControllerOptions() config.Controller {
return cm.controllerConfig
}

func (cm *controllerManager) addMetricsServer() error {
mux := http.NewServeMux()
srv := httpserver.New(mux)

handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
})

log := cm.logger.WithValues("path", defaultMetricsEndpoint)

if cm.metricsFilter != nil {
var err error
handler, err = cm.metricsFilter(log, handler)
if err != nil {
return fmt.Errorf("failed to add metrics server: failed to add metrics filter %w", err)
}
}
// TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics
mux.Handle(defaultMetricsEndpoint, handler)

for path, extraHandler := range cm.metricsExtraHandlers {
if cm.metricsFilter != nil {
var err error
extraHandler, err = cm.metricsFilter(log, extraHandler)
if err != nil {
return fmt.Errorf("failed to add metrics server: failed to add metrics filter to extra handler %w", err)
}
}
mux.Handle(path, extraHandler)
}

return cm.add(&server{
Kind: "metrics",
Log: log,
Server: srv,
Listener: cm.metricsListener,
})
}

func (cm *controllerManager) addHealthProbeServer() error {
mux := http.NewServeMux()
srv := httpserver.New(mux)
Expand Down Expand Up @@ -433,8 +362,10 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
// the pod but will get a connection refused).
if cm.metricsListener != nil {
if err := cm.addMetricsServer(); err != nil {
if cm.metricsServer != nil {
// Note: We are adding the metrics server directly to HTTPServers here as matching on the
// metricsserver.Server interface in cm.runnables.Add would be very brittle.
if err := cm.runnables.HTTPServers.Add(cm.metricsServer, nil); err != nil {
return fmt.Errorf("failed to add metrics server: %w", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/manager/internal/integration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -145,7 +146,7 @@ var _ = Describe("manger.Manager Start", func() {
Scheme: scheme,
HealthProbeBindAddress: ":0",
// Disable metrics to avoid port conflicts.
MetricsBindAddress: "0",
Metrics: metricsserver.Options{BindAddress: "0"},
WebhookServer: webhook.NewServer(webhook.Options{
Port: env.WebhookInstallOptions.LocalServingPort,
Host: env.WebhookInstallOptions.LocalServingHost,
Expand Down
61 changes: 13 additions & 48 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -42,7 +43,6 @@ import (
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/leaderelection"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/recorder"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
Expand All @@ -64,13 +64,6 @@ type Manager interface {
// election was configured.
Elected() <-chan struct{}

// AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics.
// Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be
// sensitive and shouldn't be exposed publicly.
// If the simple path -> handler mapping offered here is not enough, a new http server/listener should be added as
// Runnable to the manager via Add method.
AddMetricsExtraHandler(path string, handler http.Handler) error

// AddHealthzCheck allows you to add Healthz checker
AddHealthzCheck(name string, check healthz.Checker) error

Expand Down Expand Up @@ -218,22 +211,8 @@ type Options struct {
// between tries of actions. Default is 2 seconds.
RetryPeriod *time.Duration

// MetricsBindAddress is the TCP address that the controller should bind to
// for serving prometheus metrics.
// It can be set to "0" to disable the metrics serving.
//
// Per default metrics will be served via http. If MetricsSecureServing is enabled
// metrics will be served via https.
MetricsBindAddress string

// MetricsSecureServing enables serving metrics via https.
MetricsSecureServing bool

// MetricsFilterProvider provides a metrics filter which is a func that is added before
// the metrics handler on the metrics server.
// This can be e.g. used to enforce authentication and authorization on the metrics
// endpoint by setting this field to filters.WithAuthenticationAndAuthorization.
MetricsFilterProvider func(c *rest.Config, httpClient *http.Client) (metrics.Filter, error)
// Metrics are the metrics.Options that will be used to create the metrics.Server.
Metrics metricsserver.Options

// HealthProbeBindAddress is the TCP address that the controller should bind to
// for serving health probes
Expand All @@ -254,8 +233,8 @@ type Options struct {
PprofBindAddress string

// WebhookServer is an externally configured webhook.Server. By default,
// a Manager will create a default server using Port, Host, and CertDir;
// if this is set, the Manager will use this server instead.
// a Manager will create a server via webhook.NewServer with default settings.
// If this is set, the Manager will use this server instead.
WebhookServer webhook.Server

// BaseContext is the function that provides Context values to Runnables
Expand Down Expand Up @@ -290,7 +269,7 @@ type Options struct {
// Dependency injection for testing
newRecorderProvider func(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error)
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
newMetricsListener func(addr string, secureServing bool) (net.Listener, error)
newMetricsServer func(options metricsserver.Options, config *rest.Config, httpClient *http.Client) (metricsserver.Server, error)
newHealthProbeListener func(addr string) (net.Listener, error)
newPprofListener func(addr string) (net.Listener, error)
}
Expand Down Expand Up @@ -386,24 +365,12 @@ func New(config *rest.Config, options Options) (Manager, error) {
}
}

var metricsFilter metrics.Filter
if options.MetricsFilterProvider != nil {
metricsFilter, err = options.MetricsFilterProvider(config, cluster.GetHTTPClient())
if err != nil {
return nil, err
}
}

// Create the metrics listener. This will throw an error if the metrics bind
// address is invalid or already in use.
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress, options.MetricsSecureServing)
// Create the metrics server.
metricsServer, err := options.newMetricsServer(options.Metrics, config, cluster.GetHTTPClient())
if err != nil {
return nil, err
}

// By default we have no extra endpoints to expose on metrics http server.
metricsExtraHandlers := make(map[string]http.Handler)

// Create health probes listener. This will throw an error if the bind
// address is invalid or already in use.
healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
Expand All @@ -428,9 +395,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
errChan: errChan,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsFilter: metricsFilter,
metricsExtraHandlers: metricsExtraHandlers,
metricsServer: metricsServer,
controllerConfig: options.Controller,
logger: options.Logger,
elected: make(chan struct{}),
Expand Down Expand Up @@ -476,8 +441,8 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options,
o.Cache.DefaultNamespaces = map[string]cache.Config{newObj.CacheNamespace: {}}
}

if o.MetricsBindAddress == "" && newObj.Metrics.BindAddress != "" {
o.MetricsBindAddress = newObj.Metrics.BindAddress
if o.Metrics.BindAddress == "" && newObj.Metrics.BindAddress != "" {
o.Metrics.BindAddress = newObj.Metrics.BindAddress
}

if o.HealthProbeBindAddress == "" && newObj.Health.HealthProbeBindAddress != "" {
Expand Down Expand Up @@ -628,8 +593,8 @@ func setOptionsDefaults(options Options) Options {
}
}

if options.newMetricsListener == nil {
options.newMetricsListener = metrics.NewListener
if options.newMetricsServer == nil {
options.newMetricsServer = metricsserver.NewServer
}
leaseDuration, renewDeadline, retryPeriod := defaultLeaseDuration, defaultRenewDeadline, defaultRetryPeriod
if options.LeaseDuration == nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/manager/manager_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

func TestSource(t *testing.T) {
Expand Down Expand Up @@ -69,12 +69,12 @@ var _ = BeforeSuite(func() {
Expect(err).NotTo(HaveOccurred())

// Prevent the metrics listener being created
metrics.DefaultBindAddress = "0"
metricsserver.DefaultBindAddress = "0"
})

var _ = AfterSuite(func() {
Expect(testenv.Stop()).To(Succeed())

// Put the DefaultBindAddress back
metrics.DefaultBindAddress = ":8080"
metricsserver.DefaultBindAddress = ":8080"
})
Loading

0 comments on commit dbcc6fb

Please sign in to comment.