Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdms: support primary/transfer api for scheduling and tso #8157

Merged
merged 37 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7fa19d3
check primary
HuSharp May 9, 2024
1f13fa2
make test happy
HuSharp May 9, 2024
af995cc
address comment and add test
HuSharp May 9, 2024
8d36be5
only trigger by updating
HuSharp May 9, 2024
2433f0c
change log
HuSharp May 10, 2024
dd72b9c
address comment
HuSharp May 10, 2024
a39300e
change to name
HuSharp May 13, 2024
51708b5
make test happy
HuSharp May 13, 2024
c6d2bc3
address comment and change some comments
HuSharp May 14, 2024
a4c5c29
add more test
HuSharp May 14, 2024
4d0598f
merge master
HuSharp May 21, 2024
6ac311f
Merge branch 'master' into support_transfer_primary2
HuSharp May 30, 2024
510b92a
Merge branch 'master' into support_transfer_primary2
HuSharp Jun 13, 2024
b235bc1
address comment and add more comment
HuSharp Jul 1, 2024
f659782
add more comment
HuSharp Jul 1, 2024
32b0b5f
Merge branch 'master' into support_transfer_primary2
HuSharp Jul 2, 2024
dbc5447
Merge branch 'master' into support_transfer_primary2
HuSharp Jul 2, 2024
204ffd5
address comment
HuSharp Jul 3, 2024
ec8e737
remove redundant wait
HuSharp Jul 4, 2024
9e3b798
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 8, 2024
e53844e
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 8, 2024
cc82e7b
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 9, 2024
19ce9d8
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 9, 2024
4c7f8ac
changed by name
HuSharp Jul 25, 2024
36b5a82
refine code
HuSharp Jul 25, 2024
ea8d9e3
address comment
HuSharp Jul 30, 2024
ffb7b1b
refine code
HuSharp Jul 31, 2024
379b1f6
merge master
HuSharp Aug 5, 2024
d9bffb8
Merge branch 'master' into support_transfer_primary2
HuSharp Aug 12, 2024
d037a6a
remove delete
HuSharp Aug 12, 2024
e711fd9
refine purpose and lease
HuSharp Aug 12, 2024
d999c7f
merge master
HuSharp Aug 12, 2024
7f0a426
refine code
HuSharp Aug 12, 2024
d810ed1
address comment
HuSharp Aug 12, 2024
43830ec
non-essential exported
HuSharp Aug 13, 2024
2d9a3b0
refine check name
HuSharp Aug 13, 2024
c1da5b5
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Leadership struct {
// campaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`.
// It is ordered by time to prevent the leader from campaigning too frequently.
campaignTimes []time.Time
// primaryWatch is for the primary watch only,
// which is used to reuse `Watch` interface in `Leadership`.
primaryWatch atomic.Bool
}

// NewLeadership creates a new Leadership.
Expand All @@ -84,17 +87,18 @@ func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadersh
return leadership
}

// getLease gets the lease of leadership, only if leadership is valid,
// GetLease gets the lease of leadership, only if leadership is valid,
// i.e. the owner is a true leader, the lease is not nil.
func (ls *Leadership) getLease() *lease {
func (ls *Leadership) GetLease() *Lease {
l := ls.lease.Load()
if l == nil {
return nil
}
return l.(*lease)
return l.(*Lease)
}

func (ls *Leadership) setLease(lease *lease) {
// SetLease sets the lease of leadership.
func (ls *Leadership) SetLease(lease *Lease) {
ls.lease.Store(lease)
}

Expand All @@ -114,6 +118,16 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

// SetPrimaryWatch sets the primary watch flag.
func (ls *Leadership) SetPrimaryWatch(val bool) {
ls.primaryWatch.Store(val)
}

// IsPrimary gets the primary watch flag.
func (ls *Leadership) IsPrimary() bool {
return ls.primaryWatch.Load()
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
// Need to make sure `AddCampaignTimes` is called before this function.
func (ls *Leadership) GetCampaignTimesNum() int {
Expand Down Expand Up @@ -152,18 +166,19 @@ func (ls *Leadership) AddCampaignTimes() {
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error {
ls.leaderValue = leaderData
// Create a new lease to campaign
newLease := &lease{
Purpose: ls.purpose,
client: ls.client,
lease: clientv3.NewLease(ls.client),
}
ls.setLease(newLease)
newLease := NewLease(ls.client, ls.purpose)
ls.SetLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
name, ok := val.(string)
if len(name) == 0 {
// return directly when not set the name
failpoint.Return(errors.Errorf("failed to grant lease"))
}
var member pdpb.Member
_ = member.Unmarshal([]byte(leaderData))
name, ok := val.(string)
if ok && member.Name == name {
// only return when the name is set and the name is equal to the leader name
failpoint.Return(errors.Errorf("failed to grant lease"))
}
})
Expand Down Expand Up @@ -200,12 +215,12 @@ func (ls *Leadership) Keep(ctx context.Context) {
ls.keepAliveCancelFuncLock.Lock()
ls.keepAliveCtx, ls.keepAliveCancelFunc = context.WithCancel(ctx)
ls.keepAliveCancelFuncLock.Unlock()
go ls.getLease().KeepAlive(ls.keepAliveCtx)
go ls.GetLease().KeepAlive(ls.keepAliveCtx)
}

// Check returns whether the leadership is still available.
func (ls *Leadership) Check() bool {
return ls != nil && ls.getLease() != nil && !ls.getLease().IsExpired()
return ls != nil && ls.GetLease() != nil && !ls.GetLease().IsExpired()
}

// LeaderTxn returns txn() with a leader comparison to guarantee that
Expand Down Expand Up @@ -376,6 +391,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
// ONLY `{service}/primary/transfer` API update primary will meet this condition.
if ev.Type == mvccpb.PUT && ls.IsPrimary() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we can optimize the case that if the updated primary is still itself, no need to return to campaign again.

Copy link
Member Author

@HuSharp HuSharp Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/HuSharp/pd/blob/43830ec9ea80d92008be663ec5c26b3d137373a9/pkg/mcs/utils/expected_primary.go#L141-L146

I skipped the member on the outer layer(in transfer primary) which is not the same as oldPrimary, do you think I still need to add the restriction inside the watch?

Maybe the expected primary flag should not be modified when the leader is itself, because this flag must keep the lease alive after campaigning new leader, which means that it requires the Primary to quit the current watch.

log.Info("current leadership is updated", zap.Int64("revision", wresp.Header.Revision),
zap.String("leader-key", ls.leaderKey), zap.ByteString("cur-value", ev.Kv.Value),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if the kv has the value?

zap.String("purpose", ls.purpose))
return
}
}
revision = wresp.Header.Revision + 1
}
Expand All @@ -385,13 +407,14 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {

// Reset does some defer jobs such as closing lease, resetting lease etc.
func (ls *Leadership) Reset() {
if ls == nil || ls.getLease() == nil {
if ls == nil || ls.GetLease() == nil {
return
}
ls.keepAliveCancelFuncLock.Lock()
if ls.keepAliveCancelFunc != nil {
ls.keepAliveCancelFunc()
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
ls.GetLease().Close()
ls.SetPrimaryWatch(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to skip it in non-ms mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's no problem that this code makes false negatives for non-ms mode.

}
4 changes: 2 additions & 2 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ func TestLeadership(t *testing.T) {
leadership2.Keep(ctx)

// Check the lease.
lease1 := leadership1.getLease()
lease1 := leadership1.GetLease()
re.NotNil(lease1)
lease2 := leadership2.getLease()
lease2 := leadership2.GetLease()
re.NotNil(lease2)

re.True(lease1.IsExpired())
Expand Down
24 changes: 17 additions & 7 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ const (
slowRequestTime = etcdutil.DefaultSlowRequestTime
)

// lease is used as the low-level mechanism for campaigning and renewing elected leadership.
// Lease is used as the low-level mechanism for campaigning and renewing elected leadership.
// The way to gain and maintain leadership is to update and keep the lease alive continuously.
type lease struct {
type Lease struct {
// purpose is used to show what this election for
Purpose string
// etcd client and lease
Expand All @@ -48,8 +48,17 @@ type lease struct {
expireTime atomic.Value
}

// NewLease creates a new Lease instance.
func NewLease(client *clientv3.Client, purpose string) *Lease {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return &Lease{
Purpose: purpose,
client: client,
lease: clientv3.NewLease(client),
}
}

// Grant uses `lease.Grant` to initialize the lease and expireTime.
func (l *lease) Grant(leaseTimeout int64) error {
func (l *Lease) Grant(leaseTimeout int64) error {
if l == nil {
return errs.ErrEtcdGrantLease.GenWithStackByCause("lease is nil")
}
Expand All @@ -71,7 +80,7 @@ func (l *lease) Grant(leaseTimeout int64) error {
}

// Close releases the lease.
func (l *lease) Close() error {
func (l *Lease) Close() error {
if l == nil {
return nil
}
Expand All @@ -92,15 +101,15 @@ func (l *lease) Close() error {

// IsExpired checks if the lease is expired. If it returns true,
// current leader should step down and try to re-elect again.
func (l *lease) IsExpired() bool {
func (l *Lease) IsExpired() bool {
if l == nil || l.expireTime.Load() == nil {
return true
}
return time.Now().After(l.expireTime.Load().(time.Time))
}

// KeepAlive auto renews the lease and update expireTime.
func (l *lease) KeepAlive(ctx context.Context) {
func (l *Lease) KeepAlive(ctx context.Context) {
defer logutil.LogPanic()

if l == nil {
Expand All @@ -109,6 +118,7 @@ func (l *lease) KeepAlive(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)
defer log.Info("lease keep alive stopped", zap.String("purpose", l.Purpose))

var maxExpire time.Time
timer := time.NewTimer(l.leaseTimeout)
Expand Down Expand Up @@ -146,7 +156,7 @@ func (l *lease) KeepAlive(ctx context.Context) {
}

// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel.
func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
func (l *Lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
ch := make(chan time.Time)

go func() {
Expand Down
19 changes: 3 additions & 16 deletions pkg/election/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
)

func TestLease(t *testing.T) {
Expand All @@ -30,16 +29,8 @@ func TestLease(t *testing.T) {
defer clean()

// Create the lease.
lease1 := &lease{
Purpose: "test_lease_1",
client: client,
lease: clientv3.NewLease(client),
}
lease2 := &lease{
Purpose: "test_lease_2",
client: client,
lease: clientv3.NewLease(client),
}
lease1 := NewLease(client, "test_lease_1")
lease2 := NewLease(client, "test_lease_2")
re.True(lease1.IsExpired())
re.True(lease2.IsExpired())
re.NoError(lease1.Close())
Expand Down Expand Up @@ -95,11 +86,7 @@ func TestLeaseKeepAlive(t *testing.T) {
defer clean()

// Create the lease.
lease := &lease{
Purpose: "test_lease",
client: client,
lease: clientv3.NewLease(client),
}
lease := NewLease(client, "test_lease")

re.NoError(lease.Grant(defaultLeaseTimeout))
ch := lease.keepAliveWorker(context.Background(), 2*time.Second)
Expand Down
36 changes: 36 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/response"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/handler"
Expand Down Expand Up @@ -120,6 +121,7 @@ func NewService(srv *scheserver.Service) *Service {
s.RegisterHotspotRouter()
s.RegisterRegionsRouter()
s.RegisterStoresRouter()
s.RegisterPrimaryRouter()
return s
}

Expand Down Expand Up @@ -226,6 +228,12 @@ func (s *Service) RegisterConfigRouter() {
regions.GET("/:id/labels", getRegionLabels)
}

// RegisterPrimaryRouter registers the router of the primary handler.
func (s *Service) RegisterPrimaryRouter() {
router := s.root.Group("primary")
router.POST("transfer", transferPrimary)
}

// @Tags admin
// @Summary Change the log level.
// @Produce json
Expand Down Expand Up @@ -1478,3 +1486,31 @@ func getRegionByID(c *gin.Context) {
}
c.Data(http.StatusOK, "application/json", b)
}

// TransferPrimary transfers the primary member to `new_primary`.
// @Tags primary
// @Summary Transfer the primary member to `new_primary`.
// @Produce json
// @Param new_primary body string false "new primary name"
// @Success 200 string string
// @Router /primary/transfer [post]
func transferPrimary(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var input map[string]string
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

newPrimary := ""
if v, ok := input["new_primary"]; ok {
newPrimary = v
}

if err := mcsutils.TransferPrimary(svr.GetClient(), svr.GetParticipant().GetExpectedPrimaryLease(),
constant.SchedulingServiceName, svr.Name(), newPrimary, 0); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.IndentedJSON(http.StatusOK, "success")
}
33 changes: 33 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"os/signal"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

// GetParticipant returns the participant.
func (s *Server) GetParticipant() *member.Participant {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return s.participant
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
Expand Down Expand Up @@ -242,6 +248,20 @@ func (s *Server) primaryElectionLoop() {
log.Info("the scheduling primary has changed, try to re-campaign a primary")
}

// To make sure the expected primary(if existed) and new primary are on the same server.
expectedPrimary := utils.GetExpectedPrimaryFlag(s.GetClient(), s.participant.GetLeaderPath())
// skip campaign the primary if the expected primary is not empty and not equal to the current memberValue.
// expected primary ONLY SET BY `{service}/primary/transfer` API.
if len(expectedPrimary) > 0 && !strings.Contains(s.participant.MemberValue(), expectedPrimary) {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", s.Name()),
zap.String("expected-primary-id", expectedPrimary),
zap.Uint64("member-id", s.participant.ID()),
zap.String("cur-member-value", s.participant.MemberValue()))
time.Sleep(200 * time.Millisecond)
continue
}

s.campaignLeader()
}
}
Expand Down Expand Up @@ -285,7 +305,17 @@ func (s *Server) campaignLeader() {
cb()
}
}()
// check expected primary and watch the primary.
exitPrimary := make(chan struct{})
lease, err := utils.KeepExpectedPrimaryAlive(ctx, s.GetClient(), exitPrimary,
s.cfg.LeaderLease, s.participant.GetLeaderPath(), s.participant.MemberValue(), constant.SchedulingServiceName)
if err != nil {
log.Error("prepare scheduling primary watch error", errs.ZapError(err))
return
}
s.participant.SetExpectedPrimaryLease(lease)
s.participant.EnableLeader()

member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

Expand All @@ -303,6 +333,9 @@ func (s *Server) campaignLeader() {
// Server is closed and it should return nil.
log.Info("server is closed")
return
case <-exitPrimary:
log.Info("no longer be primary because primary have been updated, the scheduling primary will step down")
return
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mcs/scheduling/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Conf
// GenerateConfig generates a new config with the given options.
func GenerateConfig(c *config.Config) (*config.Config, error) {
arguments := []string{
"--name=" + c.Name,
"--listen-addr=" + c.ListenAddr,
"--advertise-listen-addr=" + c.AdvertiseListenAddr,
"--backend-endpoints=" + c.BackendEndpoints,
}

flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError)
flagSet.StringP("name", "", "", "human-readable name for this scheduling member")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we need a default name?

Copy link
Member Author

@HuSharp HuSharp May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default name set by this code, which is like TSO-localhost

func (c *Config) Adjust(meta *toml.MetaData) error {
configMetaData := configutil.NewConfigMetadata(meta)
if err := configMetaData.CheckUndecoded(); err != nil {
c.WarningMsgs = append(c.WarningMsgs, err.Error())
}
if c.Name == "" {
hostname, err := os.Hostname()
if err != nil {
return err
}
configutil.AdjustString(&c.Name, fmt.Sprintf("%s-%s", defaultName, hostname))

And your commented snippet is for testing to avoid using the same name for the same machine for local testing, I used addr here

pd/tests/testutil.go

Lines 87 to 88 in 51708b5

cfg.Name = cfg.ListenAddr
cfg, err := tso.GenerateConfig(cfg)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got

flagSet.BoolP("version", "V", false, "print version information and exit")
flagSet.StringP("config", "", "", "config file")
flagSet.StringP("backend-endpoints", "", "", "url for etcd client")
Expand Down
Loading