Skip to content

Commit

Permalink
allow cleanup after the connection is killed
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <[email protected]>
  • Loading branch information
YangKeao committed Aug 14, 2024
1 parent 0820519 commit e2c6441
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,15 @@ func (c *twoPhaseCommitter) doActionOnBatches(
status,
),
)
// TODO: There might be various signals besides a query interruption,
// but we are unable to differentiate them, because the definition is in TiDB.
return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: status})

// TODO: at least provide a way to force kill the action.
// The cleanup action is explicitly not killed to avoid leaking stale locks.
_, isCleanUp := action.(actionCleanup)
if !isCleanUp {
// TODO: There might be various signals besides a query interruption,
// but we are unable to differentiate them, because the definition is in TiDB.
return errors.WithStack(tikverr.ErrQueryInterruptedWithSignal{Signal: status})
}
}
}
if len(batches) == 0 {
Expand Down Expand Up @@ -1638,6 +1644,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
return err
}

if c.txn.resourceGroupName == "testslowcommit" {
time.Sleep(5 * time.Second)
}

// return assertion error found in TiDB after prewrite succeeds to prevent false positive. Note this is only visible
// when async commit or 1PC is disabled.
if c.stashedAssertionError != nil {
Expand Down Expand Up @@ -1761,11 +1771,26 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
c.store.WaitGroup().Add(1)
go func() {
defer c.store.WaitGroup().Done()
if _, err := util.EvalFailpoint("asyncCommitDoNothing"); err == nil {

var err error
defer func() {
// The error means the async commit should not succeed.
if err != nil {
if c.getUndeterminedErr() == nil {
c.cleanup(ctx)
}
metrics.AsyncCommitTxnCounterError.Inc()
} else {
metrics.AsyncCommitTxnCounterOk.Inc()
}
}()

_, err = util.EvalFailpoint("asyncCommitDoNothing")
if err == nil {
return
}
commitBo := retry.NewBackofferWithVars(c.store.Ctx(), CommitSecondaryMaxBackoff, c.txn.vars)
err := c.commitMutations(commitBo, c.mutations)
err = c.commitMutations(commitBo, c.mutations)
if err != nil {
logutil.Logger(ctx).Warn("2PC async commit failed", zap.Uint64("sessionID", c.sessionID),
zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", c.commitTS), zap.Error(err))
Expand Down

0 comments on commit e2c6441

Please sign in to comment.