Skip to content

Commit

Permalink
ddl: watch the ddl ownerkey with the createRevision (#55692)
Browse files Browse the repository at this point in the history
close #54689
  • Loading branch information
joccau authored Sep 4, 2024
1 parent 7d9b76c commit 0720ea8
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ go_test(
],
embed = [":owner"],
flaky = True,
shard_count = 5,
shard_count = 7,
deps = [
"//pkg/ddl",
"//pkg/infoschema",
Expand Down
61 changes: 38 additions & 23 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,14 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
continue
}

ownerKey, err := GetOwnerKey(campaignContext, logCtx, m.etcdCli, m.key, m.id)
ownerKey, currRev, err := GetOwnerKeyInfo(campaignContext, logCtx, m.etcdCli, m.key, m.id)
if err != nil {
continue
}

m.toBeOwner(elec)
m.watchOwner(campaignContext, etcdSession, ownerKey)
err = m.watchOwner(campaignContext, etcdSession, ownerKey, currRev)
logutil.Logger(logCtx).Info("watch owner finished", zap.Error(err))
m.RetireOwner()

metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc()
Expand All @@ -328,17 +329,17 @@ func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) {

// GetOwnerID implements Manager.GetOwnerID interface.
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
_, ownerID, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
_, ownerID, _, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
return string(ownerID), errors.Trace(err)
}

func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, error) {
func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) {
var op OpType
var resp *clientv3.GetResponse
var err error
for i := 0; i < 3; i++ {
if err = ctx.Err(); err != nil {
return "", nil, op, 0, errors.Trace(err)
return "", nil, op, 0, 0, errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, util.KeyOpDefaultTimeout)
Expand All @@ -352,31 +353,35 @@ func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPa
}
if err != nil {
logutil.Logger(logCtx).Warn("etcd-cli get owner info failed", zap.Error(err))
return "", nil, op, 0, errors.Trace(err)
return "", nil, op, 0, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", nil, op, 0, concurrency.ErrElectionNoLeader
return "", nil, op, 0, 0, concurrency.ErrElectionNoLeader
}

var ownerID []byte
ownerID, op = splitOwnerValues(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key),
zap.ByteString("ownerID", ownerID), zap.Stringer("op", op))
return string(resp.Kvs[0].Key), ownerID, op, resp.Kvs[0].ModRevision, nil
return string(resp.Kvs[0].Key), ownerID, op, resp.Header.Revision, resp.Kvs[0].ModRevision, nil
}

// GetOwnerKey gets the owner key information.
func GetOwnerKey(ctx, logCtx context.Context, etcdCli *clientv3.Client, etcdKey, id string) (string, error) {
ownerKey, ownerID, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
// GetOwnerKeyInfo gets the owner key and current revision.
func GetOwnerKeyInfo(
ctx, logCtx context.Context,
etcdCli *clientv3.Client,
etcdKey, id string,
) (string, int64, error) {
ownerKey, ownerID, _, currRevision, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
if err != nil {
return "", errors.Trace(err)
return "", 0, errors.Trace(err)
}
if string(ownerID) != id {
logutil.Logger(logCtx).Warn("is not the owner")
return "", errors.New("ownerInfoNotMatch")
return "", 0, errors.New("ownerInfoNotMatch")
}

return ownerKey, nil
return ownerKey, currRevision, nil
}

func splitOwnerValues(val []byte) ([]byte, OpType) {
Expand All @@ -395,7 +400,7 @@ func joinOwnerValues(vals ...[]byte) []byte {
// SetOwnerOpValue implements Manager.SetOwnerOpValue interface.
func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error {
// owner don't change.
ownerKey, ownerID, currOp, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -438,42 +443,52 @@ func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, l
}

logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, _, op, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
_, _, op, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
return op, errors.Trace(err)
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) {
// WatchOwnerForTest watches the ownerKey.
// This function is used to test watchOwner().
func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency.Session, key string, createRevison int64) error {
if ownerManager, ok := m.(*ownerManager); ok {
return ownerManager.watchOwner(ctx, etcdSession, key, createRevison)
}
return nil
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, currRev int64) error {
logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key)
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
logutil.BgLogger().Debug(logPrefix)
watchCh := m.etcdCli.Watch(ctx, key)
// we need to watch the ownerKey since currRev + 1.
watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(currRev+1))
for {
select {
case resp, ok := <-watchCh:
if !ok {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.WatcherClosed).Inc()
logutil.Logger(logCtx).Info("watcher is closed, no owner")
return
return errors.Errorf("watcher is closed, key: %v", key)
}
if resp.Canceled {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Cancelled).Inc()
logutil.Logger(logCtx).Info("watch canceled, no owner")
return
return errors.Errorf("watch canceled, key: %v", key)
}

for _, ev := range resp.Events {
if ev.Type == mvccpb.DELETE {
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.Deleted).Inc()
logutil.Logger(logCtx).Info("watch failed, owner is deleted")
return
return nil
}
}
case <-etcdSession.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.SessionDone).Inc()
return
return nil
case <-ctx.Done():
metrics.WatchOwnerCounter.WithLabelValues(m.prompt, metrics.CtxDone).Inc()
return
return nil
}
}
}
Expand Down
106 changes: 105 additions & 1 deletion pkg/owner/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,117 @@ func TestCluster(t *testing.T) {

logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", DDLOwnerKey, "useless id")
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, err = owner.GetOwnerKey(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id")
_, _, err = owner.GetOwnerKeyInfo(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id")
require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err)
op, err := owner.GetOwnerOpValue(context.Background(), cliRW, DDLOwnerKey, logPrefix)
require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err)
require.Equal(t, op, owner.OpNone)
}

func TestWatchOwner(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)

tInfo := newTestInfo(t)
client, d := tInfo.client, tInfo.ddl
defer tInfo.Close(t)
ownerManager := d.OwnerManager()
lis := &listener{}
ownerManager.SetListener(lis)
require.NoError(t, ownerManager.CampaignOwner())
isOwner := checkOwner(d, true)
require.True(t, isOwner)

// get the owner id.
ctx := context.Background()
id, err := ownerManager.GetOwnerID(ctx)
require.NoError(t, err)

// create etcd session.
session, err := concurrency.NewSession(client)
require.NoError(t, err)

// test the GetOwnerKeyInfo()
ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id)
require.NoError(t, err)

// watch the ownerKey.
ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*300)
defer cancel2()
watchDone := make(chan bool)
watched := false
go func() {
watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision)
require.NoError(t, watchErr)
watchDone <- true
}()

select {
case watched = <-watchDone:
case <-ctx2.Done():
}
require.False(t, watched)

// delete the owner, and can watch the DELETE event.
err = deleteLeader(client, DDLOwnerKey)
require.NoError(t, err)
watched = <-watchDone
require.True(t, watched)

// the ownerKey has been deleted, watch ownerKey again, it can be watched.
go func() {
watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision)
require.NoError(t, watchErr)
watchDone <- true
}()

watched = <-watchDone
require.True(t, watched)
}

func TestWatchOwnerAfterDeleteOwnerKey(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)

tInfo := newTestInfo(t)
client, d := tInfo.client, tInfo.ddl
defer tInfo.Close(t)
ownerManager := d.OwnerManager()
lis := &listener{}
ownerManager.SetListener(lis)
require.NoError(t, ownerManager.CampaignOwner())
isOwner := checkOwner(d, true)
require.True(t, isOwner)

// get the owner id.
ctx := context.Background()
id, err := ownerManager.GetOwnerID(ctx)
require.NoError(t, err)
session, err := concurrency.NewSession(client)
require.NoError(t, err)

// get the ownkey informations.
ownerKey, currRevision, err := owner.GetOwnerKeyInfo(ctx, context.TODO(), client, DDLOwnerKey, id)
require.NoError(t, err)

// delete the ownerkey
err = deleteLeader(client, DDLOwnerKey)
require.NoError(t, err)

// watch the ownerKey with the current revisoin.
watchDone := make(chan bool)
go func() {
watchErr := owner.WatchOwnerForTest(ctx, ownerManager, session, ownerKey, currRevision)
require.NoError(t, watchErr)
watchDone <- true
}()
<-watchDone
}

func checkOwner(d DDL, fbVal bool) (isOwner bool) {
manager := d.OwnerManager()
// The longest to wait for 30 seconds to
Expand Down

0 comments on commit 0720ea8

Please sign in to comment.