Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hasResource should return whether resource is matched not cached #5662

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/search/proxy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
if err != nil {
return err
}

registeredResources := make(map[schema.GroupVersionResource]struct{})
resourcesByClusters := make(map[string]map[schema.GroupVersionResource]*store.MultiNamespace)
for _, registry := range registries {
matchedResources := make(map[schema.GroupVersionResource]*store.MultiNamespace, len(registry.Spec.ResourceSelectors))
Expand All @@ -203,8 +203,8 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
matchedResources[gvr] = nsSelector
}
nsSelector.Add(selector.Namespace)
registeredResources[gvr] = struct{}{}
}

if len(matchedResources) == 0 {
continue
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (ctl *Controller) reconcile(util.QueueKey) error {
}
}

return ctl.store.UpdateCache(resourcesByClusters)
return ctl.store.UpdateCache(resourcesByClusters, registeredResources)
}

type errorHTTPHandler struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/search/proxy/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func TestController_reconcile(t *testing.T) {
clusterLister: karmadaFactory.Cluster().V1alpha1().Clusters().Lister(),
registryLister: karmadaFactory.Search().V1alpha1().ResourceRegistries().Lister(),
store: &proxytest.MockStore{
UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error {
UpdateCacheFunc: func(m map[string]map[schema.GroupVersionResource]*store.MultiNamespace, _ map[schema.GroupVersionResource]struct{}) error {
for clusterName, resources := range m {
resourceNames := make([]string, 0, len(resources))
for resource := range resources {
Expand Down
43 changes: 13 additions & 30 deletions pkg/search/proxy/store/multi_cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (

// Store is the cache for resources from multiple member clusters
type Store interface {
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error
UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error
HasResource(resource schema.GroupVersionResource) bool
GetResourceFromCache(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
Stop()
Expand All @@ -52,10 +52,10 @@ type Store interface {

// MultiClusterCache caches resource from multi member clusters
type MultiClusterCache struct {
lock sync.RWMutex
cache map[string]*clusterCache
cachedResources map[schema.GroupVersionResource]struct{}
restMapper meta.RESTMapper
lock sync.RWMutex
cache map[string]*clusterCache
registeredResources map[schema.GroupVersionResource]struct{}
restMapper meta.RESTMapper
// newClientFunc returns a dynamic client for member cluster apiserver
newClientFunc func(string) (dynamic.Interface, error)
}
Expand All @@ -65,15 +65,15 @@ var _ Store = &MultiClusterCache{}
// NewMultiClusterCache return a cache for resources from member clusters
func NewMultiClusterCache(newClientFunc func(string) (dynamic.Interface, error), restMapper meta.RESTMapper) *MultiClusterCache {
return &MultiClusterCache{
restMapper: restMapper,
newClientFunc: newClientFunc,
cache: map[string]*clusterCache{},
cachedResources: map[schema.GroupVersionResource]struct{}{},
restMapper: restMapper,
newClientFunc: newClientFunc,
cache: map[string]*clusterCache{},
registeredResources: map[schema.GroupVersionResource]struct{}{},
}
}

// UpdateCache update cache for multi clusters
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace) error {
func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error {
if klog.V(3).Enabled() {
start := time.Now()
defer func() {
Expand Down Expand Up @@ -106,24 +106,7 @@ func (c *MultiClusterCache) UpdateCache(resourcesByCluster map[string]map[schema
return err
}
}

// update cachedResource
newCachedResources := make(map[schema.GroupVersionResource]struct{}, len(c.cachedResources))
for _, resources := range resourcesByCluster {
for resource := range resources {
newCachedResources[resource] = struct{}{}
}
}
for resource := range c.cachedResources {
if _, exist := newCachedResources[resource]; !exist {
delete(c.cachedResources, resource)
}
}
for resource := range newCachedResources {
if _, exist := c.cachedResources[resource]; !exist {
c.cachedResources[resource] = struct{}{}
}
}
c.registeredResources = registeredResources
return nil
}

Expand All @@ -137,11 +120,11 @@ func (c *MultiClusterCache) Stop() {
}
}

// HasResource return whether resource is cached.
// HasResource return whether resource is registered.
func (c *MultiClusterCache) HasResource(resource schema.GroupVersionResource) bool {
c.lock.RLock()
defer c.lock.RUnlock()
_, ok := c.cachedResources[resource]
_, ok := c.registeredResources[resource]
return ok
}

Expand Down
58 changes: 46 additions & 12 deletions pkg/search/proxy/store/multi_cluster_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ func TestMultiClusterCache_UpdateCache(t *testing.T) {
cluster1.Name: resourceSet(podGVR, nodeGVR),
cluster2.Name: resourceSet(podGVR),
}

err := cache.UpdateCache(resources)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
nodeGVR: {},
}
err := cache.UpdateCache(resources, registeredResources)
if err != nil {
t.Error(err)
}
Expand All @@ -93,7 +96,7 @@ func TestMultiClusterCache_UpdateCache(t *testing.T) {
// Then test removing cluster2 and remove node cache for cluster1
err = cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Error(err)
}
Expand All @@ -115,7 +118,11 @@ func TestMultiClusterCache_HasResource(t *testing.T) {
cluster1.Name: resourceSet(podGVR, nodeGVR),
cluster2.Name: resourceSet(podGVR),
}
err := cache.UpdateCache(resources)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
nodeGVR: {},
}
err := cache.UpdateCache(resources, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -160,6 +167,9 @@ func TestMultiClusterCache_GetResourceFromCache(t *testing.T) {
cluster1.Name: resourceSet(podGVR),
cluster2.Name: resourceSet(podGVR),
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cluster1Client := fakedynamic.NewSimpleDynamicClient(scheme,
newUnstructuredObject(podGVK, "pod11", withDefaultNamespace()),
newUnstructuredObject(podGVK, "pod_conflict", withDefaultNamespace()),
Expand All @@ -180,7 +190,7 @@ func TestMultiClusterCache_GetResourceFromCache(t *testing.T) {
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(resources)
err := cache.UpdateCache(resources, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -300,11 +310,15 @@ func TestMultiClusterCache_Get(t *testing.T) {
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
nodeGVR: {},
}
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR, nodeGVR),
cluster2.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -440,6 +454,9 @@ func TestMultiClusterCache_Get_Namespaced(t *testing.T) {
}
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
Expand All @@ -449,7 +466,7 @@ func TestMultiClusterCache_Get_Namespaced(t *testing.T) {
cluster2.Name: {
podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2")},
},
})
}, registeredResources)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -569,6 +586,9 @@ func TestMultiClusterCache_List(t *testing.T) {
newUnstructuredObject(podGVK, "pod24", withDefaultNamespace(), withResourceVersion("2004")),
newUnstructuredObject(podGVK, "pod25", withDefaultNamespace(), withResourceVersion("2005")),
)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}

newClientFunc := func(cluster string) (dynamic.Interface, error) {
switch cluster {
Expand Down Expand Up @@ -657,7 +677,7 @@ func TestMultiClusterCache_List(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(tt.resources)
err := cache.UpdateCache(tt.resources, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -723,12 +743,15 @@ func TestMultiClusterCache_List_CacheSourceAnnotation(t *testing.T) {
}
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
cluster2.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -777,11 +800,14 @@ func TestMultiClusterCache_List_Namespaced(t *testing.T) {
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1")}},
cluster2.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2", "ns3")}},
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -917,11 +943,14 @@ func TestMultiClusterCache_Watch(t *testing.T) {
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
cluster2.Name: resourceSet(podGVR),
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -1038,12 +1067,15 @@ func TestMultiClusterCache_Watch_Namespaced(t *testing.T) {
}
return fakedynamic.NewSimpleDynamicClient(scheme), nil
}
registeredResources := map[schema.GroupVersionResource]struct{}{
podGVR: {},
}
cache := NewMultiClusterCache(newClientFunc, restMapper)
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1")}},
cluster2.Name: {podGVR: &MultiNamespace{namespaces: sets.New[string]("ns1", "ns2", "ns3")}},
})
}, registeredResources)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -1396,6 +1428,8 @@ func TestMultiClusterCache_fillMissingClusterResourceVersion(t *testing.T) {
defer cache.Stop()
err := cache.UpdateCache(map[string]map[schema.GroupVersionResource]*MultiNamespace{
cluster1.Name: resourceSet(podGVR),
}, map[schema.GroupVersionResource]struct{}{
podGVR: {},
})
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/search/proxy/testing/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// MockStore is a mock for store.Store interface
type MockStore struct {
UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error
UpdateCacheFunc func(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error
HasResourceFunc func(resource schema.GroupVersionResource) bool
GetResourceFromCacheFunc func(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (runtime.Object, string, error)
StopFunc func()
Expand All @@ -42,11 +42,11 @@ type MockStore struct {
var _ store.Store = &MockStore{}

// UpdateCache implements store.Store interface
func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace) error {
func (c *MockStore) UpdateCache(resourcesByCluster map[string]map[schema.GroupVersionResource]*store.MultiNamespace, registeredResources map[schema.GroupVersionResource]struct{}) error {
if c.UpdateCacheFunc == nil {
panic("implement me")
}
return c.UpdateCacheFunc(resourcesByCluster)
return c.UpdateCacheFunc(resourcesByCluster, registeredResources)
}

// HasResource implements store.Store interface
Expand Down