diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java index 9116817834db24..154bd4ec7f17c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java @@ -17,27 +17,25 @@ package org.apache.doris.mtmv; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.MTMV; -import org.apache.doris.catalog.Partition; +import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; -import org.apache.doris.nereids.analyzer.UnboundResultSink; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils; +import org.apache.doris.nereids.rules.rewrite.EliminateSort; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; -import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink; +import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; -import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; -import com.google.common.collect.Lists; - -import java.util.stream.Collectors; +import com.google.common.collect.ImmutableList; /** * The cache for materialized view cache @@ -70,27 +68,25 @@ public static MTMVCache from(MTMV mtmv, ConnectContext connectContext) { if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) { mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext); } - unboundMvPlan = unboundMvPlan.accept(new DefaultPlanVisitor() { - // convert to table sink to eliminate sort under table sink, because sort under result sink can not be - // eliminated - @Override - public LogicalPlan visitUnboundResultSink(UnboundResultSink unboundResultSink, - Void context) { - return new UnboundTableSink<>(mtmv.getFullQualifiers(), - mtmv.getBaseSchema().stream().map(Column::getName).collect(Collectors.toList()), - Lists.newArrayList(), - mtmv.getPartitions().stream().map(Partition::getName).collect(Collectors.toList()), - unboundResultSink.child()); - } - }, null); + // Can not convert to table sink, because use the same column from different table when self join + // the out slot is wrong Plan originPlan = planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN); - // eliminate logicalTableSink because sink operator is useless in query rewrite by materialized view - Plan mvPlan = planner.getCascadesContext().getRewritePlan().accept(new DefaultPlanRewriter() { + // Eliminate result sink because sink operator is useless in query rewrite by materialized view + // and the top sort can also be removed + Plan mvPlan = originPlan.accept(new DefaultPlanRewriter() { @Override - public Plan visitLogicalTableSink(LogicalTableSink logicalTableSink, Object context) { - return logicalTableSink.child().accept(this, context); + public Plan visitLogicalResultSink(LogicalResultSink logicalResultSink, Object context) { + return logicalResultSink.child().accept(this, context); } }, null); + // Optimize by rules to remove top sort + CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan, + PhysicalProperties.ANY); + mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> { + Rewriter.getCteChildrenRewriter(childContext, + ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute(); + return childContext.getRewritePlan(); + }, mvPlan, originPlan); return new MTMVCache(mvPlan, originPlan); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java index 44e65dee1c9873..8f558aaba0239b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/CommonSubExpressionOpt.java @@ -83,8 +83,9 @@ private List> computeMultiLayerProjections( // 'case slot whenClause2 END' // This is illegal. Expression rewritten = expr.accept(ExpressionReplacer.INSTANCE, aliasMap); - Alias alias = new Alias(rewritten); - aliasMap.put(expr, alias); + // if rewritten is already alias, use it directly, because in materialized view rewriting + // Should keep out slot immutably after rewritten successfully + aliasMap.put(expr, rewritten instanceof Alias ? (Alias) rewritten : new Alias(rewritten)); } }); layer.addAll(aliasMap.values()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index b596408851f04d..6bf47f00c359c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -17,27 +17,33 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.analysis.PartitionKeyDesc; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.Pair; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.mtmv.MTMVRewriteUtil; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.executor.Rewriter; +import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory; import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate; +import org.apache.doris.nereids.rules.exploration.mv.StructInfo.InvalidPartitionRemover; +import org.apache.doris.nereids.rules.exploration.mv.StructInfo.QueryScanPartitionsCollector; import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping; import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping; import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping; import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Not; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable; import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable; @@ -46,20 +52,26 @@ import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeUtils; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -129,20 +141,22 @@ public List rewrite(Plan queryPlan, CascadesContext cascadesContext) { */ protected List getValidQueryStructInfos(Plan queryPlan, CascadesContext cascadesContext, BitSet materializedViewTableSet) { - return MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext, materializedViewTableSet) - .stream() - .filter(queryStructInfo -> { - boolean valid = checkPattern(queryStructInfo); - if (!valid) { - cascadesContext.getMaterializationContexts().forEach(ctx -> - ctx.recordFailReason(queryStructInfo, "Query struct info is invalid", - () -> String.format("query table bitmap is %s, plan is %s", - queryStructInfo.getTableBitSet(), queryPlan.treeString()) - )); - } - return valid; - }) - .collect(Collectors.toList()); + List validStructInfos = new ArrayList<>(); + List uncheckedStructInfos = MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext, + materializedViewTableSet); + uncheckedStructInfos.forEach(queryStructInfo -> { + boolean valid = checkPattern(queryStructInfo); + if (!valid) { + cascadesContext.getMaterializationContexts().forEach(ctx -> + ctx.recordFailReason(queryStructInfo, "Query struct info is invalid", + () -> String.format("query table bitmap is %s, plan is %s", + queryStructInfo.getTableBitSet(), queryPlan.treeString()) + )); + } else { + validStructInfos.add(queryStructInfo); + } + }); + return validStructInfos; } /** @@ -200,13 +214,13 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca } Plan rewrittenPlan; Plan mvScan = materializationContext.getMvScanPlan(); - Plan topPlan = queryStructInfo.getTopPlan(); + Plan queryPlan = queryStructInfo.getTopPlan(); if (compensatePredicates.isAlwaysTrue()) { rewrittenPlan = mvScan; } else { // Try to rewrite compensate predicates by using mv scan List rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(), - topPlan, materializationContext.getMvExprToMvScanExprMapping(), + queryPlan, materializationContext.getMvExprToMvScanExprMapping(), viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet()); if (rewriteCompensatePredicates.isEmpty()) { materializationContext.recordFailReason(queryStructInfo, @@ -225,65 +239,125 @@ protected List doRewrite(StructInfo queryStructInfo, CascadesContext casca if (rewrittenPlan == null) { continue; } - final Plan finalRewrittenPlan = rewriteByRules(cascadesContext, rewrittenPlan, topPlan); - if (!isOutputValid(topPlan, finalRewrittenPlan)) { - materializationContext.recordFailReason(queryStructInfo, - "RewrittenPlan output logical properties is different with target group", - () -> String.format("planOutput logical" - + " properties = %s,\n groupOutput logical properties = %s", - finalRewrittenPlan.getLogicalProperties(), topPlan.getLogicalProperties())); - continue; - } + rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext, + childContext -> { + Rewriter.getWholeTreeRewriter(childContext).execute(); + return childContext.getRewritePlan(); + }, rewrittenPlan, queryPlan); // check the partitions used by rewritten plan is valid or not - Set invalidPartitionsQueryUsed = - calcInvalidPartitions(finalRewrittenPlan, materializationContext, cascadesContext); - if (!invalidPartitionsQueryUsed.isEmpty()) { + Multimap, Partition> invalidPartitionsQueryUsed = + calcUsedInvalidMvPartitions(rewrittenPlan, materializationContext, cascadesContext); + // All partition used by query is valid + if (!invalidPartitionsQueryUsed.isEmpty() && !cascadesContext.getConnectContext().getSessionVariable() + .isEnableMaterializedViewUnionRewrite()) { materializationContext.recordFailReason(queryStructInfo, "Check partition query used validation fail", () -> String.format("the partition used by query is invalid by materialized view," + "invalid partition info query used is %s", - materializationContext.getMTMV().getPartitions().stream() - .filter(partition -> - invalidPartitionsQueryUsed.contains(partition.getId())) + invalidPartitionsQueryUsed.values().stream() + .map(Partition::getName) .collect(Collectors.toSet()))); continue; } + boolean partitionValid = invalidPartitionsQueryUsed.isEmpty(); + if (checkCanUnionRewrite(invalidPartitionsQueryUsed, queryPlan, cascadesContext)) { + // construct filter on originalPlan + Map> filterOnOriginPlan; + try { + filterOnOriginPlan = Predicates.constructFilterByPartitions(invalidPartitionsQueryUsed, + queryToViewSlotMapping); + if (filterOnOriginPlan.isEmpty()) { + materializationContext.recordFailReason(queryStructInfo, + "construct invalid partition filter on query fail", + () -> String.format("the invalid partitions used by query is %s, query plan is %s", + invalidPartitionsQueryUsed.values().stream().map(Partition::getName) + .collect(Collectors.toSet()), + queryStructInfo.getOriginalPlan().treeString())); + continue; + } + } catch (org.apache.doris.common.AnalysisException e) { + materializationContext.recordFailReason(queryStructInfo, + "construct invalid partition filter on query analysis fail", + () -> String.format("the invalid partitions used by query is %s, query plan is %s", + invalidPartitionsQueryUsed.values().stream().map(Partition::getName) + .collect(Collectors.toSet()), + queryStructInfo.getOriginalPlan().treeString())); + continue; + } + // For rewrittenPlan which contains materialized view should remove invalid partition ids + List children = Lists.newArrayList( + rewrittenPlan.accept(new InvalidPartitionRemover(), Pair.of(materializationContext.getMTMV(), + invalidPartitionsQueryUsed.values().stream() + .map(Partition::getId).collect(Collectors.toSet()))), + StructInfo.addFilterOnTableScan(queryPlan, filterOnOriginPlan, cascadesContext)); + // Union query materialized view and source table + rewrittenPlan = new LogicalUnion(Qualifier.ALL, + queryPlan.getOutput().stream().map(NamedExpression.class::cast).collect(Collectors.toList()), + children.stream() + .map(plan -> plan.getOutput().stream() + .map(slot -> (SlotReference) slot.toSlot()).collect(Collectors.toList())) + .collect(Collectors.toList()), + ImmutableList.of(), + false, + children); + partitionValid = true; + } + if (!partitionValid) { + materializationContext.recordFailReason(queryStructInfo, + "materialized view partition is invalid union fail", + () -> String.format("invalidPartitionsQueryUsed = %s,\n query plan = %s", + invalidPartitionsQueryUsed, queryPlan.treeString())); + continue; + } + rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan); + if (!isOutputValid(queryPlan, rewrittenPlan)) { + LogicalProperties logicalProperties = rewrittenPlan.getLogicalProperties(); + materializationContext.recordFailReason(queryStructInfo, + "RewrittenPlan output logical properties is different with target group", + () -> String.format("planOutput logical" + + " properties = %s,\n groupOutput logical properties = %s", + logicalProperties, queryPlan.getLogicalProperties())); + continue; + } recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext); - rewriteResults.add(finalRewrittenPlan); + rewriteResults.add(rewrittenPlan); } return rewriteResults; } - /** - * Rewrite by rules and try to make output is the same after optimize by rules - */ - protected Plan rewriteByRules(CascadesContext cascadesContext, Plan rewrittenPlan, Plan originPlan) { - List originOutputs = originPlan.getOutput(); - if (originOutputs.size() != rewrittenPlan.getOutput().size()) { - return null; - } - Map originSlotToRewrittenExprId = Maps.newLinkedHashMap(); - for (int i = 0; i < originOutputs.size(); i++) { - originSlotToRewrittenExprId.put(originOutputs.get(i), rewrittenPlan.getOutput().get(i).getExprId()); + private boolean checkCanUnionRewrite(Multimap, Partition> + invalidPartitionsQueryUsed, Plan queryPlan, CascadesContext cascadesContext) { + if (invalidPartitionsQueryUsed.isEmpty() + || !cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewUnionRewrite()) { + return false; } - // run rbo job on mv rewritten plan - CascadesContext rewrittenPlanContext = CascadesContext.initContext( - cascadesContext.getStatementContext(), rewrittenPlan, - cascadesContext.getCurrentJobContext().getRequiredProperties()); - Rewriter.getWholeTreeRewriter(rewrittenPlanContext).execute(); - rewrittenPlan = rewrittenPlanContext.getRewritePlan(); - - // for get right nullable after rewritten, we need this map - Map exprIdToNewRewrittenSlot = Maps.newLinkedHashMap(); - for (Slot slot : rewrittenPlan.getOutput()) { - exprIdToNewRewrittenSlot.put(slot.getExprId(), slot); + // if mv can not offer valid partition data for query, bail out union rewrite + Map> mvRelatedTablePartitionMap = new LinkedHashMap<>(); + invalidPartitionsQueryUsed.keySet().forEach(invalidPartition -> + mvRelatedTablePartitionMap.put(invalidPartition.key().getRelatedTableInfo().getTableId(), + new HashSet<>())); + queryPlan.accept(new QueryScanPartitionsCollector(), mvRelatedTablePartitionMap); + Set partitionKeyDescSetQueryUsed = mvRelatedTablePartitionMap.values().stream() + .flatMap(Collection::stream) + .map(PartitionItem::toPartitionKeyDesc) + .collect(Collectors.toSet()); + Set mvInvalidPartitionKeyDescSet = new HashSet<>(); + for (Map.Entry, Collection> entry : + invalidPartitionsQueryUsed.asMap().entrySet()) { + entry.getValue().forEach(invalidPartition -> mvInvalidPartitionKeyDescSet.add( + entry.getKey().value().getItem(invalidPartition.getId()).toPartitionKeyDesc())); } + return !mvInvalidPartitionKeyDescSet.containsAll(partitionKeyDescSetQueryUsed); + } + // Normalize expression such as nullable property and output slot id + protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) { // normalize nullable - ImmutableList convertNullable = originOutputs.stream() - .map(s -> normalizeExpression(s, exprIdToNewRewrittenSlot.get(originSlotToRewrittenExprId.get(s)))) - .collect(ImmutableList.toImmutableList()); - return new LogicalProject<>(convertNullable, rewrittenPlan); + List normalizeProjects = new ArrayList<>(); + for (int i = 0; i < originPlan.getOutput().size(); i++) { + normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), rewrittenPlan.getOutput().get(i))); + } + return new LogicalProject<>(normalizeProjects, rewrittenPlan); } /** @@ -303,35 +377,47 @@ protected boolean isOutputValid(Plan sourcePlan, Plan rewrittenPlan) { * catalog relation. * Maybe only just some partitions is valid in materialized view, so we should check if the mv can * offer the partitions which query used or not. + * + * @return the invalid partition name set */ - protected Set calcInvalidPartitions(Plan rewrittenPlan, MaterializationContext materializationContext, + protected Multimap, Partition> calcUsedInvalidMvPartitions( + Plan rewrittenPlan, + MaterializationContext materializationContext, CascadesContext cascadesContext) { // check partition is valid or not MTMV mtmv = materializationContext.getMTMV(); PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo(); if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) { // if not partition, if rewrite success, it means mv is available - return ImmutableSet.of(); + return ImmutableMultimap.of(); } // check mv related table partition is valid or not MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo(); BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo(); if (relatedPartitionTable == null) { - return ImmutableSet.of(); + return ImmutableMultimap.of(); } // get mv valid partitions Set mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, cascadesContext.getConnectContext(), System.currentTimeMillis()).stream() .map(Partition::getId) .collect(Collectors.toSet()); - Set queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan + // get partitions query used + Set mvPartitionSetQueryUsed = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan && Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName())) .stream() .map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds()) .flatMap(Collection::stream) .collect(Collectors.toSet()); - queryUsedPartitionIdSet.removeAll(mvDataValidPartitionIdSet); - return queryUsedPartitionIdSet; + // get invalid partition ids + Set invalidMvPartitionIdSet = new HashSet<>(mvPartitionSetQueryUsed); + invalidMvPartitionIdSet.removeAll(mvDataValidPartitionIdSet); + ImmutableMultimap.Builder, Partition> invalidPartitionMapBuilder = + ImmutableMultimap.builder(); + Pair partitionInfo = Pair.of(mvCustomPartitionInfo, mvPartitionInfo); + invalidMvPartitionIdSet.forEach(invalidPartitionId -> + invalidPartitionMapBuilder.put(partitionInfo, mtmv.getPartition(invalidPartitionId))); + return invalidPartitionMapBuilder.build(); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java index ac4b9ccad38174..46d0adde06e978 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.StructInfoMap; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; @@ -49,15 +50,17 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import java.util.ArrayList; import java.util.BitSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -147,13 +150,19 @@ public static List extractStructInfo(Plan plan, CascadesContext casc StructInfoMap structInfoMap = ownerGroup.getstructInfoMap(); structInfoMap.refresh(ownerGroup); Set queryTableSets = structInfoMap.getTableMaps(); + ImmutableList.Builder structInfosBuilder = ImmutableList.builder(); if (!queryTableSets.isEmpty()) { - return queryTableSets.stream() - // Just construct the struct info which mv table set contains all the query table set - .filter(queryTableSet -> materializedViewTableSet.isEmpty() - || StructInfo.containsAll(materializedViewTableSet, queryTableSet)) - .map(tableMap -> structInfoMap.getStructInfo(tableMap, tableMap, ownerGroup, plan)) - .collect(Collectors.toList()); + for (BitSet queryTableSet : queryTableSets) { + if (!materializedViewTableSet.isEmpty() + && !StructInfo.containsAll(materializedViewTableSet, queryTableSet)) { + continue; + } + StructInfo structInfo = structInfoMap.getStructInfo(queryTableSet, queryTableSet, ownerGroup, plan); + if (structInfo != null) { + structInfosBuilder.add(structInfo); + } + } + return structInfosBuilder.build(); } } // if plan doesn't belong to any group, construct it directly @@ -172,8 +181,8 @@ public static Plan generateMvScanPlan(MTMV materializedView, CascadesContext cas materializedView, ImmutableList.of(materializedView.getQualifiedDbName()), // this must be empty, or it will be used to sample - Lists.newArrayList(), - Lists.newArrayList(), + ImmutableList.of(), + ImmutableList.of(), Optional.empty()); mvScan = mvScan.withMaterializedIndexSelected(PreAggStatus.on(), materializedView.getBaseIndexId()); List mvProjects = mvScan.getOutput().stream().map(NamedExpression.class::cast) @@ -181,6 +190,43 @@ public static Plan generateMvScanPlan(MTMV materializedView, CascadesContext cas return new LogicalProject(mvProjects, mvScan); } + /** + * Optimize by rules, this support optimize by custom rules by define different rewriter according to different + * rules + */ + public static Plan rewriteByRules( + CascadesContext cascadesContext, + Function planRewriter, + Plan rewrittenPlan, Plan originPlan) { + List originOutputs = originPlan.getOutput(); + if (originOutputs.size() != rewrittenPlan.getOutput().size()) { + return null; + } + // After RBO, slot order may change, so need originSlotToRewrittenExprId which record + // origin plan slot order + List originalRewrittenPlanExprIds = + rewrittenPlan.getOutput().stream().map(Slot::getExprId).collect(Collectors.toList()); + // run rbo job on mv rewritten plan + CascadesContext rewrittenPlanContext = CascadesContext.initContext( + cascadesContext.getStatementContext(), rewrittenPlan, + cascadesContext.getCurrentJobContext().getRequiredProperties()); + rewrittenPlan = planRewriter.apply(rewrittenPlanContext); + Map exprIdToNewRewrittenSlot = Maps.newLinkedHashMap(); + for (Slot slot : rewrittenPlan.getOutput()) { + exprIdToNewRewrittenSlot.put(slot.getExprId(), slot); + } + List rewrittenPlanExprIds = rewrittenPlan.getOutput().stream() + .map(Slot::getExprId).collect(Collectors.toList()); + // If project order doesn't change, return rewrittenPlan directly + if (originalRewrittenPlanExprIds.equals(rewrittenPlanExprIds)) { + return rewrittenPlan; + } + // If project order change, return rewrittenPlan with reordered projects + return new LogicalProject<>(originalRewrittenPlanExprIds.stream() + .map(exprId -> (NamedExpression) exprIdToNewRewrittenSlot.get(exprId)).collect(Collectors.toList()), + rewrittenPlan); + } + private static final class TableQueryOperatorChecker extends DefaultPlanVisitor { public static final TableQueryOperatorChecker INSTANCE = new TableQueryOperatorChecker(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java index 139230be5d4b97..c801e683d65bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java @@ -17,8 +17,15 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Pair; +import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping; +import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedSlot; import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping; import org.apache.doris.nereids.rules.expression.ExpressionNormalization; import org.apache.doris.nereids.rules.expression.ExpressionOptimization; @@ -27,13 +34,17 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; +import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.Utils; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -220,6 +231,47 @@ public String toString() { return Utils.toSqlString("Predicates", "pulledUpPredicates", pulledUpPredicates); } + /** Construct filter by partition + * @param partitions this is the partition which filter should be constructed from + * @param queryToViewSlotMapping construct filter on slot, the slot belong the slotmapping + * */ + public static Map> constructFilterByPartitions( + Multimap, Partition> partitions, + SlotMapping queryToViewSlotMapping) throws AnalysisException { + Map> constructedFilterMap = new HashMap<>(); + for (Map.Entry, Collection> entry : + partitions.asMap().entrySet()) { + // Get the base table partition column mv related + String relatedCol = entry.getKey().key().getRelatedCol(); + TableIf relatedTableInfo = entry.getKey().key().getRelatedTable(); + // Find the query slot which mv partition col mapped to + Optional partitionSlotQueryUsed = queryToViewSlotMapping.getRelationSlotMap() + .keySet() + .stream() + .filter(mappedSlot -> mappedSlot.getSlot().isColumnFromTable() + && mappedSlot.getSlot().getName().equals(relatedCol) + && mappedSlot.getBelongedRelation() != null + && mappedSlot.getBelongedRelation().getTable().getId() == relatedTableInfo.getId()) + .findFirst(); + if (!partitionSlotQueryUsed.isPresent()) { + return ImmutableMap.of(); + } + // Constructed filter which should add on the query base table, + // after supported data roll up this method should keep logic consistency to partition mapping + Set partitionExpressions = UpdateMvByPartitionCommand.constructPredicates( + // get mv partition items + entry.getValue().stream() + .map(partition -> entry.getKey().value().getItem(partition.getId())) + .collect(Collectors.toSet()), + partitionSlotQueryUsed.get().getSlot()); + // Put partition expressions on query base table + constructedFilterMap.computeIfPresent(relatedTableInfo, + (key, existExpressions) -> Sets.union(existExpressions, partitionExpressions)); + constructedFilterMap.computeIfAbsent(relatedTableInfo, key -> partitionExpressions); + } + return constructedFilterMap; + } + /** * The split different representation for predicate expression, such as equal, range and residual predicate. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java index 604e7853d48894..4979f11c28e5db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java @@ -17,13 +17,21 @@ package org.apache.doris.nereids.rules.exploration.mv; +import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph; import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.JoinEdge; import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate; +import org.apache.doris.nereids.trees.copier.DeepCopierContext; +import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; @@ -38,11 +46,16 @@ import org.apache.doris.nereids.trees.plans.algebra.Filter; import org.apache.doris.nereids.trees.plans.algebra.Join; import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalSort; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor; import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer; import org.apache.doris.nereids.util.ExpressionUtils; @@ -584,4 +597,61 @@ private Boolean doVisit(Plan plan, PlanCheckContext checkContext) { return true; } } + + /** + * Add predicates on base table when materialized view scan contains invalid partitions + */ + public static class InvalidPartitionRemover extends DefaultPlanRewriter>> { + // materialized view scan is always LogicalOlapScan, so just handle LogicalOlapScan + @Override + public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Pair> context) { + if (olapScan.getTable().getName().equals(context.key().getName())) { + List selectedPartitionIds = olapScan.getSelectedPartitionIds(); + return olapScan.withSelectedPartitionIds(selectedPartitionIds.stream() + .filter(partitionId -> !context.value().contains(partitionId)) + .collect(Collectors.toList())); + } + return olapScan; + } + } + + /**Collect partitions which scan used according to given table */ + public static class QueryScanPartitionsCollector extends DefaultPlanVisitor>> { + @Override + public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, + Map> context) { + TableIf table = catalogRelation.getTable(); + if (!context.containsKey(table.getId())) { + return catalogRelation; + } + // Only support check olap partition currently + if (catalogRelation instanceof LogicalOlapScan) { + LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation; + PartitionInfo partitionInfo = logicalOlapScan.getTable().getPartitionInfo(); + logicalOlapScan.getSelectedPartitionIds().stream() + .map(partitionInfo::getItem) + .forEach(partitionItem -> context.computeIfPresent(table.getId(), (key, oldValue) -> { + oldValue.add(partitionItem); + return oldValue; + })); + } + return catalogRelation; + } + } + + /**Add filter on table scan according to table filter map */ + public static Plan addFilterOnTableScan(Plan queryPlan, Map> filterOnOriginPlan, + CascadesContext parentCascadesContext) { + // Firstly, construct filter form invalid partition, this filter should be added on origin plan + Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), filterOnOriginPlan); + // Deep copy the plan to avoid the plan output is the same with the later union output, this may cause + // exec by mistake + queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy( + (LogicalPlan) queryPlanWithUnionFilter, new DeepCopierContext()); + // rbo rewrite after adding filter on origin plan + return MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> { + Rewriter.getWholeTreeRewriter(context).execute(); + return context.getRewritePlan(); + }, queryPlanWithUnionFilter, queryPlan); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java index 9dd807717856ba..152d4c5d92ebed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/EliminateSort.java @@ -37,7 +37,7 @@ public class EliminateSort extends DefaultPlanRewriter implements CustomRewriter { @Override public Plan rewriteRoot(Plan plan, JobContext jobContext) { - Boolean eliminateSort = false; + Boolean eliminateSort = true; return plan.accept(this, eliminateSort); } @@ -76,7 +76,7 @@ public Plan visitLogicalSink(LogicalSink logicalSink, Boolean el // eliminate sort return visit(logicalSink, true); } - return skipEliminateSort(logicalSink, eliminateSort); + return skipEliminateSort(logicalSink, false); } private Plan skipEliminateSort(Plan plan, Boolean eliminateSort) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 67864187bb2a48..d63aee6ea709e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -120,18 +121,27 @@ private static Map> constructTableWithPredicates(MTMV m */ @VisibleForTesting public static Set constructPredicates(Set partitions, String colName) { - Set predicates = new HashSet<>(); UnboundSlot slot = new UnboundSlot(colName); + return constructPredicates(partitions, slot); + } + + /** + * construct predicates for partition items, the min key is the min key of range items. + * For list partition or less than partition items, the min key is null. + */ + @VisibleForTesting + public static Set constructPredicates(Set partitions, Slot colSlot) { + Set predicates = new HashSet<>(); if (partitions.isEmpty()) { return Sets.newHashSet(BooleanLiteral.TRUE); } if (partitions.iterator().next() instanceof ListPartitionItem) { for (PartitionItem item : partitions) { - predicates.add(convertListPartitionToIn(item, slot)); + predicates.add(convertListPartitionToIn(item, colSlot)); } } else { for (PartitionItem item : partitions) { - predicates.add(convertRangePartitionToCompare(item, slot)); + predicates.add(convertRangePartitionToCompare(item, colSlot)); } } return predicates; @@ -186,7 +196,10 @@ private static Expression convertRangePartitionToCompare(PartitionItem item, Slo return predicate; } - static class PredicateAdder extends DefaultPlanRewriter>> { + /** + * Add predicates on base table when mv can partition update + */ + public static class PredicateAdder extends DefaultPlanRewriter>> { @Override public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map> predicates) { List tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(), @@ -198,5 +211,16 @@ public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map> predicates) { + TableIf table = catalogRelation.getTable(); + if (predicates.containsKey(table)) { + return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))), + catalogRelation); + } + return catalogRelation; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java index 2060718ec1359b..1252f3b4bbf899 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/ExpressionLineageReplacer.java @@ -45,12 +45,12 @@ * Get from rewrite plan and can also get from plan struct info, if from plan struct info it depends on * the nodes from graph. */ -public class ExpressionLineageReplacer extends DefaultPlanVisitor { +public class ExpressionLineageReplacer extends DefaultPlanVisitor { public static final ExpressionLineageReplacer INSTANCE = new ExpressionLineageReplacer(); @Override - public Void visit(Plan plan, ExpressionReplaceContext context) { + public Expression visit(Plan plan, ExpressionReplaceContext context) { List expressions = plan.getExpressions(); Map targetExpressionMap = context.getExprIdExpressionMap(); // Filter the namedExpression used by target and collect the namedExpression @@ -62,7 +62,7 @@ public Void visit(Plan plan, ExpressionReplaceContext context) { } @Override - public Void visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) { + public Expression visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) { Group group = groupPlan.getGroup(); if (group == null) { return visit(groupPlan, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5b0512683aa486..0148d30508ca06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -521,6 +521,9 @@ public class SessionVariable implements Serializable, Writable { public static final String MATERIALIZED_VIEW_REWRITE_SUCCESS_CANDIDATE_NUM = "materialized_view_rewrite_success_candidate_num"; + public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE + = "enable_materialized_view_union_rewrite"; + public static final String CREATE_TABLE_PARTITION_MAX_NUM = "create_table_partition_max_num"; @@ -1638,6 +1641,13 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { "The max candidate num which participate in CBO when using asynchronous materialized views"}) public int materializedViewRewriteSuccessCandidateNum = 3; + @VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_UNION_REWRITE, needForward = true, + description = {"当物化视图不足以提供查询的全部数据时,是否允许基表和物化视图 union 来响应查询", + "When the materialized view is not enough to provide all the data for the query, " + + "whether to allow the union of the base table and the materialized view to " + + "respond to the query"}) + public boolean enableMaterializedViewUnionRewrite = false; + @VariableMgr.VarAttr(name = CREATE_TABLE_PARTITION_MAX_NUM, needForward = true, description = {"建表时创建分区的最大数量", "The maximum number of partitions created during table creation"}) @@ -3674,6 +3684,10 @@ public int getMaterializedViewRewriteSuccessCandidateNum() { return materializedViewRewriteSuccessCandidateNum; } + public boolean isEnableMaterializedViewUnionRewrite() { + return enableMaterializedViewUnionRewrite; + } + public int getCreateTablePartitionMaxNum() { return createTablePartitionMaxNum; } diff --git a/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out new file mode 100644 index 00000000000000..8559da6305b03b --- /dev/null +++ b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !query_all_direct_before -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_all_direct_after -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_partition_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_partition_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_all_before -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_all_after -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + diff --git a/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy b/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy new file mode 100644 index 00000000000000..9a2e893f7e4e8c --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/dimension/dimension_self_conn.groovy @@ -0,0 +1,564 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* +This suite test self connection case + */ +suite("partition_mv_rewrite_dimension_self_conn") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders_self_conn + """ + + sql """CREATE TABLE `orders_self_conn` ( + `o_orderkey` BIGINT NULL, + `o_custkey` INT NULL, + `o_orderstatus` VARCHAR(1) NULL, + `o_totalprice` DECIMAL(15, 2) NULL, + `o_orderpriority` VARCHAR(15) NULL, + `o_clerk` VARCHAR(15) NULL, + `o_shippriority` INT NULL, + `o_comment` VARCHAR(79) NULL, + `o_orderdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(`o_orderkey`, `o_custkey`) + COMMENT 'OLAP' + auto partition by range (date_trunc(`o_orderdate`, 'day')) () + DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists lineitem_self_conn + """ + + sql """CREATE TABLE `lineitem_self_conn` ( + `l_orderkey` BIGINT NULL, + `l_linenumber` INT NULL, + `l_partkey` INT NULL, + `l_suppkey` INT NULL, + `l_quantity` DECIMAL(15, 2) NULL, + `l_extendedprice` DECIMAL(15, 2) NULL, + `l_discount` DECIMAL(15, 2) NULL, + `l_tax` DECIMAL(15, 2) NULL, + `l_returnflag` VARCHAR(1) NULL, + `l_linestatus` VARCHAR(1) NULL, + `l_commitdate` DATE NULL, + `l_receiptdate` DATE NULL, + `l_shipinstruct` VARCHAR(25) NULL, + `l_shipmode` VARCHAR(10) NULL, + `l_comment` VARCHAR(44) NULL, + `l_shipdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) + COMMENT 'OLAP' + auto partition by range (date_trunc(`l_shipdate`, 'day')) () + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + insert into orders_self_conn values + (null, 1, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (1, null, 'o', 109.2, 'c','d',2, 'mm', '2023-10-17'), + (3, 3, null, 99.5, 'a', 'b', 1, 'yy', '2023-10-19'), + (1, 2, 'o', null, 'a', 'b', 1, 'yy', '2023-10-20'), + (2, 3, 'k', 109.2, null,'d',2, 'mm', '2023-10-21'), + (3, 1, 'k', 99.5, 'a', null, 1, 'yy', '2023-10-22'), + (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'), + (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'), + (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); + """ + + sql """ + insert into lineitem_self_conn values + (null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (1, null, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (3, 3, null, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 2, 3, null, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'); + """ + + sql """analyze table orders_self_conn with sync;""" + sql """analyze table lineitem_self_conn with sync;""" + + def create_mv = { mv_name, mv_sql -> + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};""" + sql """DROP TABLE IF EXISTS ${mv_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_name} + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_sql} + """ + } + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + + // join direction + def mv_name_1 = "mv_self_conn" + + def join_direction_mv_1 = """ + select t1.l_Shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + """ + + create_mv(mv_name_1, join_direction_mv_1) + def job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(job_name_1) + + def join_direction_sql_1 = """ + select t1.L_SHIPDATE + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + """ + explain { + sql("${join_direction_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(join_direction_sql_1 + " order by 1") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + + // join filter position + def join_filter_stmt_1 = """select t1.L_SHIPDATE, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey""" + def join_filter_stmt_2 = """select t1.l_shipdate, t2.L_partkey, t1.l_suppkey + from (select * from lineitem_self_conn where l_shipdate = '2023-10-17' ) t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey""" + def join_filter_stmt_3 = """select t1.l_shipdate, t2.l_Partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join (select * from lineitem_self_conn where l_shipdate = '2023-10-17' ) t2 + on t1.l_orderkey = t2.l_orderkey""" + def join_filter_stmt_4 = """select t1.l_shipdate, t2.l_parTkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_shipdate = '2023-10-17'""" + def join_filter_stmt_5 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_suppkey=1""" + + def mv_list = [ + join_filter_stmt_1, join_filter_stmt_2, join_filter_stmt_3, join_filter_stmt_4, join_filter_stmt_5] + def join_self_conn_order = " order by 1, 2, 3" + for (int i =0; i < mv_list.size(); i++) { + logger.info("i:" + i) + def join_self_conn_mv = """join_self_conn_mv_${i}""" + create_mv(join_self_conn_mv, mv_list[i]) + def job_name = getJobName(db, join_self_conn_mv) + waitingMTMVTaskFinished(job_name) + if (i == 0) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 2) { + continue + } + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } + } else if (i == 1) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 1 || j == 3) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + } + } else if (i == 2) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 2) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + + } + } else if (i == 3) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 1 || j == 3) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + } + } else if (i == 4) { + for (int j = 0; j < mv_list.size(); j++) { + logger.info("j:" + j) + if (j == 4) { + explain { + sql("${mv_list[j]}") + contains "${join_self_conn_mv}(${join_self_conn_mv})" + } + compare_res(mv_list[j] + join_self_conn_order) + } else { + explain { + sql("${mv_list[j]}") + notContains "${join_self_conn_mv}(${join_self_conn_mv})" + } + } + } + } + sql """DROP MATERIALIZED VIEW IF EXISTS ${join_self_conn_mv};""" + } + + // join type + def join_type_stmt_1 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_2 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_3 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + right join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_5 = """select t1.l_shipdate, t2.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + full join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_6 = """select t1.l_shipdate, t1.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left semi join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_7 = """select t2.l_shipdate, t2.l_partkey, t2.l_suppkey + from lineitem_self_conn as t1 + right semi join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_8 = """select t1.l_shipdate, t1.l_partkey, t1.l_suppkey + from lineitem_self_conn as t1 + left anti join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_9 = """select t2.l_shipdate, t2.l_partkey, t2.l_suppkey + from lineitem_self_conn as t1 + right anti join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY""" + def join_type_stmt_list = [join_type_stmt_1, join_type_stmt_2, join_type_stmt_3, + join_type_stmt_5, join_type_stmt_6, join_type_stmt_7, join_type_stmt_8, join_type_stmt_9] + for (int i = 0; i < join_type_stmt_list.size(); i++) { + logger.info("i:" + i) + String join_type_self_conn_mv = """join_type_self_conn_mv_${i}""" + create_mv(join_type_self_conn_mv, join_type_stmt_list[i]) + def job_name = getJobName(db, join_type_self_conn_mv) + waitingMTMVTaskFinished(job_name) + if (i in [4, 5]) { + for (int j = 0; j < join_type_stmt_list.size(); j++) { + logger.info("j: " + j) + if (j in [4, 5]) { + explain { + sql("${join_type_stmt_list[j]}") + contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + compare_res(join_type_stmt_list[j] + " order by 1,2,3") + } else { + explain { + sql("${join_type_stmt_list[j]}") + notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + } + } + } else if (i in [6, 7]) { + for (int j = 0; j < join_type_stmt_list.size(); j++) { + logger.info("j: " + j) + if (j in [6, 7]) { + explain { + sql("${join_type_stmt_list[j]}") + contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + compare_res(join_type_stmt_list[j] + " order by 1,2,3") + } else { + explain { + sql("${join_type_stmt_list[j]}") + notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + } + } + } else { + for (int j = 0; j < join_type_stmt_list.size(); j++) { + logger.info("j:" + j) + if (i == j) { + explain { + sql("${join_type_stmt_list[j]}") + contains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + compare_res(join_type_stmt_list[j] + " order by 1,2,3") + } else { + explain { + sql("${join_type_stmt_list[j]}") + notContains "${join_type_self_conn_mv}(${join_type_self_conn_mv})" + } + } + } + } + sql """DROP MATERIALIZED VIEW IF EXISTS ${join_type_self_conn_mv};""" + } + + // agg + // agg + without group by + with agg function + agg_mv_stmt = """ + select t2.o_orderkey, + sum(t1.O_TOTALPRICE) as sum_total, + max(t1.o_totalprice) as max_total, + min(t1.o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when t1.o_shippriority > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end)) as cnt_2 + from orders_self_conn as t1 + left join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by t2.o_orderkey + """ + + create_mv(mv_name_1, agg_mv_stmt) + job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(job_name_1) + + def agg_sql_1 = """select t2.o_orderkey, + count(distinct case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end) as cnt_1, + count(distinct case when t1.O_SHIPPRIORITY > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end) as cnt_2, + sum(t1.O_totalprice), + max(t1.o_totalprice), + min(t1.o_totalprice), + count(*) + from orders_self_conn as t1 + left join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by t2.o_orderkey + """ + explain { + sql("${agg_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_1 + " order by 1,2,3,4,5,6,7") + + agg_sql_1 = """select t2.o_orderkey, + count(distinct case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end) as cnt_1, + count(distinct case when t1.O_SHIPPRIORITY > 2 and t1.o_orderkey IN (2) then t1.o_custkey else null end) as cnt_2, + sum(t1.O_totalprice), + max(t1.o_totalprice), + min(t1.o_totalprice), + count(*) + from orders_self_conn as t1 + left join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by t2.o_orderkey + """ + explain { + sql("${agg_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_1 + " order by 1,2,3,4,5,6") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + + // agg + with group by + without agg function + + def agg_mv_stmt_2 = """ + select t1.o_orderdatE, t2.O_SHIPPRIORITY, t1.o_comment + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t1.o_orderdate, + t2.o_shippriority, + t1.o_comment + """ + create_mv(mv_name_1, agg_mv_stmt_2) + def agg_job_name_2 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(agg_job_name_2) + sql """analyze table ${mv_name_1} with sync;""" + + def agg_sql_2 = """ + select t2.O_SHIPPRIORITY, t1.o_comment + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t2.o_shippriority, + t1.o_comment + """ + explain { + sql("${agg_sql_2}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_2 + " order by 1,2") + + // agg + with group by + with agg function + def agg_mv_stmt_3 = """ + select t1.o_orderdatE, t2.o_shippriority, t1.o_comment, + sum(t1.o_totalprice) as sum_total, + max(t2.o_totalpricE) as max_total, + min(t1.o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when t2.o_shippriority > 2 and t2.o_orderkey IN (2) then t2.o_custkey else null end)) as cnt_2 + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t1.o_orderdatE, + t2.o_shippriority, + t1.o_comment + """ + create_mv(mv_name_1, agg_mv_stmt_3) + def agg_job_name_3 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(agg_job_name_3) + sql """analyze table ${mv_name_1} with sync;""" + + def agg_sql_3 = """ + select t2.o_shippriority, t1.o_comment, + sum(t1.o_totalprice) as sum_total, + max(t2.o_totalpricE) as max_total, + min(t1.o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when t1.o_shippriority > 1 and t1.o_orderkey IN (1, 3) then t1.o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when t2.o_shippriority > 2 and t2.o_orderkey IN (2) then t2.o_custkey else null end)) as cnt_2 + from orders_self_conn as t1 + inner join orders_self_conn as t2 + on t1.o_orderkey = t2.o_orderkey + group by + t2.o_shippriority, + t1.o_comment + """ + explain { + sql("${agg_sql_3}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(agg_sql_3 + " order by 1,2,3,4,5,6") + + + // view partital rewriting + def view_partition_mv_stmt_1 = """ + select t1.l_shipdatE, t2.l_partkey, t1.l_orderkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY + group by t1.l_shipdate, t2.l_partkey, t1.l_orderkeY""" + create_mv(mv_name_1, view_partition_mv_stmt_1) + def view_partition_job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(view_partition_job_name_1) + + def view_partition_sql_1 = """select t.l_shipdate, lineitem_self_conn.l_orderkey, t.l_partkey + from ( + select t1.l_shipdatE as l_shipdatE, t2.l_partkey as l_partkey, t1.l_orderkey as l_orderkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.L_ORDERKEY = t2.L_ORDERKEY + group by t1.l_shipdate, t2.l_partkey, t1.l_orderkeY + ) t + inner join lineitem_self_conn + on t.l_partkey = lineitem_self_conn.l_partkey + group by t.l_shipdate, lineitem_self_conn.l_orderkey, t.l_partkey + """ + explain { + sql("${view_partition_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(view_partition_sql_1 + " order by 1,2,3") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + + + // predicate compensate + def predicate_mv_stmt_1 = """ + select t1.l_shipdatE, t2.l_shipdate, t1.l_partkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_shipdate >= "2023-10-17" + """ + create_mv(mv_name_1, predicate_mv_stmt_1) + def predicate_job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(predicate_job_name_1) + + def predicate_sql_1 = """ + select t1.l_shipdatE, t2.l_shipdate, t1.l_partkey + from lineitem_self_conn as t1 + inner join lineitem_self_conn as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_shipdate >= "2023-10-17" and t1.l_partkey = 1 + """ + explain { + sql("${predicate_sql_1}") + contains "${mv_name_1}(${mv_name_1})" + } + compare_res(predicate_sql_1 + " order by 1,2,3") + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name_1};""" + +} diff --git a/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy b/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy new file mode 100644 index 00000000000000..4a0f513f0fae88 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/nested_mtmv/nested_mtmv.groovy @@ -0,0 +1,859 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("nested_mtmv") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders_1 + """ + + sql """CREATE TABLE `orders_1` ( + `o_orderkey` BIGINT NULL, + `o_custkey` INT NULL, + `o_orderstatus` VARCHAR(1) NULL, + `o_totalprice` DECIMAL(15, 2) NULL, + `o_orderpriority` VARCHAR(15) NULL, + `o_clerk` VARCHAR(15) NULL, + `o_shippriority` INT NULL, + `o_comment` VARCHAR(79) NULL, + `o_orderdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(`o_orderkey`, `o_custkey`) + COMMENT 'OLAP' + auto partition by range (date_trunc(`o_orderdate`, 'day')) () + DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists lineitem_1 + """ + + sql """CREATE TABLE `lineitem_1` ( + `l_orderkey` BIGINT NULL, + `l_linenumber` INT NULL, + `l_partkey` INT NULL, + `l_suppkey` INT NULL, + `l_quantity` DECIMAL(15, 2) NULL, + `l_extendedprice` DECIMAL(15, 2) NULL, + `l_discount` DECIMAL(15, 2) NULL, + `l_tax` DECIMAL(15, 2) NULL, + `l_returnflag` VARCHAR(1) NULL, + `l_linestatus` VARCHAR(1) NULL, + `l_commitdate` DATE NULL, + `l_receiptdate` DATE NULL, + `l_shipinstruct` VARCHAR(25) NULL, + `l_shipmode` VARCHAR(10) NULL, + `l_comment` VARCHAR(44) NULL, + `l_shipdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) + COMMENT 'OLAP' + auto partition by range (date_trunc(`l_shipdate`, 'day')) () + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists partsupp_1 + """ + + sql """CREATE TABLE `partsupp_1` ( + `ps_partkey` INT NULL, + `ps_suppkey` INT NULL, + `ps_availqty` INT NULL, + `ps_supplycost` DECIMAL(15, 2) NULL, + `ps_comment` VARCHAR(199) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`ps_partkey`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`ps_partkey`) BUCKETS 24 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + insert into orders_1 values + (null, 1, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (1, null, 'o', 109.2, 'c','d',2, 'mm', '2023-10-17'), + (3, 3, null, 99.5, 'a', 'b', 1, 'yy', '2023-10-19'), + (1, 2, 'o', null, 'a', 'b', 1, 'yy', '2023-10-20'), + (2, 3, 'k', 109.2, null,'d',2, 'mm', '2023-10-21'), + (3, 1, 'k', 99.5, 'a', null, 1, 'yy', '2023-10-22'), + (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'), + (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'), + (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); + """ + + sql """ + insert into lineitem_1 values + (null, 1, 2, 3, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (1, 1, 3, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (3, 3, 3, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 2, 3, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (2, 1, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 1, 3, 1, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 2, 1, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'), + (2, 2, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 3, 3, 3, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 1, 1, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'); + """ + + sql""" + insert into partsupp_1 values + (1, 1, 1, 99.5, 'yy'), + (2, 2, 2, 109.2, 'mm'), + (3, 3, 1, 99.5, 'yy'), + (3, null, 1, 99.5, 'yy'); + """ + + sql """analyze table orders_1 with sync;""" + sql """analyze table lineitem_1 with sync;""" + sql """analyze table partsupp_1 with sync;""" + + def create_mv = { mv_name, mv_sql -> + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};""" + sql """DROP TABLE IF EXISTS ${mv_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_name} + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_sql} + """ + } + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + // sr + def mv_stmt_1 = """SELECT l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey + FROM lineitem_1 INNER JOIN orders_1 + ON l_orderkey = o_orderkey""" + def mv_name_1 = "join_mv1" + def mv_stmt_2 = """SELECT + l_orderkey, + l_linenumber, + o_orderkey, + sum(l_partkey) AS total_revenue, + max(o_custkey) AS max_discount + FROM ${mv_name_1} + GROUP BY l_orderkey, l_linenumber, o_orderkey;""" + def mv_name_2 = "agg_mv2" + def mv_stmt_3 = """SELECT + l_orderkey, + sum(total_revenue) AS total_revenue, + max(max_discount) AS max_discount + FROM ${mv_name_2} + GROUP BY l_orderkey;""" + def mv_name_3 = "join_agg_mv3" + create_mv(mv_name_1, mv_stmt_1) + def job_name_1 = getJobName(db, mv_name_1) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_name_2, mv_stmt_2) + job_name_1 = getJobName(db, mv_name_2) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_name_3, mv_stmt_3) + job_name_1 = getJobName(db, mv_name_3) + waitingMTMVTaskFinished(job_name_1) + + def query_stmt_1 = """SELECT + l_orderkey, + sum(l_partkey) AS total_revenue, + max(o_custkey) AS max_discount + FROM lineitem_1 INNER JOIN orders_1 + ON l_orderkey = o_orderkey + GROUP BY l_orderkey""" + explain { + sql("${query_stmt_1}") + contains "${mv_name_3}(${mv_name_3})" + } + compare_res(query_stmt_1 + " order by 1,2,3") + + // user + def mv_stmt_4 = """ + select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 + from lineitem_1 + inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey + inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey + """ + def mv_level1_name = "mv_level1_name" + def mv_stmt_5 = """ + select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 + from ${mv_level1_name} + """ + def mv_level2_name = "mv_level2_name" + def mv_stmt_6 = """ + select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1 + from ${mv_level1_name} as t1 + left join ${mv_level1_name} as t2 + on t1.l_orderkey = t2.l_orderkey + """ + def mv_level3_name = "mv_level3_name" + def mv_stmt_7 = """ + select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1 + from ${mv_level2_name} as t1 + left join ${mv_level2_name} as t2 + on t1.l_orderkey = t2.l_orderkey + """ + def mv_level4_name = "mv_level4_name" + + create_mv(mv_level1_name, mv_stmt_4) + job_name_1 = getJobName(db, mv_level1_name) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_level2_name, mv_stmt_5) + job_name_1 = getJobName(db, mv_level2_name) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_level3_name, mv_stmt_6) + job_name_1 = getJobName(db, mv_level3_name) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_level4_name, mv_stmt_7) + job_name_1 = getJobName(db, mv_level4_name) + waitingMTMVTaskFinished(job_name_1) + + def query_stmt_2 = """ + select t1.l_orderkey, t2.l_linenumber, t1.l_partkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.col1 + from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 + from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 + from lineitem_1 + inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey + inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey + ) as t1 + ) as t1 + left join (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, col1 + from (select l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey, cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as col1 + from lineitem_1 + inner join orders_1 on lineitem_1.l_orderkey = orders_1.o_orderkey + inner join partsupp_1 on lineitem_1.l_partkey = partsupp_1.ps_partkey and lineitem_1.l_suppkey = partsupp_1.ps_suppkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_linenumber, l_partkey, o_orderkey, o_custkey, ps_partkey + ) as t1 + ) as t2 on t1.l_orderkey = t2.l_orderkey + """ + explain { + sql("${query_stmt_2}") + contains "${mv_level3_name}(${mv_level3_name})" + } + compare_res(query_stmt_2 + " order by 1,2,3,4,5,6,7") + + // five level + def mv_1 = "mv1" + def mv_2 = "mv2" + def mv_3 = "mv3" + def mv_4 = "mv4" + def mv_5 = "mv5" + + def join_mv_1 = """ + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + """ + def join_mv_2 = """ + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ${mv_1} as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + """ + def join_mv_3 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ${mv_2} as t1 + left join ${mv_2} as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def join_mv_4 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ${mv_3} as t1 + left join ${mv_3} as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def join_mv_5 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ${mv_4} as t1 + left join ${mv_4} as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + + create_mv(mv_1, join_mv_1) + job_name_1 = getJobName(db, mv_1) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_2, join_mv_2) + job_name_1 = getJobName(db, mv_2) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_3, join_mv_3) + job_name_1 = getJobName(db, mv_3) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_4, join_mv_4) + job_name_1 = getJobName(db, mv_4) + waitingMTMVTaskFinished(job_name_1) + + create_mv(mv_5, join_mv_5) + job_name_1 = getJobName(db, mv_5) + waitingMTMVTaskFinished(job_name_1) + + + def sql_2 = """ + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + """ + def sql_3 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def sql_4 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + def sql_5 = """ + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t1 + left join ( + select t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t1 + left join ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, + t.agg1 as agg1, + t.sum_total as agg3, + t.max_total as agg4, + t.min_total as agg5, + t.count_all as agg6, + cast(sum(IFNULL(ps_suppkey, 0) * IFNULL(ps_partkey, 0)) as decimal(28, 8)) as agg2 + from ( + select l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, cast(sum(IFNULL(o_orderkey, 0) * IFNULL(o_custkey, 0)) as decimal(28, 8)) as agg1, + sum(o_totalprice) as sum_total, + max(o_totalprice) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from lineitem_1 + inner join orders_1 + on lineitem_1.l_orderkey = orders_1.o_orderkey + where lineitem_1.l_shipdate >= "2023-10-17" + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey + ) as t + inner join partsupp_1 + on t.l_partkey = partsupp_1.ps_partkey and t.l_suppkey = partsupp_1.ps_suppkey + where partsupp_1.ps_suppkey > 1 + group by l_orderkey, l_partkey, l_suppkey, o_orderkey, o_custkey, ps_partkey, ps_suppkey, agg1, agg3, agg4, agg5, agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + where t1.l_orderkey > 1 + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + ) as t2 + on t1.l_orderkey = t2.l_orderkey + group by t1.l_orderkey, t2.l_partkey, t1.l_suppkey, t2.o_orderkey, t1.o_custkey, t2.ps_partkey, t1.ps_suppkey, t2.agg1, t1.agg2, t2.agg3, t1.agg4, t2.agg5, t1.agg6 + """ + + explain { + sql("${sql_2}") + contains "${mv_2}(${mv_2})" + } + compare_res(sql_2 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + explain { + sql("${sql_3}") + contains "${mv_3}(${mv_3})" + } + compare_res(sql_3 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + explain { + sql("${sql_4}") + contains "${mv_4}(${mv_4})" + } + compare_res(sql_4 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + explain { + sql("${sql_5}") + contains "${mv_5}(${mv_5})" + } + compare_res(sql_5 + " order by 1,2,3,4,5,6,7,8,9,10,11,12,13") + + + + +} diff --git a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy index e799c01fff96b9..1d34e9617da0ac 100644 --- a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy @@ -166,12 +166,29 @@ suite("partition_mv_rewrite") { // wait partition is invalid sleep(5000) // only can use valid partition + sql "SET enable_materialized_view_union_rewrite=false" + // Test query all partition when disable enable_materialized_view_union_rewrite + order_qt_query_all_direct_before "${all_partition_sql}" explain { sql("${all_partition_sql}") notContains("${mv_name}(${mv_name})") } + order_qt_query_all_direct_after "${all_partition_sql}" + + // Test query part partition when disable enable_materialized_view_union_rewrite + order_qt_query_partition_before "${partition_sql}" explain { sql("${partition_sql}") contains("${mv_name}(${mv_name})") } + order_qt_query_partition_after "${partition_sql}" + + // Test query part partition when enable enable_materialized_view_union_rewrite + sql "SET enable_materialized_view_union_rewrite=true" + order_qt_query_all_before "${all_partition_sql}" + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + order_qt_query_all_after "${all_partition_sql}" } diff --git a/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy new file mode 100644 index 00000000000000..cfdd327c0d2199 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/partition_curd_union_rewrite.groovy @@ -0,0 +1,238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite ("partition_curd_union_rewrite") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "set runtime_filter_mode=OFF" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders + """ + + sql """ + CREATE TABLE IF NOT EXISTS orders ( + o_orderkey integer not null, + o_custkey integer not null, + o_orderstatus char(1) not null, + o_totalprice decimalv3(15,2) not null, + o_orderdate date not null, + o_orderpriority char(15) not null, + o_clerk char(15) not null, + o_shippriority integer not null, + o_comment varchar(79) not null + ) + DUPLICATE KEY(o_orderkey, o_custkey) + PARTITION BY RANGE(o_orderdate)( + FROM ('2023-10-17') TO ('2023-11-01') INTERVAL 1 DAY + ) + DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql """ + drop table if exists lineitem + """ + + sql""" + CREATE TABLE IF NOT EXISTS lineitem ( + l_orderkey integer not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimalv3(15,2) not null, + l_extendedprice decimalv3(15,2) not null, + l_discount decimalv3(15,2) not null, + l_tax decimalv3(15,2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null + ) + DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber) + PARTITION BY RANGE(l_shipdate) + (FROM ('2023-10-17') TO ('2023-11-01') INTERVAL 1 DAY) + DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql""" + insert into orders values + (1, 1, 'ok', 99.5, '2023-10-17', 'a', 'b', 1, 'yy'), + (2, 2, 'ok', 109.2, '2023-10-18', 'c','d',2, 'mm'), + (3, 3, 'ok', 99.5, '2023-10-19', 'a', 'b', 1, 'yy'); + """ + + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (2, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'), + (3, 2, 3, 6, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'); + """ + + + def mv_def_sql = """ + select l_shipdate, o_orderdate, l_partkey, + l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey + """ + + def all_partition_sql = """ + select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey + """ + + + def partition_sql = """ + select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total + from lineitem + left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate + where (l_shipdate>= '2023-10-18' and l_shipdate <= '2023-10-19') + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey + """ + + sql """DROP MATERIALIZED VIEW IF EXISTS mv_10086""" + sql """DROP TABLE IF EXISTS mv_10086""" + sql""" + CREATE MATERIALIZED VIEW mv_10086 + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + partition by(l_shipdate) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_def_sql} + """ + + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + def mv_name = "mv_10086" + def order_by_stmt = " order by 1,2,3,4,5" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + + // All partition is valid, test query and rewrite by materialized view + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + + /* + // Part partition is invalid, test can not use partition 2023-10-17 to rewrite + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'); + """ + // wait partition is invalid + sleep(5000) + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + // Test when base table create partition + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-21', '2023-10-21', '2023-10-21', 'a', 'b', 'yyyyyyyyy'); + """ + // Wait partition is invalid + sleep(5000) + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + + // Test when base table delete partition test + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + sql """ ALTER TABLE lineitem DROP PARTITION IF EXISTS p_20231021 FORCE; + """ + // Wait partition is invalid + sleep(3000) + explain { + sql("${all_partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(all_partition_sql + order_by_stmt) + explain { + sql("${partition_sql}") + contains("${mv_name}(${mv_name})") + } + compare_res(partition_sql + order_by_stmt) + */ +} diff --git a/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy new file mode 100644 index 00000000000000..1e474abc8ffb48 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/mv/union_rewrite/usercase_union_rewrite.groovy @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite ("usercase_union_rewrite") { + String db = context.config.getDbNameByFile(context.file) + sql "use ${db}" + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_materialized_view_rewrite=true" + sql "SET enable_nereids_timeout = false" + + sql """ + drop table if exists orders_user + """ + + sql """CREATE TABLE `orders_user` ( + `o_orderkey` BIGINT NULL, + `o_custkey` INT NULL, + `o_orderstatus` VARCHAR(1) NULL, + `o_totalprice` DECIMAL(15, 2) NULL, + `o_orderpriority` VARCHAR(15) NULL, + `o_clerk` VARCHAR(15) NULL, + `o_shippriority` INT NULL, + `o_comment` VARCHAR(79) NULL, + `o_orderdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(`o_orderkey`, `o_custkey`) + COMMENT 'OLAP' + auto partition by range (date_trunc(`o_orderdate`, 'day')) () + DISTRIBUTED BY HASH(`o_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + drop table if exists lineitem_user + """ + + sql """CREATE TABLE `lineitem_user` ( + `l_orderkey` BIGINT NULL, + `l_linenumber` INT NULL, + `l_partkey` INT NULL, + `l_suppkey` INT NULL, + `l_quantity` DECIMAL(15, 2) NULL, + `l_extendedprice` DECIMAL(15, 2) NULL, + `l_discount` DECIMAL(15, 2) NULL, + `l_tax` DECIMAL(15, 2) NULL, + `l_returnflag` VARCHAR(1) NULL, + `l_linestatus` VARCHAR(1) NULL, + `l_commitdate` DATE NULL, + `l_receiptdate` DATE NULL, + `l_shipinstruct` VARCHAR(25) NULL, + `l_shipmode` VARCHAR(10) NULL, + `l_comment` VARCHAR(44) NULL, + `l_shipdate` DATE not NULL + ) ENGINE=OLAP + DUPLICATE KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey ) + COMMENT 'OLAP' + auto partition by range (date_trunc(`l_shipdate`, 'day')) () + DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + );""" + + sql """ + insert into orders_user values + (1, 3, 'o', 99.5, 'a', 'b', null, 'yy', '2023-10-19'), + (2, 1, 'o', 109.2, 'c','d',2, null, '2023-10-18'), + (3, 2, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-17'), + (4, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19'); + """ + + sql """ + insert into lineitem_user values + (2, 3, 2, 1, 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy', '2023-10-18'), + (3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'), + (1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17'); + """ + + sql """analyze table orders_user with sync;""" + sql """analyze table lineitem_user with sync;""" + + def create_mv_orders = { mv_name, mv_sql -> + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name};""" + sql """DROP TABLE IF EXISTS ${mv_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_name} + BUILD IMMEDIATE REFRESH AUTO ON MANUAL + partition by(o_orderdate) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS + ${mv_sql} + """ + } + + def compare_res = { def stmt -> + sql "SET enable_materialized_view_rewrite=false" + def origin_res = sql stmt + logger.info("origin_res: " + origin_res) + sql "SET enable_materialized_view_rewrite=true" + def mv_origin_res = sql stmt + logger.info("mv_origin_res: " + mv_origin_res) + assertTrue((mv_origin_res == [] && origin_res == []) || (mv_origin_res.size() == origin_res.size())) + for (int row = 0; row < mv_origin_res.size(); row++) { + assertTrue(mv_origin_res[row].size() == origin_res[row].size()) + for (int col = 0; col < mv_origin_res[row].size(); col++) { + assertTrue(mv_origin_res[row][col] == origin_res[row][col]) + } + } + } + + def mv_name = "mv_usercase" + def mv_stmt = """select o_orderdatE, o_shippriority, o_comment, o_orderdate, + sum(o_totalprice) as sum_total, + max(o_totalpricE) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from (select * from orders_user) as t1 + group by + o_orderdatE, + o_shippriority, + o_comment, + o_orderdate + """ + create_mv_orders(mv_name, mv_stmt) + def job_name_1 = getJobName(db, mv_name) + waitingMTMVTaskFinished(job_name_1) + + def query_stmt = """select o_orderdatE, o_shippriority, o_comment, o_orderdate, + sum(o_totalprice) as sum_total, + max(o_totalpricE) as max_total, + min(o_totalprice) as min_total, + count(*) as count_all, + bitmap_union(to_bitmap(case when o_shippriority > 1 and o_orderkey IN (1, 3) then o_custkey else null end)) cnt_1, + bitmap_union(to_bitmap(case when o_shippriority > 2 and o_orderkey IN (2) then o_custkey else null end)) as cnt_2 + from (select * from orders_user) as t1 + group by + o_orderdatE, + o_shippriority, + o_comment, + o_orderdate + """ + explain { + sql("${query_stmt}") + contains "${mv_name}(${mv_name})" + } + compare_res(query_stmt + " order by 1,2,3,4,5,6,7,8") + + sql """insert into orders_user values (5, 5, 'k', 99.5, 'a', 'b', 1, 'yy', '2023-10-19');""" + sql "SET enable_materialized_view_union_rewrite=true" + sleep(10 * 1000) + explain { + sql("${query_stmt}") + contains "${mv_name}(${mv_name})" + } + compare_res(query_stmt + " order by 1,2,3,4,5,6,7,8") +}