From 7b99d1712330a09e75ab606100b0965aa8a8877d Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 22 May 2024 12:17:50 +0800 Subject: [PATCH] fix cloud case --- .../doris/transaction/TransactionEntry.java | 31 +++++++++++++------ .../doris/transaction/TransactionState.java | 4 +++ 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 62cc059e5574d7e..7672384d959133e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -83,6 +83,10 @@ public class TransactionEntry { private long transactionId = -1; private TransactionState transactionState; private long timeoutTimestamp = -1; + // 1. For cloud mode, we keep subTransactionStates in TransactionEntry; + // 2. For doris, we keep subTransactionStates in TransactionState, because if executed in observer, + // the dml statements will be forwarded to master, so keep the subTransactionStates is in master. + private List subTransactionStates = new ArrayList<>(); public TransactionEntry() { } @@ -193,7 +197,7 @@ public long beginTransaction(TableIf table) throws Exception { if (!isTransactionBegan) { long timeoutSecond = ConnectContext.get().getExecTimeout(); this.timeoutTimestamp = System.currentTimeMillis() + timeoutSecond * 1000; - if (Env.getCurrentEnv().isMaster()) { + if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) { this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), Lists.newArrayList(table.getId()), label, new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), @@ -240,7 +244,7 @@ public long beginTransaction(TableIf table) throws Exception { public TransactionStatus commitTransaction() throws Exception { if (isTransactionBegan) { try { - if (Env.getCurrentEnv().isMaster()) { + if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) { beforeFinishTransaction(); if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database, transactionId, transactionState.getSubTransactionStates(), @@ -299,7 +303,7 @@ public long abortTransaction() throws Exception { private long abortTransaction(String reason) throws Exception { if (isTransactionBegan) { - if (Env.getCurrentEnv().isMaster()) { + if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) { beforeFinishTransaction(); Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(), transactionId, reason); return transactionId; @@ -325,7 +329,9 @@ private void beforeFinishTransaction() { if (isTransactionBegan) { List tableIds = transactionState.getTableIdList().stream().distinct().collect(Collectors.toList()); transactionState.setTableIdList(tableIds); - transactionState.getSubTransactionStates().sort((s1, s2) -> { + List subTransactionStatesPtr = Config.isCloudMode() ? subTransactionStates + : transactionState.getSubTransactionStates(); + subTransactionStatesPtr.sort((s1, s2) -> { if (s1.getSubTransactionType() == SubTransactionType.INSERT && s2.getSubTransactionType() == SubTransactionType.DELETE) { return 1; @@ -336,6 +342,9 @@ private void beforeFinishTransaction() { return Long.compare(s1.getSubTransactionId(), s2.getSubTransactionId()); } }); + if (Config.isCloudMode()) { + transactionState.setSubTransactionStates(subTransactionStatesPtr); + } LOG.info("subTransactionStates={}", transactionState.getSubTransactionStates()); transactionState.resetSubTxnIds(); } @@ -374,13 +383,15 @@ public void addTabletCommitInfos(long subTxnId, Table table, List subTransactionStatesPtr = Config.isCloudMode() ? subTransactionStates + : transactionState.getSubTransactionStates(); + subTransactionStatesPtr .add(new SubTransactionState(subTxnId, table, commitInfos, subTransactionType)); - Preconditions.checkState( - transactionState.getTableIdList().size() == transactionState.getSubTransactionStates().size(), - "txn_id=" + transactionId + ", expect table_list=" + transactionState.getSubTransactionStates().stream() - .map(s -> s.getTable().getId()).collect(Collectors.toList()) + ", real table_list=" - + transactionState.getTableIdList()); + Preconditions.checkState(transactionState.getTableIdList().size() == subTransactionStatesPtr.size(), + "txn_id=" + transactionId + + ", expect table_list=" + + subTransactionStatesPtr.stream().map(s -> s.getTable().getId()).collect(Collectors.toList()) + + ", real table_list=" + transactionState.getTableIdList()); } public boolean isTransactionBegan() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 652f22852f0a9ee..f628c945b6cd1ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -859,6 +859,10 @@ public void resetSubTransactionStates() { this.subTransactionStates = new ArrayList<>(); } + public void setSubTransactionStates(List subTransactionStates) { + this.subTransactionStates = subTransactionStates; + } + public void resetSubTxnIds() { this.subTxnIds = subTransactionStates.stream().map(SubTransactionState::getSubTransactionId) .collect(Collectors.toList());