Skip to content

Commit

Permalink
[fix](txn load) Fix txn insert connect to follower fe (#35075)
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi authored May 22, 2024
1 parent 454cdff commit b699cf3
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 63 deletions.
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -724,6 +725,13 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
}
}

// set transaction entry for transaction load
if (request.isSetTxnLoadInfo()) {
TransactionEntry transactionEntry = new TransactionEntry();
transactionEntry.setTxnInfoInMaster(request.getTxnLoadInfo());
ctx.setTxnEntry(transactionEntry);
}

TUniqueId queryId; // This query id will be set in ctx
if (request.isSetQueryId()) {
queryId = request.getQueryId();
Expand Down Expand Up @@ -758,6 +766,13 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
result.setStatus(ctx.getState().toString());
if (ctx.getState().getStateType() == MysqlStateType.OK) {
result.setStatusCode(0);
if (request.isSetTxnLoadInfo()) {
TransactionEntry transactionEntry = ConnectContext.get().getTxnEntry();
// null if this is a commit or rollback command
if (transactionEntry != null) {
result.setTxnLoadInfo(transactionEntry.getTxnInfoInMaster());
}
}
} else {
result.setStatusCode(ctx.getState().getErrorCode().getCode());
result.setErrMessage(ctx.getState().getErrorMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
Expand Down Expand Up @@ -84,6 +85,14 @@ public MasterOpExecutor(ConnectContext ctx) {

public void execute() throws Exception {
result = forward(buildStmtForwardParams());
if (result.getStatusCode() == 0 && ctx.isTxnModel()) {
if (result.isSetTxnLoadInfo()) {
ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo());
} else {
ctx.setTxnEntry(null);
LOG.info("set txn entry to null");
}
}
waitOnReplaying();
}

Expand Down Expand Up @@ -175,7 +184,7 @@ private TMasterOpResult forward(TNetworkAddress thriftAddress, TMasterOpRequest
}
}

private TMasterOpRequest buildStmtForwardParams() {
private TMasterOpRequest buildStmtForwardParams() throws AnalysisException {
TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
Expand Down Expand Up @@ -203,6 +212,10 @@ private TMasterOpRequest buildStmtForwardParams() {
if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}
// set transaction load info
if (ctx.isTxnModel()) {
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
}
return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@ public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest requ

FrontendService.Client client = getClient(thriftAddress);

LOG.info("Send waiting transaction status {} to Master {}", ctx.getStmtId(), thriftAddress);
LOG.info("Send waiting transaction status stmtId={}, txnId={} to Master {}", ctx.getStmtId(),
request.getTxnId(), thriftAddress);

boolean isReturnToPool = false;
try {
TWaitingTxnStatusResult result = client.waitingTxnStatus(request);
isReturnToPool = true;
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new TException("get txn status failed.");
throw new TException(
"get txn status (id=" + request.getTxnId() + ") failed, status code: " + result.getStatus()
.getStatusCode() + ", msg: "
+ result.getStatus().getErrorMsgs() + ".");
}
return result;
} catch (TTransportException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,11 @@ public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest requ
throws AnalysisException, TimeoutException {
long dbId = request.getDbId();
int commitTimeoutSec = Config.commit_timeout_second;
TransactionStatus txnStatus = null;
for (int i = 0; i < commitTimeoutSec; ++i) {
Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbId);
TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult();
statusResult.status = new TStatus();
TransactionStatus txnStatus = null;
if (request.isSetTxnId()) {
long txnId = request.getTxnId();
TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(dbId, txnId);
Expand All @@ -551,7 +551,13 @@ public TWaitingTxnStatusResult getWaitingTxnStatus(TWaitingTxnStatusRequest requ
LOG.info("commit sleep exception.", e);
}
}
throw new TimeoutException("Operation is timeout");
if (txnStatus == TransactionStatus.COMMITTED) {
TWaitingTxnStatusResult statusResult = new TWaitingTxnStatusResult();
statusResult.status = new TStatus();
statusResult.setTxnStatusId(txnStatus.value());
return statusResult;
}
throw new TimeoutException("Operation is timeout, txn status is " + txnStatus);
}

@Override
Expand Down
Loading

0 comments on commit b699cf3

Please sign in to comment.