Skip to content

Commit

Permalink
Merge pull request #2 from micro-stacks/feature-customKvFormat-xvrzhao
Browse files Browse the repository at this point in the history
add: EtcdKvBuilder and EtcdKvResolver interfaces
  • Loading branch information
Xavier Zhao authored Feb 7, 2021
2 parents 5b622f5 + 66b9b6e commit 15bf446
Show file tree
Hide file tree
Showing 14 changed files with 223 additions and 99 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Discov is a generic gRPC service registration and discovery / load balancing com

## Documentation

[pkg.go.dev](https://pkg.go.dev/github.com/xvrzhao/discov)
[pkg.go.dev](https://pkg.go.dev/github.com/micro-stacks/discov)

## License

Expand Down
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Discov 是通用的 gRPC 服务注册发现/负载均衡组件。

## 文档

[pkg.go.dev](https://pkg.go.dev/github.com/xvrzhao/discov)
[pkg.go.dev](https://pkg.go.dev/github.com/micro-stacks/discov)

## 开源协议

Expand Down
48 changes: 39 additions & 9 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,28 @@ import (
"time"
)

// builder implements the gRPC interface of resolver.Builder.
type builder struct {
// Builder implements the gRPC resolver.Builder interface.
// It Builds a discov Resolver that implements the gRPC
// resolver.Resolver interface.
type Builder struct {
options *builderOptions
}

type builderOptions struct {
// etcd
etcdClient *clientv3.Client
kvResolver EtcdKvResolver
// dns
dnsPollingInterval time.Duration
}

// BuilderOption is the option that passed to NewBuilder function.
type BuilderOption struct {
applyTo func(*builderOptions)
}

// WithEtcdClient injects an etcd client. The option is required
// when the authority of discov is etcd.
func WithEtcdClient(cli *clientv3.Client) BuilderOption {
return BuilderOption{
applyTo: func(options *builderOptions) {
Expand All @@ -35,6 +41,18 @@ func WithEtcdClient(cli *clientv3.Client) BuilderOption {
}
}

// WithEtcdKvResolver injects a custom EtcdKvResolver implementation
// to determine how the internal etcdResolver watches keys and retrieves addrs.
func WithEtcdKvResolver(r EtcdKvResolver) BuilderOption {
return BuilderOption{
applyTo: func(options *builderOptions) {
options.kvResolver = r
},
}
}

// WithDNSPollingInterval customizes the polling frequency of
// the internal dnsResolver.
func WithDNSPollingInterval(d time.Duration) BuilderOption {
return BuilderOption{
applyTo: func(options *builderOptions) {
Expand All @@ -43,19 +61,28 @@ func WithDNSPollingInterval(d time.Duration) BuilderOption {
}
}

func NewBuilder(opts ...BuilderOption) *builder {
// NewBuilder returns a Builder that contain the passed options.
func NewBuilder(opts ...BuilderOption) *Builder {
options := new(builderOptions)
for _, option := range opts {
option.applyTo(options)
}
return &builder{options: options}
return &Builder{options: options}
}

func (b *builder) Scheme() string {
// Scheme returns the scheme "discov" correspond to the Builder.
func (b *Builder) Scheme() string {
return "discov"
}

func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (r resolver.Resolver, err error) {
// Build creates a new resolver for the given target.
//
// If the authority of target that passed to grpc dial method
// is etcd, Build returns an internal etcdResolver, if the
// authority is dns, returns an internal dnsResolver.
//
// The options passed to NewBuilder will be used in this method.
func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (r resolver.Resolver, err error) {
_, authority, endpoint, err := parseTarget(target)
if err != nil {
err = fmt.Errorf("parse target: %v", err)
Expand All @@ -74,11 +101,16 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
return
}

if b.options.kvResolver == nil {
b.options.kvResolver = new(DefaultEtcdKvResolver)
}

ctx, cancel := context.WithCancel(context.Background())

er := &etcdResolver{
cli: b.options.etcdClient,
srv: endpoint,
kvResolver: b.options.kvResolver,
cc: cc,
ctx: ctx,
cancel: cancel,
Expand All @@ -105,9 +137,7 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
}

if b.options.dnsPollingInterval == 0 {
err = fmt.Errorf("the authority in target is %q but missing WithDNSPollingInterval option when NewBuilder",
authority)
return
b.options.dnsPollingInterval = time.Second * 30
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
3 changes: 3 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package discov provides the APIs for service registration and
// discovery of gRPC.
package discov
52 changes: 52 additions & 0 deletions etcd_kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package discov

import "fmt"

// EtcdKvBuilder builds the key and the value to store to etcd.
type EtcdKvBuilder interface {
// BuildKey builds the etcd key according to the name and
// the address of the service.
BuildKey(srvName, srvAddr string) (key string)
// BuildValue builds the etcd value according to the name
// and the address of the service.
BuildValue(srvName, srvAddr string) (value string)
}

// DefaultEtcdKvBuilder will be used if the user does not RegisterKvBuilder.
type DefaultEtcdKvBuilder struct{}

// BuildKey returns the key of "/srv/${srvName}/${srvAddr}".
func (*DefaultEtcdKvBuilder) BuildKey(srvName, srvAddr string) string {
return fmt.Sprintf("/srv/%s/%s", srvName, srvAddr)
}

// BuildValue returns the value as the same as srvAddr.
func (*DefaultEtcdKvBuilder) BuildValue(_, srvAddr string) string {
return srvAddr
}

// EtcdKvResolver determines how the internal etcdResolver
// watches keys and retrieves addrs.
type EtcdKvResolver interface {
// GetKeyPrefixForSrv generates the key prefix for etcdResolver to watch.
// srvName is the endpoint in the target that passed to the grpc Dial method.
GetKeyPrefixForSrv(srvName string) (prefix string)
// ResolveSrvAddr resolves the service address from the value corresponding
// to the etcd key that be watched.
ResolveSrvAddr(value []byte) (srvAddr string)
}

// DefaultEtcdKvResolver will be used if the user doesn't set the EtcdKvResolver.
// DefaultEtcdKvResolver watches keys with prefix of "/srv/${srvName}", and
// treats the value of the key directly as an address.
type DefaultEtcdKvResolver struct{}

// GetKeyPrefixForSrv returns the prefix of "/srv/${srvName}".
func (*DefaultEtcdKvResolver) GetKeyPrefixForSrv(srvName string) string {
return fmt.Sprintf("/srv/%s", srvName)
}

// ResolveSrvAddr returns the string content of value.
func (r *DefaultEtcdKvResolver) ResolveSrvAddr(value []byte) string {
return string(value)
}
136 changes: 76 additions & 60 deletions etcd_register.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"context"
"errors"
"fmt"
"go.etcd.io/etcd/clientv3"
"sync"
"time"

"go.etcd.io/etcd/clientv3"
)

// EtcdSrvRecord is a service record to be registered to or
Expand All @@ -22,9 +23,12 @@ type EtcdSrvRecord interface {
// Since the EtcdSrvRecord spawns goroutines to keep heartbeat
// to etcd after calling Register, some errors may occur at runtime.
CatchRuntimeErrors() <-chan error
// RegisterKvBuilder injects an EtcdKvBuilder to custom the
// format of key and value to store.
RegisterKvBuilder(EtcdKvBuilder)
}

// NewEtcdSrvRecord construct a service record.
// NewEtcdSrvRecord creates a service record.
func NewEtcdSrvRecord(cli *clientv3.Client, srvName, srvAddr string) (EtcdSrvRecord, error) {
if cli == nil {
return nil, errors.New("nil etcd client")
Expand All @@ -41,22 +45,26 @@ func NewEtcdSrvRecord(cli *clientv3.Client, srvName, srvAddr string) (EtcdSrvRec
}

r := &etcdSrvRecord{
cli: cli,
ctx: nil,
cancel: nil,
wg: sync.WaitGroup{},
errs: nil,
errsLock: sync.Mutex{},
srvName: srvName,
srvAddr: srvAddr,
srvTTL: time.Second * 3,
cli: cli,
ctx: nil,
cancel: nil,
wg: sync.WaitGroup{},
errs: nil,
errsLock: sync.Mutex{},
srvName: srvName,
srvAddr: srvAddr,
kvBuilder: nil,
srvTTL: time.Second * 3,
}

return r, nil
}

type etcdSrvRecord struct {
cli *clientv3.Client
cli *clientv3.Client
srvTTL time.Duration
kvBuilder EtcdKvBuilder
srvName, srvAddr string

// used as revoking checker, aliveKeeper and consumer
ctx context.Context
Expand All @@ -65,16 +73,16 @@ type etcdSrvRecord struct {

errs chan error // note: never close after initialization
errsLock sync.Mutex

srvName string
srvAddr string
srvTTL time.Duration
}

func (r *etcdSrvRecord) Register() {
r.ctx, r.cancel = context.WithCancel(context.Background())
r.initErrorChannel()

if r.kvBuilder == nil {
r.RegisterKvBuilder(new(DefaultEtcdKvBuilder))
}

r.wg.Add(1)
go r.checker()
}
Expand All @@ -85,7 +93,11 @@ func (r *etcdSrvRecord) Unregister(ctx context.Context) error {
r.wg.Wait()
}

key := fmt.Sprintf("%s/%v", getEtcdKeyPrefix(r.srvName), r.srvAddr)
if r.kvBuilder == nil {
r.RegisterKvBuilder(new(DefaultEtcdKvBuilder))
}

key := r.kvBuilder.BuildKey(r.srvName, r.srvAddr)
_, err := r.cli.Delete(ctx, key)
if err != nil {
return fmt.Errorf("etcd client delete key(%s): %v", key, err)
Expand All @@ -94,56 +106,66 @@ func (r *etcdSrvRecord) Unregister(ctx context.Context) error {
return nil
}

func (r *etcdSrvRecord) CatchRuntimeErrors() <-chan error {
r.initErrorChannel()

return r.errs
}

// RegisterKvBuilder should be called before Register and Unregister.
func (r *etcdSrvRecord) RegisterKvBuilder(b EtcdKvBuilder) {
r.kvBuilder = b
}

func (r *etcdSrvRecord) checker() {
defer r.wg.Done()

key := fmt.Sprintf("%s/%v", getEtcdKeyPrefix(r.srvName), r.srvAddr)
key := r.kvBuilder.BuildKey(r.srvName, r.srvAddr)
ticker := time.NewTicker(r.srvTTL)

for {
select {
case <-r.ctx.Done():
// revoke this goroutine
ticker.Stop()
return
case <-ticker.C:
}

ctx, cancel := context.WithTimeout(context.Background(), r.srvTTL)
resp, err := r.cli.Get(ctx, key)
if err != nil {
cancel()
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to get key: %v", err))
continue
}

if resp.Count <= 0 {
lease, err := r.cli.Grant(ctx, r.srvTTL.Milliseconds()/1000)
if err != nil {
cancel()
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to grant lease: %v", err))
continue
}
func() {
ctx, cancel := context.WithTimeout(r.ctx, r.srvTTL)
defer cancel()

_, err = r.cli.Put(ctx, key, r.srvAddr, clientv3.WithLease(lease.ID))
resp, err := r.cli.Get(ctx, key)
if err != nil {
cancel()
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to put key: %v", err))
continue
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to get key: %v", err))
return
}

ch, err := r.cli.KeepAlive(r.ctx, lease.ID) // aliveKeeper
if err != nil {
cancel()
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to keep alive: %v", err))
continue
if resp.Count <= 0 {
lease, err := r.cli.Grant(ctx, r.srvTTL.Milliseconds()/1000)
if err != nil {
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to grant lease: %v", err))
return
}

val := r.kvBuilder.BuildValue(r.srvName, r.srvAddr)
_, err = r.cli.Put(ctx, key, val, clientv3.WithLease(lease.ID))
if err != nil {
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to put key: %v", err))
return
}

ch, err := r.cli.KeepAlive(r.ctx, lease.ID) // aliveKeeper
if err != nil {
r.pushErr(fmt.Errorf("CHECKER_ERROR: failed to keep alive: %v", err))
return
}

r.wg.Add(1)
go r.consumer(ch)
}
}()

r.wg.Add(1)
go r.consumer(ch)
select {
case <-r.ctx.Done():
// revoke this goroutine
ticker.Stop()
return
case <-ticker.C:
}

cancel()
}
}

Expand Down Expand Up @@ -172,12 +194,6 @@ func (r *etcdSrvRecord) pushErr(err error) {
}
}

func (r *etcdSrvRecord) CatchRuntimeErrors() <-chan error {
r.initErrorChannel()

return r.errs
}

func (r *etcdSrvRecord) initErrorChannel() {
r.errsLock.Lock()
defer r.errsLock.Unlock()
Expand Down
Loading

0 comments on commit 15bf446

Please sign in to comment.