From efb568dfe51da1435137a5fae2f75ef7d050feea Mon Sep 17 00:00:00 2001 From: cpu1 Date: Tue, 19 Dec 2023 01:19:30 +0530 Subject: [PATCH] Use an informer to watch and approve CSRs Uses an Informer to watch and approve CSRs, instead of polling the API server and calling the `List` API every few seconds. Additionally, the existing `TestBasicCRSApprover` test was inaccurate in that it wasn't actually testing the condition that the certificate has been approved. The call to `approveCSR` was failing the validation in `ValidateKubeletServingCSR`, but since the error is only being logged and not returned, the test was still passing. The actual certificate status check [here](https://github.com/k0sproject/k0s/blob/7d9532a9c51039ba8a2b1ddd6351ee440d9a31bd/pkg/component/controller/csrapprover_test.go#L84) was never reached because `status.Conditions` was empty. This changelist also fixes the existing test by generating a certificate request that complies with the validation in `ValidateKubeletServingCSR`, and by mocking the fake `Clientset` to authorize all operations (by responding to creation of `SubjectAccessReviews` with `status.Allowed = true`). A new test is also added to cover the case where the CSRs already exist before `CSRApprover.Start()` is called. This may slightly increase the memory footprint as the CSRs will need to be held in the informer's cache, but it will be limited to CSRs with `spec.signerName` set to `kubernetes.io/kubelet-serving`. This is a minor trade-off compared to polling which is slower to respond to newly-issued CSRs, makes more requests to the API server and fetches a full list of kubelet-serving CSRs in each request. A TODO is also added for the future to share the informer factory with other components. Signed-off-by: cpu1 --- cmd/controller/controller.go | 4 +- pkg/component/controller/csrapprover.go | 152 +++++++++++-------- pkg/component/controller/csrapprover_test.go | 132 +++++++++++----- 3 files changed, 186 insertions(+), 102 deletions(-) diff --git a/cmd/controller/controller.go b/cmd/controller/controller.go index 85cbd1850033..68d808b04171 100644 --- a/cmd/controller/controller.go +++ b/cmd/controller/controller.go @@ -279,7 +279,9 @@ func (c *command) start(ctx context.Context) error { if !slices.Contains(c.DisableComponents, constant.CsrApproverComponentName) { nodeComponents.Add(ctx, controller.NewCSRApprover(nodeConfig, leaderElector, - adminClientFactory)) + adminClientFactory, + 30*time.Minute), + ) } if c.EnableK0sCloudProvider { diff --git a/pkg/component/controller/csrapprover.go b/pkg/component/controller/csrapprover.go index fe4e447aa420..a8f735236570 100644 --- a/pkg/component/controller/csrapprover.go +++ b/pkg/component/controller/csrapprover.go @@ -20,23 +20,26 @@ import ( "context" "crypto/x509" "encoding/pem" + "errors" "fmt" - "sync/atomic" "time" + "github.com/avast/retry-go" "github.com/sirupsen/logrus" authorization "k8s.io/api/authorization/v1" v1 "k8s.io/api/certificates/v1" core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" "github.com/k0sproject/k0s/pkg/component/manager" kubeutil "github.com/k0sproject/k0s/pkg/kubernetes" - certificates "k8s.io/kubernetes/pkg/apis/certificates" + "k8s.io/kubernetes/pkg/apis/certificates" ) type CSRApprover struct { @@ -47,18 +50,18 @@ type CSRApprover struct { KubeClientFactory kubeutil.ClientFactoryInterface leaderElector leaderelector.Interface clientset clientset.Interface + resyncPeriod time.Duration } var _ manager.Component = (*CSRApprover)(nil) // NewCSRApprover creates the CSRApprover component -func NewCSRApprover(c *v1beta1.ClusterConfig, leaderElector leaderelector.Interface, kubeClientFactory kubeutil.ClientFactoryInterface) *CSRApprover { - d := atomic.Value{} - d.Store(true) +func NewCSRApprover(c *v1beta1.ClusterConfig, leaderElector leaderelector.Interface, kubeClientFactory kubeutil.ClientFactoryInterface, cacheResyncPeriod time.Duration) *CSRApprover { return &CSRApprover{ ClusterConfig: c, leaderElector: leaderElector, KubeClientFactory: kubeClientFactory, + resyncPeriod: cacheResyncPeriod, log: logrus.WithFields(logrus.Fields{"component": "csrapprover"}), } } @@ -80,85 +83,108 @@ func (a *CSRApprover) Init(_ context.Context) error { return nil } -// Run every 10 seconds checks for newly issued CSRs and approves them +// Start watches for newly issued CSRs and approves them. func (a *CSRApprover) Start(ctx context.Context) error { ctx, a.stop = context.WithCancel(ctx) - go func() { - defer a.stop() - ticker := time.NewTicker(10 * time.Second) // TODO: sometimes this should be refactored so it watches instead of polls for CSRs - defer ticker.Stop() - for { - select { - case <-ticker.C: - err := a.approveCSR(ctx) - if err != nil { - a.log.WithError(err).Warn("CSR approval failed") - } - case <-ctx.Done(): - a.log.Info("CSR Approver context done") - return - } - } - }() + // TODO: share informer factory with other components. + factory := informers.NewSharedInformerFactoryWithOptions(a.clientset, a.resyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = "spec.signerName=kubernetes.io/kubelet-serving" + })) + _, err := factory.Certificates().V1().CertificateSigningRequests().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + a.retryApproveCSR(ctx, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + a.retryApproveCSR(ctx, newObj) + }, + }) + if err != nil { + return fmt.Errorf("failed to add event handler to shared informer: %w", err) + } + + factory.Start(ctx.Done()) + synced := factory.WaitForCacheSync(ctx.Done()) + for _, ok := range synced { + if !ok { + return errors.New("caches failed to sync") + } + } return nil } +func (a *CSRApprover) retryApproveCSR(ctx context.Context, obj interface{}) { + csr, ok := obj.(*v1.CertificateSigningRequest) + if !ok { + a.log.Errorf("expected resource to be of type %T; got %T", &v1.CertificateSigningRequest{}, obj) + return + } + + const maxAttempts = 10 + logger := a.log.WithField("csrName", csr.Name) + + err := retry.Do(func() error { + if err := a.approveCSR(ctx, csr); err != nil { + logger.WithError(err).Warn("CSR approval failed") + return err + } + return nil + }, + retry.Context(ctx), + retry.Attempts(maxAttempts), + retry.OnRetry(func(attempts uint, err error) { + logger.WithField("attempts", attempts).WithError(err).Info("retrying CSR approval") + }), + ) + + if err != nil { + logger.WithError(err).Errorf("Failed to approve CSR after %d attempts", maxAttempts) + } else { + logger.Info("CSR approved successfully") + } +} + // Majority of this code has been adapted from https://github.com/kontena/kubelet-rubber-stamp -func (a *CSRApprover) approveCSR(ctx context.Context) error { +func (a *CSRApprover) approveCSR(ctx context.Context, csr *v1.CertificateSigningRequest) error { if !a.leaderElector.IsLeader() { a.log.Debug("not the leader, can't approve certificates") return nil } - opts := metav1.ListOptions{ - FieldSelector: "spec.signerName=kubernetes.io/kubelet-serving", + if approved, denied := getCertApprovalCondition(&csr.Status); approved || denied { + a.log.Debugf("CSR %s is approved=%t || denied=%t. Carry on", csr.Name, approved, denied) + return nil } - csrs, err := a.clientset.CertificatesV1().CertificateSigningRequests().List(ctx, opts) + x509cr, err := parseCSR(csr) if err != nil { - return fmt.Errorf("can't fetch CSRs: %w", err) + return retry.Unrecoverable(fmt.Errorf("unable to parse CSR %q: %w", csr.Name, err)) } - for _, csr := range csrs.Items { - if approved, denied := getCertApprovalCondition(&csr.Status); approved || denied { - a.log.Debugf("CSR %s is approved=%t || denied=%t. Carry on", csr.Name, approved, denied) - continue - } - - x509cr, err := parseCSR(&csr) - if err != nil { - return fmt.Errorf("unable to parse csr %q: %w", csr.Name, err) - } - - if err := a.ensureKubeletServingCert(&csr, x509cr); err != nil { - a.log.WithError(err).Infof("Not approving CSR %q as it is not recognized as a kubelet-serving certificate", csr.Name) - continue - } - - approved, err := a.authorize(ctx, &csr, authorization.ResourceAttributes{ - Group: "certificates.k8s.io", - Resource: "certificatesigningrequests", - Verb: "create", - }) - if err != nil { - return fmt.Errorf("SubjectAccessReview failed for CSR %q: %w", csr.Name, err) - } - - if !approved { - return fmt.Errorf("failed to perform SubjectAccessReview for CSR %q", csr.Name) - } + if err := a.ensureKubeletServingCert(csr, x509cr); err != nil { + a.log.WithError(err).Infof("Not approving CSR %q as it is not recognized as a kubelet-serving certificate", csr.Name) + return nil + } - a.log.Infof("approving csr %s with SANs: %s, IP Addresses:%s", csr.ObjectMeta.Name, x509cr.DNSNames, x509cr.IPAddresses) - appendApprovalCondition(&csr, "Auto approving kubelet serving certificate after SubjectAccessReview.") - _, err = a.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, &csr, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("error updating approval for CSR %q: %w", csr.Name, err) - } + approved, err := a.authorize(ctx, csr, authorization.ResourceAttributes{ + Group: v1.GroupName, + Resource: "certificatesigningrequests", + Verb: "create", + }) + if err != nil { + return fmt.Errorf("SubjectAccessReview failed for CSR %q: %w", csr.Name, err) + } - return nil + if !approved { + return fmt.Errorf("failed to perform SubjectAccessReview for CSR %q", csr.Name) } + a.log.Infof("approving CSR %s with SANs: %s, IP Addresses:%s", csr.ObjectMeta.Name, x509cr.DNSNames, x509cr.IPAddresses) + appendApprovalCondition(csr, "Auto approving kubelet serving certificate after SubjectAccessReview.") + _, err = a.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("error updating approval for CSR %q: %w", csr.Name, err) + } return nil } diff --git a/pkg/component/controller/csrapprover_test.go b/pkg/component/controller/csrapprover_test.go index 20d154260ae4..406eaf804f9f 100644 --- a/pkg/component/controller/csrapprover_test.go +++ b/pkg/component/controller/csrapprover_test.go @@ -24,25 +24,26 @@ import ( "crypto/x509" "crypto/x509/pkix" "encoding/pem" + "fmt" "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" "github.com/k0sproject/k0s/internal/testutil" "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" "github.com/k0sproject/k0s/pkg/component/controller/leaderelector" "github.com/stretchr/testify/assert" + + authorizationv1 "k8s.io/api/authorization/v1" certv1 "k8s.io/api/certificates/v1" core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestBasicCRSApprover(t *testing.T) { - fakeFactory := testutil.NewFakeClientFactory() - - client, err := fakeFactory.GetClient() - assert.NoError(t, err) - - ctx := context.TODO() - privateKey, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { t.Fatal(err) @@ -50,47 +51,102 @@ func TestBasicCRSApprover(t *testing.T) { req := pemWithPrivateKey(privateKey) - csrReq := &certv1.CertificateSigningRequest{ - ObjectMeta: metav1.ObjectMeta{ - Name: "csrapprover_test", + for i, test := range []struct { + name string + startControllerBeforeCreatingCSR bool + }{ + { + name: "existing CSRs are approved", + startControllerBeforeCreatingCSR: false, }, - Spec: certv1.CertificateSigningRequestSpec{ - Request: req, - SignerName: "kubernetes.io/kubelet-serving", + { + name: "newly-created CSRs are approved", + startControllerBeforeCreatingCSR: true, }, + } { + t.Run(test.name, func(t *testing.T) { + fakeFactory := testutil.NewFakeClientFactory() + client, err := fakeFactory.GetClient() + assert.NoError(t, err) + + config := &v1beta1.ClusterConfig{ + Spec: &v1beta1.ClusterSpec{ + API: &v1beta1.APISpec{ + Address: "1.2.3.4", + ExternalAddress: "get.k0s.sh", + }, + }, + } + ctx := context.TODO() + + c := NewCSRApprover(config, &leaderelector.Dummy{Leader: true}, fakeFactory, 10*time.Minute) + assert.NoError(t, c.Init(ctx)) + + csrReq := &certv1.CertificateSigningRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("csrapprover-test-%d", i+1), + }, + Spec: certv1.CertificateSigningRequestSpec{ + Request: req, + SignerName: "kubernetes.io/kubelet-serving", + Usages: []certv1.KeyUsage{"digital signature", "key encipherment", "server auth"}, + }, + } + + fakeClient, ok := client.(*fake.Clientset) + assert.True(t, ok, "expected Clientset to be of type %T; got %T", &fake.Clientset{}, client) + fakeClient.PrependReactor("create", "subjectaccessreviews", func(action k8stesting.Action) (bool, runtime.Object, error) { + createAction, ok := action.(k8stesting.CreateActionImpl) + if !ok { + return false, nil, fmt.Errorf("expected action to be of type %T; got %T", &k8stesting.CreateActionImpl{}, action) + } + sar, ok := createAction.Object.(*authorizationv1.SubjectAccessReview) + if !ok { + return false, nil, fmt.Errorf("expected resource to be of type %T; got %T", &authorizationv1.SubjectAccessReview{}, createAction.Object) + } + sar.Status.Allowed = true + return true, sar, nil + }) + + var newCSR *certv1.CertificateSigningRequest + createCSR := func() { + t.Helper() + newCSR, err = client.CertificatesV1().CertificateSigningRequests().Create(ctx, csrReq, metav1.CreateOptions{}) + assert.NoError(t, err) + } + if test.startControllerBeforeCreatingCSR { + assert.NoError(t, c.Start(ctx)) + createCSR() + } else { + createCSR() + assert.NoError(t, c.Start(ctx)) + } + + assert.EventuallyWithT(t, func(c *assert.CollectT) { + csr, err := client.CertificatesV1().CertificateSigningRequests().Get(ctx, newCSR.Name, metav1.GetOptions{}) + assert.NoError(c, err) + assert.NotNil(c, csr, "could not find CSR") + assert.NotEmpty(c, csr.Status.Conditions, "expected to find at least one element in status.conditions") + + for _, condition := range csr.Status.Conditions { + assert.True(c, condition.Type == certv1.CertificateApproved && condition.Reason == "Autoapproved by K0s CSRApprover" && condition.Status == core.ConditionTrue, + "expected CSR to be approved") + } + }, 2*time.Second, 1*time.Millisecond) + + assert.NoError(t, c.Stop()) + }) } - newCsr, err := client.CertificatesV1().CertificateSigningRequests().Create(ctx, csrReq, metav1.CreateOptions{}) - assert.NoError(t, err) - - config := &v1beta1.ClusterConfig{ - Spec: &v1beta1.ClusterSpec{ - API: &v1beta1.APISpec{ - Address: "1.2.3.4", - ExternalAddress: "get.k0s.sh", - }, - }, - } - c := NewCSRApprover(config, &leaderelector.Dummy{Leader: true}, fakeFactory) - - assert.NoError(t, c.Init(ctx)) - assert.NoError(t, c.approveCSR(ctx)) - - csr, err := client.CertificatesV1().CertificateSigningRequests().Get(ctx, newCsr.Name, metav1.GetOptions{}) - assert.NoError(t, err) - assert.NotNil(t, csr) - assert.True(t, csr.Name == newCsr.Name) - for _, c := range csr.Status.Conditions { - assert.True(t, c.Type == certv1.CertificateApproved && c.Reason == "Autoapproved by K0S CSRApprover" && c.Status == core.ConditionTrue) - } } func pemWithPrivateKey(pk crypto.PrivateKey) []byte { template := &x509.CertificateRequest{ Subject: pkix.Name{ - CommonName: "something", - Organization: []string{"test"}, + CommonName: "system:node:worker", + Organization: []string{"system:nodes"}, }, + DNSNames: []string{"worker-1"}, } return pemWithTemplate(template, pk) }