diff --git a/examples/gcworker/go.mod b/examples/gcworker/go.mod index a81da93f2..8222c6c00 100644 --- a/examples/gcworker/go.mod +++ b/examples/gcworker/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/rawkv/go.mod b/examples/rawkv/go.mod index ef7c9279a..c4efef50e 100644 --- a/examples/rawkv/go.mod +++ b/examples/rawkv/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/1pc_txn/go.mod b/examples/txnkv/1pc_txn/go.mod index f7dcef09c..74f381560 100644 --- a/examples/txnkv/1pc_txn/go.mod +++ b/examples/txnkv/1pc_txn/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/async_commit/go.mod b/examples/txnkv/async_commit/go.mod index a09833288..73e051f04 100644 --- a/examples/txnkv/async_commit/go.mod +++ b/examples/txnkv/async_commit/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/delete_range/go.mod b/examples/txnkv/delete_range/go.mod index 79b958fe0..c15194ed9 100644 --- a/examples/txnkv/delete_range/go.mod +++ b/examples/txnkv/delete_range/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/go.mod b/examples/txnkv/go.mod index 19ca706d3..884cdd666 100644 --- a/examples/txnkv/go.mod +++ b/examples/txnkv/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/pessimistic_txn/go.mod b/examples/txnkv/pessimistic_txn/go.mod index dd5e13bd4..7edbc1886 100644 --- a/examples/txnkv/pessimistic_txn/go.mod +++ b/examples/txnkv/pessimistic_txn/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/examples/txnkv/unsafedestoryrange/go.mod b/examples/txnkv/unsafedestoryrange/go.mod index 38911f96f..6c9f434ea 100644 --- a/examples/txnkv/unsafedestoryrange/go.mod +++ b/examples/txnkv/unsafedestoryrange/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect - github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect + github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect github.com/docker/go-units v0.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 8cadf63df..7d9dbbe24 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -486,7 +486,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end ) runner.SetStatLogInterval(30 * time.Second) - go func() { + c.txn.spawnWithStorePool(func() { if err = runner.RunOnRange(bo.GetCtx(), start, end); err != nil { logutil.Logger(bo.GetCtx()).Error("[pipelined dml] resolve flushed locks failed", zap.String("txn-status", status), @@ -529,5 +529,5 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end c.resourceGroupTag, ) } - }() + }) } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 7c124a678..4eaa84112 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -842,23 +842,27 @@ func (txn *KVTxn) Rollback() error { // no need to clean up locks when no flush triggered. pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd needCleanUpLocks := len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 - broadcastToAllStores( - txn, - txn.committer.store, - retry.NewBackofferWithVars( - txn.store.Ctx(), - broadcastMaxBackoff, - txn.committer.txn.vars, - ), - &kvrpcpb.TxnStatus{ - StartTs: txn.startTS, - MinCommitTs: txn.committer.minCommitTSMgr.get(), - CommitTs: 0, - RolledBack: true, - IsCompleted: !needCleanUpLocks, + txn.spawnWithStorePool( + func() { + broadcastToAllStores( + txn, + txn.committer.store, + retry.NewBackofferWithVars( + txn.store.Ctx(), + broadcastMaxBackoff, + txn.committer.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: txn.startTS, + MinCommitTs: txn.committer.minCommitTSMgr.get(), + CommitTs: 0, + RolledBack: true, + IsCompleted: !needCleanUpLocks, + }, + txn.resourceGroupName, + txn.resourceGroupTag, + ) }, - txn.resourceGroupName, - txn.resourceGroupTag, ) if needCleanUpLocks { rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars)