Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](transaction) commit txn check txn status #40064

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ public void preCommitTransaction2PC(Database db, List<Table> tableList, long tra
}
}

@Deprecated
@Override
public void commitTransaction(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1783,20 +1783,16 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
List<String> tables = new ArrayList<>();
// if table was dropped, transaction must be aborted
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
for (Table table : tableList) {
tables.add(table.getName());
}
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);

// Step 3: check auth
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
Expand Down Expand Up @@ -1991,19 +1987,15 @@ private void rollbackTxnImpl(TRollbackTxnRequest request) throws UserException {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
List<String> tables = new ArrayList<>();
tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
for (Table table : tableList) {
tables.add(table.getName());
}
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);

// Step 3: check auth
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,6 @@ private enum PublishResult {
// transactionId -> running TransactionState
private final Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();

/**
* the multi table ids that are in transaction, used to check whether a table is in transaction
* multi table transaction state
* txnId -> tableId list
*/
private final ConcurrentHashMap<Long, List<Long>> multiTableRunningTransactionTableIdMaps =
new ConcurrentHashMap<>();

// transactionId -> final status TransactionState
private final Map<Long, TransactionState> idToFinalStatusTransactionState = Maps.newHashMap();
private final Map<Long, Long> subTxnIdToTxnId = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -478,8 +470,13 @@ public void preCommitTransaction2PC(List<Table> tableList, long transactionId,
checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds,
tableToPartition, totalInvolvedBackends);

unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition,
totalInvolvedBackends, db);
writeLock();
try {
unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition,
totalInvolvedBackends, db);
} finally {
writeUnlock();
}
LOG.info("transaction:[{}] successfully pre-committed", transactionState);
}

Expand Down Expand Up @@ -850,7 +847,7 @@ public void commitTransaction(List<Table> tableList, long transactionId, List<Ta
LOG.info("transaction:[{}] successfully committed", transactionState);
}

public void commitTransaction(long transactionId, List<Table> tableList,
protected void commitTransaction(long transactionId, List<Table> tableList,
List<SubTransactionState> subTransactionStates) throws UserException {
// check status
// the caller method already own tables' write lock
Expand Down Expand Up @@ -1538,8 +1535,18 @@ private PartitionCommitInfo generatePartitionCommitInfo(OlapTable table, long pa
protected void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
Map<Long, Set<Long>> tableToPartition, Set<Long> totalInvolvedBackends,
Database db) {
checkBeforeUnprotectedCommitTransaction(transactionState, errorReplicaIds);

// transaction state is modified during check if the transaction could committed
if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
return;
}
// update transaction state version
long commitTime = System.currentTimeMillis();
transactionState.setCommitTime(commitTime);
if (MetricRepo.isInit) {
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(commitTime - transactionState.getPrepareTime());
}
transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
transactionState.setErrorReplicas(errorReplicaIds);
for (long tableId : tableToPartition.keySet()) {
OlapTable table = (OlapTable) db.getTableNullable(tableId);
TableCommitInfo tableCommitInfo = new TableCommitInfo(tableId);
Expand All @@ -1555,7 +1562,9 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S
transactionState.setInvolvedBackends(totalInvolvedBackends);
}

private void checkBeforeUnprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds) {
protected void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
Map<Long, Set<Long>> subTxnToPartition, Set<Long> totalInvolvedBackends,
List<SubTransactionState> subTransactionStates, Database db) {
// transaction state is modified during check if the transaction could committed
if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE) {
return;
Expand All @@ -1569,15 +1578,6 @@ private void checkBeforeUnprotectedCommitTransaction(TransactionState transactio
transactionState.setTransactionStatus(TransactionStatus.COMMITTED);
transactionState.setErrorReplicas(errorReplicaIds);

// persist transactionState
unprotectUpsertTransactionState(transactionState, false);
}

protected void unprotectedCommitTransaction(TransactionState transactionState, Set<Long> errorReplicaIds,
Map<Long, Set<Long>> subTxnToPartition, Set<Long> totalInvolvedBackends,
List<SubTransactionState> subTransactionStates, Database db) {
checkBeforeUnprotectedCommitTransaction(transactionState, errorReplicaIds);

Map<Long, List<SubTransactionState>> tableToSubTransactionState = new HashMap<>();
for (SubTransactionState subTransactionState : subTransactionStates) {
long tableId = subTransactionState.getTable().getId();
Expand Down Expand Up @@ -2199,14 +2199,14 @@ private void updatePartitionNextVersion(TransactionState transactionState, Datab
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("txn_id={}, partition to next version={}", transactionState.getTransactionId(),
LOG.debug("txn_id={}, partition to version={}", transactionState.getTransactionId(),
partitionToVersionMap);
}
for (Entry<Partition, Long> entry : partitionToVersionMap.entrySet()) {
Partition partition = entry.getKey();
long version = entry.getValue();
partition.setNextVersion(version + 1);
LOG.debug("set partition={}, next_version={}", partition.getId(), partition.getNextVersion());
LOG.debug("set partition={}, next_version={}", partition.getId(), version + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void preCommitTransaction2PC(Database db, List<Table> tableList, long tra
}
}

public void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId,
private void preCommitTransaction2PC(long dbId, List<Table> tableList, long transactionId,
List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment)
throws UserException {
if (Config.disable_load_job) {
Expand All @@ -214,6 +214,7 @@ public void preCommitTransaction2PC(long dbId, List<Table> tableList, long trans
dbTransactionMgr.preCommitTransaction2PC(tableList, transactionId, tabletCommitInfos, txnCommitAttachment);
}

@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException {
Expand Down Expand Up @@ -634,6 +635,7 @@ public void abortTxnWhenCoordinateBeRestart(long coordinateBeId, String coordina
TransactionState transactionState = dbTransactionMgr.getTransactionState(txnInfo.second);
long coordStartTime = transactionState.getCoordinator().startTime;
if (coordStartTime > 0 && coordStartTime < beStartTime) {
// does not hold table write lock
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE restart", null);
}
} catch (UserException e) {
Expand All @@ -652,6 +654,7 @@ public void abortTxnWhenCoordinateBeDown(long coordinateBeId, String coordinateH
= getPrepareTransactionIdByCoordinateBe(coordinateBeId, coordinateHost, limit);
for (Pair<Long, Long> txnInfo : transactionIdByCoordinateBe) {
try {
// does not hold table write lock
DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(txnInfo.first);
dbTransactionMgr.abortTransaction(txnInfo.second, "coordinate BE is down", null);
} catch (UserException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public void preCommitTransaction2PC(Database db, List<Table> tableList, long tra
TxnCommitAttachment txnCommitAttachment)
throws UserException;

@Deprecated
public void commitTransaction(long dbId, List<Table> tableList,
long transactionId, List<TabletCommitInfo> tabletCommitInfos)
throws UserException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null);
transactionId, null, null);
}

@Test
Expand All @@ -220,7 +220,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null);
transactionId, null, null);
}

@Test
Expand All @@ -247,7 +247,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null);
transactionId, null, null);
});
}

Expand Down Expand Up @@ -279,7 +279,7 @@ public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request) {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId, null);
transactionId, null, null);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public Map<String, Long> addTransactionToTransactionMgr() throws UserException {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId1,
transTablets);
transTablets, null);
TransactionState transactionState1 = fakeEditLog.getTransaction(transactionId1);
Map<String, Map<Long, Long>> keyToSuccessTablets = new HashMap<>();
DatabaseTransactionMgrTest.setSuccessTablet(keyToSuccessTablets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void testCommitTransaction() throws UserException {
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
.getTableOrMetaException(CatalogTestUtil.testTableId1);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets);
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down Expand Up @@ -210,7 +210,7 @@ public void testCommitTransactionWithOneFailed() throws UserException {
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2));
// commit txn
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets);
transTablets, null);
checkVersion(testTable1, CatalogTestUtil.testPartition1, CatalogTestUtil.testIndexId1,
CatalogTestUtil.testTabletId1, CatalogTestUtil.testStartVersion,
CatalogTestUtil.testStartVersion + 2,
Expand Down Expand Up @@ -241,7 +241,7 @@ public void testCommitTransactionWithOneFailed() throws UserException {
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId3));
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2, transTablets);
transactionId2, transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
TransactionState transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1, transactionId2);
Expand All @@ -261,7 +261,7 @@ public void testCommitTransactionWithOneFailed() throws UserException {
if (true) {
List<TabletCommitInfo> transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets);
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is committed
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down Expand Up @@ -430,7 +430,7 @@ public void testFinishTransaction() throws UserException {
OlapTable testTable1 = (OlapTable) (masterEnv.getInternalCatalog()
.getDbOrMetaException(CatalogTestUtil.testDbId1).getTableOrMetaException(CatalogTestUtil.testTableId1));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets);
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
checkTableVersion(testTable1, 1, 2);
Expand Down Expand Up @@ -498,7 +498,7 @@ public void testFinishTransactionWithOneFailed() throws UserException {
List<TabletCommitInfo> transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1,
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId2));
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId,
transTablets);
transTablets, null);

// follower catalog replay the transaction
TransactionState transactionState = fakeEditLog.getTransaction(transactionId);
Expand Down Expand Up @@ -563,7 +563,7 @@ public void testFinishTransactionWithOneFailed() throws UserException {
Lists.newArrayList(CatalogTestUtil.testBackendId1, CatalogTestUtil.testBackendId3));
try {
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1),
transactionId2, transTablets);
transactionId2, transTablets, null);
Assert.fail();
} catch (TabletQuorumFailedException e) {
TransactionState transactionState = masterTransMgr.getTransactionState(CatalogTestUtil.testDbId1,
Expand All @@ -577,7 +577,7 @@ public void testFinishTransactionWithOneFailed() throws UserException {
if (true) {
List<TabletCommitInfo> transTablets = generateTabletCommitInfos(CatalogTestUtil.testTabletId1, allBackends);
masterTransMgr.commitTransaction(CatalogTestUtil.testDbId1, Lists.newArrayList(testTable1), transactionId2,
transTablets);
transTablets, null);
TransactionState transactionState = fakeEditLog.getTransaction(transactionId2);
// check status is commit
Assert.assertEquals(TransactionStatus.COMMITTED, transactionState.getTransactionStatus());
Expand Down
Loading