diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 364ffd1fc2a93..0f194737886be 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -608,7 +608,7 @@ func TestColumnTable(t *testing.T) { testkit.RowsWithSep("|", "test|tbl1|col_2")) tk.MustQuery(`select count(*) from information_schema.columns;`).Check( - testkit.RowsWithSep("|", "4937")) + testkit.RowsWithSep("|", "4939")) } func TestIndexUsageTable(t *testing.T) { diff --git a/pkg/executor/internal/querywatch/query_watch_test.go b/pkg/executor/internal/querywatch/query_watch_test.go index 72381f52c523e..fa461083270e5 100644 --- a/pkg/executor/internal/querywatch/query_watch_test.go +++ b/pkg/executor/internal/querywatch/query_watch_test.go @@ -29,6 +29,10 @@ import ( ) func TestQueryWatch(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC")) + }() store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) if variable.SchemaCacheSize.Load() != 0 { @@ -117,7 +121,7 @@ func TestQueryWatch(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest", fmt.Sprintf("return(%d)", 60))) err = tk.QueryToErr("select /*+ resource_group(rg1) */ * from t3") require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query") - tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil, + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, match_type from mysql.tidb_runaway_queries", nil, testkit.Rows( "rg1 select /*+ resource_group(rg1) */ * from t3 watch", "rg1 select /*+ resource_group(rg1) */ * from t3 identify", diff --git a/pkg/resourcegroup/runaway/BUILD.bazel b/pkg/resourcegroup/runaway/BUILD.bazel index 9dcbbe78c898e..269fea00c3937 100644 --- a/pkg/resourcegroup/runaway/BUILD.bazel +++ b/pkg/resourcegroup/runaway/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "runaway", @@ -38,3 +38,12 @@ go_library( "@org_uber_go_zap//:zap", ], ) + +go_test( + name = "runaway_test", + timeout = "short", + srcs = ["record_test.go"], + embed = [":runaway"], + flaky = True, + deps = ["@com_github_stretchr_testify//assert"], +) diff --git a/pkg/resourcegroup/runaway/checker.go b/pkg/resourcegroup/runaway/checker.go index 4592f2a338405..e397a411972a4 100644 --- a/pkg/resourcegroup/runaway/checker.go +++ b/pkg/resourcegroup/runaway/checker.go @@ -300,7 +300,7 @@ func (r *Checker) markRunaway(matchType string, action rmpb.RunawayAction, switc } actionStr = strings.ToLower(actionStr) metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType, actionStr).Inc() - r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now) + r.manager.markRunaway(r, actionStr, matchType, now) } func (r *Checker) getSettingConvictIdentifier() string { diff --git a/pkg/resourcegroup/runaway/manager.go b/pkg/resourcegroup/runaway/manager.go index 831c3db285197..5b9dee83c8231 100644 --- a/pkg/resourcegroup/runaway/manager.go +++ b/pkg/resourcegroup/runaway/manager.go @@ -43,7 +43,7 @@ const ( maxWatchListCap = 10000 maxWatchRecordChannelSize = 1024 - runawayRecordFlushInterval = time.Second + runawayRecordFlushInterval = 30 * time.Second runawayRecordGCInterval = time.Hour * 24 runawayRecordExpiredDuration = time.Hour * 24 * 7 @@ -155,17 +155,19 @@ func (rm *Manager) RunawayRecordFlushLoop() { quarantineRecordCh := rm.quarantineRecordChan() staleQuarantineRecordCh := rm.staleQuarantineRecordChan() flushThreshold := flushThreshold() - records := make([]*Record, 0, flushThreshold) + // recordMap is used to deduplicate records. + recordMap = make(map[recordKey]*Record, flushThreshold) flushRunawayRecords := func() { - if len(records) == 0 { + if len(recordMap) == 0 { return } - sql, params := genRunawayQueriesStmt(records) + sql, params := genRunawayQueriesStmt(recordMap) if _, err := ExecRCRestrictedSQL(rm.sysSessionPool, sql, params); err != nil { - logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records))) + logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(recordMap))) } - records = records[:0] + // reset the map. + recordMap = make(map[recordKey]*Record, flushThreshold) } for { @@ -176,11 +178,21 @@ func (rm *Manager) RunawayRecordFlushLoop() { flushRunawayRecords() fired = true case r := <-recordCh: - records = append(records, r) + key := recordKey{ + ResourceGroupName: r.ResourceGroupName, + SQLDigest: r.SQLDigest, + PlanDigest: r.PlanDigest, + Match: r.Match, + } + if _, exists := recordMap[key]; exists { + recordMap[key].Repeats++ + } else { + recordMap[key] = r + } failpoint.Inject("FastRunawayGC", func() { flushRunawayRecords() }) - if len(records) >= flushThreshold { + if len(recordMap) >= flushThreshold { flushRunawayRecords() } else if fired { fired = false @@ -321,7 +333,7 @@ func (rm *Manager) getWatchFromWatchList(key string) *QuarantineRecord { return nil } -func (rm *Manager) markRunaway(resourceGroupName, originalSQL, planDigest, action, matchType string, now *time.Time) { +func (rm *Manager) markRunaway(checker *Checker, action, matchType string, now *time.Time) { source := rm.serverID if !rm.syncerInitialized.Load() { rm.logOnce.Do(func() { @@ -331,13 +343,16 @@ func (rm *Manager) markRunaway(resourceGroupName, originalSQL, planDigest, actio } select { case rm.runawayQueriesChan <- &Record{ - ResourceGroupName: resourceGroupName, - Time: *now, + ResourceGroupName: checker.resourceGroupName, + StartTime: *now, Match: matchType, Action: action, - SQLText: originalSQL, - PlanDigest: planDigest, + SampleText: checker.originalSQL, + SQLDigest: checker.sqlDigest, + PlanDigest: checker.planDigest, Source: source, + // default value for Repeats + Repeats: 1, }: default: // TODO: add warning for discard flush records diff --git a/pkg/resourcegroup/runaway/record.go b/pkg/resourcegroup/runaway/record.go index d8762417e22c1..7e9ce1cab51f2 100644 --- a/pkg/resourcegroup/runaway/record.go +++ b/pkg/resourcegroup/runaway/record.go @@ -17,6 +17,7 @@ package runaway import ( "context" "fmt" + "hash/fnv" "strings" "time" @@ -52,36 +53,62 @@ var NullTime time.Time // Record is used to save records which will be inserted into mysql.tidb_runaway_queries. type Record struct { ResourceGroupName string - Time time.Time + StartTime time.Time Match string Action string - SQLText string + SampleText string + SQLDigest string PlanDigest string Source string + // Repeats is used to avoid inserting the same record multiple times. + // It records the number of times after flushing the record(10s) to the table or len(map) exceeds the threshold(1024). + // We only consider `resource_group_name`, `sql_digest`, `plan_digest` and `match_type` when comparing records. + // default value is 1. + Repeats int +} + +// recordMap is used to save records which will be inserted into `mysql.tidb_runaway_queries` by function `flushRunawayRecords`. +var recordMap map[recordKey]*Record + +// recordKey represents the composite key for record key in `tidb_runaway_queries`. +type recordKey struct { + ResourceGroupName string + SQLDigest string + PlanDigest string + Match string +} + +// Hash generates a hash for the recordKey. +// Because `tidb_runaway_queries` is informational and not performance-critical, +// we can lose some accuracy for other component's performance. +func (k recordKey) Hash() uint64 { + h := fnv.New64a() + h.Write([]byte(k.ResourceGroupName)) + h.Write([]byte(k.SQLDigest)) + h.Write([]byte(k.PlanDigest)) + h.Write([]byte(k.Match)) + return h.Sum64() } // genRunawayQueriesStmt generates statement with given RunawayRecords. -func genRunawayQueriesStmt(records []*Record) (string, []any) { +func genRunawayQueriesStmt(recordMap map[recordKey]*Record) (string, []any) { var builder strings.Builder - params := make([]any, 0, len(records)*7) - builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ") - for count, r := range records { - if count > 0 { + params := make([]any, 0, len(recordMap)*9) + builder.WriteString("INSERT INTO mysql.tidb_runaway_queries " + + "(resource_group_name, start_time, match_type, action, sample_sql, sql_digest, plan_digest, tidb_server, repeats) VALUES ") + firstRecord := true + for _, r := range recordMap { + if !firstRecord { builder.WriteByte(',') } - builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)") - params = append(params, r.ResourceGroupName) - params = append(params, r.Time) - params = append(params, r.Match) - params = append(params, r.Action) - params = append(params, r.SQLText) - params = append(params, r.PlanDigest) - params = append(params, r.Source) + firstRecord = false + builder.WriteString("(%?, %?, %?, %?, %?, %?, %?, %?, %?)") + params = append(params, r.ResourceGroupName, r.StartTime, r.Match, r.Action, r.SampleText, r.SQLDigest, r.PlanDigest, r.Source, r.Repeats) } return builder.String(), params } -// QuarantineRecord is used to save records which will be insert into mysql.tidb_runaway_watch. +// QuarantineRecord is used to save records which will be inserted into mysql.tidb_runaway_watch. type QuarantineRecord struct { ID int64 ResourceGroupName string @@ -183,7 +210,7 @@ func (r *QuarantineRecord) genDeletionStmt() (string, []any) { func (rm *Manager) deleteExpiredRows(expiredDuration time.Duration) { const ( tableName = "tidb_runaway_queries" - colName = "time" + colName = "start_time" ) var systemSchemaCIStr = model.NewCIStr("mysql") diff --git a/pkg/resourcegroup/runaway/record_test.go b/pkg/resourcegroup/runaway/record_test.go new file mode 100644 index 0000000000000..8e51f97e4e24b --- /dev/null +++ b/pkg/resourcegroup/runaway/record_test.go @@ -0,0 +1,81 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runaway + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRecordKey(t *testing.T) { + // Initialize test data + key1 := recordKey{ + ResourceGroupName: "group1", + SQLDigest: "digest1", + PlanDigest: "plan1", + } + // key2 is identical to key1 + key2 := recordKey{ + ResourceGroupName: "group1", + SQLDigest: "digest1", + PlanDigest: "plan1", + } + key3 := recordKey{ + ResourceGroupName: "group2", + } + // Test Hash method + hash1 := key1.Hash() + hash2 := key2.Hash() + hash3 := key3.Hash() + + assert.Equal(t, hash1, hash2, "Hashes should be equal for identical keys") + assert.NotEqual(t, hash1, hash3, "Hashes should not be equal for different keys") + + // Test MapKey method + recordMap = make(map[recordKey]*Record) + record1 := &Record{ + ResourceGroupName: "group1", + SQLDigest: "digest1", + PlanDigest: "plan1", + } + // put key1 into recordMap + recordMap[key1] = record1 + assert.Len(t, recordMap, 1, "recordMap should have 1 element") + assert.Equal(t, "group1", recordMap[key1].ResourceGroupName, "Repeats should not be updated") + assert.Equal(t, 0, recordMap[key1].Repeats, "Repeats should be incremented") + // key2 is identical to key1, so we can use key2 to get the record + assert.NotNil(t, recordMap[key1], "key1 should exist in recordMap") + assert.NotNil(t, recordMap[key2], "key2 should exist in recordMap") + assert.Nil(t, recordMap[key3], "key3 should not exist in recordMap") + + // put key2 into recordMap and update Repeats + record2 := &Record{ + ResourceGroupName: "group1", + Repeats: 1, + } + recordMap[key2] = record2 + assert.Len(t, recordMap, 1, "recordMap should have 1 element") + assert.Equal(t, 1, recordMap[key1].Repeats, "Repeats should be updated") + // change ResourceGroupName of key2 will not affect key1 + key2.ResourceGroupName = "group2" + record3 := &Record{ + ResourceGroupName: "group2", + } + recordMap[key2] = record3 + assert.Len(t, recordMap, 2, "recordMap should have 1 element") + assert.Equal(t, "group1", recordMap[key1].ResourceGroupName, "Repeats should not be updated") + assert.Equal(t, "group2", recordMap[key2].ResourceGroupName, "ResourceGroupName should be updated") +} diff --git a/pkg/resourcegroup/tests/BUILD.bazel b/pkg/resourcegroup/tests/BUILD.bazel index 9ef1e1a79bbd2..ebc61d5cba148 100644 --- a/pkg/resourcegroup/tests/BUILD.bazel +++ b/pkg/resourcegroup/tests/BUILD.bazel @@ -6,7 +6,7 @@ go_test( srcs = ["resource_group_test.go"], flaky = True, race = "on", - shard_count = 6, + shard_count = 7, deps = [ "//pkg/ddl/resourcegroup", "//pkg/domain", diff --git a/pkg/resourcegroup/tests/resource_group_test.go b/pkg/resourcegroup/tests/resource_group_test.go index 4b7e57dd7d9ac..532bc3cc7bb00 100644 --- a/pkg/resourcegroup/tests/resource_group_test.go +++ b/pkg/resourcegroup/tests/resource_group_test.go @@ -322,9 +322,9 @@ func TestResourceGroupRunaway(t *testing.T) { tryInterval := time.Millisecond * 100 maxWaitDuration := time.Second * 5 - tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil, + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, match_type from mysql.tidb_runaway_queries", nil, testkit.Rows("rg1 select /*+ resource_group(rg1) */ * from t identify"), maxWaitDuration, tryInterval) - tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries", nil, + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, start_time from mysql.tidb_runaway_queries", nil, nil, maxWaitDuration, tryInterval) tk.MustExec("alter resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='100ms' ACTION=COOLDOWN)") tk.MustQuery("select /*+ resource_group(rg1) */ * from t").Check(testkit.Rows("1")) @@ -335,7 +335,7 @@ func TestResourceGroupRunaway(t *testing.T) { err = tk.QueryToErr("select /*+ resource_group(rg2) */ * from t") require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") tk.MustGetErrCode("select /*+ resource_group(rg2) */ * from t", mysql.ErrResourceGroupQueryRunawayQuarantine) - tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil, + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, match_type from mysql.tidb_runaway_queries", nil, testkit.Rows("rg2 select /*+ resource_group(rg2) */ * from t identify", "rg2 select /*+ resource_group(rg2) */ * from t watch"), maxWaitDuration, tryInterval) tk.MustQuery("select SQL_NO_CACHE resource_group_name, watch_text from mysql.tidb_runaway_watch"). @@ -345,7 +345,7 @@ func TestResourceGroupRunaway(t *testing.T) { err = tk.QueryToErr("select /*+ resource_group(rg2) */ * from t") require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") - tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries", nil, + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, start_time from mysql.tidb_runaway_queries", nil, nil, maxWaitDuration, tryInterval) tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, watch_text, end_time from mysql.tidb_runaway_watch", nil, nil, maxWaitDuration, tryInterval) @@ -407,9 +407,61 @@ func TestResourceGroupRunawayExceedTiDBSide(t *testing.T) { tryInterval := time.Millisecond * 100 maxWaitDuration := time.Second * 5 - tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil, + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, match_type from mysql.tidb_runaway_queries", nil, testkit.Rows("rg1 select /*+ resource_group(rg1) */ sleep(0.5) from t identify"), maxWaitDuration, tryInterval) - tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, time from mysql.tidb_runaway_queries", nil, + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, start_time from mysql.tidb_runaway_queries", nil, + nil, maxWaitDuration, tryInterval) +} + +func TestResourceGroupRunawayFlood(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC")) + }() + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + require.NoError(t, tk.Session().Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost"}, nil, nil, nil)) + + tk.MustExec("use test") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values(1)") + tk.MustExec("set global tidb_enable_resource_control='on'") + tk.MustExec("create resource group rg1 RU_PER_SEC=1000 QUERY_LIMIT=(EXEC_ELAPSED='50ms' ACTION=KILL)") + tk.MustQuery("select /*+ resource_group(rg1) */ * from t").Check(testkit.Rows("1")) + require.Eventually(t, func() bool { + return dom.RunawayManager().IsSyncerInitialized() + }, 20*time.Second, 300*time.Millisecond) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest", fmt.Sprintf("return(%d)", 60))) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest")) + }() + err := tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.1) from t") + require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") + + tryInterval := time.Millisecond * 100 + maxWaitDuration := time.Second * 5 + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats, match_type from mysql.tidb_runaway_queries", nil, + testkit.Rows("rg1 select /*+ resource_group(rg1) */ sleep(0.1) from t 1 identify"), maxWaitDuration, tryInterval) + // wait for the runaway watch to be cleaned up + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats from mysql.tidb_runaway_queries", nil, + nil, maxWaitDuration, tryInterval) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC")) + + // check thrice to make sure the runaway query be regarded as a repeated query. + err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.2) from t") + require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") + err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.3) from t") + require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") + // using FastRunawayGC to trigger flush + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`)) + err = tk.QueryToErr("select /*+ resource_group(rg1) */ sleep(0.4) from t") + require.ErrorContains(t, err, "Query execution was interrupted, identified as runaway query") + // only have one runaway query + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats, match_type from mysql.tidb_runaway_queries", nil, + testkit.Rows("rg1 select /*+ resource_group(rg1) */ sleep(0.2) from t 3 identify"), maxWaitDuration, tryInterval) + // wait for the runaway watch to be cleaned up + tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, repeats from mysql.tidb_runaway_queries", nil, nil, maxWaitDuration, tryInterval) } diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 37321950f8e96..8b30966b2b085 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -621,14 +621,16 @@ const ( // CreateRunawayTable stores the query which is identified as runaway or quarantined because of in watch list. CreateRunawayTable = `CREATE TABLE IF NOT EXISTS mysql.tidb_runaway_queries ( resource_group_name varchar(32) not null, - time TIMESTAMP NOT NULL, + start_time TIMESTAMP NOT NULL, + repeats int default 1, match_type varchar(12) NOT NULL, action varchar(12) NOT NULL, - original_sql TEXT NOT NULL, - plan_digest TEXT NOT NULL, + sample_sql TEXT NOT NULL, + sql_digest varchar(64) NOT NULL, + plan_digest varchar(64) NOT NULL, tidb_server varchar(512), INDEX plan_index(plan_digest(64)) COMMENT "accelerate the speed when select runaway query", - INDEX time_index(time) COMMENT "accelerate the speed when querying with active watch" + INDEX time_index(start_time) COMMENT "accelerate the speed when querying with active watch" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` // CreateRunawayWatchTable stores the condition which is used to check whether query should be quarantined. @@ -1113,7 +1115,10 @@ const ( // version211 add column `summary` to `mysql.tidb_background_subtask_history`. version211 = 211 - // version212 add column `switch_group_name` to `mysql.tidb_runaway_watch` and `mysql.tidb_runaway_watch_done`. + // version212 changed a lots of runaway related table. + // 1. switchGroup: add column `switch_group_name` to `mysql.tidb_runaway_watch` and `mysql.tidb_runaway_watch_done`. + // 2. modify column `plan_digest` type, modify column `time` to `start_time, + // modify column `original_sql` to `sample_sql` to `mysql.tidb_runaway_queries`. version212 = 212 ) @@ -3090,8 +3095,28 @@ func upgradeToVer212(s sessiontypes.Session, ver int64) { if ver >= version212 { return } + // need to ensure curVersion has the column before rename. + // version169 created `tidb_runaway_queries` table + // version172 created `tidb_runaway_watch` and `tidb_runaway_watch_done` tables + if ver < version172 { + return + } + // version212 changed a lots of runaway related table. + // 1. switchGroup: add column `switch_group_name` to `mysql.tidb_runaway_watch` and `mysql.tidb_runaway_watch_done`. doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch ADD COLUMN `switch_group_name` VARCHAR(32) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch_done ADD COLUMN `switch_group_name` VARCHAR(32) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) + // 2. modify column `plan_digest` type, modify column `time` to `start_time, + // modify column `original_sql` to `sample_sql` and unique union key to `mysql.tidb_runaway_queries`. + // add column `sql_digest`. + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `sql_digest` varchar(64) DEFAULT '' AFTER `original_sql`;", infoschema.ErrColumnExists) + // add column `repeats`. + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `repeats` int DEFAULT 1 AFTER `time`;", infoschema.ErrColumnExists) + // rename column name from `time` to `start_time`, will auto rebuild the index. + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries RENAME COLUMN `time` TO `start_time`") + // rename column `original_sql` to `sample_sql`. + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries RENAME COLUMN `original_sql` TO `sample_sql`") + // modify column type of `plan_digest`. + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries MODIFY COLUMN `plan_digest` varchar(64) DEFAULT '';", infoschema.ErrColumnExists) } // initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist. diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 0987adb382ddd..c0150461388e0 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -270,6 +270,12 @@ func revertVersionAndVariables(t *testing.T, se sessiontypes.Session, ver int) { // for version <= version195, tidb_enable_dist_task should be disabled before upgrade MustExec(t, se, "update mysql.global_variables set variable_value='off' where variable_name='tidb_enable_dist_task'") } + if ver < version212 && ver >= version172 { + // for version < version212, revert column changes related to function `upgradeToVer212`. + // related tables created after version172. + MustExec(t, se, "ALTER TABLE mysql.tidb_runaway_queries RENAME COLUMN `start_time` TO `time`") + MustExec(t, se, "ALTER TABLE mysql.tidb_runaway_queries RENAME COLUMN `sample_sql` TO `original_sql`") + } } // TestUpgrade tests upgrading @@ -312,6 +318,7 @@ func TestUpgrade(t *testing.T) { MustExec(t, se1, fmt.Sprintf(`delete from mysql.global_variables where VARIABLE_NAME="%s"`, variable.TiDBDistSQLScanConcurrency)) MustExec(t, se1, `commit`) unsetStoreBootstrapped(store.UUID()) + revertVersionAndVariables(t, se1, 0) // Make sure the version is downgraded. r = MustExecToRecodeSet(t, se1, `SELECT VARIABLE_VALUE from mysql.TiDB where VARIABLE_NAME="tidb_server_version"`) req = r.NewChunk(nil) @@ -2412,3 +2419,35 @@ func TestTiDBHistoryTableConsistent(t *testing.T) { dom.Close() } + +func TestTiDBUpgradeToVer212(t *testing.T) { + store, dom := CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + // bootstrap as version198, version 199~208 is reserved for v8.1.x bugfix patch. + ver198 := version198 + seV198 := CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(int64(ver198)) + require.NoError(t, err) + revertVersionAndVariables(t, seV198, ver198) + // simulate a real ver198 where mysql.tidb_runaway_queries` doesn't have `start_time`/`sample_sql` columns yet. + MustExec(t, seV198, "select original_sql, time from mysql.tidb_runaway_queries") + err = txn.Commit(context.Background()) + require.NoError(t, err) + unsetStoreBootstrapped(store.UUID()) + + // upgrade to ver212 + dom.Close() + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := CreateSessionAndSetID(t, store) + ver, err := getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + // the columns are changed automatically + MustExec(t, seCurVer, "select sample_sql, start_time, plan_digest from mysql.tidb_runaway_queries") +} diff --git a/pkg/session/bootstraptest/bootstrap_upgrade_test.go b/pkg/session/bootstraptest/bootstrap_upgrade_test.go index 29107c9739675..dd1e5a2aea677 100644 --- a/pkg/session/bootstraptest/bootstrap_upgrade_test.go +++ b/pkg/session/bootstraptest/bootstrap_upgrade_test.go @@ -103,6 +103,12 @@ func revertVersionAndVariables(t *testing.T, se sessiontypes.Session, ver int) { // for version <= version195, tidb_enable_dist_task should be disabled before upgrade session.MustExec(t, se, "update mysql.global_variables set variable_value='off' where variable_name='tidb_enable_dist_task'") } + if ver < 212 && ver >= 172 { + // for version < version212, revert column changes related to function `upgradeToVer212`. + // related tables created after version172. + session.MustExec(t, se, "ALTER TABLE mysql.tidb_runaway_queries RENAME COLUMN `start_time` TO `time`") + session.MustExec(t, se, "ALTER TABLE mysql.tidb_runaway_queries RENAME COLUMN `sample_sql` TO `original_sql`") + } } func TestUpgradeVersion66(t *testing.T) { @@ -618,7 +624,8 @@ func TestUpgradeVersionForResumeJob(t *testing.T) { idxFinishTS = runJob.BinlogInfo.FinishedTS } else { // The second add index op. - if strings.Contains(runJob.TableName, "upgrade_tbl") { + // notice: upgrade `tidb_runaway_queries` table will happened in `upgradeToVer212` function which is before the second add index op. + if strings.Contains(runJob.TableName, "upgrade_tbl") || strings.Contains(runJob.TableName, "tidb_runaway_queries") { require.Greater(t, runJob.BinlogInfo.FinishedTS, idxFinishTS) } else { // The upgrade DDL ops. These jobs' finishedTS must less than add index ops.