diff --git a/pkg/admin/bootstrap.go b/pkg/admin/bootstrap.go index a4daa0c89..31521ad47 100644 --- a/pkg/admin/bootstrap.go +++ b/pkg/admin/bootstrap.go @@ -103,7 +103,7 @@ func RegisterOther(rt core_runtime.Runtime) error { if err != nil { panic(err) } - config.AdminRegistry, err = registry.Registry(c.GetProtocol(), addrUrl) // TODO: support k8s + config.AdminRegistry, config.Cache, err = registry.Registry(c.GetProtocol(), addrUrl, rt.KubeClient()) if err != nil { panic(err) } diff --git a/pkg/admin/cache/README.md b/pkg/admin/cache/README.md new file mode 100644 index 000000000..7a4160b45 --- /dev/null +++ b/pkg/admin/cache/README.md @@ -0,0 +1,48 @@ +# Development Guide + +## Overview +- cache module + - cache.go: define cache interface and result model. + - registry: + - kube + - cache.go: implement Cache interface for kubernetes mode, and define some registry logic like startInformer, stopInformer, etc. + - registry.go: implement Registry interface for kubernetes mode, and refer registry logic defined in cache.go + - universal: + - cache.go: implement Cache interface for universal mode, and define some registry logic like store, delete, etc. + - registry.go: implement Registry interface for universal mode, and refer registry logic defined in cache.go + - extension.go: define Registry extension interface and use it in admin module's bootstrap process. + - selector: + - selector.go: define Selector interface and Options interface. + - application_selector.go: implement Selector interface, and define application selector logic. + - service_selector.go: implement Selector interface, and define service selector logic. + - multiple_selector.go: an implement of Selector to combine multiple selectors. + +## How to use +- After dubbo-cp setup, cache has been initialized and an instance of Cache is declared as a global var in admin/config. +- Use `config.Cache` to get cache instance. +- Call some methods of Cache to get data from cache. + +## Examples + +### Get resources by application + +```go +package service + +import ( + "github.com/apache/dubbo-kubernetes/pkg/admin/cache/selector" + "github.com/apache/dubbo-kubernetes/pkg/admin/config" +) + +func (s *XXXServiceImpl) GetXXX(application string) ([]*model.XXX, error) { + // get data from cache + xxx, err := config.Cache.GetXXXWithSelector("some-namespace", selector.NewApplicationSelector(application)) + if err != nil { + return nil, err + } + // use data to do something + + // return results + return yyy, nil +} +``` \ No newline at end of file diff --git a/pkg/admin/cache/registry/extension.go b/pkg/admin/cache/registry/extension.go index 8caed783e..e090b03a4 100644 --- a/pkg/admin/cache/registry/extension.go +++ b/pkg/admin/cache/registry/extension.go @@ -20,24 +20,26 @@ package registry import ( "dubbo.apache.org/dubbo-go/v3/common" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" + "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" ) -var registries = make(map[string]func(u *common.URL) (AdminRegistry, error)) +var registries = make(map[string]func(u *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error)) // AddRegistry sets the registry extension with @name -func AddRegistry(name string, v func(u *common.URL) (AdminRegistry, error)) { +func AddRegistry(name string, v func(u *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error)) { registries[name] = v } // Registry finds the registry extension with @name -func Registry(name string, config *common.URL) (AdminRegistry, error) { +func Registry(name string, config *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error) { if name != "kubernetes" && name != "kube" && name != "k8s" { name = "universal" } if registries[name] == nil { panic("registry for " + name + " does not exist. please make sure that you have imported the package dubbo.apache.org/dubbo-go/v3/registry/" + name + ".") } - return registries[name](config) + return registries[name](config, kc) } type AdminRegistry interface { diff --git a/pkg/admin/cache/registry/kube/registry.go b/pkg/admin/cache/registry/kube/registry.go index 74652e077..ef7ea9728 100644 --- a/pkg/admin/cache/registry/kube/registry.go +++ b/pkg/admin/cache/registry/kube/registry.go @@ -20,12 +20,23 @@ package kube import ( "dubbo.apache.org/dubbo-go/v3/common" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/constant" + "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" ) func init() { - registry.AddRegistry("kube", func(u *common.URL) (registry.AdminRegistry, error) { - return NewRegistry(true, []string{"ns1", "ns2"}) // FIXME: get fields from config + registry.AddRegistry("kube", func(u *common.URL, kc *client.KubeClient) (registry.AdminRegistry, cache.Cache, error) { + clusterScoped := false + namespaces := make([]string, 0) + if ns, ok := u.GetParams()[constant.NamespaceKey]; ok && ns[0] != constant.AnyValue { + namespaces = append(namespaces, ns...) + } else { + clusterScoped = true + } + KubernetesCacheInstance = NewKubernetesCache(kc, clusterScoped) // init cache instance before start registry + return NewRegistry(clusterScoped, namespaces), KubernetesCacheInstance, nil }) } @@ -34,11 +45,11 @@ type Registry struct { namespaces []string } -func NewRegistry(clusterScoped bool, namespaces []string) (*Registry, error) { +func NewRegistry(clusterScoped bool, namespaces []string) *Registry { return &Registry{ clusterScoped: clusterScoped, namespaces: namespaces, - }, nil + } } func (kr *Registry) Delegate() dubboRegistry.Registry { diff --git a/pkg/admin/cache/registry/universal/registry.go b/pkg/admin/cache/registry/universal/registry.go index 90b086794..b84c52dc2 100644 --- a/pkg/admin/cache/registry/universal/registry.go +++ b/pkg/admin/cache/registry/universal/registry.go @@ -24,9 +24,11 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/extension" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" "github.com/apache/dubbo-kubernetes/pkg/admin/config" "github.com/apache/dubbo-kubernetes/pkg/admin/constant" + "github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client" "github.com/apache/dubbo-kubernetes/pkg/core/logger" gxset "github.com/dubbogo/gost/container/set" ) @@ -34,11 +36,11 @@ import ( var subscribeUrl *common.URL func init() { - registry.AddRegistry("universal", func(u *common.URL) (registry.AdminRegistry, error) { + registry.AddRegistry("universal", func(u *common.URL, _ *client.KubeClient) (registry.AdminRegistry, cache.Cache, error) { delegate, err := extension.GetRegistry(u.Protocol, u) if err != nil { logger.Error("Error initialize registry instance.") - return nil, err + return nil, nil, err } sdUrl := u.Clone() @@ -47,9 +49,10 @@ func init() { sdDelegate, err := extension.GetServiceDiscovery(sdUrl) if err != nil { logger.Error("Error initialize service discovery instance.") - return nil, err + return nil, nil, err } - return NewRegistry(delegate, sdDelegate), nil + UniversalCacheInstance = NewUniversalCache() // init cache instance before start registry + return NewRegistry(delegate, sdDelegate), UniversalCacheInstance, nil }) queryParams := url.Values{ diff --git a/pkg/admin/config/config.go b/pkg/admin/config/config.go index 9ff545fff..fe447e8ed 100644 --- a/pkg/admin/config/config.go +++ b/pkg/admin/config/config.go @@ -20,6 +20,7 @@ package config import ( "dubbo.apache.org/dubbo-go/v3/metadata/report" dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry" + "github.com/apache/dubbo-kubernetes/pkg/admin/cache" "github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry" "gorm.io/gorm" @@ -33,6 +34,8 @@ var ( MetadataReportCenter report.MetadataReport DataBase *gorm.DB // for service mock + + Cache cache.Cache ) var (