diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 5bbb7b26973a4c..13ba9ef10f469c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2005,7 +2005,7 @@ private PartitionSortNode translatePartitionSortNode(PhysicalPartitionTopN partitionExprs, - SortInfo info, boolean hasGlobalLimit, long partitionLimit) { + SortInfo info, boolean hasGlobalLimit, long partitionLimit, PartitionTopnPhase phase) { super(id, "PartitionTopN", StatisticalType.PARTITION_TOPN_NODE); Preconditions.checkArgument(info.getOrderingExprs().size() == info.getIsAscOrder().size()); this.function = function; @@ -58,6 +61,7 @@ public PartitionSortNode(PlanNodeId id, PlanNode input, WindowFuncType function, this.info = info; this.hasGlobalLimit = hasGlobalLimit; this.partitionLimit = partitionLimit; + this.phase = phase; this.tupleIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId())); this.tblRefIds.addAll(Lists.newArrayList(info.getSortTupleDescriptor().getId())); this.nullableTupleIds.addAll(input.getNullableTupleIds()); @@ -120,6 +124,9 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { output.append(prefix).append("has global limit: ").append(hasGlobalLimit).append("\n"); output.append(prefix).append("partition limit: ").append(partitionLimit).append("\n"); + // mark partition topn phase + output.append(prefix).append("partition topn phase: ").append(phase).append("\n"); + return output.toString(); } @@ -139,12 +146,24 @@ protected void toThrift(TPlanNode msg) { topNAlgorithm = TopNAlgorithm.DENSE_RANK; } + TPartTopNPhase pTopNPhase; + if (phase == PartitionTopnPhase.ONE_PHASE_GLOBAL_PTOPN) { + pTopNPhase = TPartTopNPhase.ONE_PAHSE_GLOBAL; + } else if (phase == PartitionTopnPhase.TWO_PHASE_LOCAL_PTOPN) { + pTopNPhase = TPartTopNPhase.TWO_PAHSE_LOCAL; + } else if (phase == PartitionTopnPhase.TWO_PHASE_GLOBAL_PTOPN) { + pTopNPhase = TPartTopNPhase.TWO_PAHSE_GLOBAL; + } else { + pTopNPhase = TPartTopNPhase.UNKNOWN; + } + TPartitionSortNode partitionSortNode = new TPartitionSortNode(); partitionSortNode.setTopNAlgorithm(topNAlgorithm); partitionSortNode.setPartitionExprs(Expr.treesToThrift(partitionExprs)); partitionSortNode.setSortInfo(sortInfo); partitionSortNode.setHasGlobalLimit(hasGlobalLimit); partitionSortNode.setPartitionInnerLimit(partitionLimit); + partitionSortNode.setPtopnPhase(pTopNPhase); msg.partition_sort_node = partitionSortNode; } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 3cc1c56911fe2c..ff587ddc68c96c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -850,12 +850,20 @@ enum TopNAlgorithm { ROW_NUMBER } +enum TPartTopNPhase { + UNKNOWN, + ONE_PAHSE_GLOBAL, + TWO_PAHSE_LOCAL, + TWO_PAHSE_GLOBAL +} + struct TPartitionSortNode { 1: optional list partition_exprs 2: optional TSortInfo sort_info 3: optional bool has_global_limit 4: optional TopNAlgorithm top_n_algorithm 5: optional i64 partition_inner_limit + 6: optional TPartTopNPhase ptopn_phase } enum TAnalyticWindowType { // Specifies the window as a logical offset