Skip to content

Commit

Permalink
[fix](timeout) unify query_timeout, use setting from user property first
Browse files Browse the repository at this point in the history
  • Loading branch information
Yulei-Yang committed Jul 10, 2024
1 parent 9087f1e commit 911041c
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public void analyze(Analyzer analyzer) throws UserException {

db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
// create label and begin transaction
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
long timeoutSecond = ConnectContext.get().getExecTimeoutS();
if (Strings.isNullOrEmpty(label)) {
label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ public final synchronized void makeSureInitialized() {
if (!initialized) {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getSessionVariable()
.getQueryTimeoutS();
int waitTimeOut = ConnectContext.get() == null ? 300 : (int) ConnectContext.get().getExecTimeoutS();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(extCatalog.getId(), id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public final synchronized void makeSureInitialized() {
if (!Env.getCurrentEnv().isMaster()) {
// Forward to master and wait the journal to replay.
int waitTimeOut = ConnectContext.get() == null ? 300
: ConnectContext.get().getSessionVariable().getQueryTimeoutS();
: (int) ConnectContext.get().getExecTimeoutS();
MasterCatalogExecutor remoteExecutor = new MasterCatalogExecutor(waitTimeOut * 1000);
try {
remoteExecutor.forward(id, -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ public static UpdateStmtExecutor fromUpdateStmt(UpdateStmt updateStmt) throws An
updateStmtExecutor.dbId = database.getId();
updateStmtExecutor.analyzer = updateStmt.getAnalyzer();
updateStmtExecutor.queryId = updateStmtExecutor.analyzer.getContext().queryId();
updateStmtExecutor.timeoutSecond = updateStmtExecutor.analyzer.getContext()
.getSessionVariable().getQueryTimeoutS();
updateStmtExecutor.timeoutSecond = (int) updateStmtExecutor.analyzer.getContext()
.getExecTimeoutS();
updateStmtExecutor.updatePlanner = new UpdatePlanner(updateStmtExecutor.dbId, updateStmtExecutor.targetTable,
updateStmt.getSetExprs(), updateStmt.getSrcTupleDesc(),
updateStmt.getAnalyzer());
Expand Down
29 changes: 13 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ public void setUserQueryTimeout(long queryTimeout) {
this.userQueryTimeout = queryTimeout;
}

/**
* use query_timeout from user property first, and then from session variable
*
* @return exact execution timeout in second
*/
public long getExecTimeoutS() {
return userQueryTimeout > 0 ? userQueryTimeout : sessionVariable.getQueryTimeoutS();
}

private StatementContext statementContext;

public SessionContext getSessionContext() {
Expand Down Expand Up @@ -571,23 +580,11 @@ public void checkTimeout(long now) {
killConnection = true;
}
} else {
if (userQueryTimeout > 0) {
// user set query_timeout property
if (delta > userQueryTimeout * 1000) {
LOG.warn("kill query timeout, remote: {}, query timeout: {}",
getMysqlChannel().getRemoteHostPortString(), userQueryTimeout);
if (delta > getExecTimeoutS() * 1000) {
LOG.warn("kill query timeout, remote: {}, query timeout: {}",
getMysqlChannel().getRemoteHostPortString(), getExecTimeoutS());

killFlag = true;
}
} else {
// default use session query_timeout
if (delta > sessionVariable.getQueryTimeoutS() * 1000) {
LOG.warn("kill query timeout, remote: {}, query timeout: {}",
getMysqlChannel().getRemoteHostPortString(), sessionVariable.getQueryTimeoutS());

// Only kill
killFlag = true;
}
killFlag = true;
}
}
if (killFlag) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, Redirect
this.originStmt = originStmt;
this.ctx = ctx;
if (status.isNeedToWaitJournalSync()) {
this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.waitTimeoutMs = (int) ctx.getExecTimeoutS() * 1000;
} else {
this.waitTimeoutMs = 0;
}
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.thriftTimeoutMs = (int) ctx.getExecTimeoutS() * 1000;
// if isQuery=false, we shouldn't retry twice when catch exception because of Idempotency
this.shouldNotRetry = !isQuery;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class MasterTxnExecutor {

public MasterTxnExecutor(ConnectContext ctx) {
this.ctx = ctx;
this.waitTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.thriftTimeoutMs = ctx.getSessionVariable().getQueryTimeoutS() * 1000;
this.waitTimeoutMs = (int) ctx.getExecTimeoutS() * 1000;
this.thriftTimeoutMs = (int) ctx.getExecTimeoutS() * 1000;
}

private TNetworkAddress getMasterAddress() throws TException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ private void beginTxn(String dbName, String tblName) throws UserException, TExce
InterruptedException, ExecutionException, TimeoutException {
TransactionEntry txnEntry = context.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
long timeoutSecond = ConnectContext.get().getExecTimeoutS();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
Database dbObj = Env.getCurrentInternalCatalog()
.getDbOrException(dbName, s -> new TException("database is invalid for dbName: " + s));
Expand Down Expand Up @@ -1478,7 +1478,7 @@ private void handleInsertStmt() throws Exception {

coord.exec();

boolean notTimeout = coord.join(context.getSessionVariable().getQueryTimeoutS());
boolean notTimeout = coord.join((int) context.getExecTimeoutS());
if (!coord.isDone()) {
coord.cancel();
if (notTimeout) {
Expand Down

0 comments on commit 911041c

Please sign in to comment.