From fa65f4176f4c6de97b3299f66764de943e7e17c2 Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Tue, 9 Jan 2024 13:51:14 +0800 Subject: [PATCH 1/4] feat: cache bootstrap --- pkg/admin/bootstrap.go | 2 +- pkg/admin/cache/registry/extension.go | 9 +++++---- pkg/admin/cache/registry/kube/registry.go | 14 ++++++++++++-- pkg/admin/cache/registry/universal/registry.go | 4 +++- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/pkg/admin/bootstrap.go b/pkg/admin/bootstrap.go index a4daa0c89..107bc9000 100644 --- a/pkg/admin/bootstrap.go +++ b/pkg/admin/bootstrap.go @@ -103,7 +103,7 @@ func RegisterOther(rt core_runtime.Runtime) error { if err != nil { panic(err) } - config.AdminRegistry, err = registry.Registry(c.GetProtocol(), addrUrl) // TODO: support k8s + config.AdminRegistry, err = registry.Registry(c.GetProtocol(), addrUrl, rt.KubeClient()) if err != nil { panic(err) } diff --git a/pkg/admin/cache/registry/extension.go b/pkg/admin/cache/registry/extension.go index 8caed783e..f26fdce93 100644 --- a/pkg/admin/cache/registry/extension.go +++ b/pkg/admin/cache/registry/extension.go @@ -20,24 +20,25 @@ package registry import ( "dubbo.apache.org/dubbo-go/v3/common" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" + "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" ) -var registries = make(map[string]func(u *common.URL) (AdminRegistry, error)) +var registries = make(map[string]func(u *common.URL, kc *client.KubeClient) (AdminRegistry, error)) // AddRegistry sets the registry extension with @name -func AddRegistry(name string, v func(u *common.URL) (AdminRegistry, error)) { +func AddRegistry(name string, v func(u *common.URL, kc *client.KubeClient) (AdminRegistry, error)) { registries[name] = v } // Registry finds the registry extension with @name -func Registry(name string, config *common.URL) (AdminRegistry, error) { +func Registry(name string, config *common.URL, kc *client.KubeClient) (AdminRegistry, error) { if name != "kubernetes" && name != "kube" && name != "k8s" { name = "universal" } if registries[name] == nil { panic("registry for " + name + " does not exist. please make sure that you have imported the package dubbo.apache.org/dubbo-go/v3/registry/" + name + ".") } - return registries[name](config) + return registries[name](config, kc) } type AdminRegistry interface { diff --git a/pkg/admin/cache/registry/kube/registry.go b/pkg/admin/cache/registry/kube/registry.go index 74652e077..a7c948411 100644 --- a/pkg/admin/cache/registry/kube/registry.go +++ b/pkg/admin/cache/registry/kube/registry.go @@ -21,11 +21,21 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/constant" + "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" ) func init() { - registry.AddRegistry("kube", func(u *common.URL) (registry.AdminRegistry, error) { - return NewRegistry(true, []string{"ns1", "ns2"}) // FIXME: get fields from config + registry.AddRegistry("kube", func(u *common.URL, kc *client.KubeClient) (registry.AdminRegistry, error) { + clusterScoped := false + namespaces := make([]string, 0) + if ns, ok := u.GetParams()[constant.NamespaceKey]; ok && ns[0] != constant.AnyValue { + namespaces = append(namespaces, ns...) + } else { + clusterScoped = true + } + KubernetesCacheInstance = NewKubernetesCache(kc, clusterScoped) // init cache instance before start registry + return NewRegistry(clusterScoped, []string{}) }) } diff --git a/pkg/admin/cache/registry/universal/registry.go b/pkg/admin/cache/registry/universal/registry.go index 90b086794..fb7fb1cec 100644 --- a/pkg/admin/cache/registry/universal/registry.go +++ b/pkg/admin/cache/registry/universal/registry.go @@ -27,6 +27,7 @@ import ( "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" "github.com/apache/dubbo-kubernetes/pkg/admin/config" "github.com/apache/dubbo-kubernetes/pkg/admin/constant" + "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" "github.com/apache/dubbo-kubernetes/pkg/core/logger" gxset "github.com/dubbogo/gost/container/set" ) @@ -34,7 +35,7 @@ import ( var subscribeUrl *common.URL func init() { - registry.AddRegistry("universal", func(u *common.URL) (registry.AdminRegistry, error) { + registry.AddRegistry("universal", func(u *common.URL, _ *client.KubeClient) (registry.AdminRegistry, error) { delegate, err := extension.GetRegistry(u.Protocol, u) if err != nil { logger.Error("Error initialize registry instance.") @@ -49,6 +50,7 @@ func init() { logger.Error("Error initialize service discovery instance.") return nil, err } + UniversalCacheInstance = NewUniversalCache() // init cache instance before start registry return NewRegistry(delegate, sdDelegate), nil }) From 05607e5a15171c414ebd42fbfcdac7664bf187bb Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Tue, 9 Jan 2024 15:01:37 +0800 Subject: [PATCH 2/4] feat: add cache instance in config --- pkg/admin/bootstrap.go | 2 +- pkg/admin/cache/registry/extension.go | 7 ++++--- pkg/admin/cache/registry/kube/registry.go | 9 +++++---- pkg/admin/cache/registry/universal/registry.go | 9 +++++---- pkg/admin/config/config.go | 3 +++ 5 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/admin/bootstrap.go b/pkg/admin/bootstrap.go index 107bc9000..31521ad47 100644 --- a/pkg/admin/bootstrap.go +++ b/pkg/admin/bootstrap.go @@ -103,7 +103,7 @@ func RegisterOther(rt core_runtime.Runtime) error { if err != nil { panic(err) } - config.AdminRegistry, err = registry.Registry(c.GetProtocol(), addrUrl, rt.KubeClient()) + config.AdminRegistry, config.Cache, err = registry.Registry(c.GetProtocol(), addrUrl, rt.KubeClient()) if err != nil { panic(err) } diff --git a/pkg/admin/cache/registry/extension.go b/pkg/admin/cache/registry/extension.go index f26fdce93..e090b03a4 100644 --- a/pkg/admin/cache/registry/extension.go +++ b/pkg/admin/cache/registry/extension.go @@ -20,18 +20,19 @@ package registry import ( "dubbo.apache.org/dubbo-go/v3/common" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" ) -var registries = make(map[string]func(u *common.URL, kc *client.KubeClient) (AdminRegistry, error)) +var registries = make(map[string]func(u *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error)) // AddRegistry sets the registry extension with @name -func AddRegistry(name string, v func(u *common.URL, kc *client.KubeClient) (AdminRegistry, error)) { +func AddRegistry(name string, v func(u *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error)) { registries[name] = v } // Registry finds the registry extension with @name -func Registry(name string, config *common.URL, kc *client.KubeClient) (AdminRegistry, error) { +func Registry(name string, config *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error) { if name != "kubernetes" && name != "kube" && name != "k8s" { name = "universal" } diff --git a/pkg/admin/cache/registry/kube/registry.go b/pkg/admin/cache/registry/kube/registry.go index a7c948411..5a525b7ef 100644 --- a/pkg/admin/cache/registry/kube/registry.go +++ b/pkg/admin/cache/registry/kube/registry.go @@ -20,13 +20,14 @@ package kube import ( "dubbo.apache.org/dubbo-go/v3/common" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" "github.com/apache/dubbo-kubernetes/pkg/admin/constant" "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" ) func init() { - registry.AddRegistry("kube", func(u *common.URL, kc *client.KubeClient) (registry.AdminRegistry, error) { + registry.AddRegistry("kube", func(u *common.URL, kc *client.KubeClient) (registry.AdminRegistry, cache.Cache, error) { clusterScoped := false namespaces := make([]string, 0) if ns, ok := u.GetParams()[constant.NamespaceKey]; ok && ns[0] != constant.AnyValue { @@ -35,7 +36,7 @@ func init() { clusterScoped = true } KubernetesCacheInstance = NewKubernetesCache(kc, clusterScoped) // init cache instance before start registry - return NewRegistry(clusterScoped, []string{}) + return NewRegistry(clusterScoped, []string{}), KubernetesCacheInstance, nil }) } @@ -44,11 +45,11 @@ type Registry struct { namespaces []string } -func NewRegistry(clusterScoped bool, namespaces []string) (*Registry, error) { +func NewRegistry(clusterScoped bool, namespaces []string) *Registry { return &Registry{ clusterScoped: clusterScoped, namespaces: namespaces, - }, nil + } } func (kr *Registry) Delegate() dubboRegistry.Registry { diff --git a/pkg/admin/cache/registry/universal/registry.go b/pkg/admin/cache/registry/universal/registry.go index fb7fb1cec..b84c52dc2 100644 --- a/pkg/admin/cache/registry/universal/registry.go +++ b/pkg/admin/cache/registry/universal/registry.go @@ -24,6 +24,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/extension" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" "github.com/apache/dubbo-kubernetes/pkg/admin/config" "github.com/apache/dubbo-kubernetes/pkg/admin/constant" @@ -35,11 +36,11 @@ import ( var subscribeUrl *common.URL func init() { - registry.AddRegistry("universal", func(u *common.URL, _ *client.KubeClient) (registry.AdminRegistry, error) { + registry.AddRegistry("universal", func(u *common.URL, _ *client.KubeClient) (registry.AdminRegistry, cache.Cache, error) { delegate, err := extension.GetRegistry(u.Protocol, u) if err != nil { logger.Error("Error initialize registry instance.") - return nil, err + return nil, nil, err } sdUrl := u.Clone() @@ -48,10 +49,10 @@ func init() { sdDelegate, err := extension.GetServiceDiscovery(sdUrl) if err != nil { logger.Error("Error initialize service discovery instance.") - return nil, err + return nil, nil, err } UniversalCacheInstance = NewUniversalCache() // init cache instance before start registry - return NewRegistry(delegate, sdDelegate), nil + return NewRegistry(delegate, sdDelegate), UniversalCacheInstance, nil }) queryParams := url.Values{ diff --git a/pkg/admin/config/config.go b/pkg/admin/config/config.go index 9ff545fff..fe447e8ed 100644 --- a/pkg/admin/config/config.go +++ b/pkg/admin/config/config.go @@ -20,6 +20,7 @@ package config import ( "dubbo.apache.org/dubbo-go/v3/metadata/report" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" "gorm.io/gorm" @@ -33,6 +34,8 @@ var ( MetadataReportCenter report.MetadataReport DataBase *gorm.DB // for service mock + + Cache cache.Cache ) var ( From 949406c9cf843c470895b2a72503a81244e95832 Mon Sep 17 00:00:00 2001 From: Wang Guan Date: Tue, 9 Jan 2024 15:01:59 +0800 Subject: [PATCH 3/4] doc(cache): add cache dev document --- pkg/admin/cache/README.md | 48 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 pkg/admin/cache/README.md diff --git a/pkg/admin/cache/README.md b/pkg/admin/cache/README.md new file mode 100644 index 000000000..7a4160b45 --- /dev/null +++ b/pkg/admin/cache/README.md @@ -0,0 +1,48 @@ +# Development Guide + +## Overview +- cache module + - cache.go: define cache interface and result model. + - registry: + - kube + - cache.go: implement Cache interface for kubernetes mode, and define some registry logic like startInformer, stopInformer, etc. + - registry.go: implement Registry interface for kubernetes mode, and refer registry logic defined in cache.go + - universal: + - cache.go: implement Cache interface for universal mode, and define some registry logic like store, delete, etc. + - registry.go: implement Registry interface for universal mode, and refer registry logic defined in cache.go + - extension.go: define Registry extension interface and use it in admin module's bootstrap process. + - selector: + - selector.go: define Selector interface and Options interface. + - application_selector.go: implement Selector interface, and define application selector logic. + - service_selector.go: implement Selector interface, and define service selector logic. + - multiple_selector.go: an implement of Selector to combine multiple selectors. + +## How to use +- After dubbo-cp setup, cache has been initialized and an instance of Cache is declared as a global var in admin/config. +- Use `config.Cache` to get cache instance. +- Call some methods of Cache to get data from cache. + +## Examples + +### Get resources by application + +```go +package service + +import ( + "github.com/apache/dubbo-kubernetes/pkg/admin/cache/selector" + "github.com/apache/dubbo-kubernetes/pkg/admin/config" +) + +func (s *XXXServiceImpl) GetXXX(application string) ([]*model.XXX, error) { + // get data from cache + xxx, err := config.Cache.GetXXXWithSelector("some-namespace", selector.NewApplicationSelector(application)) + if err != nil { + return nil, err + } + // use data to do something + + // return results + return yyy, nil +} +``` \ No newline at end of file From 72b6fbea8834d0f3815be260539a820982eebeec Mon Sep 17 00:00:00 2001 From: wangguan Date: Tue, 9 Jan 2024 23:16:22 +0800 Subject: [PATCH 4/4] fix: lint --- pkg/admin/cache/registry/kube/registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/admin/cache/registry/kube/registry.go b/pkg/admin/cache/registry/kube/registry.go index 5a525b7ef..ef7ea9728 100644 --- a/pkg/admin/cache/registry/kube/registry.go +++ b/pkg/admin/cache/registry/kube/registry.go @@ -36,7 +36,7 @@ func init() { clusterScoped = true } KubernetesCacheInstance = NewKubernetesCache(kc, clusterScoped) // init cache instance before start registry - return NewRegistry(clusterScoped, []string{}), KubernetesCacheInstance, nil + return NewRegistry(clusterScoped, namespaces), KubernetesCacheInstance, nil }) }