Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-0.16] 🐛 Avoid extra calls for not found resource #2677

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions pkg/client/apiutil/restmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/http"
"sync"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -179,23 +180,28 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
Group: metav1.APIGroup{Name: groupName},
VersionedResources: make(map[string][]metav1.APIResource),
}
if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

// Update information for group resources about versioned resources.
// The number of API calls is equal to the number of versions: /apis/<group>/<version>.
groupVersionResources, err := m.fetchGroupVersionResources(groupName, versions...)
// If we encounter a missing API version (NotFound error), we will remove the group from
// the m.apiGroups and m.knownGroups caches.
// If this happens, in the next call the group will be added back to apiGroups
// and only the existing versions will be loaded in knownGroups.
groupVersionResources, err := m.fetchGroupVersionResourcesLocked(groupName, versions...)
if err != nil {
return fmt.Errorf("failed to get API group resources: %w", err)
}
for version, resources := range groupVersionResources {
groupResources.VersionedResources[version.Version] = resources.APIResources

if _, ok := m.knownGroups[groupName]; ok {
groupResources = m.knownGroups[groupName]
}

// Update information for group resources about the API group by adding new versions.
// Ignore the versions that are already registered.
for _, version := range versions {
for groupVersion, resources := range groupVersionResources {
version := groupVersion.Version

groupResources.VersionedResources[version] = resources.APIResources
found := false
for _, v := range groupResources.Group.Versions {
if v.Version == version {
Expand All @@ -216,12 +222,7 @@ func (m *mapper) addKnownGroupAndReload(groupName string, versions ...string) er
m.knownGroups[groupName] = groupResources

// Finally, update the group with received information and regenerate the mapper.
updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups))
for _, agr := range m.knownGroups {
updatedGroupResources = append(updatedGroupResources, agr)
}

m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources)
m.refreshMapper()
return nil
}

Expand Down Expand Up @@ -267,18 +268,31 @@ func (m *mapper) findAPIGroupByName(groupName string) (*metav1.APIGroup, error)
return nil, fmt.Errorf("failed to find API group %q", groupName)
}

// fetchGroupVersionResources fetches the resources for the specified group and its versions.
func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
// fetchGroupVersionResourcesLocked fetches the resources for the specified group and its versions.
// This method might modify the cache so it needs to be called under the lock.
func (m *mapper) fetchGroupVersionResourcesLocked(groupName string, versions ...string) (map[schema.GroupVersion]*metav1.APIResourceList, error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)

for _, version := range versions {
groupVersion := schema.GroupVersion{Group: groupName, Version: version}

apiResourceList, err := m.client.ServerResourcesForGroupVersion(groupVersion.String())
if apierrors.IsNotFound(err) && m.isGroupVersionCached(groupVersion) {
// If the version is not found, we remove the group from the cache
// so it gets refreshed on the next call.
delete(m.apiGroups, groupName)
delete(m.knownGroups, groupName)
// It's important to refresh the mapper after invalidating the cache, since returning an error
// aborts the call and leaves the underlying mapper unchanged. If not refreshed, the next call
// will still return a match for the NotFound version.
m.refreshMapper()
}

if err != nil {
failedGroups[groupVersion] = err
}

if apiResourceList != nil {
// even in case of error, some fallback might have been returned.
groupVersionResources[groupVersion] = apiResourceList
Expand All @@ -292,3 +306,21 @@ func (m *mapper) fetchGroupVersionResources(groupName string, versions ...string

return groupVersionResources, nil
}

// isGroupVersionCached checks if a version for a group is cached in the known groups cache.
func (m *mapper) isGroupVersionCached(gv schema.GroupVersion) bool {
if cachedGroup, ok := m.knownGroups[gv.Group]; ok {
_, cached := cachedGroup.VersionedResources[gv.Version]
return cached
}

return false
}

func (m *mapper) refreshMapper() {
updatedGroupResources := make([]*restmapper.APIGroupResources, 0, len(m.knownGroups))
for _, agr := range m.knownGroups {
updatedGroupResources = append(updatedGroupResources, agr)
}
m.mapper = restmapper.NewDiscoveryRESTMapper(updatedGroupResources)
}
213 changes: 193 additions & 20 deletions pkg/client/apiutil/restmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ package apiutil_test

import (
"context"
"fmt"
"net/http"
"testing"

"k8s.io/apimachinery/pkg/api/meta"

_ "github.com/onsi/ginkgo/v2"
gmg "github.com/onsi/gomega"

"github.com/onsi/gomega/format"
gomegatypes "github.com/onsi/gomega/types"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -493,23 +496,7 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register another CRD in runtime - "riders.crew.example.com".

crd := &apiextensionsv1.CustomResourceDefinition{}
err = c.Get(context.TODO(), types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Name = "riders.crew.example.com"
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: "Rider",
Plural: "riders",
}
newCRD.ResourceVersion = ""

// Create the new CRD.
g.Expect(c.Create(context.TODO(), newCRD)).To(gmg.Succeed())
createNewCRD(context.TODO(), g, c, "crew.example.com", "Rider", "riders")

// Wait a bit until the CRD is registered.
g.Eventually(func() error {
Expand All @@ -528,4 +515,190 @@ func TestLazyRestMapperProvider(t *testing.T) {
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal("rider"))
})

t.Run("LazyRESTMapper should invalidate the group cache if a version is not found", func(t *testing.T) {
g := gmg.NewWithT(t)
ctx := context.Background()

httpClient, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())

crt := newCountingRoundTripper(httpClient.Transport)
httpClient.Transport = crt

lazyRestMapper, err := apiutil.NewDynamicRESTMapper(restCfg, httpClient)
g.Expect(err).NotTo(gmg.HaveOccurred())

s := scheme.Scheme
err = apiextensionsv1.AddToScheme(s)
g.Expect(err).NotTo(gmg.HaveOccurred())

c, err := client.New(restCfg, client.Options{Scheme: s})
g.Expect(err).NotTo(gmg.HaveOccurred())

// Register a new CRD in a new group to avoid collisions when deleting versions - "taxis.inventory.example.com".
group := "inventory.example.com"
kind := "Taxi"
plural := "taxis"
crdName := plural + "." + group
// Create a CRD with two versions: v1alpha1 and v1 where both are served and
// v1 is the storage version so we can easily remove v1alpha1 later.
crd := newCRD(ctx, g, c, group, kind, plural)
v1alpha1 := crd.Spec.Versions[0]
v1alpha1.Name = "v1alpha1"
v1alpha1.Storage = false
v1alpha1.Served = true
v1 := crd.Spec.Versions[0]
v1.Name = "v1"
v1.Storage = true
v1.Served = true
crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1alpha1, v1}
g.Expect(c.Create(ctx, crd)).To(gmg.Succeed())
t.Cleanup(func() {
g.Expect(c.Delete(ctx, crd)).To(gmg.Succeed())
})

// Wait until the CRD is registered.
discHTTP, err := rest.HTTPClientFor(restCfg)
g.Expect(err).NotTo(gmg.HaveOccurred())
discClient, err := discovery.NewDiscoveryClientForConfigAndClient(restCfg, discHTTP)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v1")
g.Expect(err).NotTo(gmg.HaveOccurred())
}).Should(gmg.Succeed(), "v1 should be available")

// There are no requests before any call
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for all available versions:
// #3: GET https://host/apis/inventory.example.com/v1alpha1
// #4: GET https://host/apis/inventory.example.com/v1
// This should fill the cache for apiGroups and versions.
mapping, err := lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(4))
crt.Reset() // We reset the counter to check how many additional requests are made later.

// At this point v1alpha1 should be cached
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We update the CRD to only have v1 version.
g.Expect(c.Get(ctx, types.NamespacedName{Name: crdName}, crd)).To(gmg.Succeed())
for _, version := range crd.Spec.Versions {
if version.Name == "v1" {
v1 = version
break
}
}
crd.Spec.Versions = []apiextensionsv1.CustomResourceDefinitionVersion{v1}
g.Expect(c.Update(ctx, crd)).To(gmg.Succeed())

// We wait until v1alpha1 is not available anymore.
g.Eventually(func(g gmg.Gomega) {
_, err = discClient.ServerResourcesForGroupVersion(group + "/v1alpha1")
g.Expect(apierrors.IsNotFound(err)).To(gmg.BeTrue(), "v1alpha1 should not be available anymore")
}).Should(gmg.Succeed())

// Although v1alpha1 is not available anymore, the cache is not invalidated yet so it should return a mapping.
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(0))

// We request Limo, which is not in the mapper because it doesn't exist.
// This will trigger a reload of the lazy mapper cache.
// Reloading the cache will read v2 again and since it's not available anymore, it should invalidate the cache.
// #1: GET https://host/apis/inventory.example.com/v1alpha1
// #2: GET https://host/apis/inventory.example.com/v1
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: "Limo"})
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(2))
crt.Reset()

// Now we request v1alpha1 again and it should return an error since the cache was invalidated.
// #1: GET https://host/apis/inventory.example.com/v1alpha1
_, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind}, "v1alpha1")
g.Expect(err).To(beNoMatchError())
g.Expect(crt.GetRequestCount()).To(gmg.Equal(1))
crt.Reset()

// Since we don't specify what version we expect, restmapper will fetch them all and search there.
// To fetch a list of available versions
// #1: GET https://host/api
// #2: GET https://host/apis
// Then, for all available versions:
// #3: GET https://host/apis/inventory.example.com/v1
mapping, err = lazyRestMapper.RESTMapping(schema.GroupKind{Group: group, Kind: kind})
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(mapping.GroupVersionKind.Kind).To(gmg.Equal(kind))
g.Expect(crt.GetRequestCount()).To(gmg.Equal(3))
})
}

// createNewCRD creates a new CRD with the given group, kind, and plural and returns it.
func createNewCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition {
newCRD := newCRD(ctx, g, c, group, kind, plural)
g.Expect(c.Create(ctx, newCRD)).To(gmg.Succeed())

return newCRD
}

// newCRD returns a new CRD with the given group, kind, and plural.
func newCRD(ctx context.Context, g gmg.Gomega, c client.Client, group, kind, plural string) *apiextensionsv1.CustomResourceDefinition {
crd := &apiextensionsv1.CustomResourceDefinition{}
err := c.Get(ctx, types.NamespacedName{Name: "drivers.crew.example.com"}, crd)
g.Expect(err).NotTo(gmg.HaveOccurred())
g.Expect(crd.Spec.Names.Kind).To(gmg.Equal("Driver"))

newCRD := &apiextensionsv1.CustomResourceDefinition{}
crd.DeepCopyInto(newCRD)
newCRD.Spec.Group = group
newCRD.Name = plural + "." + group
newCRD.Spec.Names = apiextensionsv1.CustomResourceDefinitionNames{
Kind: kind,
Plural: plural,
}
newCRD.ResourceVersion = ""

return newCRD
}

func beNoMatchError() gomegatypes.GomegaMatcher {
return &errorMatcher{
checkFunc: meta.IsNoMatchError,
message: "NoMatch",
}
}

type errorMatcher struct {
checkFunc func(error) bool
message string
}

func (e *errorMatcher) Match(actual interface{}) (success bool, err error) {
if actual == nil {
return false, nil
}

actualErr, actualOk := actual.(error)
if !actualOk {
return false, fmt.Errorf("expected an error-type. got:\n%s", format.Object(actual, 1))
}

return e.checkFunc(actualErr), nil
}

func (e *errorMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("to be %s error", e.message))
}

func (e *errorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, fmt.Sprintf("not to be %s error", e.message))
}