Skip to content

Commit

Permalink
Merge pull request apache#138 from ev1lQuark/cache-bootstrap
Browse files Browse the repository at this point in the history
feat: impl cache bootstrap
  • Loading branch information
chickenlj authored Jan 11, 2024
2 parents 2925c3c + 72b6fbe commit 3e060ac
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/admin/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func RegisterOther(rt core_runtime.Runtime) error {
if err != nil {
panic(err)
}
config.AdminRegistry, err = registry.Registry(c.GetProtocol(), addrUrl) // TODO: support k8s
config.AdminRegistry, config.Cache, err = registry.Registry(c.GetProtocol(), addrUrl, rt.KubeClient())
if err != nil {
panic(err)
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/admin/cache/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Development Guide

## Overview
- cache module
- cache.go: define cache interface and result model.
- registry:
- kube
- cache.go: implement Cache interface for kubernetes mode, and define some registry logic like startInformer, stopInformer, etc.
- registry.go: implement Registry interface for kubernetes mode, and refer registry logic defined in cache.go
- universal:
- cache.go: implement Cache interface for universal mode, and define some registry logic like store, delete, etc.
- registry.go: implement Registry interface for universal mode, and refer registry logic defined in cache.go
- extension.go: define Registry extension interface and use it in admin module's bootstrap process.
- selector:
- selector.go: define Selector interface and Options interface.
- application_selector.go: implement Selector interface, and define application selector logic.
- service_selector.go: implement Selector interface, and define service selector logic.
- multiple_selector.go: an implement of Selector to combine multiple selectors.

## How to use
- After dubbo-cp setup, cache has been initialized and an instance of Cache is declared as a global var in admin/config.
- Use `config.Cache` to get cache instance.
- Call some methods of Cache to get data from cache.

## Examples

### Get resources by application

```go
package service

import (
"github.com/apache/dubbo-kubernetes/pkg/admin/cache/selector"
"github.com/apache/dubbo-kubernetes/pkg/admin/config"
)

func (s *XXXServiceImpl) GetXXX(application string) ([]*model.XXX, error) {
// get data from cache
xxx, err := config.Cache.GetXXXWithSelector("some-namespace", selector.NewApplicationSelector(application))
if err != nil {
return nil, err
}
// use data to do something

// return results
return yyy, nil
}
```
10 changes: 6 additions & 4 deletions pkg/admin/cache/registry/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ package registry
import (
"dubbo.apache.org/dubbo-go/v3/common"
dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache"
"github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client"
)

var registries = make(map[string]func(u *common.URL) (AdminRegistry, error))
var registries = make(map[string]func(u *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error))

// AddRegistry sets the registry extension with @name
func AddRegistry(name string, v func(u *common.URL) (AdminRegistry, error)) {
func AddRegistry(name string, v func(u *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error)) {
registries[name] = v
}

// Registry finds the registry extension with @name
func Registry(name string, config *common.URL) (AdminRegistry, error) {
func Registry(name string, config *common.URL, kc *client.KubeClient) (AdminRegistry, cache.Cache, error) {
if name != "kubernetes" && name != "kube" && name != "k8s" {
name = "universal"
}
if registries[name] == nil {
panic("registry for " + name + " does not exist. please make sure that you have imported the package dubbo.apache.org/dubbo-go/v3/registry/" + name + ".")
}
return registries[name](config)
return registries[name](config, kc)
}

type AdminRegistry interface {
Expand Down
19 changes: 15 additions & 4 deletions pkg/admin/cache/registry/kube/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@ package kube
import (
"dubbo.apache.org/dubbo-go/v3/common"
dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry"
"github.com/apache/dubbo-kubernetes/pkg/admin/constant"
"github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client"
)

func init() {
registry.AddRegistry("kube", func(u *common.URL) (registry.AdminRegistry, error) {
return NewRegistry(true, []string{"ns1", "ns2"}) // FIXME: get fields from config
registry.AddRegistry("kube", func(u *common.URL, kc *client.KubeClient) (registry.AdminRegistry, cache.Cache, error) {
clusterScoped := false
namespaces := make([]string, 0)
if ns, ok := u.GetParams()[constant.NamespaceKey]; ok && ns[0] != constant.AnyValue {
namespaces = append(namespaces, ns...)
} else {
clusterScoped = true
}
KubernetesCacheInstance = NewKubernetesCache(kc, clusterScoped) // init cache instance before start registry
return NewRegistry(clusterScoped, namespaces), KubernetesCacheInstance, nil
})
}

Expand All @@ -34,11 +45,11 @@ type Registry struct {
namespaces []string
}

func NewRegistry(clusterScoped bool, namespaces []string) (*Registry, error) {
func NewRegistry(clusterScoped bool, namespaces []string) *Registry {
return &Registry{
clusterScoped: clusterScoped,
namespaces: namespaces,
}, nil
}
}

func (kr *Registry) Delegate() dubboRegistry.Registry {
Expand Down
11 changes: 7 additions & 4 deletions pkg/admin/cache/registry/universal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,23 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry"
"github.com/apache/dubbo-kubernetes/pkg/admin/config"
"github.com/apache/dubbo-kubernetes/pkg/admin/constant"
"github.com/apache/dubbo-kubernetes/pkg/core/kubeclient/client"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
gxset "github.com/dubbogo/gost/container/set"
)

var subscribeUrl *common.URL

func init() {
registry.AddRegistry("universal", func(u *common.URL) (registry.AdminRegistry, error) {
registry.AddRegistry("universal", func(u *common.URL, _ *client.KubeClient) (registry.AdminRegistry, cache.Cache, error) {
delegate, err := extension.GetRegistry(u.Protocol, u)
if err != nil {
logger.Error("Error initialize registry instance.")
return nil, err
return nil, nil, err
}

sdUrl := u.Clone()
Expand All @@ -47,9 +49,10 @@ func init() {
sdDelegate, err := extension.GetServiceDiscovery(sdUrl)
if err != nil {
logger.Error("Error initialize service discovery instance.")
return nil, err
return nil, nil, err
}
return NewRegistry(delegate, sdDelegate), nil
UniversalCacheInstance = NewUniversalCache() // init cache instance before start registry
return NewRegistry(delegate, sdDelegate), UniversalCacheInstance, nil
})

queryParams := url.Values{
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package config
import (
"dubbo.apache.org/dubbo-go/v3/metadata/report"
dubboRegistry "dubbo.apache.org/dubbo-go/v3/registry"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache"
"github.com/apache/dubbo-kubernetes/pkg/admin/cache/registry"
"gorm.io/gorm"

Expand All @@ -33,6 +34,8 @@ var (
MetadataReportCenter report.MetadataReport

DataBase *gorm.DB // for service mock

Cache cache.Cache
)

var (
Expand Down

0 comments on commit 3e060ac

Please sign in to comment.