Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runaway: Add processed_keys and request_ru to check #55147

Merged
merged 16 commits into from
Sep 24, 2024
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,13 @@ def go_deps():
name = "com_github_benbjohnson_clock",
build_file_proto_mode = "disable_global",
importpath = "github.com/benbjohnson/clock",
sha256 = "b615224e45f86907cfb0acc2b198dacea85ced624ed6c497ca2e7e705a53f2f9",
strip_prefix = "github.com/benbjohnson/[email protected].5",
sha256 = "b710f0d542532ecb521fcaeda6c09977dced8722b05956fecccc97464d3dcee8",
strip_prefix = "github.com/benbjohnson/[email protected].0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"http://ats.apps.svc/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"https://cache.hawkingrei.com/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.5.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
"http://ats.apps.svc/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/benbjohnson/clock/com_github_benbjohnson_clock-v1.3.0.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1948,7 +1948,7 @@ Failed to split region ranges: %s

["executor:8253"]
error = '''
Query execution was interrupted, identified as runaway query
Query execution was interrupted, identified as runaway query [%s]
'''

["executor:8254"]
Expand Down
22 changes: 14 additions & 8 deletions pkg/ddl/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func SetDirectResourceGroupSettings(groupInfo *model.ResourceGroupInfo, opt *ast
case ast.ResourceGroupRunaway:
if len(opt.RunawayOptionList) == 0 {
resourceGroupSettings.Runaway = nil
} else {
resourceGroupSettings.Runaway = &model.ResourceGroupRunawaySettings{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part changed the behavior, for the old code, if resourceGroupSettings.Runaway is not nil, it won't be overwritten in SetDirectResourceGroupRunawayOption, but now it will be reset to an empty new one if len(opt.RunawayOptionList) > 0. Will this be a problem?

Copy link
Contributor Author

@HuSharp HuSharp Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, fixed. Yes, the new behavior has been discussed with PM.

}
for _, opt := range opt.RunawayOptionList {
if err := SetDirectResourceGroupRunawayOption(resourceGroupSettings, opt); err != nil {
Expand Down Expand Up @@ -260,18 +262,22 @@ func SetDirectResourceGroupRUSecondOption(resourceGroupSettings *model.ResourceG

// SetDirectResourceGroupRunawayOption tries to set runaway part of the ResourceGroupSettings.
func SetDirectResourceGroupRunawayOption(resourceGroupSettings *model.ResourceGroupSettings, opt *ast.ResourceGroupRunawayOption) error {
if resourceGroupSettings.Runaway == nil {
resourceGroupSettings.Runaway = &model.ResourceGroupRunawaySettings{}
}
settings := resourceGroupSettings.Runaway
switch opt.Tp {
case pmodel.RunawayRule:
// because execute time won't be too long, we use `time` pkg which does not support to parse unit 'd'.
dur, err := time.ParseDuration(opt.RuleOption.ExecElapsed)
if err != nil {
return err
switch opt.RuleOption.Tp {
case ast.RunawayRuleExecElapsed:
// because execute time won't be too long, we use `time` pkg which does not support to parse unit 'd'.
dur, err := time.ParseDuration(opt.RuleOption.ExecElapsed)
if err != nil {
return err
}
settings.ExecElapsedTimeMs = uint64(dur.Milliseconds())
case ast.RunawayRuleProcessedKeys:
settings.ProcessedKeys = opt.RuleOption.ProcessedKeys
case ast.RunawayRuleRequestUnit:
settings.RequestUnit = opt.RuleOption.RequestUnit
}
settings.ExecElapsedTimeMs = uint64(dur.Milliseconds())
case pmodel.RunawayAction:
settings.Action = opt.ActionOption.Type
settings.SwitchGroupName = opt.ActionOption.SwitchGroupName.String()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/resourcegroup/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ var (
ErrUnknownResourceGroupMode = errors.New("unknown resource group mode")
// ErrDroppingInternalResourceGroup is from group.go
ErrDroppingInternalResourceGroup = errors.New("can't drop reserved resource group")
// ErrInvalidResourceGroupRunawayExecElapsedTime is from group.go.
ErrInvalidResourceGroupRunawayExecElapsedTime = errors.New("invalid exec elapsed time")
// ErrResourceGroupRunawayRuleIsEmpty is from group.go.
ErrResourceGroupRunawayRuleIsEmpty = errors.New("please set at least one field(exec_elapsed_time_ms, processed_keys, ru)")
// ErrUnknownResourceGroupRunawayAction is from group.go.
ErrUnknownResourceGroupRunawayAction = errors.New("unknown resource group runaway action")
// ErrUnknownResourceGroupRunawaySwitchGroupName is from group.go.
Expand Down
11 changes: 6 additions & 5 deletions pkg/ddl/resourcegroup/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@ func NewGroupFromOptions(groupName string, options *model.ResourceGroupSettings)

group.Priority = uint32(options.Priority)
if options.Runaway != nil {
if options.Runaway.ExecElapsedTimeMs == 0 && options.Runaway.ProcessedKeys == 0 && options.Runaway.RequestUnit == 0 {
return nil, ErrResourceGroupRunawayRuleIsEmpty
}
runaway := &rmpb.RunawaySettings{
Rule: &rmpb.RunawayRule{},
}

// Update the rule settings.
if options.Runaway.ExecElapsedTimeMs == 0 {
return nil, ErrInvalidResourceGroupRunawayExecElapsedTime
}
runaway.Rule.ExecElapsedTimeMs = options.Runaway.ExecElapsedTimeMs
runaway.Rule.ProcessedKeys = options.Runaway.ProcessedKeys
runaway.Rule.RequestUnit = options.Runaway.RequestUnit
// Update the action settings.
if options.Runaway.Action == pmodel.RunawayActionNone {
return nil, ErrUnknownResourceGroupRunawayAction
}
// Update the action settings.
runaway.Action = rmpb.RunawayAction(options.Runaway.Action)
if options.Runaway.Action == pmodel.RunawayActionSwitchGroup && len(options.Runaway.SwitchGroupName) == 0 {
return nil, ErrUnknownResourceGroupRunawaySwitchGroupName
Expand Down
2 changes: 1 addition & 1 deletion pkg/errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil),
ErrResourceGroupConfigUnavailable: mysql.Message("Resource group configuration is unavailable", nil),
ErrResourceGroupThrottled: mysql.Message("Exceeded resource group quota limitation", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query [%s]", nil),
ErrResourceGroupQueryRunawayQuarantine: mysql.Message("Quarantined and interrupted because of being in runaway watch list", nil),
ErrResourceGroupInvalidBackgroundTaskName: mysql.Message("Unknown background task name '%-.192s'", nil),

Expand Down
38 changes: 1 addition & 37 deletions pkg/executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package executor

import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -138,7 +137,7 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
ruDetailsRaw := ctx.Value(clientutil.RUDetailsCtxKey)
if coll := e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl; coll != nil && ruDetailsRaw != nil {
ruDetails := ruDetailsRaw.(*clientutil.RUDetails)
coll.RegisterStats(e.explain.TargetPlan.ID(), &ruRuntimeStats{ruDetails})
coll.RegisterStats(e.explain.TargetPlan.ID(), &execdetails.RURuntimeStats{RUDetails: ruDetails})
}
}
return err
Expand Down Expand Up @@ -318,38 +317,3 @@ func getHeapProfile() (fileName string, err error) {
}
return fileName, nil
}

// ruRuntimeStats is a wrapper of clientutil.RUDetails,
// which implements the RuntimeStats interface.
type ruRuntimeStats struct {
*clientutil.RUDetails
}

// String implements the RuntimeStats interface.
func (e *ruRuntimeStats) String() string {
if e.RUDetails != nil {
return fmt.Sprintf("RU:%f", e.RRU()+e.WRU())
}
return ""
}

// Clone implements the RuntimeStats interface.
func (e *ruRuntimeStats) Clone() execdetails.RuntimeStats {
return &ruRuntimeStats{RUDetails: e.RUDetails.Clone()}
}

// Merge implements the RuntimeStats interface.
func (e *ruRuntimeStats) Merge(other execdetails.RuntimeStats) {
if tmp, ok := other.(*ruRuntimeStats); ok {
if e.RUDetails != nil {
e.RUDetails.Merge(tmp.RUDetails)
} else {
e.RUDetails = tmp.RUDetails.Clone()
}
}
}

// Tp implements the RuntimeStats interface.
func (*ruRuntimeStats) Tp() int {
return execdetails.TpRURuntimeStats
}
24 changes: 22 additions & 2 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3655,6 +3655,7 @@ func (e *memtableRetriever) setDataFromRunawayWatches(sctx sessionctx.Context) e
watch.WatchText,
watch.Source,
watch.GetActionString(),
watch.GetExceedCause(),
)
if watch.EndTime.Equal(runaway.NullTime) {
row[3].SetString("UNLIMITED", mysql.DefaultCollationName)
Expand Down Expand Up @@ -3694,8 +3695,27 @@ func (e *memtableRetriever) setDataFromResourceGroups() error {
if setting.Rule == nil {
return errors.Errorf("unexpected runaway config in resource group")
}
dur := time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond
fmt.Fprintf(limitBuilder, "EXEC_ELAPSED='%s'", dur.String())
// rule settings
firstParam := true
if setting.Rule.ExecElapsedTimeMs > 0 {
dur := time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond
fmt.Fprintf(limitBuilder, "EXEC_ELAPSED='%s'", dur.String())
firstParam = false
}
if setting.Rule.ProcessedKeys > 0 {
if !firstParam {
fmt.Fprintf(limitBuilder, ", ")
}
fmt.Fprintf(limitBuilder, "PROCESSED_KEYS=%d", setting.Rule.ProcessedKeys)
firstParam = false
}
if setting.Rule.RequestUnit > 0 {
if !firstParam {
fmt.Fprintf(limitBuilder, ", ")
}
fmt.Fprintf(limitBuilder, "RU=%d", setting.Rule.RequestUnit)
}
// action settings
actionType := pmodel.RunawayActionType(setting.Action)
switch actionType {
case pmodel.RunawayActionDryRun, pmodel.RunawayActionCooldown, pmodel.RunawayActionKill:
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func TestColumnTable(t *testing.T) {
testkit.RowsWithSep("|",
"test|tbl1|col_2"))
tk.MustQuery(`select count(*) from information_schema.columns;`).Check(
testkit.RowsWithSep("|", "4961"))
testkit.RowsWithSep("|", "4965"))
}

func TestIndexUsageTable(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/executor/internal/querywatch/query_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,10 @@ func setWatchOption(ctx context.Context,
func fromQueryWatchOptionList(ctx context.Context, sctx, newSctx sessionctx.Context,
optionList []*ast.QueryWatchOption) (*runaway.QuarantineRecord, error) {
record := &runaway.QuarantineRecord{
Source: runaway.ManualSource,
StartTime: time.Now().UTC(),
EndTime: runaway.NullTime,
Source: runaway.ManualSource,
StartTime: time.Now().UTC(),
EndTime: runaway.NullTime,
ExceedCause: "None",
}
for _, op := range optionList {
if err := setWatchOption(ctx, sctx, newSctx, record, op); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ var tableRunawayWatchListCols = []columnInfo{
{name: "WATCH_TEXT", tp: mysql.TypeBlob, size: types.UnspecifiedLength, flag: mysql.NotNullFlag},
{name: "SOURCE", tp: mysql.TypeVarchar, size: 128, flag: mysql.NotNullFlag},
{name: "ACTION", tp: mysql.TypeVarchar, size: 12, flag: mysql.NotNullFlag},
{name: "RULE", tp: mysql.TypeVarchar, size: 128, flag: mysql.NotNullFlag},
}

// information_schema.CHECK_CONSTRAINTS
Expand Down
24 changes: 23 additions & 1 deletion pkg/meta/model/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// ResourceGroupRunawaySettings is the runaway settings of the resource group
type ResourceGroupRunawaySettings struct {
ExecElapsedTimeMs uint64 `json:"exec_elapsed_time_ms"`
ProcessedKeys int64 `json:"processed_keys"`
RequestUnit int64 `json:"request_unit"`
Action model.RunawayActionType `json:"action"`
SwitchGroupName string `json:"switch_group_name"`
WatchType model.RunawayWatchType `json:"watch_type"`
Expand Down Expand Up @@ -84,7 +86,27 @@ func (p *ResourceGroupSettings) String() string {
writeSettingItemToBuilder(sb, "BURSTABLE", separatorFn)
}
if p.Runaway != nil {
writeSettingDurationToBuilder(sb, "QUERY_LIMIT=(EXEC_ELAPSED", time.Duration(p.Runaway.ExecElapsedTimeMs)*time.Millisecond, separatorFn)
fmt.Fprintf(sb, ", QUERY_LIMIT=(")
// rule settings
firstParam := true
if p.Runaway.ExecElapsedTimeMs > 0 {
fmt.Fprintf(sb, "EXEC_ELAPSED=\"%s\"", (time.Duration(p.Runaway.ExecElapsedTimeMs) * time.Millisecond).String())
firstParam = false
}
if p.Runaway.ProcessedKeys > 0 {
if !firstParam {
sb.WriteString(" ")
}
fmt.Fprintf(sb, "PROCESSED_KEYS=%d", p.Runaway.ProcessedKeys)
firstParam = false
}
if p.Runaway.RequestUnit > 0 {
if !firstParam {
sb.WriteString(" ")
}
fmt.Fprintf(sb, "RU=%d", p.Runaway.RequestUnit)
}
// action settings
if p.Runaway.Action == model.RunawayActionSwitchGroup {
writeSettingItemToBuilder(sb, fmt.Sprintf("ACTION=%s(%s)", p.Runaway.Action.String(), p.Runaway.SwitchGroupName))
} else {
Expand Down
34 changes: 30 additions & 4 deletions pkg/parser/ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2326,13 +2326,35 @@ func (n *ResourceGroupRunawayOption) Restore(ctx *format.RestoreCtx) error {

// ResourceGroupRunawayRuleOption is used for parsing the resource group/query watch runaway rule.
type ResourceGroupRunawayRuleOption struct {
ExecElapsed string
Tp RunawayRuleOptionType
ExecElapsed string
ProcessedKeys int64
RequestUnit int64
}

type RunawayRuleOptionType int

const (
RunawayRuleExecElapsed RunawayRuleOptionType = iota
RunawayRuleProcessedKeys
RunawayRuleRequestUnit
)

func (n *ResourceGroupRunawayRuleOption) restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("EXEC_ELAPSED ")
ctx.WritePlain("= ")
ctx.WriteString(n.ExecElapsed)
switch n.Tp {
case RunawayRuleExecElapsed:
ctx.WriteKeyWord("EXEC_ELAPSED ")
ctx.WritePlain("= ")
ctx.WriteString(n.ExecElapsed)
case RunawayRuleProcessedKeys:
ctx.WriteKeyWord("PROCESSED_KEYS ")
ctx.WritePlain("= ")
ctx.WritePlainf("%d", n.ProcessedKeys)
case RunawayRuleRequestUnit:
ctx.WriteKeyWord("RU ")
ctx.WritePlain("= ")
ctx.WritePlainf("%d", n.RequestUnit)
}
return nil
}

Expand Down Expand Up @@ -4777,6 +4799,10 @@ func CheckAppend(ops []*ResourceGroupOption, newOp *ResourceGroupOption) bool {
func CheckRunawayAppend(ops []*ResourceGroupRunawayOption, newOp *ResourceGroupRunawayOption) bool {
for _, op := range ops {
if op.Tp == newOp.Tp {
// support multiple runaway rules.
if op.Tp == model.RunawayRule {
continue
}
return false
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/parser/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/goleak v1.2.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3
golang.org/x/text v0.14.0
modernc.org/parser v1.1.0
modernc.org/y v1.1.0
)

require (
github.com/benbjohnson/clock v1.3.5 // indirect
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/tools v0.17.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
modernc.org/golex v1.1.0 // indirect
Expand Down
Loading