Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add keepalive #14

Merged
merged 12 commits into from
Aug 22, 2023
2 changes: 1 addition & 1 deletion .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18

- uses: actions/cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.18

- uses: actions/cache@v2
with:
Expand Down
62 changes: 62 additions & 0 deletions entity/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,69 @@

package entity

import (
"encoding/json"
"fmt"
"net"
"strings"

"github.com/cloudwego/kitex/pkg/registry"
"github.com/kitex-contrib/registry-zookeeper/utils"
)

type RegistryEntity struct {
Weight int
Tags map[string]string
}

type NodeEntity struct {
*registry.Info
}

func (n *NodeEntity) Path() (string, error) {
return buildPath(n.Info)
}

func (n *NodeEntity) Content() ([]byte, error) {
return json.Marshal(&RegistryEntity{Weight: n.Weight, Tags: n.Tags})
}

func MustNewNodeEntity(ri *registry.Info) *NodeEntity {
return &NodeEntity{ri}
}

// path format as follows:
// /{serviceName}/{ip}:{port}
func buildPath(info *registry.Info) (string, error) {
var path string
if info == nil {
return "", fmt.Errorf("registry info can't be nil")
}
if info.ServiceName == "" {
return "", fmt.Errorf("registry info service name can't be empty")
}
if info.Addr == nil {
return "", fmt.Errorf("registry info addr can't be nil")
}
if !strings.HasPrefix(info.ServiceName, utils.Separator) {
path = utils.Separator + info.ServiceName
}

if host, port, err := net.SplitHostPort(info.Addr.String()); err == nil {
if port == "" {
return "", fmt.Errorf("registry info addr missing port")
}
if host == "" {
ipv4, err := utils.GetLocalIPv4Address()
if err != nil {
return "", fmt.Errorf("get local ipv4 error, cause %w", err)
}
path = path + utils.Separator + ipv4 + ":" + port
} else {
path = path + utils.Separator + host + ":" + port
}
} else {
return "", fmt.Errorf("parse registry info addr error")
}
return path, nil
}
189 changes: 109 additions & 80 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package registry

import (
"encoding/json"
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/cloudwego/kitex/pkg/registry"
Expand All @@ -27,126 +27,155 @@ import (
"github.com/kitex-contrib/registry-zookeeper/utils"
)

var (
ErrorZkConnectedTimedOut = errors.New("timed out waiting for zk connected")
ErrorNilRegistryInfo = errors.New("registry info can't be nil")
)

type zookeeperRegistry struct {
sync.RWMutex
conn *zk.Conn
authOpen bool
user, password string
user string
password string
sessionTimeout time.Duration
canceler map[string]context.CancelFunc
}

func NewZookeeperRegistry(servers []string, sessionTimeout time.Duration) (registry.Registry, error) {
day253 marked this conversation as resolved.
Show resolved Hide resolved
conn, _, err := zk.Connect(servers, sessionTimeout)
if err != nil {
return nil, err
}
return &zookeeperRegistry{conn: conn}, nil
return NewZookeeperRegistryWithAuth(servers, sessionTimeout, "", "")
}

func NewZookeeperRegistryWithAuth(servers []string, sessionTimeout time.Duration, user, password string) (registry.Registry, error) {
if user == "" || password == "" {
return nil, fmt.Errorf("user or password can't be empty")
}
conn, _, err := zk.Connect(servers, sessionTimeout)
conn, event, err := zk.Connect(servers, sessionTimeout)
if err != nil {
return nil, err
}
auth := []byte(fmt.Sprintf("%s:%s", user, password))
err = conn.AddAuth(utils.Scheme, auth)
if err != nil {
return nil, err
if user != "" && password != "" {
if err := conn.AddAuth(utils.Scheme, []byte(fmt.Sprintf("%s:%s", user, password))); err != nil {
return nil, err
}
}
ticker := time.NewTimer(sessionTimeout / 2)
for {
select {
case e := <-event:
if e.State == zk.StateConnected {
return &zookeeperRegistry{
user: user,
password: password,
sessionTimeout: sessionTimeout,
conn: conn,
canceler: make(map[string]context.CancelFunc),
}, nil
}
case <-ticker.C:
return nil, ErrorZkConnectedTimedOut
}
}
return &zookeeperRegistry{conn: conn, authOpen: true, user: user, password: password}, nil
}

func (z *zookeeperRegistry) Register(info *registry.Info) error {
path, err := buildPath(info)
if info == nil {
return ErrorNilRegistryInfo
}
ne := entity.MustNewNodeEntity(info)
path, err := ne.Path()
if err != nil {
return err
}
content, err := json.Marshal(&entity.RegistryEntity{Weight: info.Weight, Tags: info.Tags})
content, err := ne.Content()
if err != nil {
return err
}
return z.createNode(path, content, true)
}

// path format as follows:
// /{serviceName}/{ip}:{port}
func buildPath(info *registry.Info) (string, error) {
var path string
if info == nil {
return "", fmt.Errorf("registry info can't be nil")
}
if info.ServiceName == "" {
return "", fmt.Errorf("registry info service name can't be empty")
}
if info.Addr == nil {
return "", fmt.Errorf("registry info addr can't be nil")
}
if !strings.HasPrefix(info.ServiceName, utils.Separator) {
path = utils.Separator + info.ServiceName
}

if host, port, err := net.SplitHostPort(info.Addr.String()); err == nil {
if port == "" {
return "", fmt.Errorf("registry info addr missing port")
}
if host == "" {
ipv4, err := utils.GetLocalIPv4Address()
if err != nil {
return "", fmt.Errorf("get local ipv4 error, cause %w", err)
}
path = path + utils.Separator + ipv4 + ":" + port
} else {
path = path + utils.Separator + host + ":" + port
}
} else {
return "", fmt.Errorf("parse registry info addr error")
err = z.createNode(path, content, true)
if err != nil {
return err
}
return path, nil
ctx, cancel := context.WithCancel(context.Background())
z.Lock()
defer z.Unlock()
z.canceler[path] = cancel
go z.keepalive(ctx, path, content)
return nil
}

func (z *zookeeperRegistry) Deregister(info *registry.Info) error {
if info == nil {
return fmt.Errorf("registry info can't be nil")
return ErrorNilRegistryInfo
}
path, err := buildPath(info)
ne := entity.MustNewNodeEntity(info)
path, err := ne.Path()
if err != nil {
return err
}
z.Lock()
defer z.Unlock()
cancel, ok := z.canceler[path]
if ok {
cancel()
delete(z.canceler, path)
}
return z.deleteNode(path)
}

func (z *zookeeperRegistry) createNode(path string, content []byte, ephemeral bool) error {
i := strings.LastIndex(path, utils.Separator)
if i > 0 {
err := z.createNode(path[0:i], nil, false)
if err != nil && !errors.Is(err, zk.ErrNodeExists) {
exists, stat, err := z.conn.Exists(path)
if err != nil {
return err
}
// ephemeral nodes handling after restart
// fixes a race condition if the server crashes without using CreateProtectedEphemeralSequential()
// https://github.com/go-kratos/kratos/blob/main/contrib/registry/zookeeper/register.go
if exists && ephemeral {
err = z.conn.Delete(path, stat.Version)
if err != nil && err != zk.ErrNoNode {
return err
}
}
var flag int32
if ephemeral {
flag = zk.FlagEphemeral
}
if z.authOpen {
_, err := z.conn.Create(path, content, flag, zk.DigestACL(zk.PermAll, z.user, z.password))
if err != nil {
return fmt.Errorf("create node [%s] with auth error, cause %w", path, err)
exists = false
}
if !exists {
i := strings.LastIndex(path, utils.Separator)
if i > 0 {
err := z.createNode(path[0:i], nil, false)
if err != nil && !errors.Is(err, zk.ErrNodeExists) {
return err
}
}
var flag int32
if ephemeral {
flag = zk.FlagEphemeral
}
if z.user != "" && z.password != "" {
_, err = z.conn.Create(path, content, flag, zk.DigestACL(zk.PermAll, z.user, z.password))
} else {
_, err = z.conn.Create(path, content, flag, zk.WorldACL(zk.PermAll))
}
return nil
} else {
_, err := z.conn.Create(path, content, flag, zk.WorldACL(zk.PermAll))
if err != nil {
return fmt.Errorf("create node [%s] error, cause %w", path, err)
return err
}
return nil
}
return nil
}

func (z *zookeeperRegistry) deleteNode(path string) error {
err := z.conn.Delete(path, -1)
if err != nil {
return fmt.Errorf("delete node [%s] error, cause %w", path, err)
return z.conn.Delete(path, -1)
}

func (z *zookeeperRegistry) keepalive(ctx context.Context, path string, content []byte) {
sessionID := z.conn.SessionID()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cur := z.conn.SessionID()
if cur != 0 && sessionID != cur {
if err := z.createNode(path, content, true); err == nil {
sessionID = cur
}
}
}
}
return nil
}