Skip to content

Commit

Permalink
change log
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed May 10, 2024
1 parent 2433f0c commit 1fe976d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 31 deletions.
4 changes: 2 additions & 2 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,9 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
// only API update the leader key to transfer the leader will meet
// only API update the leader key to transfer the primary will meet
if ev.Type == mvccpb.PUT && ls.IsLeader() {
log.Info("[LeaderWatch] current leadership is updated",
log.Info("[PrimaryWatch] current leadership is updated",
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistr
return nil, errors.Errorf("unknown service name %s", serviceName)
}

func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string) error {
// TransferPrimary transfers the primary of the specified service.
func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string, keyspaceGroupID uint32) error {
log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary))
entries, err := GetMSMembers(serviceName, client)
if err != nil {
Expand All @@ -90,22 +91,22 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar

// Do nothing when I am the only member of cluster.
if len(entries) == 1 && newPrimary == "" {
return errors.New("no valid follower to transfer primary")
return errors.New("no valid secondary to transfer primary")
}

var primaryIDs []string
var memberValues []string
var secondaryValues []string
for _, member := range entries {
if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.ServiceAddr == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
if string(member.MemberValue) == "" {
return errors.New("member value is empty")
}
memberValues = append(memberValues, string(member.MemberValue))
secondaryValues = append(secondaryValues, string(member.MemberValue))
}
}
if len(primaryIDs) == 0 {
return errors.New("no valid follower to transfer primary")
return errors.New("no valid secondary to transfer primary")
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand All @@ -122,12 +123,12 @@ func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimar
primaryKey = endpoint.SchedulingPrimaryPath(clusterID)
case utils.TSOServiceName:
tsoRootPath := endpoint.TSOSvcRootPath(clusterID)
primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID)
primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, keyspaceGroupID)
}

// update primary key to notify old primary server.
putResp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpPut(primaryKey, memberValues[nextPrimaryID])).
Then(clientv3.OpPut(primaryKey, secondaryValues[nextPrimaryID])).
Commit()
if err != nil || !putResp.Succeeded {
return errors.Errorf("failed to write primary flag for %s", serviceName)
Expand Down
12 changes: 5 additions & 7 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,11 @@ func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {

s.participant.UnsetLeader()
defer log.Info("scheduling primary exit the primary watch loop")
for {
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
}
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,17 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err

// GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API.
func GetExpectedPrimary(client *clientv3.Client, leaderPath string) string {
leader, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))
primary, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))
if err != nil {
log.Error("get expected primary key error", errs.ZapError(err))
return ""
}

return string(leader)
return string(primary)
}

// RemoveExpectedPrimary removes the expected primary key.
// - removed when campaign new primary success
// - removed when old primary server is closed
// - removed when campaign new primary successfully
func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {
log.Info("remove expected primary key", zap.String("leaderPath", leaderPath))
// remove expected leader key
Expand Down
16 changes: 7 additions & 9 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
logutil.CondUint32("keyspace-group-id", gta.getGroupID(), gta.getGroupID() > 0))
return
case <-exitPrimary:
log.Info("no longer a primary because primary have been updated, the TSO primary/leader will step down")
log.Info("no longer a primary because primary have been updated, the TSO primary will step down")
return
}
}
Expand All @@ -694,19 +694,17 @@ func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary cha
log.Error("tso primary getting the leader meets error", errs.ZapError(err))
return
}

// only trigger by updating primary
if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) {
mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())

gta.member.UnsetLeader()
defer log.Info("tso primary exit the primary watch loop")
for {
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
}
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
}
}
}
Expand Down
15 changes: 13 additions & 2 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package handlers

import (
"net/http"
"strconv"

"github.com/gin-gonic/gin"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/apiv2/middlewares"
)
Expand Down Expand Up @@ -101,16 +103,25 @@ func TransferPrimary(c *gin.Context) {
return
}

newPrimary := ""
newPrimary, keyspaceGroupID := "", utils.DefaultKeyspaceGroupID
if v, ok := input["new_primary"]; ok {
newPrimary = v
}

if v, ok := input["keyspace_group_id"]; ok {
keyspaceGroupIDRaw, err := strconv.ParseUint(v, 10, 32)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
keyspaceGroupID = uint32(keyspaceGroupIDRaw)
}
oldPrimary, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service)
if oldPrimary == newPrimary {
c.AbortWithStatusJSON(http.StatusInternalServerError, "new primary is the same as the old one")
return
}
if err := discovery.TransferPrimary(svr.GetClient(), service, oldPrimary, newPrimary); err != nil {
if err := discovery.TransferPrimary(svr.GetClient(), service, oldPrimary, newPrimary, keyspaceGroupID); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
Expand Down

0 comments on commit 1fe976d

Please sign in to comment.