Skip to content

Commit

Permalink
runaway: Mitigation tidb_runaway_queries flooding (#55664)
Browse files Browse the repository at this point in the history
ref #54434
  • Loading branch information
HuSharp authored Sep 13, 2024
1 parent b3095dc commit bb3456b
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func TestColumnTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|tbl1|col_2"))
tk.MustQuery(`select count(*) from information_schema.columns;`).Check(
testkit.RowsWithSep("|", "4937"))
testkit.RowsWithSep("|", "4939"))
}

func TestIndexUsageTable(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/executor/internal/querywatch/query_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
)

func TestQueryWatch(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/resourcegroup/runaway/FastRunawayGC"))
}()
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
if variable.SchemaCacheSize.Load() != 0 {
Expand Down Expand Up @@ -117,7 +121,7 @@ func TestQueryWatch(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/copr/sleepCoprRequest", fmt.Sprintf("return(%d)", 60)))
err = tk.QueryToErr("select /*+ resource_group(rg1) */ * from t3")
require.ErrorContains(t, err, "[executor:8253]Query execution was interrupted, identified as runaway query")
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, original_sql, match_type from mysql.tidb_runaway_queries", nil,
tk.EventuallyMustQueryAndCheck("select SQL_NO_CACHE resource_group_name, sample_sql, match_type from mysql.tidb_runaway_queries", nil,
testkit.Rows(
"rg1 select /*+ resource_group(rg1) */ * from t3 watch",
"rg1 select /*+ resource_group(rg1) */ * from t3 identify",
Expand Down
11 changes: 10 additions & 1 deletion pkg/resourcegroup/runaway/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "runaway",
Expand Down Expand Up @@ -38,3 +38,12 @@ go_library(
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "runaway_test",
timeout = "short",
srcs = ["record_test.go"],
embed = [":runaway"],
flaky = True,
deps = ["@com_github_stretchr_testify//assert"],
)
2 changes: 1 addition & 1 deletion pkg/resourcegroup/runaway/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (r *Checker) markRunaway(matchType string, action rmpb.RunawayAction, switc
}
actionStr = strings.ToLower(actionStr)
metrics.RunawayCheckerCounter.WithLabelValues(r.resourceGroupName, matchType, actionStr).Inc()
r.manager.markRunaway(r.resourceGroupName, r.originalSQL, r.planDigest, actionStr, matchType, now)
r.manager.markRunaway(r, actionStr, matchType, now)
}

func (r *Checker) getSettingConvictIdentifier() string {
Expand Down
41 changes: 28 additions & 13 deletions pkg/resourcegroup/runaway/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
maxWatchListCap = 10000
maxWatchRecordChannelSize = 1024

runawayRecordFlushInterval = time.Second
runawayRecordFlushInterval = 30 * time.Second
runawayRecordGCInterval = time.Hour * 24
runawayRecordExpiredDuration = time.Hour * 24 * 7

Expand Down Expand Up @@ -155,17 +155,19 @@ func (rm *Manager) RunawayRecordFlushLoop() {
quarantineRecordCh := rm.quarantineRecordChan()
staleQuarantineRecordCh := rm.staleQuarantineRecordChan()
flushThreshold := flushThreshold()
records := make([]*Record, 0, flushThreshold)
// recordMap is used to deduplicate records.
recordMap = make(map[recordKey]*Record, flushThreshold)

flushRunawayRecords := func() {
if len(records) == 0 {
if len(recordMap) == 0 {
return
}
sql, params := genRunawayQueriesStmt(records)
sql, params := genRunawayQueriesStmt(recordMap)
if _, err := ExecRCRestrictedSQL(rm.sysSessionPool, sql, params); err != nil {
logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(records)))
logutil.BgLogger().Error("flush runaway records failed", zap.Error(err), zap.Int("count", len(recordMap)))
}
records = records[:0]
// reset the map.
recordMap = make(map[recordKey]*Record, flushThreshold)
}

for {
Expand All @@ -176,11 +178,21 @@ func (rm *Manager) RunawayRecordFlushLoop() {
flushRunawayRecords()
fired = true
case r := <-recordCh:
records = append(records, r)
key := recordKey{
ResourceGroupName: r.ResourceGroupName,
SQLDigest: r.SQLDigest,
PlanDigest: r.PlanDigest,
Match: r.Match,
}
if _, exists := recordMap[key]; exists {
recordMap[key].Repeats++
} else {
recordMap[key] = r
}
failpoint.Inject("FastRunawayGC", func() {
flushRunawayRecords()
})
if len(records) >= flushThreshold {
if len(recordMap) >= flushThreshold {
flushRunawayRecords()
} else if fired {
fired = false
Expand Down Expand Up @@ -321,7 +333,7 @@ func (rm *Manager) getWatchFromWatchList(key string) *QuarantineRecord {
return nil
}

func (rm *Manager) markRunaway(resourceGroupName, originalSQL, planDigest, action, matchType string, now *time.Time) {
func (rm *Manager) markRunaway(checker *Checker, action, matchType string, now *time.Time) {
source := rm.serverID
if !rm.syncerInitialized.Load() {
rm.logOnce.Do(func() {
Expand All @@ -331,13 +343,16 @@ func (rm *Manager) markRunaway(resourceGroupName, originalSQL, planDigest, actio
}
select {
case rm.runawayQueriesChan <- &Record{
ResourceGroupName: resourceGroupName,
Time: *now,
ResourceGroupName: checker.resourceGroupName,
StartTime: *now,
Match: matchType,
Action: action,
SQLText: originalSQL,
PlanDigest: planDigest,
SampleText: checker.originalSQL,
SQLDigest: checker.sqlDigest,
PlanDigest: checker.planDigest,
Source: source,
// default value for Repeats
Repeats: 1,
}:
default:
// TODO: add warning for discard flush records
Expand Down
61 changes: 44 additions & 17 deletions pkg/resourcegroup/runaway/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package runaway
import (
"context"
"fmt"
"hash/fnv"
"strings"
"time"

Expand Down Expand Up @@ -52,36 +53,62 @@ var NullTime time.Time
// Record is used to save records which will be inserted into mysql.tidb_runaway_queries.
type Record struct {
ResourceGroupName string
Time time.Time
StartTime time.Time
Match string
Action string
SQLText string
SampleText string
SQLDigest string
PlanDigest string
Source string
// Repeats is used to avoid inserting the same record multiple times.
// It records the number of times after flushing the record(10s) to the table or len(map) exceeds the threshold(1024).
// We only consider `resource_group_name`, `sql_digest`, `plan_digest` and `match_type` when comparing records.
// default value is 1.
Repeats int
}

// recordMap is used to save records which will be inserted into `mysql.tidb_runaway_queries` by function `flushRunawayRecords`.
var recordMap map[recordKey]*Record

// recordKey represents the composite key for record key in `tidb_runaway_queries`.
type recordKey struct {
ResourceGroupName string
SQLDigest string
PlanDigest string
Match string
}

// Hash generates a hash for the recordKey.
// Because `tidb_runaway_queries` is informational and not performance-critical,
// we can lose some accuracy for other component's performance.
func (k recordKey) Hash() uint64 {
h := fnv.New64a()
h.Write([]byte(k.ResourceGroupName))
h.Write([]byte(k.SQLDigest))
h.Write([]byte(k.PlanDigest))
h.Write([]byte(k.Match))
return h.Sum64()
}

// genRunawayQueriesStmt generates statement with given RunawayRecords.
func genRunawayQueriesStmt(records []*Record) (string, []any) {
func genRunawayQueriesStmt(recordMap map[recordKey]*Record) (string, []any) {
var builder strings.Builder
params := make([]any, 0, len(records)*7)
builder.WriteString("insert into mysql.tidb_runaway_queries VALUES ")
for count, r := range records {
if count > 0 {
params := make([]any, 0, len(recordMap)*9)
builder.WriteString("INSERT INTO mysql.tidb_runaway_queries " +
"(resource_group_name, start_time, match_type, action, sample_sql, sql_digest, plan_digest, tidb_server, repeats) VALUES ")
firstRecord := true
for _, r := range recordMap {
if !firstRecord {
builder.WriteByte(',')
}
builder.WriteString("(%?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName)
params = append(params, r.Time)
params = append(params, r.Match)
params = append(params, r.Action)
params = append(params, r.SQLText)
params = append(params, r.PlanDigest)
params = append(params, r.Source)
firstRecord = false
builder.WriteString("(%?, %?, %?, %?, %?, %?, %?, %?, %?)")
params = append(params, r.ResourceGroupName, r.StartTime, r.Match, r.Action, r.SampleText, r.SQLDigest, r.PlanDigest, r.Source, r.Repeats)
}
return builder.String(), params
}

// QuarantineRecord is used to save records which will be insert into mysql.tidb_runaway_watch.
// QuarantineRecord is used to save records which will be inserted into mysql.tidb_runaway_watch.
type QuarantineRecord struct {
ID int64
ResourceGroupName string
Expand Down Expand Up @@ -183,7 +210,7 @@ func (r *QuarantineRecord) genDeletionStmt() (string, []any) {
func (rm *Manager) deleteExpiredRows(expiredDuration time.Duration) {
const (
tableName = "tidb_runaway_queries"
colName = "time"
colName = "start_time"
)
var systemSchemaCIStr = model.NewCIStr("mysql")

Expand Down
81 changes: 81 additions & 0 deletions pkg/resourcegroup/runaway/record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package runaway

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestRecordKey(t *testing.T) {
// Initialize test data
key1 := recordKey{
ResourceGroupName: "group1",
SQLDigest: "digest1",
PlanDigest: "plan1",
}
// key2 is identical to key1
key2 := recordKey{
ResourceGroupName: "group1",
SQLDigest: "digest1",
PlanDigest: "plan1",
}
key3 := recordKey{
ResourceGroupName: "group2",
}
// Test Hash method
hash1 := key1.Hash()
hash2 := key2.Hash()
hash3 := key3.Hash()

assert.Equal(t, hash1, hash2, "Hashes should be equal for identical keys")
assert.NotEqual(t, hash1, hash3, "Hashes should not be equal for different keys")

// Test MapKey method
recordMap = make(map[recordKey]*Record)
record1 := &Record{
ResourceGroupName: "group1",
SQLDigest: "digest1",
PlanDigest: "plan1",
}
// put key1 into recordMap
recordMap[key1] = record1
assert.Len(t, recordMap, 1, "recordMap should have 1 element")
assert.Equal(t, "group1", recordMap[key1].ResourceGroupName, "Repeats should not be updated")
assert.Equal(t, 0, recordMap[key1].Repeats, "Repeats should be incremented")
// key2 is identical to key1, so we can use key2 to get the record
assert.NotNil(t, recordMap[key1], "key1 should exist in recordMap")
assert.NotNil(t, recordMap[key2], "key2 should exist in recordMap")
assert.Nil(t, recordMap[key3], "key3 should not exist in recordMap")

// put key2 into recordMap and update Repeats
record2 := &Record{
ResourceGroupName: "group1",
Repeats: 1,
}
recordMap[key2] = record2
assert.Len(t, recordMap, 1, "recordMap should have 1 element")
assert.Equal(t, 1, recordMap[key1].Repeats, "Repeats should be updated")
// change ResourceGroupName of key2 will not affect key1
key2.ResourceGroupName = "group2"
record3 := &Record{
ResourceGroupName: "group2",
}
recordMap[key2] = record3
assert.Len(t, recordMap, 2, "recordMap should have 1 element")
assert.Equal(t, "group1", recordMap[key1].ResourceGroupName, "Repeats should not be updated")
assert.Equal(t, "group2", recordMap[key2].ResourceGroupName, "ResourceGroupName should be updated")
}
2 changes: 1 addition & 1 deletion pkg/resourcegroup/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ go_test(
srcs = ["resource_group_test.go"],
flaky = True,
race = "on",
shard_count = 6,
shard_count = 7,
deps = [
"//pkg/ddl/resourcegroup",
"//pkg/domain",
Expand Down
Loading

0 comments on commit bb3456b

Please sign in to comment.