diff --git a/pkg/election/leadership.go b/pkg/election/leadership.go index 63f8e8ea54fd..b9a5d8ac1c22 100644 --- a/pkg/election/leadership.go +++ b/pkg/election/leadership.go @@ -124,8 +124,8 @@ func (ls *Leadership) SetLeaderWatch(val bool) { ls.leaderWatch.Store(val) } -// GetLeaderWatch gets the leader watch flag. -func (ls *Leadership) GetLeaderWatch() bool { +// IsLeader gets the leader watch flag. +func (ls *Leadership) IsLeader() bool { return ls.leaderWatch.Load() } @@ -392,7 +392,7 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { return } // only API update the leader key to transfer the leader will meet - if ev.Type == mvccpb.PUT && ls.GetLeaderWatch() { + if ev.Type == mvccpb.PUT && ls.IsLeader() { log.Info("[LeaderWatch] current leadership is updated", zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) return diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index a6180ca4c3a0..f86bd0b10075 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -17,7 +17,6 @@ package server import ( "context" "fmt" - "github.com/tikv/pd/pkg/utils/etcdutil" "net/http" "os" "os/signal" @@ -249,7 +248,7 @@ func (s *Server) primaryElectionLoop() { } // To make sure the expected leader(if exist) and primary are on the same server. - expectedPrimary := utils.GetExpectedPrimary(s.participant.GetLeaderPath(), s.GetClient()) + expectedPrimary := utils.GetExpectedPrimary(s.GetClient(), s.participant.GetLeaderPath()) if expectedPrimary != "" && expectedPrimary != s.participant.MemberValue() { log.Info("skip campaigning of scheduling primary and check later", zap.String("server-name", s.Name()), @@ -332,7 +331,7 @@ func (s *Server) campaignLeader() { } func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { - _, revision, err := s.participant.GetPersistentLeader() + oldPrimary, revision, err := s.participant.GetPersistentLeader() if err != nil { log.Error("[primary] getting the leader meets error", errs.ZapError(err)) return @@ -345,26 +344,25 @@ func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { // only API update primary will set the expected leader // check leader key whether deleted - leaderRaw, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath()) + curPrimary, _, err := s.participant.GetPersistentLeader() if err != nil { log.Error("[primary] get primary key error", zap.Error(err)) return } - if leaderRaw == nil { - log.Info("[primary] leader key is deleted, the primary will step down") - return - } - - utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath()) + // only trigger by updating primary + if curPrimary != nil && oldPrimary != nil && + curPrimary.(*schedulingpb.Participant).Name != oldPrimary.(*schedulingpb.Participant).Name { + utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath()) - s.participant.UnsetLeader() - defer log.Info("[primary] exit the primary watch loop") - for { - select { - case <-ctx.Done(): - return - case exitPrimary <- struct{}{}: - return + s.participant.UnsetLeader() + defer log.Info("[primary] exit the primary watch loop") + for { + select { + case <-ctx.Done(): + return + case exitPrimary <- struct{}{}: + return + } } } } diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index b4d737c99e38..62bf8b320ad3 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -75,8 +75,8 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err } // GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API. -func GetExpectedPrimary(keyPath string, client *clientv3.Client) string { - leader, err := etcdutil.GetValue(client, strings.Join([]string{keyPath, ExpectedPrimary}, "/")) +func GetExpectedPrimary(client *clientv3.Client, leaderPath string) string { + leader, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimary}, "/")) if err != nil { log.Error("get expected primary key error", errs.ZapError(err)) return "" @@ -86,7 +86,10 @@ func GetExpectedPrimary(keyPath string, client *clientv3.Client) string { } // RemoveExpectedPrimary removes the expected primary key. +// - removed when campaign success +// - removed when server is closed func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { + log.Info("remove expected primary key", zap.String("leaderPath", leaderPath)) // remove expected leader key resp, err := kv.NewSlowLogTxn(client). Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))). @@ -99,6 +102,7 @@ func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) { // SetExpectedPrimary sets the expected primary key when the current primary has exited. func SetExpectedPrimary(client *clientv3.Client, leaderPath string) { + log.Info("set expected primary key", zap.String("leaderPath", leaderPath)) leaderRaw, err := etcdutil.GetValue(client, leaderPath) if err != nil { log.Error("[primary] get primary key error", zap.Error(err)) diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 495c5ae67440..478af240bf54 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "github.com/tikv/pd/pkg/utils/etcdutil" "runtime/trace" "sync" "sync/atomic" @@ -562,7 +561,7 @@ func (gta *GlobalTSOAllocator) primaryElectionLoop() { } // To make sure the expected leader(if exist) and primary are on the same server. - targetPrimary := mcsutils.GetExpectedPrimary(gta.member.GetLeaderPath(), gta.member.Client()) + targetPrimary := mcsutils.GetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) if targetPrimary != "" && targetPrimary != gta.member.MemberValue() { log.Info("skip campaigning of scheduling primary and check later", zap.String("server-name", gta.member.Name()), @@ -675,7 +674,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() { } func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary chan struct{}) { - _, revision, err := gta.member.GetPersistentLeader() + oldPrimary, revision, err := gta.member.GetPersistentLeader() if err != nil { log.Error("[primary] getting the leader meets error", errs.ZapError(err)) return @@ -690,26 +689,24 @@ func (gta *GlobalTSOAllocator) primaryWatch(ctx context.Context, exitPrimary cha // only API update primary will set the expected leader // check leader key whether deleted - leaderRaw, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath()) + curPrimary, _, err := gta.member.GetPersistentLeader() if err != nil { - log.Error("[primary] get primary key error", zap.Error(err)) - return - } - if leaderRaw == nil { - log.Info("[primary] leader key is deleted, the primary will step down") + log.Error("[primary] getting the leader meets error", errs.ZapError(err)) return } - - mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) - - gta.member.UnsetLeader() - for { - select { - case <-ctx.Done(): - log.Info("[primary] exit the primary watch loop") - return - case exitPrimary <- struct{}{}: - return + if curPrimary != nil && oldPrimary != nil && + curPrimary.(ElectionMember).Name() != oldPrimary.(ElectionMember).Name() { + mcsutils.SetExpectedPrimary(gta.member.Client(), gta.member.GetLeaderPath()) + + gta.member.UnsetLeader() + for { + select { + case <-ctx.Done(): + log.Info("[primary] exit the primary watch loop") + return + case exitPrimary <- struct{}{}: + return + } } } } diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 96179c41b0a3..954313dbd758 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -84,8 +84,8 @@ func GetPrimary(c *gin.Context) { // @Summary Transfer the primary member of the specified service. // @Produce json // @Param service path string true "service name" -// @Param new_primary query string false "new primary address" -// @Success 200 {object} string +// @Param new_primary body string false "new primary address" +// @Success 200 string string // @Router /ms/primary/transfer/{service} [post] func TransferPrimary(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)