From e7b6513db0e44090a9d11fcff15b1fec097b3b2a Mon Sep 17 00:00:00 2001 From: huangyanfeng Date: Thu, 10 Oct 2024 15:17:43 +0800 Subject: [PATCH] HasResource should return whether resource is registered not cached Signed-off-by: huangyanfeng --- pkg/search/proxy/controller.go | 6 +- pkg/search/proxy/controller_test.go | 2 +- pkg/search/proxy/store/multi_cluster_cache.go | 43 +++++--------- .../proxy/store/multi_cluster_cache_test.go | 58 +++++++++++++++---- pkg/search/proxy/testing/mock_store.go | 6 +- 5 files changed, 66 insertions(+), 49 deletions(-) diff --git a/pkg/search/proxy/controller.go b/pkg/search/proxy/controller.go index 938a9a2447c2..460a43e08f8c 100644 --- a/pkg/search/proxy/controller.go +++ b/pkg/search/proxy/controller.go @@ -186,7 +186,7 @@ func (ctl *Controller) reconcile(util.QueueKey) error { if err != nil { return err } - + registeredResources := make(map[schema.GroupVersionResource]struct{}) resourcesByClusters := make(map[string]map[schema.GroupVersionResource]*store.MultiNamespace) for _, registry := range registries { matchedResources := make(map[schema.GroupVersionResource]*store.MultiNamespace, len(registry.Spec.ResourceSelectors)) @@ -203,8 +203,8 @@ func (ctl *Controller) reconcile(util.QueueKey) error { matchedResources[gvr] = nsSelector } nsSelector.Add(selector.Namespace) + registeredResources[gvr] = struct{}{} } - if len(matchedResources) == 0 { continue } @@ -238,7 +238,7 @@ func (ctl *Controller) reconcile(util.QueueKey) error { } } - return ctl.store.UpdateCache(resourcesByClusters) + return ctl.store.UpdateCache(resourcesByClusters, registeredResources) } type errorHTTPHandler struct { diff --git a/pkg/search/proxy/controller_test.go b/pkg/search/proxy/controller_test.go index 41eedbde96fd..26cd0dc9b257 100644 --- a/pkg/search/proxy/controller_test.go +++ b/pkg/search/proxy/controller_test.go @@ -293,7 +293,7 @@ func TestController_reconcile(t *testing.T) { clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(), registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(), store: &proxytest.MockStore{ - UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error { + UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]*store.MultiNamespace, _ map[schema.GroupVersionResource]struct{}) error { for clusterName, resources := range m { resourceNames := make([]string, 0, len(resources)) for resource := range resources { diff --git a/pkg/search/proxy/store/multi_cluster_cache.go b/pkg/search/proxy/store/multi_cluster_cache.go index 99a9de027ff5..81b9c6f62abc 100644 --- a/pkg/search/proxy/store/multi_cluster_cache.go +++ b/pkg/search/proxy/store/multi_cluster_cache.go @@ -40,7 +40,7 @@ import ( // Store is the cache for resources from multiple member clusters type Store interface { - UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error + UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error HasResource(resource schema.GroupVersionResource) bool GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) Stop() @@ -52,10 +52,10 @@ type Store interface { // MultiClusterCache caches resource from multi member clusters type MultiClusterCache struct { - lock sync.RWMutex - cache map[string]*clusterCache - cachedResources map[schema.GroupVersionResource]struct{} - restMapper meta.RESTMapper + lock sync.RWMutex + cache map[string]*clusterCache + registeredResources map[schema.GroupVersionResource]struct{} + restMapper meta.RESTMapper // newClientFunc returns a dynamic client for member cluster apiserver newClientFunc func(string) (dynamic.Interface, error) } @@ -65,15 +65,15 @@ var _ Store = &MultiClusterCache{} // NewMultiClusterCache return a cache for resources from member clusters func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache { return &MultiClusterCache{ - restMapper: restMapper, - newClientFunc: newClientFunc, - cache: map[string]*clusterCache{}, - cachedResources: map[schema.GroupVersionResource]struct{}{}, + restMapper: restMapper, + newClientFunc: newClientFunc, + cache: map[string]*clusterCache{}, + registeredResources: map[schema.GroupVersionResource]struct{}{}, } } // UpdateCache update cache for multi clusters -func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error { +func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error { if klog.V(3).Enabled() { start := time.Now() defer func() { @@ -106,24 +106,7 @@ func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema return err } } - - // update cachedResource - newCachedResources := make(map[schema.GroupVersionResource]struct{}, len(c.cachedResources)) - for _, resources := range resourcesByCluster { - for resource := range resources { - newCachedResources[resource] = struct{}{} - } - } - for resource := range c.cachedResources { - if _, exist := newCachedResources[resource]; !exist { - delete(c.cachedResources, resource) - } - } - for resource := range newCachedResources { - if _, exist := c.cachedResources[resource]; !exist { - c.cachedResources[resource] = struct{}{} - } - } + c.registeredResources = registeredResources return nil } @@ -137,11 +120,11 @@ func (c *MultiClusterCache) Stop() { } } -// HasResource return whether resource is cached. +// HasResource return whether resource is registered. func (c *MultiClusterCache) HasResource(resource schema.GroupVersionResource) bool { c.lock.RLock() defer c.lock.RUnlock() - _, ok := c.cachedResources[resource] + _, ok := c.registeredResources[resource] return ok } diff --git a/pkg/search/proxy/store/multi_cluster_cache_test.go b/pkg/search/proxy/store/multi_cluster_cache_test.go index 46241434b3ed..0ecb61970f9f 100644 --- a/pkg/search/proxy/store/multi_cluster_cache_test.go +++ b/pkg/search/proxy/store/multi_cluster_cache_test.go @@ -80,8 +80,11 @@ func TestMultiClusterCache_UpdateCache(t *testing.T) { cluster1.Name: resourceSet(podGVR, nodeGVR), cluster2.Name: resourceSet(podGVR), } - - err := cache.UpdateCache(resources) + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + nodeGVR: {}, + } + err := cache.UpdateCache(resources, registeredResources) if err != nil { t.Error(err) } @@ -93,7 +96,7 @@ func TestMultiClusterCache_UpdateCache(t *testing.T) { // Then test removing cluster2 and remove node cache for cluster1 err = cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ cluster1.Name: resourceSet(podGVR), - }) + }, registeredResources) if err != nil { t.Error(err) } @@ -115,7 +118,11 @@ func TestMultiClusterCache_HasResource(t *testing.T) { cluster1.Name: resourceSet(podGVR, nodeGVR), cluster2.Name: resourceSet(podGVR), } - err := cache.UpdateCache(resources) + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + nodeGVR: {}, + } + err := cache.UpdateCache(resources, registeredResources) if err != nil { t.Error(err) return @@ -160,6 +167,9 @@ func TestMultiClusterCache_GetResourceFromCache(t *testing.T) { cluster1.Name: resourceSet(podGVR), cluster2.Name: resourceSet(podGVR), } + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + } cluster1Client := fakedynamic.NewSimpleDynamicClient(scheme, newUnstructuredObject(podGVK, "pod11", withDefaultNamespace()), newUnstructuredObject(podGVK, "pod_conflict", withDefaultNamespace()), @@ -180,7 +190,7 @@ func TestMultiClusterCache_GetResourceFromCache(t *testing.T) { } cache := NewMultiClusterCache(newClientFunc, restMapper) defer cache.Stop() - err := cache.UpdateCache(resources) + err := cache.UpdateCache(resources, registeredResources) if err != nil { t.Error(err) return @@ -300,11 +310,15 @@ func TestMultiClusterCache_Get(t *testing.T) { return fakedynamic.NewSimpleDynamicClient(scheme), nil } cache := NewMultiClusterCache(newClientFunc, restMapper) + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + nodeGVR: {}, + } defer cache.Stop() err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ cluster1.Name: resourceSet(podGVR, nodeGVR), cluster2.Name: resourceSet(podGVR), - }) + }, registeredResources) if err != nil { t.Fatal(err) } @@ -440,6 +454,9 @@ func TestMultiClusterCache_Get_Namespaced(t *testing.T) { } return fakedynamic.NewSimpleDynamicClient(scheme), nil } + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + } cache := NewMultiClusterCache(newClientFunc, restMapper) defer cache.Stop() err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ @@ -449,7 +466,7 @@ func TestMultiClusterCache_Get_Namespaced(t *testing.T) { cluster2.Name: { podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2")}, }, - }) + }, registeredResources) if err != nil { t.Fatal(err) } @@ -569,6 +586,9 @@ func TestMultiClusterCache_List(t *testing.T) { newUnstructuredObject(podGVK, "pod24", withDefaultNamespace(), withResourceVersion("2004")), newUnstructuredObject(podGVK, "pod25", withDefaultNamespace(), withResourceVersion("2005")), ) + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + } newClientFunc := func(cluster string) (dynamic.Interface, error) { switch cluster { @@ -657,7 +677,7 @@ func TestMultiClusterCache_List(t *testing.T) { t.Run(tt.name, func(t *testing.T) { cache := NewMultiClusterCache(newClientFunc, restMapper) defer cache.Stop() - err := cache.UpdateCache(tt.resources) + err := cache.UpdateCache(tt.resources, registeredResources) if err != nil { t.Error(err) return @@ -723,12 +743,15 @@ func TestMultiClusterCache_List_CacheSourceAnnotation(t *testing.T) { } return fakedynamic.NewSimpleDynamicClient(scheme), nil } + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + } cache := NewMultiClusterCache(newClientFunc, restMapper) defer cache.Stop() err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ cluster1.Name: resourceSet(podGVR), cluster2.Name: resourceSet(podGVR), - }) + }, registeredResources) if err != nil { t.Error(err) return @@ -777,11 +800,14 @@ func TestMultiClusterCache_List_Namespaced(t *testing.T) { return fakedynamic.NewSimpleDynamicClient(scheme), nil } cache := NewMultiClusterCache(newClientFunc, restMapper) + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + } defer cache.Stop() err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ cluster1.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1")}}, cluster2.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2", "ns3")}}, - }) + }, registeredResources) if err != nil { t.Error(err) return @@ -917,11 +943,14 @@ func TestMultiClusterCache_Watch(t *testing.T) { return fakedynamic.NewSimpleDynamicClient(scheme), nil } cache := NewMultiClusterCache(newClientFunc, restMapper) + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + } defer cache.Stop() err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ cluster1.Name: resourceSet(podGVR), cluster2.Name: resourceSet(podGVR), - }) + }, registeredResources) if err != nil { t.Error(err) return @@ -1038,12 +1067,15 @@ func TestMultiClusterCache_Watch_Namespaced(t *testing.T) { } return fakedynamic.NewSimpleDynamicClient(scheme), nil } + registeredResources := map[schema.GroupVersionResource]struct{}{ + podGVR: {}, + } cache := NewMultiClusterCache(newClientFunc, restMapper) defer cache.Stop() err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ cluster1.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1")}}, cluster2.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2", "ns3")}}, - }) + }, registeredResources) if err != nil { t.Error(err) return @@ -1396,6 +1428,8 @@ func TestMultiClusterCache_fillMissingClusterResourceVersion(t *testing.T) { defer cache.Stop() err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{ cluster1.Name: resourceSet(podGVR), + }, map[schema.GroupVersionResource]struct{}{ + podGVR: {}, }) if err != nil { t.Fatal(err) diff --git a/pkg/search/proxy/testing/mock_store.go b/pkg/search/proxy/testing/mock_store.go index d73d8a6d19c3..c0fabf36f71c 100644 --- a/pkg/search/proxy/testing/mock_store.go +++ b/pkg/search/proxy/testing/mock_store.go @@ -30,7 +30,7 @@ import ( // MockStore is a mock for store.Store interface type MockStore struct { - UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error + UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error HasResourceFunc func(resource schema.GroupVersionResource) bool GetResourceFromCacheFunc func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error) StopFunc func() @@ -42,11 +42,11 @@ type MockStore struct { var _ store.Store = &MockStore{} // UpdateCache implements store.Store interface -func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error { +func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error { if c.UpdateCacheFunc == nil { panic("implement me") } - return c.UpdateCacheFunc(resourcesByCluster) + return c.UpdateCacheFunc(resourcesByCluster, registeredResources) } // HasResource implements store.Store interface