Skip to content

Commit

Permalink
fix cloud case
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed May 22, 2024
1 parent 16a2691 commit 7b99d17
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public class TransactionEntry {
private long transactionId = -1;
private TransactionState transactionState;
private long timeoutTimestamp = -1;
// 1. For cloud mode, we keep subTransactionStates in TransactionEntry;
// 2. For doris, we keep subTransactionStates in TransactionState, because if executed in observer,
// the dml statements will be forwarded to master, so keep the subTransactionStates is in master.
private List<SubTransactionState> subTransactionStates = new ArrayList<>();

public TransactionEntry() {
}
Expand Down Expand Up @@ -193,7 +197,7 @@ public long beginTransaction(TableIf table) throws Exception {
if (!isTransactionBegan) {
long timeoutSecond = ConnectContext.get().getExecTimeout();
this.timeoutTimestamp = System.currentTimeMillis() + timeoutSecond * 1000;
if (Env.getCurrentEnv().isMaster()) {
if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
this.transactionId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), Lists.newArrayList(table.getId()), label,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
Expand Down Expand Up @@ -240,7 +244,7 @@ public long beginTransaction(TableIf table) throws Exception {
public TransactionStatus commitTransaction() throws Exception {
if (isTransactionBegan) {
try {
if (Env.getCurrentEnv().isMaster()) {
if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
beforeFinishTransaction();
if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(database, transactionId,
transactionState.getSubTransactionStates(),
Expand Down Expand Up @@ -299,7 +303,7 @@ public long abortTransaction() throws Exception {

private long abortTransaction(String reason) throws Exception {
if (isTransactionBegan) {
if (Env.getCurrentEnv().isMaster()) {
if (Env.getCurrentEnv().isMaster() || Config.isCloudMode()) {
beforeFinishTransaction();
Env.getCurrentGlobalTransactionMgr().abortTransaction(database.getId(), transactionId, reason);
return transactionId;
Expand All @@ -325,7 +329,9 @@ private void beforeFinishTransaction() {
if (isTransactionBegan) {
List<Long> tableIds = transactionState.getTableIdList().stream().distinct().collect(Collectors.toList());
transactionState.setTableIdList(tableIds);
transactionState.getSubTransactionStates().sort((s1, s2) -> {
List<SubTransactionState> subTransactionStatesPtr = Config.isCloudMode() ? subTransactionStates
: transactionState.getSubTransactionStates();
subTransactionStatesPtr.sort((s1, s2) -> {
if (s1.getSubTransactionType() == SubTransactionType.INSERT
&& s2.getSubTransactionType() == SubTransactionType.DELETE) {
return 1;
Expand All @@ -336,6 +342,9 @@ private void beforeFinishTransaction() {
return Long.compare(s1.getSubTransactionId(), s2.getSubTransactionId());
}
});
if (Config.isCloudMode()) {
transactionState.setSubTransactionStates(subTransactionStatesPtr);
}
LOG.info("subTransactionStates={}", transactionState.getSubTransactionStates());
transactionState.resetSubTxnIds();
}
Expand Down Expand Up @@ -374,13 +383,15 @@ public void addTabletCommitInfos(long subTxnId, Table table, List<TTabletCommitI
LOG.debug("label={}, txn_id={}, sub_txn_id={}, table={}, commit_infos={}",
label, transactionId, subTxnId, table, commitInfos);
}
transactionState.getSubTransactionStates()
List<SubTransactionState> subTransactionStatesPtr = Config.isCloudMode() ? subTransactionStates
: transactionState.getSubTransactionStates();
subTransactionStatesPtr
.add(new SubTransactionState(subTxnId, table, commitInfos, subTransactionType));
Preconditions.checkState(
transactionState.getTableIdList().size() == transactionState.getSubTransactionStates().size(),
"txn_id=" + transactionId + ", expect table_list=" + transactionState.getSubTransactionStates().stream()
.map(s -> s.getTable().getId()).collect(Collectors.toList()) + ", real table_list="
+ transactionState.getTableIdList());
Preconditions.checkState(transactionState.getTableIdList().size() == subTransactionStatesPtr.size(),
"txn_id=" + transactionId
+ ", expect table_list="
+ subTransactionStatesPtr.stream().map(s -> s.getTable().getId()).collect(Collectors.toList())
+ ", real table_list=" + transactionState.getTableIdList());
}

public boolean isTransactionBegan() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,10 @@ public void resetSubTransactionStates() {
this.subTransactionStates = new ArrayList<>();
}

public void setSubTransactionStates(List<SubTransactionState> subTransactionStates) {
this.subTransactionStates = subTransactionStates;
}

public void resetSubTxnIds() {
this.subTxnIds = subTransactionStates.stream().map(SubTransactionState::getSubTransactionId)
.collect(Collectors.toList());
Expand Down

0 comments on commit 7b99d17

Please sign in to comment.