From afdd5c2ecd5cec5905b314878d89099d26fec954 Mon Sep 17 00:00:00 2001 From: Xiaoyuan Jin Date: Fri, 27 Sep 2024 20:03:42 +0800 Subject: [PATCH] owner: fix data race on ownerManager.campaignCancel (#56362) close pingcap/tidb#56053 --- pkg/owner/BUILD.bazel | 2 +- pkg/owner/manager.go | 8 ++++---- pkg/owner/manager_test.go | 17 +++++++++++++++++ 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/owner/BUILD.bazel b/pkg/owner/BUILD.bazel index ea9a4cfd94d66..62dbe260e3127 100644 --- a/pkg/owner/BUILD.bazel +++ b/pkg/owner/BUILD.bazel @@ -37,7 +37,7 @@ go_test( ], embed = [":owner"], flaky = True, - shard_count = 8, + shard_count = 9, deps = [ "//pkg/ddl", "//pkg/infoschema", diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 6e3c185b22861..ea989b75f4856 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -196,7 +196,9 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error { } m.sessionLease.Store(int64(session.Lease())) m.wg.Add(1) - go m.campaignLoop(session) + var campaignContext context.Context + campaignContext, m.campaignCancel = context.WithCancel(m.ctx) + go m.campaignLoop(campaignContext, session) return nil } @@ -241,9 +243,7 @@ func (m *ownerManager) CampaignCancel() { m.wg.Wait() } -func (m *ownerManager) campaignLoop(etcdSession *concurrency.Session) { - var campaignContext context.Context - campaignContext, m.campaignCancel = context.WithCancel(m.ctx) +func (m *ownerManager) campaignLoop(campaignContext context.Context, etcdSession *concurrency.Session) { defer func() { m.campaignCancel() if r := recover(); r != nil { diff --git a/pkg/owner/manager_test.go b/pkg/owner/manager_test.go index a3dfea821c91f..907ebe79de896 100644 --- a/pkg/owner/manager_test.go +++ b/pkg/owner/manager_test.go @@ -439,6 +439,23 @@ func deleteLeader(cli *clientv3.Client, prefixKey string) error { return errors.Trace(err) } +func TestImmediatelyCancel(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows") + } + integration.BeforeTestExternal(t) + + tInfo := newTestInfo(t) + d := tInfo.ddl + defer tInfo.Close(t) + ownerManager := d.OwnerManager() + for i := 0; i < 10; i++ { + err := ownerManager.CampaignOwner() + require.NoError(t, err) + ownerManager.CampaignCancel() + } +} + func TestAcquireDistributedLock(t *testing.T) { const addrFmt = "http://127.0.0.1:%d" cfg := embed.NewConfig()