Skip to content

Commit

Permalink
[Fix]unified cancel method in coordinator (#41038)
Browse files Browse the repository at this point in the history
## Proposed changes
Coordinator has two cancel method, with args and without args.
They should has the same behavior, so query queue cancel should be moved
to cancel method with args.
  • Loading branch information
wangbo authored Sep 22, 2024
1 parent 63f957d commit 858b530
Show file tree
Hide file tree
Showing 15 changed files with 43 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
Expand All @@ -45,6 +46,7 @@
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -316,7 +318,7 @@ protected void unprotectedExecuteRetry(FailMsg failMsg) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, "load job failed"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.proc.CurrentQueryStatementsProcNode;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.profile.ProfileTreeNode;
Expand All @@ -38,6 +39,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -576,7 +578,7 @@ public Object killQuery(HttpServletRequest request, HttpServletResponse response
}

ExecuteEnv env = ExecuteEnv.getInstance();
env.getScheduler().cancelQuery(queryId);
env.getScheduler().cancelQuery(queryId, new Status(TStatusCode.CANCELLED, "cancel query by rest api"));
return ResponseEntityBuilder.ok();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
Expand All @@ -33,6 +34,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -219,7 +221,7 @@ protected void executeCancelLogic() {
}
isCanceled.getAndSet(true);
if (null != stmtExecutor) {
stmtExecutor.cancel();
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "insert task cancelled"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
Expand All @@ -50,6 +51,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -258,7 +260,7 @@ public synchronized void onSuccess() throws JobException {
protected synchronized void executeCancelLogic() {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
executor.cancel();
executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task cancelled"));
}
after();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Status;
import org.apache.doris.load.ExportFailMsg.CancelType;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
Expand All @@ -35,6 +36,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -156,7 +158,7 @@ public void cancel() throws JobException {
}
isCanceled.getAndSet(true);
if (stmtExecutor != null) {
stmtExecutor.cancel();
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "export task cancelled"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
Expand All @@ -57,6 +58,7 @@
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -607,7 +609,7 @@ protected void unprotectedExecuteRetry(FailMsg failMsg) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, failMsg.getMsg()));
}
}

Expand Down Expand Up @@ -671,7 +673,7 @@ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, failMsg.getMsg()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
Expand All @@ -37,6 +38,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -146,7 +148,7 @@ protected final void execImpl(StmtExecutor executor, long jobId) throws Exceptio
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, "insert timeout"));
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void kill(boolean killConnection) {
closeChannel();
}
// Now, cancel running query.
cancelQuery();
cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by user"));
}

// kill operation with no protect by timeout.
Expand All @@ -960,10 +960,10 @@ private void killByTimeout(boolean killConnection) {
}
}

public void cancelQuery() {
public void cancelQuery(Status cancelReason) {
StmtExecutor executorRef = executor;
if (executorRef != null) {
executorRef.cancel();
executorRef.cancel(cancelReason);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
Expand Down Expand Up @@ -145,11 +146,11 @@ public ConnectContext getContext(String flightToken) {
return null;
}

public void cancelQuery(String queryId) {
public void cancelQuery(String queryId, Status cancelReason) {
for (ConnectContext ctx : connectionMap.values()) {
TUniqueId qid = ctx.queryId();
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
ctx.cancelQuery();
ctx.cancelQuery(cancelReason);
break;
}
}
Expand Down
11 changes: 2 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1278,18 +1278,11 @@ public Status shouldCancel(List<Backend> currentBackends) {
}
}

// Cancel execution of query. This includes the execution of the local plan
// fragment,
// if any, as well as all plan fragments on remote nodes.
public void cancel() {
cancel(new Status(TStatusCode.CANCELLED, "query is cancelled by user"));
@Override
public void cancel(Status cancelReason) {
if (queueToken != null) {
queueToken.cancel();
}
}

@Override
public void cancel(Status cancelReason) {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
Expand Down
18 changes: 2 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,7 @@ private void resetAnalyzerAndStmt() {
}

// Because this is called by other thread
public void cancel() {
public void cancel(Status cancelReason) {
if (masterOpExecutor != null) {
try {
masterOpExecutor.cancel();
Expand All @@ -1544,7 +1544,7 @@ public void cancel() {
}
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel();
coordRef.cancel(cancelReason);
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
Expand All @@ -1570,20 +1570,6 @@ private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
return Optional.empty();
}

// Because this is called by other thread
public void cancel(Status cancelReason) {
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel(cancelReason);
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
}
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
}

// Handle kill statement.
private void handleKill() throws UserException {
KillStmt killStmt = (KillStmt) parsedStmt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.doris.resource.workloadschedpolicy;

import org.apache.doris.common.Status;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -32,7 +34,7 @@ public void exec(WorkloadQueryInfo queryInfo) {
&& queryInfo.tUniqueId != null
&& QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId);
queryInfo.context.cancelQuery();
queryInfo.context.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by workload policy"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherException;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
Expand Down Expand Up @@ -1065,7 +1066,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
TUniqueId queryId = params.getQueryId();
ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId);
if (ctx != null) {
ctx.cancelQuery();
ctx.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by forward request."));
}
final TMasterOpResult result = new TMasterOpResult();
result.setStatusCode(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.doris.service.arrowflight.sessions;

import org.apache.doris.common.Status;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -74,7 +76,7 @@ public void kill(boolean killConnection) {
connectScheduler.unregisterConnection(this);
}
// Now, cancel running query.
cancelQuery();
cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query killed by user"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AuditLogHelper;
Expand All @@ -36,6 +37,7 @@
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -267,7 +269,7 @@ protected void setTaskStateToRunning() {
public void cancel() {
killed = true;
if (stmtExecutor != null) {
stmtExecutor.cancel();
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "analysis task cancelled"));
}
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.FAILED,
Expand Down

0 comments on commit 858b530

Please sign in to comment.