Skip to content

Commit

Permalink
only trigger by updating
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed May 9, 2024
1 parent af995cc commit 5490d10
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 45 deletions.
6 changes: 3 additions & 3 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ func (ls *Leadership) SetLeaderWatch(val bool) {
ls.leaderWatch.Store(val)
}

// GetLeaderWatch gets the leader watch flag.
func (ls *Leadership) GetLeaderWatch() bool {
// IsLeader gets the leader watch flag.
func (ls *Leadership) IsLeader() bool {
return ls.leaderWatch.Load()
}

Expand Down Expand Up @@ -392,7 +392,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
return
}
// only API update the leader key to transfer the leader will meet
if ev.Type == mvccpb.PUT && ls.GetLeaderWatch() {
if ev.Type == mvccpb.PUT && ls.IsLeader() {
log.Info("[LeaderWatch] current leadership is updated",
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
Expand Down
34 changes: 16 additions & 18 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package server
import (
"context"
"fmt"
"github.com/tikv/pd/pkg/utils/etcdutil"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -249,7 +248,7 @@ func (s *Server) primaryElectionLoop() {
}

// To make sure the expected leader(if exist) and primary are on the same server.
expectedPrimary := utils.GetExpectedPrimary(s.participant.GetLeaderPath(), s.GetClient())
expectedPrimary := utils.GetExpectedPrimary(s.GetClient(), s.participant.GetLeaderPath())
if expectedPrimary != "" && expectedPrimary != s.participant.MemberValue() {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", s.Name()),
Expand Down Expand Up @@ -332,7 +331,7 @@ func (s *Server) campaignLeader() {
}

func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
_, revision, err := s.participant.GetPersistentLeader()
oldPrimary, revision, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
Expand All @@ -345,26 +344,25 @@ func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {

// only API update primary will set the expected leader
// check leader key whether deleted
leaderRaw, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath())
curPrimary, _, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
if leaderRaw == nil {
log.Info("[primary] leader key is deleted, the primary will step down")
return
}

utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())
// only trigger by updating primary
if curPrimary != nil && oldPrimary != nil &&
curPrimary.(*schedulingpb.Participant).Name != oldPrimary.(*schedulingpb.Participant).Name {
utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())

s.participant.UnsetLeader()
defer log.Info("[primary] exit the primary watch loop")
for {
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
s.participant.UnsetLeader()
defer log.Info("[primary] exit the primary watch loop")
for {
select {
case <-ctx.Done():
return
case exitPrimary <- struct{}{}:
return
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
}

// GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API.
func GetExpectedPrimary(keyPath string, client *clientv3.Client) string {
leader, err := etcdutil.GetValue(client, strings.Join([]string{keyPath, ExpectedPrimary}, "/"))
func GetExpectedPrimary(client *clientv3.Client, leaderPath string) string {
leader, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))
if err != nil {
log.Error("get expected primary key error", errs.ZapError(err))
return ""
Expand All @@ -86,7 +86,10 @@ func GetExpectedPrimary(keyPath string, client *clientv3.Client) string {
}

// RemoveExpectedPrimary removes the expected primary key.
// - removed when campaign success
// - removed when server is closed
func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {
log.Info("remove expected primary key", zap.String("leaderPath", leaderPath))
// remove expected leader key
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))).
Expand All @@ -99,6 +102,7 @@ func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {

// SetExpectedPrimary sets the expected primary key when the current primary has exited.
func SetExpectedPrimary(client *clientv3.Client, leaderPath string) {
log.Info("set expected primary key", zap.String("leaderPath", leaderPath))
leaderRaw, err := etcdutil.GetValue(client, leaderPath)
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
Expand Down
37 changes: 17 additions & 20 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"github.com/tikv/pd/pkg/utils/etcdutil"
"runtime/trace"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -562,7 +561,7 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() {
}

// To make sure the expected leader(if exist) and primary are on the same server.
targetPrimary := mcsutils.GetExpectedPrimary(gta.member.GetLeaderPath(), gta.member.Client())
targetPrimary := mcsutils.GetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())
if targetPrimary != "" && targetPrimary != gta.member.MemberValue() {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", gta.member.Name()),
Expand Down Expand Up @@ -675,7 +674,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
}

func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
_, revision, err := gta.member.GetPersistentLeader()
oldPrimary, revision, err := gta.member.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
Expand All @@ -690,26 +689,24 @@ func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary cha

// only API update primary will set the expected leader
// check leader key whether deleted
leaderRaw, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath())
curPrimary, _, err := gta.member.GetPersistentLeader()
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
if leaderRaw == nil {
log.Info("[primary] leader key is deleted, the primary will step down")
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
}

mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())

gta.member.UnsetLeader()
for {
select {
case <-ctx.Done():
log.Info("[primary] exit the primary watch loop")
return
case exitPrimary <- struct{}{}:
return
if curPrimary != nil && oldPrimary != nil &&
curPrimary.(ElectionMember).Name() != oldPrimary.(ElectionMember).Name() {
mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath())

gta.member.UnsetLeader()
for {
select {
case <-ctx.Done():
log.Info("[primary] exit the primary watch loop")
return
case exitPrimary <- struct{}{}:
return
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func GetPrimary(c *gin.Context) {
// @Summary Transfer the primary member of the specified service.
// @Produce json
// @Param service path string true "service name"
// @Param new_primary query string false "new primary address"
// @Success 200 {object} string
// @Param new_primary body string false "new primary address"
// @Success 200 string string
// @Router /ms/primary/transfer/{service} [post]
func TransferPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
Expand Down

0 comments on commit 5490d10

Please sign in to comment.