Skip to content

Commit

Permalink
Merge pull request #14 from day253/keepalive
Browse files Browse the repository at this point in the history
add keepalive
  • Loading branch information
bodhisatan committed Aug 22, 2023
2 parents 9f43717 + 961b726 commit bddfec4
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18

- uses: actions/cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18

- uses: actions/cache@v2
with:
Expand Down
62 changes: 62 additions & 0 deletions entity/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,69 @@

package entity

import (
"encoding/json"
"fmt"
"net"
"strings"

"github.com/cloudwego/kitex/pkg/registry"
"github.com/kitex-contrib/registry-zookeeper/utils"
)

type RegistryEntity struct {
Weight int
Tags map[string]string
}

type NodeEntity struct {
*registry.Info
}

func (n *NodeEntity) Path() (string, error) {
return buildPath(n.Info)
}

func (n *NodeEntity) Content() ([]byte, error) {
return json.Marshal(&RegistryEntity{Weight: n.Weight, Tags: n.Tags})
}

func MustNewNodeEntity(ri *registry.Info) *NodeEntity {
return &NodeEntity{ri}
}

// path format as follows:
// /{serviceName}/{ip}:{port}
func buildPath(info *registry.Info) (string, error) {
var path string
if info == nil {
return "", fmt.Errorf("registry info can't be nil")
}
if info.ServiceName == "" {
return "", fmt.Errorf("registry info service name can't be empty")
}
if info.Addr == nil {
return "", fmt.Errorf("registry info addr can't be nil")
}
if !strings.HasPrefix(info.ServiceName, utils.Separator) {
path = utils.Separator + info.ServiceName
}

if host, port, err := net.SplitHostPort(info.Addr.String()); err == nil {
if port == "" {
return "", fmt.Errorf("registry info addr missing port")
}
if host == "" {
ipv4, err := utils.GetLocalIPv4Address()
if err != nil {
return "", fmt.Errorf("get local ipv4 error, cause %w", err)
}
path = path + utils.Separator + ipv4 + ":" + port
} else {
path = path + utils.Separator + host + ":" + port
}
} else {
return "", fmt.Errorf("parse registry info addr error")
}
return path, nil
}
189 changes: 109 additions & 80 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package registry

import (
"encoding/json"
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/cloudwego/kitex/pkg/registry"
Expand All @@ -27,126 +27,155 @@ import (
"github.com/kitex-contrib/registry-zookeeper/utils"
)

var (
ErrorZkConnectedTimedOut = errors.New("timed out waiting for zk connected")
ErrorNilRegistryInfo = errors.New("registry info can't be nil")
)

type zookeeperRegistry struct {
sync.RWMutex
conn *zk.Conn
authOpen bool
user, password string
user string
password string
sessionTimeout time.Duration
canceler map[string]context.CancelFunc
}

func NewZookeeperRegistry(servers []string, sessionTimeout time.Duration) (registry.Registry, error) {
conn, _, err := zk.Connect(servers, sessionTimeout)
if err != nil {
return nil, err
}
return &zookeeperRegistry{conn: conn}, nil
return NewZookeeperRegistryWithAuth(servers, sessionTimeout, "", "")
}

func NewZookeeperRegistryWithAuth(servers []string, sessionTimeout time.Duration, user, password string) (registry.Registry, error) {
if user == "" || password == "" {
return nil, fmt.Errorf("user or password can't be empty")
}
conn, _, err := zk.Connect(servers, sessionTimeout)
conn, event, err := zk.Connect(servers, sessionTimeout)
if err != nil {
return nil, err
}
auth := []byte(fmt.Sprintf("%s:%s", user, password))
err = conn.AddAuth(utils.Scheme, auth)
if err != nil {
return nil, err
if user != "" && password != "" {
if err := conn.AddAuth(utils.Scheme, []byte(fmt.Sprintf("%s:%s", user, password))); err != nil {
return nil, err
}
}
ticker := time.NewTimer(sessionTimeout / 2)
for {
select {
case e := <-event:
if e.State == zk.StateConnected {
return &zookeeperRegistry{
user: user,
password: password,
sessionTimeout: sessionTimeout,
conn: conn,
canceler: make(map[string]context.CancelFunc),
}, nil
}
case <-ticker.C:
return nil, ErrorZkConnectedTimedOut
}
}
return &zookeeperRegistry{conn: conn, authOpen: true, user: user, password: password}, nil
}

func (z *zookeeperRegistry) Register(info *registry.Info) error {
path, err := buildPath(info)
if info == nil {
return ErrorNilRegistryInfo
}
ne := entity.MustNewNodeEntity(info)
path, err := ne.Path()
if err != nil {
return err
}
content, err := json.Marshal(&entity.RegistryEntity{Weight: info.Weight, Tags: info.Tags})
content, err := ne.Content()
if err != nil {
return err
}
return z.createNode(path, content, true)
}

// path format as follows:
// /{serviceName}/{ip}:{port}
func buildPath(info *registry.Info) (string, error) {
var path string
if info == nil {
return "", fmt.Errorf("registry info can't be nil")
}
if info.ServiceName == "" {
return "", fmt.Errorf("registry info service name can't be empty")
}
if info.Addr == nil {
return "", fmt.Errorf("registry info addr can't be nil")
}
if !strings.HasPrefix(info.ServiceName, utils.Separator) {
path = utils.Separator + info.ServiceName
}

if host, port, err := net.SplitHostPort(info.Addr.String()); err == nil {
if port == "" {
return "", fmt.Errorf("registry info addr missing port")
}
if host == "" {
ipv4, err := utils.GetLocalIPv4Address()
if err != nil {
return "", fmt.Errorf("get local ipv4 error, cause %w", err)
}
path = path + utils.Separator + ipv4 + ":" + port
} else {
path = path + utils.Separator + host + ":" + port
}
} else {
return "", fmt.Errorf("parse registry info addr error")
err = z.createNode(path, content, true)
if err != nil {
return err
}
return path, nil
ctx, cancel := context.WithCancel(context.Background())
z.Lock()
defer z.Unlock()
z.canceler[path] = cancel
go z.keepalive(ctx, path, content)
return nil
}

func (z *zookeeperRegistry) Deregister(info *registry.Info) error {
if info == nil {
return fmt.Errorf("registry info can't be nil")
return ErrorNilRegistryInfo
}
path, err := buildPath(info)
ne := entity.MustNewNodeEntity(info)
path, err := ne.Path()
if err != nil {
return err
}
z.Lock()
defer z.Unlock()
cancel, ok := z.canceler[path]
if ok {
cancel()
delete(z.canceler, path)
}
return z.deleteNode(path)
}

func (z *zookeeperRegistry) createNode(path string, content []byte, ephemeral bool) error {
i := strings.LastIndex(path, utils.Separator)
if i > 0 {
err := z.createNode(path[0:i], nil, false)
if err != nil && !errors.Is(err, zk.ErrNodeExists) {
exists, stat, err := z.conn.Exists(path)
if err != nil {
return err
}
// ephemeral nodes handling after restart
// fixes a race condition if the server crashes without using CreateProtectedEphemeralSequential()
// https://github.com/go-kratos/kratos/blob/main/contrib/registry/zookeeper/register.go
if exists && ephemeral {
err = z.conn.Delete(path, stat.Version)
if err != nil && err != zk.ErrNoNode {
return err
}
}
var flag int32
if ephemeral {
flag = zk.FlagEphemeral
}
if z.authOpen {
_, err := z.conn.Create(path, content, flag, zk.DigestACL(zk.PermAll, z.user, z.password))
if err != nil {
return fmt.Errorf("create node [%s] with auth error, cause %w", path, err)
exists = false
}
if !exists {
i := strings.LastIndex(path, utils.Separator)
if i > 0 {
err := z.createNode(path[0:i], nil, false)
if err != nil && !errors.Is(err, zk.ErrNodeExists) {
return err
}
}
var flag int32
if ephemeral {
flag = zk.FlagEphemeral
}
if z.user != "" && z.password != "" {
_, err = z.conn.Create(path, content, flag, zk.DigestACL(zk.PermAll, z.user, z.password))
} else {
_, err = z.conn.Create(path, content, flag, zk.WorldACL(zk.PermAll))
}
return nil
} else {
_, err := z.conn.Create(path, content, flag, zk.WorldACL(zk.PermAll))
if err != nil {
return fmt.Errorf("create node [%s] error, cause %w", path, err)
return err
}
return nil
}
return nil
}

func (z *zookeeperRegistry) deleteNode(path string) error {
err := z.conn.Delete(path, -1)
if err != nil {
return fmt.Errorf("delete node [%s] error, cause %w", path, err)
return z.conn.Delete(path, -1)
}

func (z *zookeeperRegistry) keepalive(ctx context.Context, path string, content []byte) {
sessionID := z.conn.SessionID()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cur := z.conn.SessionID()
if cur != 0 && sessionID != cur {
if err := z.createNode(path, content, true); err == nil {
sessionID = cur
}
}
}
}
return nil
}

0 comments on commit bddfec4

Please sign in to comment.