Skip to content

Commit

Permalink
session, executor, domain: use the SQL start time to calculate the ru…
Browse files Browse the repository at this point in the history
…naway deadline (#54496)

ref #54434
  • Loading branch information
JmPotato authored Jul 8, 2024
1 parent 0f81cea commit bf50430
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
8 changes: 4 additions & 4 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (rm *RunawayManager) MarkSyncerInitialized() {
}

// DeriveChecker derives a RunawayChecker from the given resource group
func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, planDigest string) *RunawayChecker {
func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDigest, planDigest string, startTime time.Time) *RunawayChecker {
group, err := rm.resourceGroupCtl.GetResourceGroup(resourceGroupName)
if err != nil || group == nil {
logutil.BgLogger().Warn("cannot setup up runaway checker", zap.Error(err))
Expand All @@ -272,7 +272,7 @@ func (rm *RunawayManager) DeriveChecker(resourceGroupName, originalSQL, sqlDiges
rm.metricsMap.Store(resourceGroupName, counter)
}
counter.Inc()
return newRunawayChecker(rm, resourceGroupName, group.RunawaySettings, originalSQL, sqlDigest, planDigest)
return newRunawayChecker(rm, resourceGroupName, group.RunawaySettings, originalSQL, sqlDigest, planDigest, startTime)
}

func (rm *RunawayManager) markQuarantine(resourceGroupName, convict string, watchType rmpb.RunawayWatchType, action rmpb.RunawayAction, ttl time.Duration, now *time.Time) {
Expand Down Expand Up @@ -476,7 +476,7 @@ type RunawayChecker struct {
watchAction rmpb.RunawayAction
}

func newRunawayChecker(manager *RunawayManager, resourceGroupName string, setting *rmpb.RunawaySettings, originalSQL, sqlDigest, planDigest string) *RunawayChecker {
func newRunawayChecker(manager *RunawayManager, resourceGroupName string, setting *rmpb.RunawaySettings, originalSQL, sqlDigest, planDigest string, startTime time.Time) *RunawayChecker {
c := &RunawayChecker{
manager: manager,
resourceGroupName: resourceGroupName,
Expand All @@ -488,7 +488,7 @@ func newRunawayChecker(manager *RunawayManager, resourceGroupName string, settin
markedByWatch: false,
}
if setting != nil {
c.deadline = time.Now().Add(time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond)
c.deadline = startTime.Add(time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond)
}
return c
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,11 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
// must set plan according to the `Execute` plan before getting planDigest
a.inheritContextFromExecuteStmt()
if rm := domain.GetDomain(sctx).RunawayManager(); variable.EnableResourceControl.Load() && rm != nil {
stmtCtx := sctx.GetSessionVars().StmtCtx
sessionVars := sctx.GetSessionVars()
stmtCtx := sessionVars.StmtCtx
_, planDigest := GetPlanDigest(stmtCtx)
_, sqlDigest := stmtCtx.SQLDigest()
stmtCtx.RunawayChecker = rm.DeriveChecker(sctx.GetSessionVars().StmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String())
stmtCtx.RunawayChecker = rm.DeriveChecker(stmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String(), sessionVars.StartTime)
if err := stmtCtx.RunawayChecker.BeforeExecutor(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1404,7 +1405,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
a.updatePrevStmt()
a.recordLastQueryInfo(err)
a.observePhaseDurations(sessVars.InRestrictedSQL, execDetail.CommitDetail)
executeDuration := time.Since(sessVars.StartTime) - sessVars.DurationCompile
executeDuration := sessVars.GetExecuteDuration()
if sessVars.InRestrictedSQL {
executor_metrics.SessionExecuteRunDurationInternal.Observe(executeDuration.Seconds())
} else {
Expand Down Expand Up @@ -1543,7 +1544,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
stmtCtx := sessVars.StmtCtx
level := log.GetLevel()
cfg := config.GetGlobalConfig()
costTime := time.Since(sessVars.StartTime) + sessVars.DurationParse
costTime := sessVars.GetTotalCostDuration()
threshold := time.Duration(atomic.LoadUint64(&cfg.Instance.SlowThreshold)) * time.Millisecond
enable := cfg.Instance.EnableSlowLog.Load()
// if the level is Debug, or trace is enabled, print slow logs anyway
Expand Down Expand Up @@ -1875,7 +1876,7 @@ func (a *ExecStmt) SummaryStmt(succ bool) {
stmtCtx.StmtType = ast.GetStmtLabel(a.StmtNode)
}
normalizedSQL, digest := stmtCtx.SQLDigest()
costTime := time.Since(sessVars.StartTime) + sessVars.DurationParse
costTime := sessVars.GetTotalCostDuration()
charset, collation := sessVars.GetCharsetInfo()

var prevSQL, prevSQLDigest string
Expand Down Expand Up @@ -2116,7 +2117,7 @@ func (a *ExecStmt) observeStmtFinishedForTopSQL() {
}
if stats := a.Ctx.GetStmtStats(); stats != nil && topsqlstate.TopSQLEnabled() {
sqlDigest, planDigest := a.getSQLPlanDigest()
execDuration := time.Since(vars.StartTime) + vars.DurationParse
execDuration := vars.GetTotalCostDuration()
stats.OnExecutionFinished(sqlDigest, planDigest, execDuration)
}
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ type SessionVars struct {
// NoopFuncsMode allows OFF/ON/WARN values as 0/1/2.
NoopFuncsMode int

// StartTime is the start time of the last query.
// StartTime is the start time of the last query. It's set after the query is parsed and before the query is compiled.
StartTime time.Time

// DurationParse is the duration of parsing SQL string to AST of the last query.
Expand Down Expand Up @@ -1833,6 +1833,16 @@ func (s *SessionVars) AllocNewPlanID() int {
return int(s.PlanID.Add(1))
}

// GetTotalCostDuration returns the total cost duration of the last statement in the current session.
func (s *SessionVars) GetTotalCostDuration() time.Duration {
return time.Since(s.StartTime) + s.DurationParse
}

// GetExecuteDuration returns the execute duration of the last statement in the current session.
func (s *SessionVars) GetExecuteDuration() time.Duration {
return time.Since(s.StartTime) - s.DurationCompile
}

const (
// PlacementModeStrict indicates all placement operations should be checked strictly in ddl
PlacementModeStrict string = "STRICT"
Expand Down

0 comments on commit bf50430

Please sign in to comment.