Skip to content

Commit

Permalink
[fix](Nereids) handle continuous filter or project in plan
Browse files Browse the repository at this point in the history
if we meet continuous project or filter in translator, we try to
generate SelectNode as far as possible to avoid generate invalid plan

for example

```
Filter(conjuncts 1)
+-- Limit (limit 10)
    +-- Filter(conjuncts 2)
        +-- Aggregate
```

will be translated to

```
SELECT_NODE (conjuncts 1)
+-- AGGREGATE_NODE (conjuncts 2) (limit 10)
```
  • Loading branch information
morrySnow committed Aug 30, 2024
1 parent 721d460 commit e5fe1d4
Showing 1 changed file with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1256,21 +1256,27 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, P
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
if (CollectionUtils.isNotEmpty(dataStreamSink.getConjuncts())
|| CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
String errMsg = "generate invalid plan \n" + filter.treeString();
LOG.warn(errMsg);
throw new AnalysisException(errMsg);
}
filter.getConjuncts().stream()
.map(e -> ExpressionTranslator.translate(e, context))
.forEach(dataStreamSink::addConjunct);
return inputFragment;
}

PlanNode planNode = inputFragment.getPlanRoot();
Plan child = filter.child();
while (child instanceof PhysicalLimit) {
child = ((PhysicalLimit<?>) child).child();
}
// the three nodes don't support conjuncts, need create a SelectNode to filter data
if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode
// this means we have filter->limit->project, need a SelectNode
|| child instanceof PhysicalProject) {
// the three nodes don't support conjuncts, need create a SelectNode to filter data
// this means already have filter on this node, we should not override it, so need a new node
|| !planNode.getConjuncts().isEmpty()
// this means already have project on this node, filter need execute after project, so need a new node
|| CollectionUtils.isNotEmpty(planNode.getProjectList())
// this means already have limit on this node, filter need execute after limit, so need a new node
|| planNode.hasLimit()) {
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode);
selectNode.setNereidsId(filter.getId());
addConjunctsToPlanNode(filter, selectNode, context);
Expand Down Expand Up @@ -1864,8 +1870,15 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
}

PlanFragment inputFragment = project.child(0).accept(this, context);

PlanNode inputPlanNode = inputFragment.getPlanRoot();
// this means already have project on this node, filter need execute after project, so need a new node
if (CollectionUtils.isNotEmpty(inputPlanNode.getProjectList())) {
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), inputPlanNode);
selectNode.setNereidsId(project.getId());
addPlanRoot(inputFragment, selectNode, project);
inputPlanNode = selectNode;
}

List<Expr> projectionExprs = null;
List<Expr> allProjectionExprs = Lists.newArrayList();
List<Slot> slots = null;
Expand Down Expand Up @@ -1903,6 +1916,11 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
if (CollectionUtils.isNotEmpty(dataStreamSink.getProjections())) {
String errMsg = "generate invalid plan \n" + project.treeString();
LOG.warn(errMsg);
throw new AnalysisException(errMsg);
}
TupleDescriptor projectionTuple = generateTupleDesc(slots, null, context);
dataStreamSink.setProjections(projectionExprs);
dataStreamSink.setOutputTupleDesc(projectionTuple);
Expand Down

0 comments on commit e5fe1d4

Please sign in to comment.