Skip to content

Commit

Permalink
add cause
Browse files Browse the repository at this point in the history
Signed-off-by: husharp <[email protected]>
  • Loading branch information
HuSharp committed Aug 16, 2024
1 parent db7e3f5 commit 3ee3027
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 37 deletions.
13 changes: 6 additions & 7 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,6 @@ func (r *RunawayChecker) CheckRuleKillAction() bool {
// If the group settings are available and it's not marked by rule, check the execution time.
if r.settings != nil && !r.markedByRule.Load() {
now := time.Now()
//
if _, exceed := r.exceedsThresholds(now, nil, 0); !exceed {
return false
}
Expand Down Expand Up @@ -610,7 +609,7 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
// Take action if needed.
switch r.settings.Action {
case rmpb.RunawayAction_Kill:
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause)
case rmpb.RunawayAction_CoolDown:
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
return nil
Expand Down Expand Up @@ -688,7 +687,7 @@ func (r *RunawayChecker) CheckThresholds(ruDetail *util.RUDetails, processKeys i
if !r.markedByRule.Load() {
if exceedCause, exceed := r.exceedsThresholds(now, ruDetail, processKeys); exceed {
if r.markRunawayByIdentify(r.settings.Action, &now, exceedCause.String()) {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause)
}
}
}
Expand All @@ -710,7 +709,7 @@ func (r *RunawayChecker) CheckThresholds(ruDetail *util.RUDetails, processKeys i
}
// Due to concurrency, check again.
if r.markedByRule.Load() {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs("concurrency")
}
return nil
}
Expand All @@ -734,11 +733,11 @@ func (t RunawayExceedCause) String() string {
case RunawayExceedNone:
return "None"
case RunawayExceedCauseTime:
return fmt.Sprintf("Execution time(%s) exceed threshold(%s)", t.actualValue, t.thresholdValue)
return fmt.Sprintf("ElapsedTime = %s(%s)", t.actualValue, t.thresholdValue)
case RunawayExceedCauseRU:
return fmt.Sprintf("Request Unit(%s) exceed threshold(%s)", t.actualValue, t.thresholdValue)
return fmt.Sprintf("RequestUnit = %d(%d)", t.actualValue, t.thresholdValue)
case RunawayExceedCauseProcessKeys:
return fmt.Sprintf("Processed keys(%d) exceed threshold(%d)", t.actualValue, t.thresholdValue)
return fmt.Sprintf("ProcessedKeys = %d(%d)", t.actualValue, t.thresholdValue)
default:
panic("unknown type")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil),
ErrResourceGroupConfigUnavailable: mysql.Message("Resource group configuration is unavailable", nil),
ErrResourceGroupThrottled: mysql.Message("Exceeded resource group quota limitation", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query", nil),
ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query [%s]", nil),
ErrResourceGroupQueryRunawayQuarantine: mysql.Message("Quarantined and interrupted because of being in runaway watch list", nil),
ErrResourceGroupInvalidBackgroundTaskName: mysql.Message("Unknown background task name '%-.192s'", nil),

Expand Down
14 changes: 7 additions & 7 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ const (
time TIMESTAMP NOT NULL,
match_type varchar(12) NOT NULL,
action varchar(12) NOT NULL,
check_rule VARCHAR(512) DEFAULT '',
rule VARCHAR(512) DEFAULT '',
original_sql TEXT NOT NULL,
plan_digest TEXT NOT NULL,
tidb_server varchar(512),
Expand All @@ -642,7 +642,7 @@ const (
watch_text TEXT NOT NULL,
source varchar(512) NOT NULL,
action bigint(10),
check_rule VARCHAR(512) DEFAULT '',
rule VARCHAR(512) DEFAULT '',
INDEX sql_index(resource_group_name,watch_text(700)) COMMENT "accelerate the speed when select quarantined query",
INDEX time_index(end_time) COMMENT "accelerate the speed when querying with active watch"
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`
Expand All @@ -658,7 +658,7 @@ const (
watch_text TEXT NOT NULL,
source varchar(512) NOT NULL,
action bigint(10),
check_rule VARCHAR(512) DEFAULT '',
rule VARCHAR(512) DEFAULT '',
done_time TIMESTAMP(6) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`

Expand Down Expand Up @@ -1114,7 +1114,7 @@ const (
// version211 add column `summary` to `mysql.tidb_background_subtask_history`.
version211 = 211

// version212 add column `check_rule` to `mysql.tidb_runaway_watch`, `mysql.tidb_runaway_watch_done` and `mysql.tidb_runaway_queries`.
// version212 add column `rule` to `mysql.tidb_runaway_watch`, `mysql.tidb_runaway_watch_done` and `mysql.tidb_runaway_queries`.
version212 = 212
)

Expand Down Expand Up @@ -3091,9 +3091,9 @@ func upgradeToVer212(s sessiontypes.Session, ver int64) {
if ver >= version211 {
return
}
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch ADD COLUMN `check_rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch_done ADD COLUMN `check_rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `check_rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch ADD COLUMN `rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch_done ADD COLUMN `rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists)
doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists)
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
Expand Down
50 changes: 30 additions & 20 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,9 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
err = worker.handleTiDBSendReqErr(err, task, ch)
return nil, err
}
worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx)
if err = worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx); err != nil {
return nil, err
}
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -1475,16 +1477,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
} else if task.ranges != nil && task.ranges.Len() > 0 {
resp.startKey = task.ranges.At(0).StartKey
}
worker.handleCollectExecutionInfo(bo, rpcCtx, resp)
// check execution info
if worker.req.RunawayChecker != nil {
var ruDetail *util.RUDetails
if ruDetailRaw := bo.GetCtx().Value(util.RUDetailsCtxKey); ruDetailRaw != nil {
ruDetail = ruDetailRaw.(*util.RUDetails)
}
if err := worker.req.RunawayChecker.CheckThresholds(ruDetail, resp.detail.ScanDetail.ProcessedKeys, nil); err != nil {
return nil, err
}
if err := worker.handleCollectExecutionInfo(bo, rpcCtx, resp); err != nil {
return nil, err
}
resp.respTime = costTime

Expand Down Expand Up @@ -1612,7 +1606,9 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t
}
return nil, errors.Trace(err)
}
worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp)
if err := worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp); err != nil {
return nil, err
}
worker.sendToRespCh(resp, ch, true)
}
for _, t := range tasks {
Expand Down Expand Up @@ -1786,12 +1782,12 @@ func (worker *copIteratorWorker) getLockResolverDetails() *util.ResolveLockDetai
return &util.ResolveLockDetail{}
}

func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) {
func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) error {
defer func() {
worker.kvclient.Stats = nil
}()
if !worker.enableCollectExecutionInfo {
return
return nil
}
failpoint.Inject("disable-collect-execution", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -1801,10 +1797,10 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
}
worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp)
return worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp)
}

func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) {
func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) error {
copStats.ReqStats = worker.kvclient.Stats
backoffTimes := bo.GetBackoffTimes()
copStats.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
Expand All @@ -1818,7 +1814,7 @@ func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStat
copStats.CalleeAddress = rpcCtx.Addr
}
if resp == nil {
return
return nil
}
sd := &util.ScanDetail{}
td := util.TimeDetail{}
Expand All @@ -1843,18 +1839,32 @@ func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStat
}
copStats.ScanDetail = sd
copStats.TimeDetail = td

if worker.req.RunawayChecker != nil {
var ruDetail *util.RUDetails
if ruDetailRaw := bo.GetCtx().Value(util.RUDetailsCtxKey); ruDetailRaw != nil {
ruDetail = ruDetailRaw.(*util.RUDetails)
}
if err := worker.req.RunawayChecker.CheckThresholds(ruDetail, sd.ProcessedKeys, nil); err != nil {
return err
}
}
return nil
}

func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) {
func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) error {
if worker.kvclient.Stats == nil {
return
return nil
}
copStats := &CopRuntimeStats{}
worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil)
if err := worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil); err != nil {
return err
}
worker.unconsumedStats.Lock()
worker.unconsumedStats.stats = append(worker.unconsumedStats.stats, copStats)
worker.unconsumedStats.Unlock()
worker.kvclient.Stats = nil
return nil
}

// CopRuntimeStats contains execution detail information.
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/driver/error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func ToTiDBErr(err error) error {
return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(-1)
}
if stderrs.Is(err, tikverr.ErrQueryInterruptedWithSignal{Signal: sqlkiller.RunawayQueryExceeded}) {
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.GenWithStackByArgs()
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs("exceed tidb side")
}

if stderrs.Is(err, tikverr.ErrTiKVServerBusy) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/sqlkiller/sqlkiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (killer *SQLKiller) getKillError(status killSignal) error {
case ServerMemoryExceeded:
return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(killer.ConnID)
case RunawayQueryExceeded:
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.GenWithStackByArgs()
return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs("exceed tidb side")
default:
}
return nil
Expand Down

0 comments on commit 3ee3027

Please sign in to comment.