diff --git a/pkg/domain/resourcegroup/runaway.go b/pkg/domain/resourcegroup/runaway.go index 8760ca867128b..0e956daab4b71 100644 --- a/pkg/domain/resourcegroup/runaway.go +++ b/pkg/domain/resourcegroup/runaway.go @@ -552,7 +552,6 @@ func (r *RunawayChecker) CheckRuleKillAction() bool { // If the group settings are available and it's not marked by rule, check the execution time. if r.settings != nil && !r.markedByRule.Load() { now := time.Now() - // if _, exceed := r.exceedsThresholds(now, nil, 0); !exceed { return false } @@ -610,7 +609,7 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error { // Take action if needed. switch r.settings.Action { case rmpb.RunawayAction_Kill: - return exeerrors.ErrResourceGroupQueryRunawayInterrupted + return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause) case rmpb.RunawayAction_CoolDown: req.ResourceControlContext.OverridePriority = 1 // set priority to lowest return nil @@ -688,7 +687,7 @@ func (r *RunawayChecker) CheckThresholds(ruDetail *util.RUDetails, processKeys i if !r.markedByRule.Load() { if exceedCause, exceed := r.exceedsThresholds(now, ruDetail, processKeys); exceed { if r.markRunawayByIdentify(r.settings.Action, &now, exceedCause.String()) { - return exeerrors.ErrResourceGroupQueryRunawayInterrupted + return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs(exceedCause) } } } @@ -710,7 +709,7 @@ func (r *RunawayChecker) CheckThresholds(ruDetail *util.RUDetails, processKeys i } // Due to concurrency, check again. if r.markedByRule.Load() { - return exeerrors.ErrResourceGroupQueryRunawayInterrupted + return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs("concurrency") } return nil } @@ -734,11 +733,11 @@ func (t RunawayExceedCause) String() string { case RunawayExceedNone: return "None" case RunawayExceedCauseTime: - return fmt.Sprintf("Execution time(%s) exceed threshold(%s)", t.actualValue, t.thresholdValue) + return fmt.Sprintf("ElapsedTime = %s(%s)", t.actualValue, t.thresholdValue) case RunawayExceedCauseRU: - return fmt.Sprintf("Request Unit(%s) exceed threshold(%s)", t.actualValue, t.thresholdValue) + return fmt.Sprintf("RequestUnit = %d(%d)", t.actualValue, t.thresholdValue) case RunawayExceedCauseProcessKeys: - return fmt.Sprintf("Processed keys(%d) exceed threshold(%d)", t.actualValue, t.thresholdValue) + return fmt.Sprintf("ProcessedKeys = %d(%d)", t.actualValue, t.thresholdValue) default: panic("unknown type") } diff --git a/pkg/errno/errname.go b/pkg/errno/errname.go index aadcbb3dfcbca..854cf1114b771 100644 --- a/pkg/errno/errname.go +++ b/pkg/errno/errname.go @@ -1148,7 +1148,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrResourceGroupSupportDisabled: mysql.Message("Resource control feature is disabled. Run `SET GLOBAL tidb_enable_resource_control='on'` to enable the feature", nil), ErrResourceGroupConfigUnavailable: mysql.Message("Resource group configuration is unavailable", nil), ErrResourceGroupThrottled: mysql.Message("Exceeded resource group quota limitation", nil), - ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query", nil), + ErrResourceGroupQueryRunawayInterrupted: mysql.Message("Query execution was interrupted, identified as runaway query [%s]", nil), ErrResourceGroupQueryRunawayQuarantine: mysql.Message("Quarantined and interrupted because of being in runaway watch list", nil), ErrResourceGroupInvalidBackgroundTaskName: mysql.Message("Unknown background task name '%-.192s'", nil), diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index a5bb2cad170d4..f836242e523de 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -624,7 +624,7 @@ const ( time TIMESTAMP NOT NULL, match_type varchar(12) NOT NULL, action varchar(12) NOT NULL, - check_rule VARCHAR(512) DEFAULT '', + rule VARCHAR(512) DEFAULT '', original_sql TEXT NOT NULL, plan_digest TEXT NOT NULL, tidb_server varchar(512), @@ -642,7 +642,7 @@ const ( watch_text TEXT NOT NULL, source varchar(512) NOT NULL, action bigint(10), - check_rule VARCHAR(512) DEFAULT '', + rule VARCHAR(512) DEFAULT '', INDEX sql_index(resource_group_name,watch_text(700)) COMMENT "accelerate the speed when select quarantined query", INDEX time_index(end_time) COMMENT "accelerate the speed when querying with active watch" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` @@ -658,7 +658,7 @@ const ( watch_text TEXT NOT NULL, source varchar(512) NOT NULL, action bigint(10), - check_rule VARCHAR(512) DEFAULT '', + rule VARCHAR(512) DEFAULT '', done_time TIMESTAMP(6) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` @@ -1114,7 +1114,7 @@ const ( // version211 add column `summary` to `mysql.tidb_background_subtask_history`. version211 = 211 - // version212 add column `check_rule` to `mysql.tidb_runaway_watch`, `mysql.tidb_runaway_watch_done` and `mysql.tidb_runaway_queries`. + // version212 add column `rule` to `mysql.tidb_runaway_watch`, `mysql.tidb_runaway_watch_done` and `mysql.tidb_runaway_queries`. version212 = 212 ) @@ -3091,9 +3091,9 @@ func upgradeToVer212(s sessiontypes.Session, ver int64) { if ver >= version211 { return } - doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch ADD COLUMN `check_rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) - doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch_done ADD COLUMN `check_rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) - doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `check_rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch ADD COLUMN `rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_watch_done ADD COLUMN `rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) + doReentrantDDL(s, "ALTER TABLE mysql.tidb_runaway_queries ADD COLUMN `rule` VARCHAR(512) DEFAULT '' AFTER `action`;", infoschema.ErrColumnExists) } // initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist. diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 083881dcf5c7d..47f97451d289e 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -1281,7 +1281,9 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch err = worker.handleTiDBSendReqErr(err, task, ch) return nil, err } - worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx) + if err = worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx); err != nil { + return nil, err + } return nil, errors.Trace(err) } @@ -1475,16 +1477,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } else if task.ranges != nil && task.ranges.Len() > 0 { resp.startKey = task.ranges.At(0).StartKey } - worker.handleCollectExecutionInfo(bo, rpcCtx, resp) - // check execution info - if worker.req.RunawayChecker != nil { - var ruDetail *util.RUDetails - if ruDetailRaw := bo.GetCtx().Value(util.RUDetailsCtxKey); ruDetailRaw != nil { - ruDetail = ruDetailRaw.(*util.RUDetails) - } - if err := worker.req.RunawayChecker.CheckThresholds(ruDetail, resp.detail.ScanDetail.ProcessedKeys, nil); err != nil { - return nil, err - } + if err := worker.handleCollectExecutionInfo(bo, rpcCtx, resp); err != nil { + return nil, err } resp.respTime = costTime @@ -1612,7 +1606,9 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *t } return nil, errors.Trace(err) } - worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp) + if err := worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp); err != nil { + return nil, err + } worker.sendToRespCh(resp, ch, true) } for _, t := range tasks { @@ -1786,12 +1782,12 @@ func (worker *copIteratorWorker) getLockResolverDetails() *util.ResolveLockDetai return &util.ResolveLockDetail{} } -func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) { +func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) error { defer func() { worker.kvclient.Stats = nil }() if !worker.enableCollectExecutionInfo { - return + return nil } failpoint.Inject("disable-collect-execution", func(val failpoint.Value) { if val.(bool) { @@ -1801,10 +1797,10 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt if resp.detail == nil { resp.detail = new(CopRuntimeStats) } - worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp) + return worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp) } -func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) { +func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) error { copStats.ReqStats = worker.kvclient.Stats backoffTimes := bo.GetBackoffTimes() copStats.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond @@ -1818,7 +1814,7 @@ func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStat copStats.CalleeAddress = rpcCtx.Addr } if resp == nil { - return + return nil } sd := &util.ScanDetail{} td := util.TimeDetail{} @@ -1843,18 +1839,32 @@ func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStat } copStats.ScanDetail = sd copStats.TimeDetail = td + + if worker.req.RunawayChecker != nil { + var ruDetail *util.RUDetails + if ruDetailRaw := bo.GetCtx().Value(util.RUDetailsCtxKey); ruDetailRaw != nil { + ruDetail = ruDetailRaw.(*util.RUDetails) + } + if err := worker.req.RunawayChecker.CheckThresholds(ruDetail, sd.ProcessedKeys, nil); err != nil { + return err + } + } + return nil } -func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) { +func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) error { if worker.kvclient.Stats == nil { - return + return nil } copStats := &CopRuntimeStats{} - worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil) + if err := worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil); err != nil { + return err + } worker.unconsumedStats.Lock() worker.unconsumedStats.stats = append(worker.unconsumedStats.stats, copStats) worker.unconsumedStats.Unlock() worker.kvclient.Stats = nil + return nil } // CopRuntimeStats contains execution detail information. diff --git a/pkg/store/driver/error/error.go b/pkg/store/driver/error/error.go index f1c2afcbb31a9..914b02a2c76a2 100644 --- a/pkg/store/driver/error/error.go +++ b/pkg/store/driver/error/error.go @@ -134,7 +134,7 @@ func ToTiDBErr(err error) error { return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(-1) } if stderrs.Is(err, tikverr.ErrQueryInterruptedWithSignal{Signal: sqlkiller.RunawayQueryExceeded}) { - return exeerrors.ErrResourceGroupQueryRunawayInterrupted.GenWithStackByArgs() + return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs("exceed tidb side") } if stderrs.Is(err, tikverr.ErrTiKVServerBusy) { diff --git a/pkg/util/sqlkiller/sqlkiller.go b/pkg/util/sqlkiller/sqlkiller.go index 1abae2dd3cc24..798661d8ee515 100644 --- a/pkg/util/sqlkiller/sqlkiller.go +++ b/pkg/util/sqlkiller/sqlkiller.go @@ -79,7 +79,7 @@ func (killer *SQLKiller) getKillError(status killSignal) error { case ServerMemoryExceeded: return exeerrors.ErrMemoryExceedForInstance.GenWithStackByArgs(killer.ConnID) case RunawayQueryExceeded: - return exeerrors.ErrResourceGroupQueryRunawayInterrupted.GenWithStackByArgs() + return exeerrors.ErrResourceGroupQueryRunawayInterrupted.FastGenByArgs("exceed tidb side") default: } return nil