Skip to content

Commit

Permalink
cluster: return error when found multiple nodes with the same ip:port (
Browse files Browse the repository at this point in the history
  • Loading branch information
srstack authored and AstroProfundis committed May 10, 2022
1 parent 6aff5f1 commit 9b8b988
Showing 1 changed file with 66 additions and 33 deletions.
99 changes: 66 additions & 33 deletions pkg/cluster/api/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -103,48 +104,53 @@ func (c *BinlogClient) IsDrainerTombstone(ctx context.Context, addr string) (boo
}

func (c *BinlogClient) isTombstone(ctx context.Context, ty string, nodeID string) (bool, error) {
status, err := c.nodeStatus(ctx, ty)
s, err := c.nodeStatus(ctx, ty, nodeID)
if err != nil {
return false, err
}

for _, s := range status {
if s.NodeID == nodeID {
if s.State == "offline" {
return true, nil
}
return false, nil
}
if s.State == "offline" {
return true, nil
}

return false, errors.Errorf("node not exist: %s", nodeID)
return false, nil
}

// nolint (unused)
func (c *BinlogClient) pumpNodeStatus(ctx context.Context) (status []*NodeStatus, err error) {
return c.nodeStatus(ctx, "pumps")
return c.nodesStatus(ctx, "pumps")
}

// nolint (unused)
func (c *BinlogClient) drainerNodeStatus(ctx context.Context) (status []*NodeStatus, err error) {
return c.nodeStatus(ctx, "drainers")
return c.nodesStatus(ctx, "drainers")
}

func (c *BinlogClient) nodeID(ctx context.Context, addr, ty string) (string, error) {
nodes, err := c.nodeStatus(ctx, ty)
// the number of nodes with the same ip:port
targetNodes := []string{}

nodes, err := c.nodesStatus(ctx, ty)
if err != nil {
return "", err
}

addrs := []string{}
for _, node := range nodes {
if addr == node.Addr {
return node.NodeID, nil
targetNodes = append(targetNodes, node.NodeID)
continue
}
addrs = append(addrs, addr)
}

return "", errors.Errorf("%s node id for address %s not found, found address: %s", ty, addr, addrs)
switch len(targetNodes) {
case 0:
return "", errors.Errorf("%s node id for address %s not found, found address: %s", ty, addr, addrs)
case 1:
return targetNodes[0], nil
default:
return "", errors.Errorf("found multiple %s nodes with the same host, found nodes: %s", ty, strings.Join(targetNodes, ","))
}
}

// UpdateDrainerState update the specify state as the specified state.
Expand All @@ -167,37 +173,25 @@ func (c *BinlogClient) UpdatePumpState(ctx context.Context, addr string, state s

// updateStatus update the specify state as the specified state.
func (c *BinlogClient) updateStatus(ctx context.Context, ty string, nodeID string, state string) error {
key := fmt.Sprintf("/tidb-binlog/v1/%s/%s", ty, nodeID)

// set timeout, otherwise it will keep retrying
ctx, f := context.WithTimeout(ctx, c.httpClient.Timeout)
defer f()
resp, err := c.etcdClient.KV.Get(ctx, key)
if err != nil {
return errors.AddStack(err)
}

if len(resp.Kvs) == 0 {
return errors.Errorf("no pump with node id: %v", nodeID)
}

var nodeStatus NodeStatus
err = json.Unmarshal(resp.Kvs[0].Value, &nodeStatus)
s, err := c.nodeStatus(ctx, ty, nodeID)
if err != nil {
return errors.AddStack(err)
}

if nodeStatus.State == state {
if s.State == state {
return nil
}

nodeStatus.State = state
s.State = state

data, err := json.Marshal(&nodeStatus)
data, err := json.Marshal(&s)
if err != nil {
return errors.AddStack(err)
}

key := fmt.Sprintf("/tidb-binlog/v1/%s/%s", ty, nodeID)
_, err = c.etcdClient.Put(ctx, key, string(data))
if err != nil {
return errors.AddStack(err)
Expand All @@ -206,7 +200,7 @@ func (c *BinlogClient) updateStatus(ctx context.Context, ty string, nodeID strin
return nil
}

func (c *BinlogClient) nodeStatus(ctx context.Context, ty string) (status []*NodeStatus, err error) {
func (c *BinlogClient) nodesStatus(ctx context.Context, ty string) (status []*NodeStatus, err error) {
key := fmt.Sprintf("/tidb-binlog/v1/%s", ty)

// set timeout, otherwise it will keep retrying
Expand All @@ -230,6 +224,25 @@ func (c *BinlogClient) nodeStatus(ctx context.Context, ty string) (status []*Nod
return
}

// nodeStatus get nodeStatus with nodeID
func (c *BinlogClient) nodeStatus(ctx context.Context, ty string, nodeID string) (node *NodeStatus, err error) {
key := fmt.Sprintf("/tidb-binlog/v1/%s/%s", ty, nodeID)

resp, err := c.etcdClient.KV.Get(ctx, key)
if err != nil {
return nil, errors.AddStack(err)
}

if len(resp.Kvs) > 0 {
err = json.Unmarshal(resp.Kvs[0].Value, &node)
if err != nil {
return nil, errors.Annotatef(err, "key: %s,data: %s", string(resp.Kvs[0].Key), string(resp.Kvs[0].Value))
}
return
}
return nil, errors.Errorf("%s node-id: %s not found, found address: %s", ty, nodeID, key)
}

func (c *BinlogClient) offline(addr string, nodeID string) error {
url := c.getOfflineURL(addr, nodeID)
req, err := http.NewRequest("PUT", url, nil)
Expand Down Expand Up @@ -272,6 +285,16 @@ func (c *BinlogClient) OfflinePump(ctx context.Context, addr string) error {
if err != nil {
return err
}

s, err := c.nodeStatus(ctx, "pumps", nodeID)
if err != nil {
return err
}

if s.State == "offline" {
return nil
}

return c.offline(addr, nodeID)
}

Expand All @@ -281,5 +304,15 @@ func (c *BinlogClient) OfflineDrainer(ctx context.Context, addr string) error {
if err != nil {
return err
}

s, err := c.nodeStatus(ctx, "drainers", nodeID)
if err != nil {
return err
}

if s.State == "offline" {
return nil
}

return c.offline(addr, nodeID)
}

0 comments on commit 9b8b988

Please sign in to comment.