Skip to content

Commit

Permalink
[improvement](mtmv) Support union rewrite when the materialized view …
Browse files Browse the repository at this point in the history
…is not enough to provide all the data for the query (#33800)

When the materialized view is not enough to provide all the data for the query, if the materialized view is increment update by partition. we can union materialized view and origin query to reponse the query.

this depends on #33362

such as materialized view def is as following:

>         CREATE MATERIALIZED VIEW mv_10086
>         BUILD IMMEDIATE REFRESH AUTO ON MANUAL
>         partition by(l_shipdate)
>         DISTRIBUTED BY RANDOM BUCKETS 2
>         PROPERTIES ('replication_num' = '1') 
>         AS 
>     select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total
>     from lineitem
>     left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate
>     group by
>     l_shipdate,
>     o_orderdate,
>     l_partkey,
>     l_suppkey;

the materialized view data is as following:
+------------+-------------+-----------+-----------+-----------+
| l_shipdate | o_orderdate | l_partkey | l_suppkey | sum_total |
+------------+-------------+-----------+-----------+-----------+
| 2023-10-18 | 2023-10-18  |         2 |         3 |    109.20 |
| 2023-10-17 | 2023-10-17  |         2 |         3 |     99.50 |
| 2023-10-19 | 2023-10-19  |         2 |         3 |     99.50 |
+------------+-------------+-----------+-----------+-----------+

when we insert data to partition `2023-10-17`,  if we run query as following
```
    select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total
    from lineitem
    left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate
    group by
    l_shipdate,
    o_orderdate,
    l_partkey,
    l_suppkey;
```
query rewrite by materialzied view will fail with message   `Check partition query used validation fail`
if we turn on the switch `SET enable_materialized_view_union_rewrite = true;` default true
we run the query above again, it will success and will use union all  materialized view and origin query to response the query correctly. the plan is as following:


```
| Explain String(Nereids Planner)                                                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                                                                                                    |
|   OUTPUT EXPRS:                                                                                                                                                                                    |
|     l_shipdate[#52]                                                                                                                                                                                |
|     o_orderdate[#53]                                                                                                                                                                               |
|     l_partkey[#54]                                                                                                                                                                                 |
|     l_suppkey[#55]                                                                                                                                                                                 |
|     sum_total[#56]                                                                                                                                                                                 |
|   PARTITION: UNPARTITIONED                                                                                                                                                                         |
|                                                                                                                                                                                                    |
|   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
|                                                                                                                                                                                                    |
|   VRESULT SINK                                                                                                                                                                                     |
|      MYSQL_PROTOCAL                                                                                                                                                                                |
|                                                                                                                                                                                                    |
|   11:VEXCHANGE                                                                                                                                                                                     |
|      offset: 0                                                                                                                                                                                     |
|      distribute expr lists:                                                                                                                                                                        |
|                                                                                                                                                                                                    |
| PLAN FRAGMENT 1                                                                                                                                                                                    |
|                                                                                                                                                                                                    |
|   PARTITION: HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                   |
|                                                                                                                                                                                                    |
|   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
|                                                                                                                                                                                                    |
|   STREAM DATA SINK                                                                                                                                                                                 |
|     EXCHANGE ID: 11                                                                                                                                                                                |
|     UNPARTITIONED                                                                                                                                                                                  |
|                                                                                                                                                                                                    |
|   10:VUNION(756)                                                                                                                                                                                   |
|   |                                                                                                                                                                                                |
|   |----9:VAGGREGATE (merge finalize)(753)                                                                                                                                                          |
|   |    |  output: sum(partial_sum(o_totalprice)[#46])[#51]                                                                                                                                         |
|   |    |  group by: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                              |
|   |    |  cardinality=2                                                                                                                                                                            |
|   |    |  distribute expr lists: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                 |
|   |    |                                                                                                                                                                                           |
|   |    8:VEXCHANGE                                                                                                                                                                                 |
|   |       offset: 0                                                                                                                                                                                |
|   |       distribute expr lists: l_shipdate[#42]                                                                                                                                                   |
|   |                                                                                                                                                                                                |
|   1:VEXCHANGE                                                                                                                                                                                      |
|      offset: 0                                                                                                                                                                                     |
|      distribute expr lists:                                                                                                                                                                        |
|                                                                                                                                                                                                    |
| PLAN FRAGMENT 2                                                                                                                                                                                    |
|                                                                                                                                                                                                    |
|   PARTITION: HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25]                                                                                                                                   |
|                                                                                                                                                                                                    |
|   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
|                                                                                                                                                                                                    |
|   STREAM DATA SINK                                                                                                                                                                                 |
|     EXCHANGE ID: 08                                                                                                                                                                                |
|     HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45]                                                                                                            |
|                                                                                                                                                                                                    |
|   7:VAGGREGATE (update serialize)(747)                                                                                                                                                             |
|   |  STREAMING                                                                                                                                                                                     |
|   |  output: partial_sum(o_totalprice[#41])[#46]                                                                                                                                                   |
|   |  group by: l_shipdate[#37], o_orderdate[#38], l_partkey[#39], l_suppkey[#40]                                                                                                                   |
|   |  cardinality=2                                                                                                                                                                                 |
|   |  distribute expr lists: l_shipdate[#37]                                                                                                                                                        |
|   |                                                                                                                                                                                                |
|   6:VHASH JOIN(741)                                                                                                                                                                                |
|   |  join op: RIGHT OUTER JOIN(PARTITIONED)[]                                                                                                                                                      |
|   |  equal join conjunct: (o_orderkey[#21] = l_orderkey[#5])                                                                                                                                       |
|   |  equal join conjunct: (o_orderdate[#25] = l_shipdate[#15])                                                                                                                                     |
|   |  runtime filters: RF000[min_max] <- l_orderkey[#5](2/2/2048), RF001[bloom] <- l_orderkey[#5](2/2/2048), RF002[min_max] <- l_shipdate[#15](1/1/2048), RF003[bloom] <- l_shipdate[#15](1/1/2048) |
|   |  cardinality=2                                                                                                                                                                                 |
|   |  vec output tuple id: 4                                                                                                                                                                        |
|   |  output tuple id: 4                                                                                                                                                                            |
|   |  vIntermediate tuple ids: 3                                                                                                                                                                    |
|   |  hash output slot ids: 6 7 24 25 15                                                                                                                                                            |
|   |  final projections: l_shipdate[#36], o_orderdate[#32], l_partkey[#34], l_suppkey[#35], o_totalprice[#31]                                                                                       |
|   |  final project output tuple id: 4                                                                                                                                                              |
|   |  distribute expr lists: o_orderkey[#21], o_orderdate[#25]                                                                                                                                      |
|   |  distribute expr lists: l_orderkey[#5], l_shipdate[#15]                                                                                                                                        |
|   |                                                                                                                                                                                                |
|   |----3:VEXCHANGE                                                                                                                                                                                 |
|   |       offset: 0                                                                                                                                                                                |
|   |       distribute expr lists: l_orderkey[#5]                                                                                                                                                    |
|   |                                                                                                                                                                                                |
|   5:VEXCHANGE                                                                                                                                                                                      |
|      offset: 0                                                                                                                                                                                     |
|      distribute expr lists:                                                                                                                                                                        |
|                                                                                                                                                                                                    |
| PLAN FRAGMENT 3                                                                                                                                                                                    |
|                                                                                                                                                                                                    |
|   PARTITION: RANDOM                                                                                                                                                                                |
|                                                                                                                                                                                                    |
|   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
|                                                                                                                                                                                                    |
|   STREAM DATA SINK                                                                                                                                                                                 |
|     EXCHANGE ID: 05                                                                                                                                                                                |
|     HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25]                                                                                                                                            |
|                                                                                                                                                                                                    |
|   4:VOlapScanNode(722)                                                                                                                                                                             |
|      TABLE: union_db.orders(orders), PREAGGREGATION: ON                                                                                                                                            |
|      runtime filters: RF000[min_max] -> o_orderkey[#21], RF001[bloom] -> o_orderkey[#21], RF002[min_max] -> o_orderdate[#25], RF003[bloom] -> o_orderdate[#25]                                     |
|      partitions=3/3 (p_20231017,p_20231018,p_20231019), tablets=9/9, tabletList=161188,161190,161192 ...                                                                                           |
|      cardinality=3, avgRowSize=0.0, numNodes=1                                                                                                                                                     |
|      pushAggOp=NONE                                                                                                                                                                                |
|                                                                                                                                                                                                    |
| PLAN FRAGMENT 4                                                                                                                                                                                    |
|                                                                                                                                                                                                    |
|   PARTITION: HASH_PARTITIONED: l_orderkey[#5]                                                                                                                                                      |
|                                                                                                                                                                                                    |
|   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
|                                                                                                                                                                                                    |
|   STREAM DATA SINK                                                                                                                                                                                 |
|     EXCHANGE ID: 03                                                                                                                                                                                |
|     HASH_PARTITIONED: l_orderkey[#5], l_shipdate[#15]                                                                                                                                              |
|                                                                                                                                                                                                    |
|   2:VOlapScanNode(729)                                                                                                                                                                             |
|      TABLE: union_db.lineitem(lineitem), PREAGGREGATION: ON                                                                                                                                        |
|      PREDICATES: (l_shipdate[#15] >= '2023-10-17') AND (l_shipdate[#15] < '2023-10-18')                                                                                                            |
|      partitions=1/3 (p_20231017), tablets=3/3, tabletList=161223,161225,161227                                                                                                                     |
|      cardinality=2, avgRowSize=0.0, numNodes=1                                                                                                                                                     |
|      pushAggOp=NONE                                                                                                                                                                                |
|                                                                                                                                                                                                    |
| PLAN FRAGMENT 5                                                                                                                                                                                    |
|                                                                                                                                                                                                    |
|   PARTITION: RANDOM                                                                                                                                                                                |
|                                                                                                                                                                                                    |
|   HAS_COLO_PLAN_NODE: false                                                                                                                                                                        |
|                                                                                                                                                                                                    |
|   STREAM DATA SINK                                                                                                                                                                                 |
|     EXCHANGE ID: 01                                                                                                                                                                                |
|     RANDOM                                                                                                                                                                                         |
|                                                                                                                                                                                                    |
|   0:VOlapScanNode(718)                                                                                                                                                                             |
|      TABLE: union_db.mv_10086(mv_10086), PREAGGREGATION: ON                                                                                                                                        |
|      partitions=2/3 (p_20231018_20231019,p_20231019_20231020), tablets=4/4, tabletList=161251,161253,161265 ...                                                                                    |
|      cardinality=2, avgRowSize=0.0, numNodes=1                                                                                                                                                     |
|      pushAggOp=NONE                                                                                                                                                                                |
|                                                                                                                                                                                                    |
| MaterializedView                                                                                                                                                                                   |
| MaterializedViewRewriteSuccessAndChose:                                                                                                                                                            |
|   Names: mv_10086                                                                                                                                                                                  |
| MaterializedViewRewriteSuccessButNotChose:                                                                                                                                                         |
|                                                                                                                                                                                                    |
| MaterializedViewRewriteFail:                                                                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
  • Loading branch information
seawinde authored Apr 21, 2024
1 parent 4f06106 commit 06c9fb6
Show file tree
Hide file tree
Showing 16 changed files with 2,282 additions and 112 deletions.
48 changes: 22 additions & 26 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@

package org.apache.doris.mtmv;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.Partition;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.rewrite.EliminateSort;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;

import com.google.common.collect.Lists;

import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;

/**
* The cache for materialized view cache
Expand Down Expand Up @@ -70,27 +68,25 @@ public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) {
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
}
unboundMvPlan = unboundMvPlan.accept(new DefaultPlanVisitor<LogicalPlan, Void>() {
// convert to table sink to eliminate sort under table sink, because sort under result sink can not be
// eliminated
@Override
public LogicalPlan visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink,
Void context) {
return new UnboundTableSink<>(mtmv.getFullQualifiers(),
mtmv.getBaseSchema().stream().map(Column::getName).collect(Collectors.toList()),
Lists.newArrayList(),
mtmv.getPartitions().stream().map(Partition::getName).collect(Collectors.toList()),
unboundResultSink.child());
}
}, null);
// Can not convert to table sink, because use the same column from different table when self join
// the out slot is wrong
Plan originPlan = planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
// eliminate logicalTableSink because sink operator is useless in query rewrite by materialized view
Plan mvPlan = planner.getCascadesContext().getRewritePlan().accept(new DefaultPlanRewriter<Object>() {
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
// and the top sort can also be removed
Plan mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
@Override
public Plan visitLogicalTableSink(LogicalTableSink<? extends Plan> logicalTableSink, Object context) {
return logicalTableSink.child().accept(this, context);
public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, Object context) {
return logicalResultSink.child().accept(this, context);
}
}, null);
// Optimize by rules to remove top sort
CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan,
PhysicalProperties.ANY);
mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
return new MTMVCache(mvPlan, originPlan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ private List<List<NamedExpression>> computeMultiLayerProjections(
// 'case slot whenClause2 END'
// This is illegal.
Expression rewritten = expr.accept(ExpressionReplacer.INSTANCE, aliasMap);
Alias alias = new Alias(rewritten);
aliasMap.put(expr, alias);
// if rewritten is already alias, use it directly, because in materialized view rewriting
// Should keep out slot immutably after rewritten successfully
aliasMap.put(expr, rewritten instanceof Alias ? (Alias) rewritten : new Alias(rewritten));
}
});
layer.addAll(aliasMap.values());
Expand Down
Loading

0 comments on commit 06c9fb6

Please sign in to comment.