Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: watch the ddl ownerkey with the createRevision #55692

Merged
merged 9 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/owner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ go_test(
],
embed = [":owner"],
flaky = True,
shard_count = 5,
shard_count = 6,
deps = [
"//pkg/ddl",
"//pkg/infoschema",
Expand Down
48 changes: 30 additions & 18 deletions pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,13 +302,13 @@ func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) {
continue
}

ownerKey, err := GetOwnerKey(campaignContext, logCtx, m.etcdCli, m.key, m.id)
ownerKey, createRevision, err := GetOwnerKeyWithCreateRevision(campaignContext, logCtx, m.etcdCli, m.key, m.id)
if err != nil {
continue
}

m.toBeOwner(elec)
m.watchOwner(campaignContext, etcdSession, ownerKey)
m.watchOwner(campaignContext, etcdSession, ownerKey, createRevision)
m.RetireOwner()

metrics.CampaignOwnerCounter.WithLabelValues(m.prompt, metrics.NoLongerOwner).Inc()
Expand All @@ -328,17 +328,17 @@ func (m *ownerManager) revokeSession(_ string, leaseID clientv3.LeaseID) {

// GetOwnerID implements Manager.GetOwnerID interface.
func (m *ownerManager) GetOwnerID(ctx context.Context) (string, error) {
_, ownerID, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
_, ownerID, _, _, _, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
return string(ownerID), errors.Trace(err)
}

func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, error) {
func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPath string) (string, []byte, OpType, int64, int64, error) {
var op OpType
var resp *clientv3.GetResponse
var err error
for i := 0; i < 3; i++ {
if err = ctx.Err(); err != nil {
return "", nil, op, 0, errors.Trace(err)
return "", nil, op, 0, 0, errors.Trace(err)
}

childCtx, cancel := context.WithTimeout(ctx, util.KeyOpDefaultTimeout)
Expand All @@ -352,31 +352,35 @@ func getOwnerInfo(ctx, logCtx context.Context, etcdCli *clientv3.Client, ownerPa
}
if err != nil {
logutil.Logger(logCtx).Warn("etcd-cli get owner info failed", zap.Error(err))
return "", nil, op, 0, errors.Trace(err)
return "", nil, op, 0, 0, errors.Trace(err)
}
if len(resp.Kvs) == 0 {
return "", nil, op, 0, concurrency.ErrElectionNoLeader
return "", nil, op, 0, 0, concurrency.ErrElectionNoLeader
}

var ownerID []byte
ownerID, op = splitOwnerValues(resp.Kvs[0].Value)
logutil.Logger(logCtx).Info("get owner", zap.ByteString("owner key", resp.Kvs[0].Key),
zap.ByteString("ownerID", ownerID), zap.Stringer("op", op))
return string(resp.Kvs[0].Key), ownerID, op, resp.Kvs[0].ModRevision, nil
return string(resp.Kvs[0].Key), ownerID, op, resp.Kvs[0].CreateRevision, resp.Kvs[0].ModRevision, nil
joccau marked this conversation as resolved.
Show resolved Hide resolved
joccau marked this conversation as resolved.
Show resolved Hide resolved
}

// GetOwnerKey gets the owner key information.
func GetOwnerKey(ctx, logCtx context.Context, etcdCli *clientv3.Client, etcdKey, id string) (string, error) {
ownerKey, ownerID, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
// GetOwnerKeyWithCreateRevision gets the owner key and createRevision information.
func GetOwnerKeyWithCreateRevision(
joccau marked this conversation as resolved.
Show resolved Hide resolved
ctx, logCtx context.Context,
etcdCli *clientv3.Client,
etcdKey, id string,
) (string, int64, error) {
ownerKey, ownerID, _, createRevision, _, err := getOwnerInfo(ctx, logCtx, etcdCli, etcdKey)
if err != nil {
return "", errors.Trace(err)
return "", 0, errors.Trace(err)
}
if string(ownerID) != id {
logutil.Logger(logCtx).Warn("is not the owner")
return "", errors.New("ownerInfoNotMatch")
return "", 0, errors.New("ownerInfoNotMatch")
}

return ownerKey, nil
return ownerKey, createRevision, nil
}

func splitOwnerValues(val []byte) ([]byte, OpType) {
Expand All @@ -395,7 +399,7 @@ func joinOwnerValues(vals ...[]byte) []byte {
// SetOwnerOpValue implements Manager.SetOwnerOpValue interface.
func (m *ownerManager) SetOwnerOpValue(ctx context.Context, op OpType) error {
// owner don't change.
ownerKey, ownerID, currOp, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
ownerKey, ownerID, currOp, _, modRevision, err := getOwnerInfo(ctx, m.logCtx, m.etcdCli, m.key)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -438,15 +442,23 @@ func GetOwnerOpValue(ctx context.Context, etcdCli *clientv3.Client, ownerPath, l
}

logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, _, op, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
_, _, op, _, _, err := getOwnerInfo(ctx, logCtx, etcdCli, ownerPath)
return op, errors.Trace(err)
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string) {
// WatchOwnerForTest watches the ownerKey.
// This function is used to test watchOwner().
func WatchOwnerForTest(ctx context.Context, m Manager, etcdSession *concurrency.Session, key string, ownerRevision int64) {
if ownerManager, ok := m.(*ownerManager); ok {
ownerManager.watchOwner(ctx, etcdSession, key, ownerRevision)
}
}

func (m *ownerManager) watchOwner(ctx context.Context, etcdSession *concurrency.Session, key string, ownerRevision int64) {
logPrefix := fmt.Sprintf("[%s] ownerManager %s watch owner key %v", m.prompt, m.id, key)
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
logutil.BgLogger().Debug(logPrefix)
watchCh := m.etcdCli.Watch(ctx, key)
watchCh := m.etcdCli.Watch(ctx, key, clientv3.WithRev(ownerRevision))
for {
select {
case resp, ok := <-watchCh:
Expand Down
65 changes: 64 additions & 1 deletion pkg/owner/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,76 @@ func TestCluster(t *testing.T) {

logPrefix := fmt.Sprintf("[ddl] %s ownerManager %s", DDLOwnerKey, "useless id")
logCtx := logutil.WithKeyValue(context.Background(), "owner info", logPrefix)
_, err = owner.GetOwnerKey(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id")
_, _, err = owner.GetOwnerKeyWithCreateRevision(context.Background(), logCtx, cliRW, DDLOwnerKey, "useless id")
require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err)
op, err := owner.GetOwnerOpValue(context.Background(), cliRW, DDLOwnerKey, logPrefix)
require.Truef(t, terror.ErrorEqual(err, concurrency.ErrElectionNoLeader), "get owner info result don't match, err %v", err)
require.Equal(t, op, owner.OpNone)
}

func TestWatchOwner(t *testing.T) {
joccau marked this conversation as resolved.
Show resolved Hide resolved
if runtime.GOOS == "windows" {
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
}
integration.BeforeTestExternal(t)

tInfo := newTestInfo(t)
client, d := tInfo.client, tInfo.ddl
defer tInfo.Close(t)
ownerManager := d.OwnerManager()
lis := &listener{}
ownerManager.SetListener(lis)
require.NoError(t, ownerManager.CampaignOwner())
isOwner := checkOwner(d, true)
require.True(t, isOwner)

// get the owner id.
ctx := context.Background()
id, err := ownerManager.GetOwnerID(ctx)
require.NoError(t, err)

// create etcd session.
session, err := concurrency.NewSession(client)
require.NoError(t, err)

// test the GetOwnerKeyWithCreateRevision()
ownerKey, createRevision, err := owner.GetOwnerKeyWithCreateRevision(ctx, context.TODO(), client, DDLOwnerKey, id)
require.NoError(t, err)

// watch the ownerKey with the `DELETE` event.
ctx2, cancel2 := context.WithTimeout(ctx, time.Millisecond*300)
defer cancel2()
watchDone := make(chan bool)
watched := false
go func() {
owner.WatchOwnerForTest(ctx2, ownerManager, session, ownerKey, createRevision)
watchDone <- true
}()

select {
case watched = <-watchDone:
case <-ctx2.Done():
}
require.False(t, watched)

// delete the owner, and can watch the event.
deleteLeader(client, DDLOwnerKey)
require.NoError(t, err)
watched = <-watchDone
require.True(t, watched)

// the ownerKey has been deleted, watch ownerKey again, it can be watched.
ctx3, cancel3 := context.WithTimeout(ctx, time.Millisecond*300)
defer cancel3()
go func() {
owner.WatchOwnerForTest(ctx3, ownerManager, session, ownerKey, createRevision)
watchDone <- true
}()

watched = <-watchDone
require.True(t, watched)
}

func checkOwner(d DDL, fbVal bool) (isOwner bool) {
manager := d.OwnerManager()
// The longest to wait for 30 seconds to
Expand Down