Skip to content

Commit

Permalink
[fix](transaction) commit txn check txn status (#40064) (#41229)
Browse files Browse the repository at this point in the history
pick #40064
  • Loading branch information
mymeiyi authored Sep 25, 2024
1 parent 2ceaa84 commit f8acd81
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1730,20 +1730,16 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
List<String> tables = new ArrayList<>();
// if table was dropped, transaction must be aborted
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
for (Table table : tableList) {
tables.add(table.getName());
}
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);

// Step 3: check auth
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
Expand Down Expand Up @@ -1912,19 +1908,15 @@ private void rollbackTxnImpl(TRollbackTxnRequest request) throws UserException {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
List<String> tables = new ArrayList<>();
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
for (Table table : tableList) {
tables.add(table.getName());
}
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);

// Step 3: check auth
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -119,14 +118,6 @@ private enum PublishResult {
// transactionId -> running TransactionState
private final Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();

/**
* the multi table ids that are in transaction, used to check whether a table is in transaction
* multi table transaction state
* txnId -> tableId list
*/
private final ConcurrentHashMap<Long, List<Long>> multiTableRunningTransactionTableIdMaps =
new ConcurrentHashMap<>();

// transactionId -> final status TransactionState
private final Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();

Expand Down Expand Up @@ -436,8 +427,13 @@ public void preCommitTransaction2PC(List<Table> tableList, long transactionId,
checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds,
tableToPartition, totalInvolvedBackends);

unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition,
totalInvolvedBackends, db);
writeLock();
try {
unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition,
totalInvolvedBackends, db);
} finally {
writeUnlock();
}
LOG.info("transaction:[{}] successfully pre-committed", transactionState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void preCommitTransaction2PC(Database db, List<Table> tableList, long tra
}
}

public void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId,
private void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
Expand All @@ -219,6 +219,7 @@ public void preCommitTransaction2PC(long dbId, List<Table> tableList, long trans
dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
}

@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
Expand Down Expand Up @@ -675,6 +676,7 @@ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordina
TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second);
long coordStartTime = transactionState.getCoordinator().startTime;
if (coordStartTime < beStartTime) {
// does not hold table write lock
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null);
}
} catch (UserException e) {
Expand All @@ -692,6 +694,7 @@ public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateH
= getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
// does not hold table write lock
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first);
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null);
} catch (UserException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Map<String, Long> addTransactionToTransactionMgr() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1, transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
setTransactionFinishPublish(transactionState1,
Lists.newArrayList(CatalogTestUtil.testBackendId1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testCommitTransaction1() throws UserException {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets);
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down Expand Up @@ -210,7 +210,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);

// follower catalog replay the transaction
transactionState = fakeEditLog.getTransaction(transactionId);
Expand All @@ -231,7 +232,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
Expand Down Expand Up @@ -260,7 +262,8 @@ public void testCommitTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down Expand Up @@ -465,7 +468,8 @@ public void testFinishTransaction() throws UserException {
transTablets.add(tabletCommitInfo3);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
slaveTransMgr.replayUpsertTransactionState(transactionState);
Expand Down Expand Up @@ -519,7 +523,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo2);
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets, null);

// follower catalog replay the transaction
transactionState = fakeEditLog.getTransaction(transactionId);
Expand Down Expand Up @@ -582,7 +587,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo3);
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
Expand All @@ -598,7 +604,8 @@ public void testFinishTransactionWithOneFailed() throws UserException {
transTablets.add(tabletCommitInfo1);
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2, transTablets);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets, null);
transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down

0 comments on commit f8acd81

Please sign in to comment.