Skip to content

Commit

Permalink
runaway: Add processed_keys and request_ru to check (#55147)
Browse files Browse the repository at this point in the history
ref #54434
  • Loading branch information
HuSharp authored Sep 24, 2024
1 parent e87affb commit 19caf52
Show file tree
Hide file tree
Showing 35 changed files with 11,507 additions and 11,234 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,13 @@ def go_deps():
name = "com_github_benbjohnson_clock",
build_file_proto_mode = "disable_global",
importpath = "github.com/benbjohnson/clock",
sha256 = "b615224e45f86907cfb0acc2b198dacea85ced624ed6c497ca2e7e705a53f2f9",
strip_prefix = "github.com/benbjohnson/[email protected].5",
sha256 = "b710f0d542532ecb521fcaeda6c09977dced8722b05956fecccc97464d3dcee8",
strip_prefix = "github.com/benbjohnson/[email protected].0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"http://ats.apps.svc/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"https://cache.hawkingrei.com/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
"http://ats.apps.svc/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1948,7 +1948,7 @@ Failed to split region ranges: %s

["executor:8253"]
error = '''
Query execution was interrupted, identified as runaway query
Query execution was interrupted, identified as runaway query [%s]
'''

["executor:8254"]
Expand Down
22 changes: 14 additions & 8 deletions pkg/ddl/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func SetDirectResourceGroupSettings(groupInfo *model.ResourceGroupInfo, opt *ast
case ast.ResourceGroupRunaway:
if len(opt.RunawayOptionList) == 0 {
resourceGroupSettings.Runaway = nil
} else {
resourceGroupSettings.Runaway = &model.ResourceGroupRunawaySettings{}
}
for _, opt := range opt.RunawayOptionList {
if err := SetDirectResourceGroupRunawayOption(resourceGroupSettings, opt); err != nil {
Expand Down Expand Up @@ -260,18 +262,22 @@ func SetDirectResourceGroupRUSecondOption(resourceGroupSettings *model.ResourceG

// SetDirectResourceGroupRunawayOption tries to set runaway part of the ResourceGroupSettings.
func SetDirectResourceGroupRunawayOption(resourceGroupSettings *model.ResourceGroupSettings, opt *ast.ResourceGroupRunawayOption) error {
if resourceGroupSettings.Runaway == nil {
resourceGroupSettings.Runaway = &model.ResourceGroupRunawaySettings{}
}
settings := resourceGroupSettings.Runaway
switch opt.Tp {
case pmodel.RunawayRule:
// because execute time won't be too long, we use `time` pkg which does not support to parse unit 'd'.
dur, err := time.ParseDuration(opt.RuleOption.ExecElapsed)
if err != nil {
return err
switch opt.RuleOption.Tp {
case ast.RunawayRuleExecElapsed:
// because execute time won't be too long, we use `time` pkg which does not support to parse unit 'd'.
dur, err := time.ParseDuration(opt.RuleOption.ExecElapsed)
if err != nil {
return err
}
settings.ExecElapsedTimeMs = uint64(dur.Milliseconds())
case ast.RunawayRuleProcessedKeys:
settings.ProcessedKeys = opt.RuleOption.ProcessedKeys
case ast.RunawayRuleRequestUnit:
settings.RequestUnit = opt.RuleOption.RequestUnit
}
settings.ExecElapsedTimeMs = uint64(dur.Milliseconds())
case pmodel.RunawayAction:
settings.Action = opt.ActionOption.Type
settings.SwitchGroupName = opt.ActionOption.SwitchGroupName.String()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/resourcegroup/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ var (
ErrUnknownResourceGroupMode = errors.New("unknown resource group mode")
// ErrDroppingInternalResourceGroup is from group.go
ErrDroppingInternalResourceGroup = errors.New("can't drop reserved resource group")
// ErrInvalidResourceGroupRunawayExecElapsedTime is from group.go.
ErrInvalidResourceGroupRunawayExecElapsedTime = errors.New("invalid exec elapsed time")
// ErrResourceGroupRunawayRuleIsEmpty is from group.go.
ErrResourceGroupRunawayRuleIsEmpty = errors.New("please set at least one field(exec_elapsed_time_ms, processed_keys, ru)")
// ErrUnknownResourceGroupRunawayAction is from group.go.
ErrUnknownResourceGroupRunawayAction = errors.New("unknown resource group runaway action")
// ErrUnknownResourceGroupRunawaySwitchGroupName is from group.go.
Expand Down
11 changes: 6 additions & 5 deletions pkg/ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)

group.Priority = uint32(options.Priority)
if options.Runaway != nil {
if options.Runaway.ExecElapsedTimeMs == 0 && options.Runaway.ProcessedKeys == 0 && options.Runaway.RequestUnit == 0 {
return nil, ErrResourceGroupRunawayRuleIsEmpty
}
runaway := &rmpb.RunawaySettings{
Rule: &rmpb.RunawayRule{},
}

// Update the rule settings.
if options.Runaway.ExecElapsedTimeMs == 0 {
return nil, ErrInvalidResourceGroupRunawayExecElapsedTime
}
runaway.Rule.ExecElapsedTimeMs = options.Runaway.ExecElapsedTimeMs
runaway.Rule.ProcessedKeys = options.Runaway.ProcessedKeys
runaway.Rule.RequestUnit = options.Runaway.RequestUnit
// Update the action settings.
if options.Runaway.Action == pmodel.RunawayActionNone {
return nil, ErrUnknownResourceGroupRunawayAction
}
// Update the action settings.
runaway.Action = rmpb.RunawayAction(options.Runaway.Action)
if options.Runaway.Action == pmodel.RunawayActionSwitchGroup && len(options.Runaway.SwitchGroupName) == 0 {
return nil, ErrUnknownResourceGroupRunawaySwitchGroupName
Expand Down
2 changes: 1 addition & 1 deletion pkg/errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil),
ErrResourceGroupConfigUnavailable: mysql.Message("Resource group configuration is unavailable", nil),
ErrResourceGroupThrottled: mysql.Message("Exceeded resource group quota limitation", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query [%s]", nil),
ErrResourceGroupQueryRunawayQuarantine: mysql.Message("Quarantined and interrupted because of being in runaway watch list", nil),
ErrResourceGroupInvalidBackgroundTaskName: mysql.Message("Unknown background task name '%-.192s'", nil),

Expand Down
38 changes: 1 addition & 37 deletions pkg/executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package executor

import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -138,7 +137,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey)
if coll := e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl; coll != nil && ruDetailsRaw != nil {
ruDetails := ruDetailsRaw.(*clientutil.RUDetails)
coll.RegisterStats(e.explain.TargetPlan.ID(), &ruRuntimeStats{ruDetails})
coll.RegisterStats(e.explain.TargetPlan.ID(), &execdetails.RURuntimeStats{RUDetails: ruDetails})
}
}
return err
Expand Down Expand Up @@ -318,38 +317,3 @@ func getHeapProfile() (fileName string, err error) {
}
return fileName, nil
}

// ruRuntimeStats is a wrapper of clientutil.RUDetails,
// which implements the RuntimeStats interface.
type ruRuntimeStats struct {
*clientutil.RUDetails
}

// String implements the RuntimeStats interface.
func (e *ruRuntimeStats) String() string {
if e.RUDetails != nil {
return fmt.Sprintf("RU:%f", e.RRU()+e.WRU())
}
return ""
}

// Clone implements the RuntimeStats interface.
func (e *ruRuntimeStats) Clone() execdetails.RuntimeStats {
return &ruRuntimeStats{RUDetails: e.RUDetails.Clone()}
}

// Merge implements the RuntimeStats interface.
func (e *ruRuntimeStats) Merge(other execdetails.RuntimeStats) {
if tmp, ok := other.(*ruRuntimeStats); ok {
if e.RUDetails != nil {
e.RUDetails.Merge(tmp.RUDetails)
} else {
e.RUDetails = tmp.RUDetails.Clone()
}
}
}

// Tp implements the RuntimeStats interface.
func (*ruRuntimeStats) Tp() int {
return execdetails.TpRURuntimeStats
}
24 changes: 22 additions & 2 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3655,6 +3655,7 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
watch.WatchText,
watch.Source,
watch.GetActionString(),
watch.GetExceedCause(),
)
if watch.EndTime.Equal(runaway.NullTime) {
row[3].SetString("UNLIMITED", mysql.DefaultCollationName)
Expand Down Expand Up @@ -3694,8 +3695,27 @@ func (e *memtableRetriever) setDataFromResourceGroups() error {
if setting.Rule == nil {
return errors.Errorf("unexpected runaway config in resource group")
}
dur := time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond
fmt.Fprintf(limitBuilder, "EXEC_ELAPSED='%s'", dur.String())
// rule settings
firstParam := true
if setting.Rule.ExecElapsedTimeMs > 0 {
dur := time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond
fmt.Fprintf(limitBuilder, "EXEC_ELAPSED='%s'", dur.String())
firstParam = false
}
if setting.Rule.ProcessedKeys > 0 {
if !firstParam {
fmt.Fprintf(limitBuilder, ", ")
}
fmt.Fprintf(limitBuilder, "PROCESSED_KEYS=%d", setting.Rule.ProcessedKeys)
firstParam = false
}
if setting.Rule.RequestUnit > 0 {
if !firstParam {
fmt.Fprintf(limitBuilder, ", ")
}
fmt.Fprintf(limitBuilder, "RU=%d", setting.Rule.RequestUnit)
}
// action settings
actionType := pmodel.RunawayActionType(setting.Action)
switch actionType {
case pmodel.RunawayActionDryRun, pmodel.RunawayActionCooldown, pmodel.RunawayActionKill:
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func TestColumnTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|tbl1|col_2"))
tk.MustQuery(`select count(*) from information_schema.columns;`).Check(
testkit.RowsWithSep("|", "4961"))
testkit.RowsWithSep("|", "4965"))
}

func TestIndexUsageTable(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/internal/querywatch/query_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func setWatchOption(ctx context.Context,
func fromQueryWatchOptionList(ctx context.Context, sctx, newSctx sessionctx.Context,
optionList []*ast.QueryWatchOption) (*runaway.QuarantineRecord, error) {
record := &runaway.QuarantineRecord{
Source: runaway.ManualSource,
StartTime: time.Now().UTC(),
EndTime: runaway.NullTime,
Source: runaway.ManualSource,
StartTime: time.Now().UTC(),
EndTime: runaway.NullTime,
ExceedCause: "None",
}
for _, op := range optionList {
if err := setWatchOption(ctx, sctx, newSctx, record, op); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ var tableRunawayWatchListCols = []columnInfo{
{name: "WATCH_TEXT", tp: mysql.TypeBlob, size: types.UnspecifiedLength, flag: mysql.NotNullFlag},
{name: "SOURCE", tp: mysql.TypeVarchar, size: 128, flag: mysql.NotNullFlag},
{name: "ACTION", tp: mysql.TypeVarchar, size: 12, flag: mysql.NotNullFlag},
{name: "RULE", tp: mysql.TypeVarchar, size: 128, flag: mysql.NotNullFlag},
}

// information_schema.CHECK_CONSTRAINTS
Expand Down
24 changes: 23 additions & 1 deletion pkg/meta/model/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// ResourceGroupRunawaySettings is the runaway settings of the resource group
type ResourceGroupRunawaySettings struct {
ExecElapsedTimeMs uint64 `json:"exec_elapsed_time_ms"`
ProcessedKeys int64 `json:"processed_keys"`
RequestUnit int64 `json:"request_unit"`
Action model.RunawayActionType `json:"action"`
SwitchGroupName string `json:"switch_group_name"`
WatchType model.RunawayWatchType `json:"watch_type"`
Expand Down Expand Up @@ -84,7 +86,27 @@ func (p *ResourceGroupSettings) String() string {
writeSettingItemToBuilder(sb, "BURSTABLE", separatorFn)
}
if p.Runaway != nil {
writeSettingDurationToBuilder(sb, "QUERY_LIMIT=(EXEC_ELAPSED", time.Duration(p.Runaway.ExecElapsedTimeMs)*time.Millisecond, separatorFn)
fmt.Fprintf(sb, ", QUERY_LIMIT=(")
// rule settings
firstParam := true
if p.Runaway.ExecElapsedTimeMs > 0 {
fmt.Fprintf(sb, "EXEC_ELAPSED=\"%s\"", (time.Duration(p.Runaway.ExecElapsedTimeMs) * time.Millisecond).String())
firstParam = false
}
if p.Runaway.ProcessedKeys > 0 {
if !firstParam {
sb.WriteString(" ")
}
fmt.Fprintf(sb, "PROCESSED_KEYS=%d", p.Runaway.ProcessedKeys)
firstParam = false
}
if p.Runaway.RequestUnit > 0 {
if !firstParam {
sb.WriteString(" ")
}
fmt.Fprintf(sb, "RU=%d", p.Runaway.RequestUnit)
}
// action settings
if p.Runaway.Action == model.RunawayActionSwitchGroup {
writeSettingItemToBuilder(sb, fmt.Sprintf("ACTION=%s(%s)", p.Runaway.Action.String(), p.Runaway.SwitchGroupName))
} else {
Expand Down
34 changes: 30 additions & 4 deletions pkg/parser/ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2326,13 +2326,35 @@ func (n *ResourceGroupRunawayOption) Restore(ctx *format.RestoreCtx) error {

// ResourceGroupRunawayRuleOption is used for parsing the resource group/query watch runaway rule.
type ResourceGroupRunawayRuleOption struct {
ExecElapsed string
Tp RunawayRuleOptionType
ExecElapsed string
ProcessedKeys int64
RequestUnit int64
}

type RunawayRuleOptionType int

const (
RunawayRuleExecElapsed RunawayRuleOptionType = iota
RunawayRuleProcessedKeys
RunawayRuleRequestUnit
)

func (n *ResourceGroupRunawayRuleOption) restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("EXEC_ELAPSED ")
ctx.WritePlain("= ")
ctx.WriteString(n.ExecElapsed)
switch n.Tp {
case RunawayRuleExecElapsed:
ctx.WriteKeyWord("EXEC_ELAPSED ")
ctx.WritePlain("= ")
ctx.WriteString(n.ExecElapsed)
case RunawayRuleProcessedKeys:
ctx.WriteKeyWord("PROCESSED_KEYS ")
ctx.WritePlain("= ")
ctx.WritePlainf("%d", n.ProcessedKeys)
case RunawayRuleRequestUnit:
ctx.WriteKeyWord("RU ")
ctx.WritePlain("= ")
ctx.WritePlainf("%d", n.RequestUnit)
}
return nil
}

Expand Down Expand Up @@ -4777,6 +4799,10 @@ func CheckAppend(ops []*ResourceGroupOption, newOp *ResourceGroupOption) bool {
func CheckRunawayAppend(ops []*ResourceGroupRunawayOption, newOp *ResourceGroupRunawayOption) bool {
for _, op := range ops {
if op.Tp == newOp.Tp {
// support multiple runaway rules.
if op.Tp == model.RunawayRule {
continue
}
return false
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/parser/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/goleak v1.2.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3
golang.org/x/text v0.14.0
modernc.org/parser v1.1.0
modernc.org/y v1.1.0
)

require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/tools v0.17.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/golex v1.1.0 // indirect
Expand Down
Loading

0 comments on commit 19caf52

Please sign in to comment.