Skip to content

Commit

Permalink
Merge remote-tracking branch 'jcftang-pr/master' into feature/zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
崔有省 committed Nov 16, 2018
2 parents b2862e3 + b0502a1 commit e0c20b8
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 0 deletions.
166 changes: 166 additions & 0 deletions backend/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package zookeeper

import (
"errors"
"fmt"
zk "github.com/samuel/go-zookeeper/zk"
"github.com/xordataexchange/crypt/backend"
"strings"
"time"
)

type Client struct {
client *zk.Conn
waitIndex uint64
}

func New(machines []string) (*Client, error) {
zkclient, _, err := zk.Connect(machines, time.Second)
if err != nil {
return nil, err
}
return &Client{zkclient, 0}, nil
}

func (c *Client) Get(key string) ([]byte, error) {
resp, _, err := c.client.Get(key)
if err != nil {
return nil, err
}
return []byte(resp), nil
}

func nodeWalk(prefix string, c *Client, vars map[string]string) error {
l, stat, err := c.client.Children(prefix)
if err != nil {
return err
}

if stat.NumChildren == 0 {
b, _, err := c.client.Get(prefix)
if err != nil {
return err
}
vars[prefix] = string(b)

} else {
for _, key := range l {
s := prefix + "/" + key
_, stat, err := c.client.Exists(s)
if err != nil {
return err
}
if stat.NumChildren == 0 {
b, _, err := c.client.Get(s)
if err != nil {
return err
}
vars[s] = string(b)
} else {
nodeWalk(s, c, vars)
}
}
}
return nil
}

func (c *Client) GetValues(key string, keys []string) (map[string]string, error) {
vars := make(map[string]string)
for _, v := range keys {
v = fmt.Sprintf("%s/%s", key, v)
v = strings.Replace(v, "/*", "", -1)
_, _, err := c.client.Exists(v)
if err != nil {
return vars, err
}
if v == "/" {
v = ""
}
err = nodeWalk(v, c, vars)
if err != nil {
return vars, err
}
}
return vars, nil
}

func (c *Client) List(key string) (backend.KVPairs, error) {
var list backend.KVPairs
resp, stat, err := c.client.Children(key)
if err != nil {
return nil, err
}

if stat.NumChildren == 0 {
return list, nil
}

entries, err := c.GetValues(key, resp)
if err != nil {
return nil, err
}

for k, v := range entries {
list = append(list, &backend.KVPair{Key: k, Value: []byte(v)})
}
return list, nil
}

func (c *Client) createParents(key string) error {
flags := int32(0)
acl := zk.WorldACL(zk.PermAll)

if key[0] != '/' {
return errors.New("Invalid path")
}

payload := []byte("")
pathString := ""
pathNodes := strings.Split(key, "/")
for i := 1; i < len(pathNodes); i++ {
pathString += "/" + pathNodes[i]
_, err := c.client.Create(pathString, payload, flags, acl)
// not being able to create the node because it exists or not having
// sufficient rights is not an issue. It is ok for the node to already
// exist and/or us to only have read rights
if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
return err
}
}
return nil
}

func (c *Client) Set(key string, value []byte) error {
err := c.createParents(key)
if err != nil {
return err
}
_, err = c.client.Set(key, []byte(value), -1)
return err
}

func (c *Client) Watch(key string, stop chan bool) <-chan *backend.Response {
respChan := make(chan *backend.Response, 0)
go func() {
for {
resp, _, watch, err := c.client.GetW(key)
if err != nil {
respChan <- &backend.Response{nil, err}
time.Sleep(time.Second * 5)
}

select {
case e := <-watch:
if e.Type == zk.EventNodeDataChanged {
resp, _, err = c.client.Get(key)
if err != nil {
respChan <- &backend.Response{nil, err}
}
c.waitIndex = 0
respChan <- &backend.Response{[]byte(resp), nil}
}
}
}
}()
return respChan
}
5 changes: 5 additions & 0 deletions bin/crypt/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/xordataexchange/crypt/backend"
"github.com/xordataexchange/crypt/backend/consul"
"github.com/xordataexchange/crypt/backend/etcd"
"github.com/xordataexchange/crypt/backend/zookeeper"
"github.com/xordataexchange/crypt/encoding/secconf"
)

Expand Down Expand Up @@ -207,6 +208,8 @@ func getBackendStore(provider string, endpoint string) (backend.Store, error) {
endpoint = "127.0.0.1:8500"
case "etcd":
endpoint = "http://127.0.0.1:4001"
case "zookeeper":
endpoint = "127.0.0.1:2181"
}
}
machines := []string{endpoint}
Expand All @@ -215,6 +218,8 @@ func getBackendStore(provider string, endpoint string) (backend.Store, error) {
return etcd.New(machines)
case "consul":
return consul.New(machines)
case "zookeeper":
return zookeeper.New(machines)
default:
return nil, errors.New("invalid backend " + provider)
}
Expand Down
20 changes: 20 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/xordataexchange/crypt/backend"
"github.com/xordataexchange/crypt/backend/consul"
"github.com/xordataexchange/crypt/backend/etcd"
"github.com/xordataexchange/crypt/backend/zookeeper"
"github.com/xordataexchange/crypt/encoding/secconf"
)

Expand Down Expand Up @@ -65,6 +66,15 @@ func NewStandardConsulConfigManager(machines []string) (ConfigManager, error) {
return NewStandardConfigManager(store)
}

// NewStandardZookeeperConfigManager returns a new ConfigManager backed by zookeeper.
func NewStandardZookeeperConfigManager(machines []string) (ConfigManager, error) {
store, err := zookeeper.New(machines)
if err != nil {
return nil, err
}
return NewStandardConfigManager(store)
}

// NewEtcdConfigManager returns a new ConfigManager backed by etcd.
// Data will be encrypted.
func NewEtcdConfigManager(machines []string, keystore io.Reader) (ConfigManager, error) {
Expand All @@ -85,6 +95,16 @@ func NewConsulConfigManager(machines []string, keystore io.Reader) (ConfigManage
return NewConfigManager(store, keystore)
}

// NewZookeeperConfigManager returns a new ConfigManager backed by zookeeper.
// Data will be encrypted.
func NewZookeeperConfigManager(machines []string, keystore io.Reader) (ConfigManager, error) {
store, err := zookeeper.New(machines)
if err != nil {
return nil, err
}
return NewConfigManager(store, keystore)
}

// Get retrieves and decodes a secconf value stored at key.
func (c configManager) Get(key string) ([]byte, error) {
value, err := c.store.Get(key)
Expand Down

0 comments on commit e0c20b8

Please sign in to comment.