From 962dd2e4b46a8e58a6233bc393c9e9d7688b1108 Mon Sep 17 00:00:00 2001 From: ekexium Date: Mon, 8 Jan 2024 13:09:29 +0800 Subject: [PATCH] fix: check kill signal against 0 (#50029) ref pingcap/tidb#49643 --- .../internal/mpp/local_mpp_coordinator.go | 15 +++++++++++---- pkg/store/copr/batch_coprocessor.go | 7 ++++++- pkg/store/copr/coprocessor.go | 18 +++++++++++++++--- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/pkg/executor/internal/mpp/local_mpp_coordinator.go b/pkg/executor/internal/mpp/local_mpp_coordinator.go index bb91bba1474e3..7cb45b46e8175 100644 --- a/pkg/executor/internal/mpp/local_mpp_coordinator.go +++ b/pkg/executor/internal/mpp/local_mpp_coordinator.go @@ -668,10 +668,17 @@ func (c *localMppCoordinator) nextImpl(ctx context.Context) (resp *mppResponse, case resp, ok = <-c.respChan: return case <-ticker.C: - if c.vars != nil && c.vars.Killed != nil && atomic.LoadUint32(c.vars.Killed) == 1 { - err = derr.ErrQueryInterrupted - exit = true - return + if c.vars != nil && c.vars.Killed != nil { + killed := atomic.LoadUint32(c.vars.Killed) + if killed != 0 { + logutil.Logger(ctx).Info( + "a killed signal is received", + zap.Uint32("signal", killed), + ) + err = derr.ErrQueryInterrupted + exit = true + return + } } case <-c.finishCh: exit = true diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index aa92dff0ef83c..6246aa87b6eb9 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -1167,7 +1167,12 @@ func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopRe case resp, ok = <-b.respChan: return case <-ticker.C: - if atomic.LoadUint32(b.vars.Killed) == 1 { + killed := atomic.LoadUint32(b.vars.Killed) + if killed != 0 { + logutil.Logger(ctx).Info( + "a killed signal is received", + zap.Uint32("signal", killed), + ) resp = &batchCopResponse{err: derr.ErrQueryInterrupted} ok = true return diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 233827f762db4..006a76e309abe 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -937,7 +937,12 @@ func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copRes exit = true return case <-ticker.C: - if atomic.LoadUint32(it.vars.Killed) == 1 { + killed := atomic.LoadUint32(it.vars.Killed) + if killed != 0 { + logutil.Logger(ctx).Info( + "a killed signal is received", + zap.Uint32("signal", killed), + ) resp = &copResponse{err: derr.ErrQueryInterrupted} ok = true return @@ -1862,8 +1867,15 @@ func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *copro // finished checks the flags and finished channel, it tells whether the worker is finished. func (worker *copIteratorWorker) finished() bool { - if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 { - return true + if worker.vars != nil && worker.vars.Killed != nil { + killed := atomic.LoadUint32(worker.vars.Killed) + if killed != 0 { + logutil.BgLogger().Info( + "a killed signal is received in copIteratorWorker", + zap.Uint32("signal", killed), + ) + return true + } } select { case <-worker.finishCh: