Skip to content

Commit

Permalink
[fix](Nereids) lock table when generate distribute plan (#38950) (#39037
Browse files Browse the repository at this point in the history
)


We should lock table when generate distribute plan, because insert overwrite by async materialized view will drop partitions parallel, and query thread will throw exception:
```
java.lang.RuntimeException: Cannot invoke "org.apache.doris.catalog.Partition.getBaseIndex()" because "partition" is null
    at org.apache.doris.nereids.util.Utils.execWithUncheckedException(Utils.java:76) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.glue.translator.PhysicalPlanTranslator.translatePlan(PhysicalPlanTranslator.java:278) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.NereidsPlanner.splitFragments(NereidsPlanner.java:341) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.NereidsPlanner.distribute(NereidsPlanner.java:400) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.NereidsPlanner.plan(NereidsPlanner.java:147) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:796) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:605) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.queryRetry(StmtExecutor.java:558) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:548) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:385) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:237) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:260) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:288) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:342) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52) ~[doris-fe.jar:1.2-SNAPSHOT]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.doris.catalog.Partition.getBaseIndex()" because "partition" is null
    at org.apache.doris.planner.OlapScanNode.mockRowCountInStatistic(OlapScanNode.java:589) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.planner.OlapScanNode.finalizeForNereids(OlapScanNode.java:1733) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.nereids.util.Utils.execWithUncheckedException(Utils.java:74) ~[doris-fe.jar:1.2-SNAPSHOT]
    ... 17 more
2024-07-29 00:46:17,608 WARN (mysql-nio-pool-114|201) Analyze failed. stmt[210035, 49d3041004ba4b6a-b07fe4491d03c5de]
org.apache.doris.common.NereidsException: errCode = 2, detailMessage = Cannot invoke "org.apache.doris.catalog.Partition.getBaseIndex()" because "partition" is null
    at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:803) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:605) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.queryRetry(StmtExecutor.java:558) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.StmtExecutor.execute(StmtExecutor.java:548) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.executeQuery(ConnectProcessor.java:385) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.ConnectProcessor.handleQuery(ConnectProcessor.java:237) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.handleQuery(MysqlConnectProcessor.java:260) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.dispatch(MysqlConnectProcessor.java:288) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.qe.MysqlConnectProcessor.processOnce(MysqlConnectProcessor.java:342) ~[doris-fe.jar:1.2-SNAPSHOT]
    at org.apache.doris.mysql.ReadListener.lambda$handleEvent$0(ReadListener.java:52) ~[doris-fe.jar:1.2-SNAPSHOT]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
    at java.lang.Thread.run(Thread.java:833) ~[?:?]
```

this exception is too hard to reproduce, so I can not write a test case
  • Loading branch information
924060929 authored Aug 7, 2024
1 parent 7d27634 commit 17b72a6
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 121 deletions.
225 changes: 118 additions & 107 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -123,43 +124,43 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
setParsedPlan(parsedPlan);
PhysicalProperties requireProperties = buildInitRequireProperties();
statementContext.getStopwatch().reset().start();
Plan resultPlan = null;
try {
resultPlan = plan(parsedPlan, requireProperties, explainLevel);
planWithLock(parsedPlan, requireProperties, explainLevel, plan -> {
setOptimizedPlan(plan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) plan;
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext);
PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(planTranslatorContext,
statementContext.getConnectContext().getStatsErrorEstimator());
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsTranslateTime();
}
if (cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) {
CounterEvent.clearCounter();
}
if (cascadesContext.getConnectContext().getSessionVariable().isPlayNereidsDump()) {
return;
}
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);

scanNodeList.addAll(planTranslatorContext.getScanNodes());
descTable = planTranslatorContext.getDescTable();
fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());
for (int seq = 0; seq < fragments.size(); seq++) {
fragments.get(seq).setFragmentSequenceNum(seq);
}
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());
ArrayList<String> columnLabelList = physicalPlan.getOutput().stream().map(NamedExpression::getName)
.collect(Collectors.toCollection(ArrayList::new));
logicalPlanAdapter.setColLabels(columnLabelList);
logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
});
} finally {
statementContext.getStopwatch().stop();
}
setOptimizedPlan(resultPlan);
if (explainLevel.isPlanLevel) {
return;
}
physicalPlan = (PhysicalPlan) resultPlan;
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext(cascadesContext);
PhysicalPlanTranslator physicalPlanTranslator = new PhysicalPlanTranslator(planTranslatorContext,
statementContext.getConnectContext().getStatsErrorEstimator());
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsTranslateTime();
}
if (cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) {
CounterEvent.clearCounter();
}
if (cascadesContext.getConnectContext().getSessionVariable().isPlayNereidsDump()) {
return;
}
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);

scanNodeList.addAll(planTranslatorContext.getScanNodes());
descTable = planTranslatorContext.getDescTable();
fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());
for (int seq = 0; seq < fragments.size(); seq++) {
fragments.get(seq).setFragmentSequenceNum(seq);
}
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());
ArrayList<String> columnLabelList = physicalPlan.getOutput().stream().map(NamedExpression::getName)
.collect(Collectors.toCollection(ArrayList::new));
logicalPlanAdapter.setColLabels(columnLabelList);
logicalPlanAdapter.setViewDdlSqls(statementContext.getViewDdlSqls());
}

@VisibleForTesting
Expand All @@ -171,19 +172,22 @@ public void plan(StatementBase queryStmt) {
}
}

public PhysicalPlan plan(LogicalPlan plan, PhysicalProperties outputProperties) {
return (PhysicalPlan) plan(plan, outputProperties, ExplainLevel.NONE);
public PhysicalPlan planWithLock(LogicalPlan plan, PhysicalProperties outputProperties) {
Consumer<Plan> noCallback = p -> {};
return (PhysicalPlan) planWithLock(plan, outputProperties, ExplainLevel.NONE, noCallback);
}

/**
* Do analyze and optimize for query plan.
*
* @param plan wait for plan
* @param requireProperties request physical properties constraints
* @param lockCallback this callback function will invoke the table lock
* @return plan generated by this planner
* @throws AnalysisException throw exception if failed in ant stage
*/
public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel) {
public Plan planWithLock(LogicalPlan plan, PhysicalProperties requireProperties, ExplainLevel explainLevel,
Consumer<Plan> lockCallback) {
if (explainLevel == ExplainLevel.PARSED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
parsedPlan = plan;
if (explainLevel == ExplainLevel.PARSED_PLAN) {
Expand All @@ -197,89 +201,96 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties, Explain
initCascadesContext(plan, requireProperties);

try (Lock lock = new Lock(plan, cascadesContext)) {
// resolve column, table and function
Span queryAnalysisSpan =
statementContext.getConnectContext().getTracer()
.spanBuilder("query analysis").setParent(Context.current()).startSpan();
try (Scope scope = queryAnalysisSpan.makeCurrent()) {
// analyze this query
analyze();
} catch (Exception e) {
queryAnalysisSpan.recordException(e);
throw e;
} finally {
queryAnalysisSpan.end();
}
Plan resultPlan = planWithoutLock(plan, explainLevel, requireProperties);
lockCallback.accept(resultPlan);
return resultPlan;
}
}

// minidump of input must be serialized first, this process ensure minidump string not null
if (!statementContext.getConnectContext().getSessionVariable().isPlayNereidsDump()
&& statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.init();
String queryId = DebugUtil.printId(statementContext.getConnectContext().queryId());
try {
statementContext.getConnectContext().setMinidump(serializeInputsToDumpFile(plan, queryId));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private Plan planWithoutLock(
LogicalPlan plan, ExplainLevel explainLevel, PhysicalProperties requireProperties) {
// resolve column, table and function
Span queryAnalysisSpan =
statementContext.getConnectContext().getTracer()
.spanBuilder("query analysis").setParent(Context.current()).startSpan();
try (Scope scope = queryAnalysisSpan.makeCurrent()) {
// analyze this query
analyze();
} catch (Exception e) {
queryAnalysisSpan.recordException(e);
throw e;
} finally {
queryAnalysisSpan.end();
}

if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
// minidump of input must be serialized first, this process ensure minidump string not null
if (!statementContext.getConnectContext().getSessionVariable().isPlayNereidsDump()
&& statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.init();
String queryId = DebugUtil.printId(statementContext.getConnectContext().queryId());
try {
statementContext.getConnectContext().setMinidump(serializeInputsToDumpFile(plan, queryId));
} catch (IOException e) {
throw new RuntimeException(e);
}
}

if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsAnalysisTime();
}

// rule-based optimize
rewrite();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
analyzedPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.ANALYZED_PLAN) {
return analyzedPlan;
}
}

optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
// rule-based optimize
rewrite();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsRewriteTime();
}
if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) {
rewrittenPlan = cascadesContext.getRewritePlan();
if (explainLevel == ExplainLevel.REWRITTEN_PLAN) {
return rewrittenPlan;
}
}

int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);
optimize();
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsOptimizeTime();
}
// print memo before choose plan.
// if chooseNthPlan failed, we could get memo to debug
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String memo = cascadesContext.getMemo().toString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + memo);
}

physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
serializeOutputToDumpFile(physicalPlan, statementContext.getConnectContext());
if (statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.saveMinidumpString(statementContext.getConnectContext().getMinidump(),
DebugUtil.printId(statementContext.getConnectContext().queryId()));
}
NereidsTracer.output(statementContext.getConnectContext());
int nth = cascadesContext.getConnectContext().getSessionVariable().getNthOptimizedPlan();
PhysicalPlan physicalPlan = chooseNthPlan(getRoot(), requireProperties, nth);

return physicalPlan;
physicalPlan = postProcess(physicalPlan);
if (cascadesContext.getConnectContext().getSessionVariable().dumpNereidsMemo) {
String tree = physicalPlan.treeString();
LOG.info(ConnectContext.get().getQueryIdentifier() + "\n" + tree);
}
if (explainLevel == ExplainLevel.OPTIMIZED_PLAN
|| explainLevel == ExplainLevel.ALL_PLAN
|| explainLevel == ExplainLevel.SHAPE_PLAN) {
optimizedPlan = physicalPlan;
}
// serialize optimized plan to dumpfile, dumpfile do not have this part means optimize failed
serializeOutputToDumpFile(physicalPlan, statementContext.getConnectContext());
if (statementContext.getConnectContext().getSessionVariable().isEnableMinidump()) {
MinidumpUtils.saveMinidumpString(statementContext.getConnectContext().getMinidump(),
DebugUtil.printId(statementContext.getConnectContext().queryId()));
}
NereidsTracer.output(statementContext.getConnectContext());

return physicalPlan;
}

private LogicalPlan preprocess(LogicalPlan logicalPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testGroupByAndHavingUseAliasFirstThrowException() {
}

private void runPlanner(String sql) {
new NereidsPlanner(MemoTestUtils.createStatementContext(connectContext, sql)).plan(
new NereidsPlanner(MemoTestUtils.createStatementContext(connectContext, sql)).planWithLock(
new NereidsParser().parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testFallbackToOriginalPlanner() throws Exception {
sv.setEnableNereidsPlanner(true);
sv.enableFallbackToOriginalPlanner = false;
Assertions.assertThrows(AnalysisException.class, () -> new NereidsPlanner(statementContext)
.plan(new NereidsParser().parseSingle(sql), PhysicalProperties.ANY));
.planWithLock(new NereidsParser().parseSingle(sql), PhysicalProperties.ANY));

// manually recover sv
sv.setEnableNereidsPlanner(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public List<Rule> getExplorationRules() {
for (String sql : testSqls) {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
PhysicalPlan plan = new NereidsPlanner(statementContext).plan(
PhysicalPlan plan = new NereidsPlanner(statementContext).planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void testTranslateCase() throws Exception {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public List<Rule> getExplorationRules() {
try {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
PhysicalPlan plan = new NereidsPlanner(statementContext).plan(
PhysicalPlan plan = new NereidsPlanner(statementContext).planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testGeneratePhysicalPlan() {
"SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10",
"SELECT * FROM T1 JOIN (SELECT ID, SUM(SCORE) SCORE FROM T2 GROUP BY ID ORDER BY ID) T ON T1.ID + 1 = T.ID AND T.SCORE < 10"
);
testSql.forEach(sql -> new NereidsPlanner(createStatementCtx(sql)).plan(
testSql.forEach(sql -> new NereidsPlanner(createStatementCtx(sql)).planWithLock(
new NereidsParser().parseSingle(sql),
PhysicalProperties.ANY
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testTranslateAllCase() throws Exception {
System.out.println("\n\n***** " + sql + " *****\n\n");
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
new NereidsParser().parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private PlanFragment getOutputFragment(String sql) throws Exception {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
((ExplainCommand) parser.parseSingle(sql)).getLogicalPlan(),
PhysicalProperties.ANY
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private PlanFragment getOutputFragment(String sql) throws Exception {
StatementScopeIdGenerator.clear();
StatementContext statementContext = MemoTestUtils.createStatementContext(connectContext, sql);
NereidsPlanner planner = new NereidsPlanner(statementContext);
PhysicalPlan plan = planner.plan(
PhysicalPlan plan = planner.planWithLock(
parser.parseSingle(sql),
PhysicalProperties.ANY
);
Expand Down
Loading

0 comments on commit 17b72a6

Please sign in to comment.