Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add keepalive #14

Merged
merged 12 commits into from
Aug 22, 2023
52 changes: 47 additions & 5 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ type zookeeperRegistry struct {
conn *zk.Conn
authOpen bool
user, password string
sessionTimeout time.Duration
}

func NewZookeeperRegistry(servers []string, sessionTimeout time.Duration) (registry.Registry, error) {
day253 marked this conversation as resolved.
Show resolved Hide resolved
conn, _, err := zk.Connect(servers, sessionTimeout)
if err != nil {
return nil, err
}
return &zookeeperRegistry{conn: conn}, nil
return &zookeeperRegistry{conn: conn, sessionTimeout: sessionTimeout}, nil
}

func NewZookeeperRegistryWithAuth(servers []string, sessionTimeout time.Duration, user, password string) (registry.Registry, error) {
Expand All @@ -54,7 +55,7 @@ func NewZookeeperRegistryWithAuth(servers []string, sessionTimeout time.Duration
if err != nil {
return nil, err
}
return &zookeeperRegistry{conn: conn, authOpen: true, user: user, password: password}, nil
return &zookeeperRegistry{conn: conn, authOpen: true, user: user, password: password, sessionTimeout: sessionTimeout}, nil
}

func (z *zookeeperRegistry) Register(info *registry.Info) error {
Expand All @@ -66,11 +67,13 @@ func (z *zookeeperRegistry) Register(info *registry.Info) error {
if err != nil {
return err
}
return z.createNode(path, content, true)
err = z.createNode(path, content, true)
go z.keepalive(path, content)
return err
}

// path format as follows:
// /{serviceName}/{ip}:{port}
// path format as follows:
// /{serviceName}/{ip}:{port}
func buildPath(info *registry.Info) (string, error) {
var path string
if info == nil {
Expand Down Expand Up @@ -150,3 +153,42 @@ func (z *zookeeperRegistry) deleteNode(path string) error {
}
return nil
}

func (z *zookeeperRegistry) keepalive(path string, content []byte) {
sessionID := z.conn.SessionID()
ticker := time.NewTicker(z.sessionTimeout / 2)
defer ticker.Stop()
for range ticker.C {
cur := z.conn.SessionID()
if cur > 0 && sessionID != cur {
if err := z.ensureName(path, content, zk.FlagEphemeral); err == nil {
sessionID = cur
}
}
}
}

func (z *zookeeperRegistry) ensureName(path string, data []byte, flags int32) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function's name doesn't express what it wants to ensure. Maybe a different name would be better.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

exists, stat, err := z.conn.Exists(path)
if err != nil {
return err
}
if exists && flags&zk.FlagEphemeral == zk.FlagEphemeral {
day253 marked this conversation as resolved.
Show resolved Hide resolved
err = z.conn.Delete(path, stat.Version)
if err != nil && err != zk.ErrNoNode {
return err
}
exists = false
}
if !exists {
if z.authOpen {
_, err = z.conn.Create(path, data, flags, zk.DigestACL(zk.PermAll, z.user, z.password))
} else {
_, err = z.conn.Create(path, data, flags, zk.WorldACL(zk.PermAll))
}
if err != nil {
return err
}
}
return nil
}