From 527f80a18663674d8d86bfcd7bf7d0d249853ac8 Mon Sep 17 00:00:00 2001 From: ekexium Date: Wed, 25 Sep 2024 09:34:14 +0800 Subject: [PATCH] feat: Broadcast min_commit_ts for pipelined transactions (#1458) Signed-off-by: ekexium --- go.mod | 2 +- go.sum | 4 +- integration_tests/go.mod | 2 +- integration_tests/go.sum | 4 +- internal/locate/region_cache.go | 39 +++++ internal/locate/region_request_test.go | 4 + tikv/gc.go | 4 +- tikv/kv_test.go | 1 - tikvrpc/tikvrpc.go | 13 ++ txnkv/transaction/2pc.go | 201 +++++++++++++++++++++++-- txnkv/transaction/2pc_test.go | 124 +++++++++++++++ txnkv/transaction/pipelined_flush.go | 56 ++++++- txnkv/transaction/prewrite.go | 8 +- txnkv/transaction/test_probe.go | 6 +- txnkv/transaction/txn.go | 21 ++- 15 files changed, 458 insertions(+), 31 deletions(-) create mode 100644 txnkv/transaction/2pc_test.go diff --git a/go.mod b/go.mod index f8bbc87b3..d226e9750 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index 74adbea16..029e0d04d 100644 --- a/go.sum +++ b/go.sum @@ -74,8 +74,8 @@ github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c h1:CgbKAHto5CQgW github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 8e2f7d930..c5b8f4a68 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240527053858-9b3b6e34194a - github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 + github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d github.com/pingcap/tidb v1.1.0-beta.0.20240703042657-230bbc2ef5ef github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.9.0 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index d50e94ff3..77a57ebb5 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -357,8 +357,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4 h1:6aIKNB2YGAec4IUDLw6G2eDECiGiufZcgEbZSCELBx0= -github.com/pingcap/kvproto v0.0.0-20240620063548-118a4cab53e4/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d h1:vSdKTrF6kpcd56G5BLP0Bz88Nho2tDo7IR1+oSsBAfc= +github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d h1:y3EueKVfVykdpTyfUnQGqft0ud+xVFuCdp1XkVL0X1E= diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index ef575f834..01dfa385c 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -726,9 +726,47 @@ func NewRegionCache(pdClient pd.Client, opt ...RegionCacheOpt) *RegionCache { // cache GC is incompatible with cache refresh c.bg.schedule(c.gcRoundFunc(cleanRegionNumPerRound), cleanCacheInterval) } + c.bg.schedule( + func(ctx context.Context, _ time.Time) bool { + refreshFullStoreList(ctx, c.stores) + return false + }, refreshStoreListInterval, + ) return c } +// Try to refresh full store list. Errors are ignored. +func refreshFullStoreList(ctx context.Context, stores storeCache) { + storeList, err := stores.fetchAllStores(ctx) + if err != nil { + logutil.Logger(ctx).Info("refresh full store list failed", zap.Error(err)) + return + } + for _, store := range storeList { + _, exist := stores.get(store.GetId()) + if exist { + continue + } + // GetAllStores is supposed to return only Up and Offline stores. + // This check is being defensive and to make it consistent with store resolve code. + if store == nil || store.GetState() == metapb.StoreState_Tombstone { + continue + } + addr := store.GetAddress() + if addr == "" { + continue + } + s := stores.getOrInsertDefault(store.GetId()) + // TODO: maybe refactor this, together with other places initializing Store + s.addr = addr + s.peerAddr = store.GetPeerAddress() + s.saddr = store.GetStatusAddress() + s.storeType = tikvrpc.GetStoreTypeByMeta(store) + s.labels = store.GetLabels() + s.changeResolveStateTo(unresolved, resolved) + } +} + // only used fot test. func newTestRegionCache() *RegionCache { c := &RegionCache{} @@ -2649,6 +2687,7 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV const cleanCacheInterval = time.Second const cleanRegionNumPerRound = 50 +const refreshStoreListInterval = 10 * time.Second // gcScanItemHook is only used for testing var gcScanItemHook = new(atomic.Pointer[func(*btreeItem)]) diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 8e10e30b8..c43aecb08 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -582,6 +582,10 @@ func (s *mockTikvGrpcServer) GetHealthFeedback(ctx context.Context, request *kvr return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) BroadcastTxnStatus(ctx context.Context, request *kvrpcpb.BroadcastTxnStatusRequest) (*kvrpcpb.BroadcastTxnStatusResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() { // prepare a mock tikv grpc server addr := "localhost:56341" diff --git a/tikv/gc.go b/tikv/gc.go index a3b4eefce..da0022f33 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -309,7 +309,7 @@ const unsafeDestroyRangeTimeout = 5 * time.Minute // multiple times on an single range. func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKey []byte) error { // Get all stores every time deleting a region. So the store list is less probably to be stale. - stores, err := s.listStoresForUnsafeDestory(ctx) + stores, err := s.listStoresForUnsafeDestroy(ctx) if err != nil { metrics.TiKVUnsafeDestroyRangeFailuresCounterVec.WithLabelValues("get_stores").Inc() return err @@ -366,7 +366,7 @@ func (s *KVStore) UnsafeDestroyRange(ctx context.Context, startKey []byte, endKe return nil } -func (s *KVStore) listStoresForUnsafeDestory(ctx context.Context) ([]*metapb.Store, error) { +func (s *KVStore) listStoresForUnsafeDestroy(ctx context.Context) ([]*metapb.Store, error) { stores, err := s.pdClient.GetAllStores(ctx) if err != nil { return nil, errors.WithStack(err) diff --git a/tikv/kv_test.go b/tikv/kv_test.go index c2e4631e2..1a9a4618f 100644 --- a/tikv/kv_test.go +++ b/tikv/kv_test.go @@ -51,7 +51,6 @@ type testKVSuite struct { func (s *testKVSuite) SetupTest() { client, cluster, pdClient, err := testutils.NewMockTiKV("", nil) s.Require().Nil(err) - testutils.BootstrapWithSingleStore(cluster) s.setGetMinResolvedTSByStoresIDs(func(ctx context.Context, ids []uint64) (uint64, map[uint64]uint64, error) { return 0, nil, nil }) diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 350c5b7c7..96a480c91 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -100,6 +100,7 @@ const ( CmdLockWaitInfo CmdGetHealthFeedback + CmdBroadcastTxnStatus CmdCop CmdType = 512 + iota CmdCopStream @@ -221,6 +222,8 @@ func (t CmdType) String() string { return "LockWaitInfo" case CmdGetHealthFeedback: return "GetHealthFeedback" + case CmdBroadcastTxnStatus: + return "BroadcastTxnStatus" case CmdFlashbackToVersion: return "FlashbackToVersion" case CmdPrepareFlashbackToVersion: @@ -568,6 +571,10 @@ func (req *Request) GetHealthFeedback() *kvrpcpb.GetHealthFeedbackRequest { return req.Req.(*kvrpcpb.GetHealthFeedbackRequest) } +func (req *Request) BroadcastTxnStatus() *kvrpcpb.BroadcastTxnStatusRequest { + return req.Req.(*kvrpcpb.BroadcastTxnStatusRequest) +} + // FlashbackToVersion returns FlashbackToVersionRequest in request. func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest { return req.Req.(*kvrpcpb.FlashbackToVersionRequest) @@ -653,6 +660,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BufferBatchGet{BufferBatchGet: req.BufferBatchGet()}} case CmdGetHealthFeedback: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_GetHealthFeedback{GetHealthFeedback: req.GetHealthFeedback()}} + case CmdBroadcastTxnStatus: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_BroadcastTxnStatus{BroadcastTxnStatus: req.BroadcastTxnStatus()}} } return nil } @@ -730,6 +739,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res return &Response{Resp: res.BufferBatchGet}, nil case *tikvpb.BatchCommandsResponse_Response_GetHealthFeedback: return &Response{Resp: res.GetHealthFeedback}, nil + case *tikvpb.BatchCommandsResponse_Response_BroadcastTxnStatus: + return &Response{Resp: res.BroadcastTxnStatus}, nil } panic("unreachable") } @@ -1143,6 +1154,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.KvBufferBatchGet(ctx, req.BufferBatchGet()) case CmdGetHealthFeedback: resp.Resp, err = client.GetHealthFeedback(ctx, req.GetHealthFeedback()) + case CmdBroadcastTxnStatus: + resp.Resp, err = client.BroadcastTxnStatus(ctx, req.BroadcastTxnStatus()) default: return nil, errors.Errorf("invalid request type: %v", req.Type) } diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index fbc8a85a4..2a01befe6 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -164,7 +164,7 @@ type twoPhaseCommitter struct { } useAsyncCommit uint32 - minCommitTS uint64 + minCommitTSMgr *minCommitTsManager maxCommitTS uint64 prewriteStarted bool prewriteCancelled uint32 @@ -477,6 +477,7 @@ func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, err binlog: txn.binlog, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, resourceGroupName: txn.resourceGroupName, + minCommitTSMgr: newMinCommitTsManager(), } return committer, nil } @@ -1137,6 +1138,69 @@ const ( stateClosed ) +// WriteAccessLevel represents the level of write access required to modify the value +type WriteAccessLevel int + +const ( + ttlAccess WriteAccessLevel = 1 + twoPCAccess WriteAccessLevel = 2 +) + +// minCommitTsManager manages a minimum commit timestamp with different write access levels. +type minCommitTsManager struct { + mutex sync.Mutex + value uint64 + requiredWriteAccess WriteAccessLevel +} + +// newMinCommitTsManager creates and returns a new minCommitTsManager. +func newMinCommitTsManager() *minCommitTsManager { + return &minCommitTsManager{requiredWriteAccess: ttlAccess} +} + +// tryUpdate update the value if the provided write access level is sufficient and +// the new value is greater. +func (m *minCommitTsManager) tryUpdate(newValue uint64, writeAccess WriteAccessLevel) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if writeAccess < m.requiredWriteAccess { + return + } + + if newValue > m.value { + m.value = newValue + } +} + +// elevateWriteAccess elevates the required write access level. +// It returns the current value. +func (m *minCommitTsManager) elevateWriteAccess(newLevel WriteAccessLevel) uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + + if newLevel > m.requiredWriteAccess { + m.requiredWriteAccess = newLevel + } + return m.value +} + +// get returns the current value. This is a read operation and doesn't require write access. +func (m *minCommitTsManager) get() uint64 { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.value +} + +// getRequiredWriteAccess returns the current required write access level. +func (m *minCommitTsManager) getRequiredWriteAccess() WriteAccessLevel { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.requiredWriteAccess +} + type ttlManager struct { state ttlManagerState ch chan struct{} @@ -1175,7 +1239,11 @@ func (tm *ttlManager) reset() { const keepAliveMaxBackoff = 20000 const pessimisticLockMaxBackoff = 20000 const maxConsecutiveFailure = 10 +const broadcastGracePeriod = 5 * time.Second +const broadcastMaxBackoff = 10000 +// keepAlive keeps sending heartbeat to update the primary key's TTL +// For pipelined transactions, it also updates min_commit_ts, and broadcasts it to all TiKVs. func keepAlive( c *twoPhaseCommitter, closeCh chan struct{}, tm *ttlManager, primaryKey []byte, lockCtx *kv.LockCtx, isPipelinedTxn bool, @@ -1234,6 +1302,14 @@ func keepAlive( return } + // update minCommitTS, if it's a non-async-commit pipelined transaction + if isPipelinedTxn && + !c.isOnePC() && + !c.isAsyncCommit() && + c.minCommitTSMgr.getRequiredWriteAccess() <= ttlAccess { + c.minCommitTSMgr.tryUpdate(now, ttlAccess) + } + newTTL := uptime + atomic.LoadUint64(&ManagedLockTTL) logutil.Logger(bo.GetCtx()).Info("send TxnHeartBeat", zap.Uint64("startTS", c.startTS), @@ -1241,7 +1317,9 @@ func keepAlive( zap.Bool("isPipelinedTxn", isPipelinedTxn), ) startTime := time.Now() - _, stopHeartBeat, err := sendTxnHeartBeat(bo, c.store, primaryKey, c.startTS, newTTL) + _, stopHeartBeat, err := sendTxnHeartBeat( + bo, c.store, primaryKey, c.startTS, newTTL, c.minCommitTSMgr.get(), + ) if err != nil { keepFail++ metrics.TxnHeartBeatHistogramError.Observe(time.Since(startTime).Seconds()) @@ -1265,19 +1343,122 @@ func keepAlive( } return } - continue + } else { + keepFail = 0 + metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds()) + } + + // broadcast to all stores + if isPipelinedTxn { + broadcastToAllStores( + c.txn, + c.store, + retry.NewBackofferWithVars( + context.Background(), + broadcastMaxBackoff, + c.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: c.startTS, + MinCommitTs: c.minCommitTSMgr.get(), + CommitTs: 0, + RolledBack: false, + IsCompleted: false, + }, + c.resourceGroupName, + c.resourceGroupTag, + ) } - keepFail = 0 - metrics.TxnHeartBeatHistogramOK.Observe(time.Since(startTime).Seconds()) } } } -func sendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) { +const broadcastRpcTimeout = time.Second * 5 +const broadcastMaxConcurrency = 10 + +// broadcastToAllStores asynchronously broadcasts the transaction status to all stores. +// Errors are ignored. +func broadcastToAllStores( + txn *KVTxn, + store kvstore, + bo *retry.Backoffer, + status *kvrpcpb.TxnStatus, + resourceGroupName string, + resourceGroupTag []byte, +) { + broadcastFunc := func() { + stores := store.GetRegionCache().GetStoresByType(tikvrpc.TiKV) + concurrency := min(broadcastMaxConcurrency, len(stores)) + rateLimit := make(chan struct{}, concurrency) + + var wg sync.WaitGroup + + for _, s := range stores { + rateLimit <- struct{}{} + wg.Add(1) + target := s + + err := txn.spawnWithStorePool(func() { + defer wg.Done() + defer func() { <-rateLimit }() + + req := tikvrpc.NewRequest( + tikvrpc.CmdBroadcastTxnStatus, &kvrpcpb.BroadcastTxnStatusRequest{ + TxnStatus: []*kvrpcpb.TxnStatus{status}, + }, + ) + req.Context.ClusterId = store.GetClusterID() + req.Context.ResourceControlContext = &kvrpcpb.ResourceControlContext{ + ResourceGroupName: resourceGroupName, + } + req.Context.ResourceGroupTag = resourceGroupTag + + _, err := store.GetTiKVClient().SendRequest( + bo.GetCtx(), + target.GetAddr(), + req, + broadcastRpcTimeout, + ) + if err != nil { + logutil.Logger(store.Ctx()).Info( + "broadcast txn status failed", + zap.Uint64("storeID", target.StoreID()), + zap.String("storeAddr", target.GetAddr()), + zap.Stringer("status", status), + zap.Error(err), + ) + } + }) + + if err != nil { + // If spawning the goroutine fails, release the slot and mark done + <-rateLimit + wg.Done() + logutil.Logger(store.Ctx()).Error("failed to spawn worker goroutine", zap.Error(err)) + } + } + + wg.Wait() + } + + if err := txn.spawnWithStorePool(broadcastFunc); err != nil { + logutil.Logger(store.Ctx()).Error("failed to spawn goroutine for broadcasting txn status", + zap.Error(err)) + } +} + +func sendTxnHeartBeat( + bo *retry.Backoffer, + store kvstore, + primary []byte, + startTS, ttl uint64, + minCommitTS uint64, +) (newTTL uint64, stopHeartBeat bool, err error) { req := tikvrpc.NewRequest(tikvrpc.CmdTxnHeartBeat, &kvrpcpb.TxnHeartBeatRequest{ PrimaryLock: primary, StartVersion: startTS, AdviseLockTtl: ttl, + MinCommitTs: minCommitTS, }) for { loc, err := store.GetRegionCache().LocateKey(bo, primary) @@ -1424,6 +1605,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { var err error if c.txn.IsPipelined() { // TODO: cleanup pipelined txn + // TODO: broadcast txn status } else if !c.isOnePC() { err = c.cleanupMutations(retry.NewBackofferWithVars(cleanupKeysCtx, cleanupMaxBackoff, c.txn.vars), c.mutations) } else if c.isPessimistic { @@ -1444,6 +1626,7 @@ func (c *twoPhaseCommitter) cleanup(ctx context.Context) { // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { + c.minCommitTSMgr.elevateWriteAccess(twoPCAccess) var binlogSkipped bool defer func() { if c.isOnePC() { @@ -1547,7 +1730,7 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } commitDetail.GetLatestTsTime = time.Since(start) // Plus 1 to avoid producing the same commit TS with previously committed transactions - c.minCommitTS = latestTS + 1 + c.minCommitTSMgr.tryUpdate(latestTS+1, twoPCAccess) } // Calculate maxCommitTS if necessary if commitTSMayBeCalculated { @@ -1666,10 +1849,10 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) { } if c.isAsyncCommit() { - if c.minCommitTS == 0 { + if c.minCommitTSMgr.get() == 0 { return errors.Errorf("session %d invalid minCommitTS for async commit protocol after prewrite, startTS=%v", c.sessionID, c.startTS) } - commitTS = c.minCommitTS + commitTS = c.minCommitTSMgr.get() } else { start = time.Now() logutil.Event(ctx, "start get commit ts") diff --git a/txnkv/transaction/2pc_test.go b/txnkv/transaction/2pc_test.go new file mode 100644 index 000000000..3e1a80d1c --- /dev/null +++ b/txnkv/transaction/2pc_test.go @@ -0,0 +1,124 @@ +// Copyright 2024 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// NOTE: The code in this file is based on code from the +// TiDB project, licensed under the Apache License v 2.0 +// +// https://github.com/pingcap/tidb/tree/cc5e161ac06827589c4966674597c137cc9e809c/store/tikv/tests/snapshot_test.go +// + +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transaction + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMinCommitTsManager(t *testing.T) { + t.Run( + "Initial state", func(t *testing.T) { + manager := newMinCommitTsManager() + assert.Equal(t, uint64(0), manager.get(), "Initial value should be 0") + assert.Equal( + t, + ttlAccess, + manager.getRequiredWriteAccess(), + "Initial write access should be ttlAccess", + ) + }, + ) + + t.Run( + "TTL updates", func(t *testing.T) { + manager := newMinCommitTsManager() + + manager.tryUpdate(10, ttlAccess) + assert.Equal(t, uint64(10), manager.get(), "Value should be 10") + + manager.tryUpdate(5, ttlAccess) + assert.Equal(t, uint64(10), manager.get(), "Value should remain 10") + }, + ) + + t.Run( + "Elevate write access", func(t *testing.T) { + manager := newMinCommitTsManager() + manager.tryUpdate(10, ttlAccess) + + currentValue := manager.elevateWriteAccess(twoPCAccess) + assert.Equal(t, uint64(10), currentValue, "Current value should be 10") + assert.Equal( + t, + twoPCAccess, + manager.getRequiredWriteAccess(), + "Required write access should be twoPCAccess", + ) + }, + ) + + t.Run( + "Updates after elevation", func(t *testing.T) { + manager := newMinCommitTsManager() + manager.tryUpdate(10, ttlAccess) + manager.elevateWriteAccess(twoPCAccess) + + manager.tryUpdate(20, ttlAccess) + assert.Equal(t, uint64(10), manager.get(), "Value should remain 10") + + manager.tryUpdate(30, twoPCAccess) + assert.Equal(t, uint64(30), manager.get(), "Value should be 30") + }, + ) + + t.Run( + "Concurrent updates", func(t *testing.T) { + manager := newMinCommitTsManager() + done := make(chan bool) + + go func() { + for i := 0; i < 1000; i++ { + manager.tryUpdate(uint64(i), ttlAccess) + } + done <- true + }() + + go func() { + for i := 0; i < 1000; i++ { + manager.tryUpdate(uint64(1000+i), ttlAccess) + } + done <- true + }() + + <-done + <-done + + assert.Equal(t, manager.get(), uint64(1999)) + }, + ) +} diff --git a/txnkv/transaction/pipelined_flush.go b/txnkv/transaction/pipelined_flush.go index 3ba39d279..8cadf63df 100644 --- a/txnkv/transaction/pipelined_flush.go +++ b/txnkv/transaction/pipelined_flush.go @@ -330,14 +330,32 @@ func (c *twoPhaseCommitter) commitFlushedMutations(bo *retry.Backoffer) error { if err = c.commitMutations(bo, &primaryMutation); err != nil { return errors.Trace(err) } - c.mu.RLock() + c.mu.Lock() c.mu.committed = true - c.mu.RUnlock() + c.mu.Unlock() logutil.Logger(bo.GetCtx()).Info( "[pipelined dml] transaction is committed", zap.Uint64("startTS", c.startTS), zap.Uint64("commitTS", commitTS), ) + broadcastToAllStores( + c.txn, + c.store, + retry.NewBackofferWithVars( + bo.GetCtx(), + broadcastMaxBackoff, + c.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: c.startTS, + MinCommitTs: c.minCommitTSMgr.get(), + CommitTs: commitTS, + RolledBack: false, + IsCompleted: false, + }, + c.resourceGroupName, + c.resourceGroupTag, + ) if _, err := util.EvalFailpoint("pipelinedSkipResolveLock"); err == nil { return nil @@ -439,13 +457,17 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end const RESOLVE_CONCURRENCY = 8 var resolved atomic.Uint64 handler, err := c.buildPipelinedResolveHandler(commit, &resolved) + commitTs := uint64(0) + if commit { + commitTs = atomic.LoadUint64(&c.commitTS) + } if err != nil { logutil.Logger(bo.GetCtx()).Error( "[pipelined dml] build buildPipelinedResolveHandler error", zap.Error(err), zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("commitTS", commitTs), ) return } @@ -470,7 +492,7 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end zap.String("txn-status", status), zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("commitTS", commitTs), zap.Uint64("session", c.sessionID), zap.Error(err), ) @@ -479,9 +501,33 @@ func (c *twoPhaseCommitter) resolveFlushedLocks(bo *retry.Backoffer, start, end zap.String("txn-status", status), zap.Uint64("resolved regions", resolved.Load()), zap.Uint64("startTS", c.startTS), - zap.Uint64("commitTS", atomic.LoadUint64(&c.commitTS)), + zap.Uint64("commitTS", commitTs), zap.Uint64("session", c.sessionID), ) + + // wait a while before notifying txn_status_cache to evict the txn, + // which tolerates slow followers and avoids the situation that the + // txn is evicted before the follower catches up. + time.Sleep(broadcastGracePeriod) + + broadcastToAllStores( + c.txn, + c.store, + retry.NewBackofferWithVars( + bo.GetCtx(), + broadcastMaxBackoff, + c.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: c.startTS, + MinCommitTs: 0, + CommitTs: commitTs, + RolledBack: !commit, + IsCompleted: true, + }, + c.resourceGroupName, + c.resourceGroupTag, + ) } }() } diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index a50e40719..26e317e10 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -116,7 +116,7 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u } } c.mu.Lock() - minCommitTS := c.minCommitTS + minCommitTS := c.minCommitTSMgr.get() c.mu.Unlock() if c.forUpdateTS > 0 && c.forUpdateTS >= minCommitTS { minCommitTS = c.forUpdateTS + 1 @@ -387,7 +387,7 @@ func (action actionPrewrite) handleSingleBatch( c.setOnePC(false) c.setAsyncCommit(false) } else { - // For 1PC, there's no racing to access to access `onePCCommmitTS` so it's safe + // For 1PC, there's no racing to access `onePCCommitTS` so it's safe // not to lock the mutex. if c.onePCCommitTS != 0 { logutil.Logger(bo.GetCtx()).Fatal( @@ -419,8 +419,8 @@ func (action actionPrewrite) handleSingleBatch( c.setAsyncCommit(false) } else { c.mu.Lock() - if prewriteResp.MinCommitTs > c.minCommitTS { - c.minCommitTS = prewriteResp.MinCommitTs + if prewriteResp.MinCommitTs > c.minCommitTSMgr.get() { + c.minCommitTSMgr.tryUpdate(prewriteResp.MinCommitTs, twoPCAccess) } c.mu.Unlock() } diff --git a/txnkv/transaction/test_probe.go b/txnkv/transaction/test_probe.go index 598bd70c0..b782a3a40 100644 --- a/txnkv/transaction/test_probe.go +++ b/txnkv/transaction/test_probe.go @@ -194,12 +194,12 @@ func (c CommitterProbe) GetCommitTS() uint64 { // GetMinCommitTS returns the minimal commit ts can be used. func (c CommitterProbe) GetMinCommitTS() uint64 { - return c.minCommitTS + return c.minCommitTSMgr.get() } // SetMinCommitTS sets the minimal commit ts can be used. func (c CommitterProbe) SetMinCommitTS(ts uint64) { - c.minCommitTS = ts + c.minCommitTSMgr.tryUpdate(ts, twoPCAccess) } // SetMaxCommitTS sets the max commit ts can be used. @@ -381,7 +381,7 @@ func (c CommitterProbe) ResolveFlushedLocks(bo *retry.Backoffer, start, end []by // SendTxnHeartBeat renews a txn's ttl. func SendTxnHeartBeat(bo *retry.Backoffer, store kvstore, primary []byte, startTS, ttl uint64) (newTTL uint64, stopHeartBeat bool, err error) { - return sendTxnHeartBeat(bo, store, primary, startTS, ttl) + return sendTxnHeartBeat(bo, store, primary, startTS, ttl, 0) } // ConfigProbe exposes configurations and global variables for testing purpose. diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index dfa1461e9..7c124a678 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -841,7 +841,26 @@ func (txn *KVTxn) Rollback() error { txn.committer.ttlManager.close() // no need to clean up locks when no flush triggered. pipelinedStart, pipelinedEnd := txn.committer.pipelinedCommitInfo.pipelinedStart, txn.committer.pipelinedCommitInfo.pipelinedEnd - if len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 { + needCleanUpLocks := len(pipelinedStart) != 0 && len(pipelinedEnd) != 0 + broadcastToAllStores( + txn, + txn.committer.store, + retry.NewBackofferWithVars( + txn.store.Ctx(), + broadcastMaxBackoff, + txn.committer.txn.vars, + ), + &kvrpcpb.TxnStatus{ + StartTs: txn.startTS, + MinCommitTs: txn.committer.minCommitTSMgr.get(), + CommitTs: 0, + RolledBack: true, + IsCompleted: !needCleanUpLocks, + }, + txn.resourceGroupName, + txn.resourceGroupTag, + ) + if needCleanUpLocks { rollbackBo := retry.NewBackofferWithVars(txn.store.Ctx(), CommitSecondaryMaxBackoff, txn.vars) txn.committer.resolveFlushedLocks(rollbackBo, pipelinedStart, pipelinedEnd, false) }