Skip to content

Commit

Permalink
Use an informer to watch and approve CSRs
Browse files Browse the repository at this point in the history
Uses an Informer to watch and approve CSRs, instead of polling the API server and calling the `List` API every few seconds.
Additionally, the existing `TestBasicCRSApprover` test was inaccurate in that it wasn't actually testing the condition that the certificate has been approved.
The call to `approveCSR` was failing the validation in `ValidateKubeletServingCSR`, but since the error is only being logged and not returned,
the test was still passing. The actual certificate status check [here](https://github.com/k0sproject/k0s/blob/7d9532a9c51039ba8a2b1ddd6351ee440d9a31bd/pkg/component/controller/csrapprover_test.go#L84)
was never reached because `status.Conditions` was empty.

This changelist also fixes the existing test by generating a certificate request that complies with the validation in `ValidateKubeletServingCSR`,
and by mocking the fake `Clientset` to authorize all operations (by responding to creation of `SubjectAccessReviews` with `status.Allowed = true`).
A new test is also added to cover the case where the CSRs already exist before `CSRApprover.Start()` is called.

This may slightly increase the memory footprint as the CSRs will need to be held in the informer's cache, but it will be limited to CSRs with
`spec.signerName` set to `kubernetes.io/kubelet-serving`. This is a minor trade-off compared to polling which is slower to respond to newly-issued CSRs,
makes more requests to the API server and fetches a full list of kubelet-serving CSRs in each request.

A TODO is also added for the future to share the informer factory with other components.

Signed-off-by: cpu1 <[email protected]>
  • Loading branch information
cPu1 committed Dec 18, 2023
1 parent 7d9532a commit efb568d
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 102 deletions.
4 changes: 3 additions & 1 deletion cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ func (c *command) start(ctx context.Context) error {
if !slices.Contains(c.DisableComponents, constant.CsrApproverComponentName) {
nodeComponents.Add(ctx, controller.NewCSRApprover(nodeConfig,
leaderElector,
adminClientFactory))
adminClientFactory,
30*time.Minute),
)
}

if c.EnableK0sCloudProvider {
Expand Down
152 changes: 89 additions & 63 deletions pkg/component/controller/csrapprover.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,26 @@ import (
"context"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/avast/retry-go"
"github.com/sirupsen/logrus"
authorization "k8s.io/api/authorization/v1"
v1 "k8s.io/api/certificates/v1"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/controller/leaderelector"
"github.com/k0sproject/k0s/pkg/component/manager"
kubeutil "github.com/k0sproject/k0s/pkg/kubernetes"
certificates "k8s.io/kubernetes/pkg/apis/certificates"
"k8s.io/kubernetes/pkg/apis/certificates"
)

type CSRApprover struct {
Expand All @@ -47,18 +50,18 @@ type CSRApprover struct {
KubeClientFactory kubeutil.ClientFactoryInterface
leaderElector leaderelector.Interface
clientset clientset.Interface
resyncPeriod time.Duration
}

var _ manager.Component = (*CSRApprover)(nil)

// NewCSRApprover creates the CSRApprover component
func NewCSRApprover(c *v1beta1.ClusterConfig, leaderElector leaderelector.Interface, kubeClientFactory kubeutil.ClientFactoryInterface) *CSRApprover {
d := atomic.Value{}
d.Store(true)
func NewCSRApprover(c *v1beta1.ClusterConfig, leaderElector leaderelector.Interface, kubeClientFactory kubeutil.ClientFactoryInterface, cacheResyncPeriod time.Duration) *CSRApprover {
return &CSRApprover{
ClusterConfig: c,
leaderElector: leaderElector,
KubeClientFactory: kubeClientFactory,
resyncPeriod: cacheResyncPeriod,
log: logrus.WithFields(logrus.Fields{"component": "csrapprover"}),
}
}
Expand All @@ -80,85 +83,108 @@ func (a *CSRApprover) Init(_ context.Context) error {
return nil
}

// Run every 10 seconds checks for newly issued CSRs and approves them
// Start watches for newly issued CSRs and approves them.
func (a *CSRApprover) Start(ctx context.Context) error {
ctx, a.stop = context.WithCancel(ctx)
go func() {
defer a.stop()
ticker := time.NewTicker(10 * time.Second) // TODO: sometimes this should be refactored so it watches instead of polls for CSRs
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := a.approveCSR(ctx)
if err != nil {
a.log.WithError(err).Warn("CSR approval failed")
}
case <-ctx.Done():
a.log.Info("CSR Approver context done")
return
}
}
}()

// TODO: share informer factory with other components.
factory := informers.NewSharedInformerFactoryWithOptions(a.clientset, a.resyncPeriod, informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.FieldSelector = "spec.signerName=kubernetes.io/kubelet-serving"
}))
_, err := factory.Certificates().V1().CertificateSigningRequests().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
a.retryApproveCSR(ctx, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
a.retryApproveCSR(ctx, newObj)
},
})
if err != nil {
return fmt.Errorf("failed to add event handler to shared informer: %w", err)
}

factory.Start(ctx.Done())
synced := factory.WaitForCacheSync(ctx.Done())
for _, ok := range synced {
if !ok {
return errors.New("caches failed to sync")
}
}
return nil
}

func (a *CSRApprover) retryApproveCSR(ctx context.Context, obj interface{}) {
csr, ok := obj.(*v1.CertificateSigningRequest)
if !ok {
a.log.Errorf("expected resource to be of type %T; got %T", &v1.CertificateSigningRequest{}, obj)
return
}

const maxAttempts = 10
logger := a.log.WithField("csrName", csr.Name)

err := retry.Do(func() error {
if err := a.approveCSR(ctx, csr); err != nil {
logger.WithError(err).Warn("CSR approval failed")
return err
}
return nil
},
retry.Context(ctx),
retry.Attempts(maxAttempts),
retry.OnRetry(func(attempts uint, err error) {
logger.WithField("attempts", attempts).WithError(err).Info("retrying CSR approval")
}),
)

if err != nil {
logger.WithError(err).Errorf("Failed to approve CSR after %d attempts", maxAttempts)
} else {
logger.Info("CSR approved successfully")
}
}

// Majority of this code has been adapted from https://github.com/kontena/kubelet-rubber-stamp
func (a *CSRApprover) approveCSR(ctx context.Context) error {
func (a *CSRApprover) approveCSR(ctx context.Context, csr *v1.CertificateSigningRequest) error {
if !a.leaderElector.IsLeader() {
a.log.Debug("not the leader, can't approve certificates")
return nil
}

opts := metav1.ListOptions{
FieldSelector: "spec.signerName=kubernetes.io/kubelet-serving",
if approved, denied := getCertApprovalCondition(&csr.Status); approved || denied {
a.log.Debugf("CSR %s is approved=%t || denied=%t. Carry on", csr.Name, approved, denied)
return nil
}

csrs, err := a.clientset.CertificatesV1().CertificateSigningRequests().List(ctx, opts)
x509cr, err := parseCSR(csr)
if err != nil {
return fmt.Errorf("can't fetch CSRs: %w", err)
return retry.Unrecoverable(fmt.Errorf("unable to parse CSR %q: %w", csr.Name, err))
}

for _, csr := range csrs.Items {
if approved, denied := getCertApprovalCondition(&csr.Status); approved || denied {
a.log.Debugf("CSR %s is approved=%t || denied=%t. Carry on", csr.Name, approved, denied)
continue
}

x509cr, err := parseCSR(&csr)
if err != nil {
return fmt.Errorf("unable to parse csr %q: %w", csr.Name, err)
}

if err := a.ensureKubeletServingCert(&csr, x509cr); err != nil {
a.log.WithError(err).Infof("Not approving CSR %q as it is not recognized as a kubelet-serving certificate", csr.Name)
continue
}

approved, err := a.authorize(ctx, &csr, authorization.ResourceAttributes{
Group: "certificates.k8s.io",
Resource: "certificatesigningrequests",
Verb: "create",
})
if err != nil {
return fmt.Errorf("SubjectAccessReview failed for CSR %q: %w", csr.Name, err)
}

if !approved {
return fmt.Errorf("failed to perform SubjectAccessReview for CSR %q", csr.Name)
}
if err := a.ensureKubeletServingCert(csr, x509cr); err != nil {
a.log.WithError(err).Infof("Not approving CSR %q as it is not recognized as a kubelet-serving certificate", csr.Name)
return nil
}

a.log.Infof("approving csr %s with SANs: %s, IP Addresses:%s", csr.ObjectMeta.Name, x509cr.DNSNames, x509cr.IPAddresses)
appendApprovalCondition(&csr, "Auto approving kubelet serving certificate after SubjectAccessReview.")
_, err = a.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, &csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating approval for CSR %q: %w", csr.Name, err)
}
approved, err := a.authorize(ctx, csr, authorization.ResourceAttributes{
Group: v1.GroupName,
Resource: "certificatesigningrequests",
Verb: "create",
})
if err != nil {
return fmt.Errorf("SubjectAccessReview failed for CSR %q: %w", csr.Name, err)
}

return nil
if !approved {
return fmt.Errorf("failed to perform SubjectAccessReview for CSR %q", csr.Name)
}

a.log.Infof("approving CSR %s with SANs: %s, IP Addresses:%s", csr.ObjectMeta.Name, x509cr.DNSNames, x509cr.IPAddresses)
appendApprovalCondition(csr, "Auto approving kubelet serving certificate after SubjectAccessReview.")
_, err = a.clientset.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating approval for CSR %q: %w", csr.Name, err)
}
return nil
}

Expand Down
132 changes: 94 additions & 38 deletions pkg/component/controller/csrapprover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,73 +24,129 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"testing"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"

"github.com/k0sproject/k0s/internal/testutil"
"github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1"
"github.com/k0sproject/k0s/pkg/component/controller/leaderelector"
"github.com/stretchr/testify/assert"

authorizationv1 "k8s.io/api/authorization/v1"
certv1 "k8s.io/api/certificates/v1"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestBasicCRSApprover(t *testing.T) {
fakeFactory := testutil.NewFakeClientFactory()

client, err := fakeFactory.GetClient()
assert.NoError(t, err)

ctx := context.TODO()

privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatal(err)
}

req := pemWithPrivateKey(privateKey)

csrReq := &certv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "csrapprover_test",
for i, test := range []struct {
name string
startControllerBeforeCreatingCSR bool
}{
{
name: "existing CSRs are approved",
startControllerBeforeCreatingCSR: false,
},
Spec: certv1.CertificateSigningRequestSpec{
Request: req,
SignerName: "kubernetes.io/kubelet-serving",
{
name: "newly-created CSRs are approved",
startControllerBeforeCreatingCSR: true,
},
} {
t.Run(test.name, func(t *testing.T) {
fakeFactory := testutil.NewFakeClientFactory()
client, err := fakeFactory.GetClient()
assert.NoError(t, err)

config := &v1beta1.ClusterConfig{
Spec: &v1beta1.ClusterSpec{
API: &v1beta1.APISpec{
Address: "1.2.3.4",
ExternalAddress: "get.k0s.sh",
},
},
}
ctx := context.TODO()

c := NewCSRApprover(config, &leaderelector.Dummy{Leader: true}, fakeFactory, 10*time.Minute)
assert.NoError(t, c.Init(ctx))

csrReq := &certv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("csrapprover-test-%d", i+1),
},
Spec: certv1.CertificateSigningRequestSpec{
Request: req,
SignerName: "kubernetes.io/kubelet-serving",
Usages: []certv1.KeyUsage{"digital signature", "key encipherment", "server auth"},
},
}

fakeClient, ok := client.(*fake.Clientset)
assert.True(t, ok, "expected Clientset to be of type %T; got %T", &fake.Clientset{}, client)
fakeClient.PrependReactor("create", "subjectaccessreviews", func(action k8stesting.Action) (bool, runtime.Object, error) {
createAction, ok := action.(k8stesting.CreateActionImpl)
if !ok {
return false, nil, fmt.Errorf("expected action to be of type %T; got %T", &k8stesting.CreateActionImpl{}, action)
}
sar, ok := createAction.Object.(*authorizationv1.SubjectAccessReview)
if !ok {
return false, nil, fmt.Errorf("expected resource to be of type %T; got %T", &authorizationv1.SubjectAccessReview{}, createAction.Object)
}
sar.Status.Allowed = true
return true, sar, nil
})

var newCSR *certv1.CertificateSigningRequest
createCSR := func() {
t.Helper()
newCSR, err = client.CertificatesV1().CertificateSigningRequests().Create(ctx, csrReq, metav1.CreateOptions{})
assert.NoError(t, err)
}
if test.startControllerBeforeCreatingCSR {
assert.NoError(t, c.Start(ctx))
createCSR()
} else {
createCSR()
assert.NoError(t, c.Start(ctx))
}

assert.EventuallyWithT(t, func(c *assert.CollectT) {
csr, err := client.CertificatesV1().CertificateSigningRequests().Get(ctx, newCSR.Name, metav1.GetOptions{})
assert.NoError(c, err)
assert.NotNil(c, csr, "could not find CSR")
assert.NotEmpty(c, csr.Status.Conditions, "expected to find at least one element in status.conditions")

for _, condition := range csr.Status.Conditions {
assert.True(c, condition.Type == certv1.CertificateApproved && condition.Reason == "Autoapproved by K0s CSRApprover" && condition.Status == core.ConditionTrue,
"expected CSR to be approved")
}
}, 2*time.Second, 1*time.Millisecond)

assert.NoError(t, c.Stop())
})
}

newCsr, err := client.CertificatesV1().CertificateSigningRequests().Create(ctx, csrReq, metav1.CreateOptions{})
assert.NoError(t, err)

config := &v1beta1.ClusterConfig{
Spec: &v1beta1.ClusterSpec{
API: &v1beta1.APISpec{
Address: "1.2.3.4",
ExternalAddress: "get.k0s.sh",
},
},
}
c := NewCSRApprover(config, &leaderelector.Dummy{Leader: true}, fakeFactory)

assert.NoError(t, c.Init(ctx))
assert.NoError(t, c.approveCSR(ctx))

csr, err := client.CertificatesV1().CertificateSigningRequests().Get(ctx, newCsr.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, csr)
assert.True(t, csr.Name == newCsr.Name)
for _, c := range csr.Status.Conditions {
assert.True(t, c.Type == certv1.CertificateApproved && c.Reason == "Autoapproved by K0S CSRApprover" && c.Status == core.ConditionTrue)
}
}

func pemWithPrivateKey(pk crypto.PrivateKey) []byte {
template := &x509.CertificateRequest{
Subject: pkix.Name{
CommonName: "something",
Organization: []string{"test"},
CommonName: "system:node:worker",
Organization: []string{"system:nodes"},
},
DNSNames: []string{"worker-1"},
}
return pemWithTemplate(template, pk)
}
Expand Down

0 comments on commit efb568d

Please sign in to comment.