Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: refine the runaway code and fix typos #54435

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,9 +1064,8 @@ func (do *Domain) Close() {
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
}
if rm := do.RunawayManager(); rm != nil {
rm.Stop()
}

do.runawayManager.Stop()

if do.unprefixedEtcdCli != nil {
terror.Log(errors.Trace(do.unprefixedEtcdCli.Close()))
Expand Down
7 changes: 5 additions & 2 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (rm *RunawayManager) addWatchList(record *QuarantineRecord, ttl time.Durati
} else {
if item == nil {
rm.queryLock.Lock()
// When watchlist get record, it will check whether the record is stale, so add new record if returns nil.
// When watchList get record, it will check whether the record is stale, so add new record if returns nil.
if rm.watchList.Get(key) == nil {
rm.watchList.Set(key, record, ttl)
} else {
Expand All @@ -340,7 +340,7 @@ func (rm *RunawayManager) addWatchList(record *QuarantineRecord, ttl time.Durati
defer rm.queryLock.Unlock()
rm.watchList.Set(key, record, ttl)
} else if item.ID != record.ID {
// check the ID because of the eariler scan.
// check the ID because of the earlier scan.
rm.staleQuarantineRecord <- record
}
}
Expand Down Expand Up @@ -452,6 +452,9 @@ func (rm *RunawayManager) examineWatchList(resourceGroupName string, convict str

// Stop stops the watchList which is a ttlcache.
func (rm *RunawayManager) Stop() {
if rm == nil {
return
}
if rm.watchList != nil {
rm.watchList.Stop()
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/domain/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,21 +242,21 @@ func (do *Domain) runawayRecordFlushLoop() {

// this times is used to batch flushing records, with 1s duration,
// we can guarantee a watch record can be seen by the user within 1s.
runawayRecordFluashTimer := time.NewTimer(runawayRecordFlushInterval)
runawayRecordFlushTimer := time.NewTimer(runawayRecordFlushInterval)
runawayRecordGCTicker := time.NewTicker(runawayRecordGCInterval)
failpoint.Inject("FastRunawayGC", func() {
runawayRecordFluashTimer.Stop()
runawayRecordFlushTimer.Stop()
runawayRecordGCTicker.Stop()
runawayRecordFluashTimer = time.NewTimer(time.Millisecond * 50)
runawayRecordFlushTimer = time.NewTimer(time.Millisecond * 50)
runawayRecordGCTicker = time.NewTicker(time.Millisecond * 200)
})

fired := false
recordCh := do.RunawayManager().RunawayRecordChan()
quarantineRecordCh := do.RunawayManager().QuarantineRecordChan()
staleQuarantineRecordCh := do.RunawayManager().StaleQuarantineRecordChan()
flushThrehold := do.runawayManager.FlushThreshold()
records := make([]*resourcegroup.RunawayRecord, 0, flushThrehold)
recordCh := do.runawayManager.RunawayRecordChan()
quarantineRecordCh := do.runawayManager.QuarantineRecordChan()
staleQuarantineRecordCh := do.runawayManager.StaleQuarantineRecordChan()
flushThreshold := do.runawayManager.FlushThreshold()
records := make([]*resourcegroup.RunawayRecord, 0, flushThreshold)

flushRunawayRecords := func() {
if len(records) == 0 {
Expand All @@ -273,20 +273,20 @@ func (do *Domain) runawayRecordFlushLoop() {
select {
case <-do.exit:
return
case <-runawayRecordFluashTimer.C:
case <-runawayRecordFlushTimer.C:
flushRunawayRecords()
fired = true
case r := <-recordCh:
records = append(records, r)
failpoint.Inject("FastRunawayGC", func() {
flushRunawayRecords()
})
if len(records) >= flushThrehold {
if len(records) >= flushThreshold {
flushRunawayRecords()
} else if fired {
fired = false
// meet a new record, reset the timer.
runawayRecordFluashTimer.Reset(runawayRecordFlushInterval)
runawayRecordFlushTimer.Reset(runawayRecordFlushInterval)
}
case <-runawayRecordGCTicker.C:
go do.deleteExpiredRows("tidb_runaway_queries", "time", runawayRecordExpiredDuration)
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,11 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {

// must set plan according to the `Execute` plan before getting planDigest
a.inheritContextFromExecuteStmt()
if variable.EnableResourceControl.Load() && domain.GetDomain(sctx).RunawayManager() != nil {
if rm := domain.GetDomain(sctx).RunawayManager(); variable.EnableResourceControl.Load() && rm != nil {
stmtCtx := sctx.GetSessionVars().StmtCtx
_, planDigest := GetPlanDigest(stmtCtx)
_, sqlDigest := stmtCtx.SQLDigest()
stmtCtx.RunawayChecker = domain.GetDomain(sctx).RunawayManager().DeriveChecker(sctx.GetSessionVars().StmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String())
stmtCtx.RunawayChecker = rm.DeriveChecker(sctx.GetSessionVars().StmtCtx.ResourceGroupName, stmtCtx.OriginalSQL, sqlDigest.String(), planDigest.String())
if err := stmtCtx.RunawayChecker.BeforeExecutor(); err != nil {
return nil, err
}
Expand Down