From 1363cd7ea9408eec3a6199d9aeb8f1e421ccf0d5 Mon Sep 17 00:00:00 2001 From: airborne12 Date: Mon, 21 Aug 2023 20:03:42 +0800 Subject: [PATCH] [Feature](inverted index) push count on index down to scan node --- be/src/olap/rowset/segment_v2/segment.cpp | 3 +- .../rowset/segment_v2/segment_iterator.cpp | 14 +++- be/src/vec/exec/scan/new_olap_scan_node.cpp | 3 +- be/src/vec/exec/scan/new_olap_scanner.cpp | 9 +- .../translator/PhysicalPlanTranslator.java | 3 + .../apache/doris/nereids/rules/RuleType.java | 1 + .../implementation/AggregateStrategies.java | 82 +++++++++++++++++++ .../PhysicalStorageLayerAggregate.java | 3 +- .../org/apache/doris/qe/SessionVariable.java | 17 +++- gensrc/thrift/PlanNodes.thrift | 3 +- 10 files changed, 127 insertions(+), 11 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index c6d472d0358baeb..f70d1bd1d514945 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -160,7 +160,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o RETURN_IF_ERROR(load_index()); if (read_options.delete_condition_predicates->num_of_column_predicate() == 0 && - read_options.push_down_agg_type_opt != TPushAggOp::NONE) { + read_options.push_down_agg_type_opt != TPushAggOp::NONE && + read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) { iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(), *schema)); } else { iter->reset(new SegmentIterator(this->shared_from_this(), schema)); diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index f1c5b15bce9ea42..cc5d4b9fdb8f515 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -956,9 +956,19 @@ bool SegmentIterator::_need_read_data(ColumnId cid) { // occurring, return true here that column data needs to be read return true; } + // Check the following conditions: + // 1. If the column represented by the unique ID is an inverted index column (indicated by '_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id]') + // and it's not marked for projection in '_output_columns'. + // 2. Or, if the column is an inverted index column and it's marked for projection in '_output_columns', + // and the operation is a push down of the 'COUNT_ON_INDEX' aggregation function. + // If any of the above conditions are met, log a debug message indicating that there's no need to read data for the indexed column. + // Then, return false. int32_t unique_id = _opts.tablet_schema->column(cid).unique_id(); - if (_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] && - _output_columns.count(unique_id) < 1) { + if ((_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] && + _output_columns.count(unique_id) < 1) || + (_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] && + _output_columns.count(unique_id) == 1 && + _opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) { VLOG_DEBUG << "SegmentIterator no need read data for column: " << _opts.tablet_schema->column_by_uid(unique_id).name(); return false; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 3af5bb9f8945e50..c5f9e75586faa84 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -249,7 +249,8 @@ Status NewOlapScanNode::_process_conjuncts() { } Status NewOlapScanNode::_build_key_ranges_and_filters() { - if (_push_down_agg_type == TPushAggOp::NONE) { + if (_push_down_agg_type == TPushAggOp::NONE || + _push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) { const std::vector& column_names = _olap_scan_node.key_column_name; const std::vector& column_types = _olap_scan_node.key_column_type; DCHECK(column_types.size() == column_names.size()); diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 862735ff359a4b1..c988452babccb46 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -291,10 +291,11 @@ Status NewOlapScanner::_init_tablet_reader_params( _tablet_reader_params.direct_mode = true; _aggregation = true; } else { - _tablet_reader_params.direct_mode = - _aggregation || single_version || - (_parent ? _parent->get_push_down_agg_type() - : _local_state->get_push_down_agg_type()) != TPushAggOp::NONE; + auto push_down_agg_type = _parent ? _parent->get_push_down_agg_type() + : _local_state->get_push_down_agg_type(); + _tablet_reader_params.direct_mode = _aggregation || single_version || + (push_down_agg_type != TPushAggOp::NONE && + push_down_agg_type != TPushAggOp::COUNT_ON_INDEX); } RETURN_IF_ERROR(_init_return_columns()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 2a5ba6474e52bf5..7fd1a49a0484fb9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -799,6 +799,9 @@ public PlanFragment visitPhysicalStorageLayerAggregate( case COUNT: pushAggOp = TPushAggOp.COUNT; break; + case COUNT_ON_MATCH: + pushAggOp = TPushAggOp.COUNT_ON_INDEX; + break; case MIN_MAX: pushAggOp = TPushAggOp.MINMAX; break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 5bc135ae49ce128..ca37658693ef483 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -325,6 +325,7 @@ public enum RuleType { STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION), STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION), STORAGE_LAYER_AGGREGATE_WITH_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION), + COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION), ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION), TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION), TWO_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index 03962ff75205f7e..8d23e741e6de9ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.IsNull; +import org.apache.doris.nereids.trees.expressions.Match; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait; @@ -56,6 +57,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Project; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; @@ -97,6 +99,28 @@ public List buildRules() { PatternDescriptor> basePattern = logicalAggregate(); return ImmutableList.of( + RuleType.COUNT_ON_INDEX.build( + logicalAggregate( + logicalProject( + logicalFilter( + logicalOlapScan().when(this::isDupOrMowKeyTable) + ).when(filter -> containsMatchExpression(filter.getExpressions()) + && filter.getExpressions().size() == 1) + )) + .when(agg -> enablePushDownCountOnIndex()) + .when(agg -> agg.getGroupByExpressions().size() == 0) + .when(agg -> { + Set funcs = agg.getAggregateFunctions(); + return !funcs.isEmpty() && funcs.stream().allMatch(f -> f instanceof Count && !f.isDistinct()); + }) + .thenApply(ctx -> { + LogicalAggregate>> agg = ctx.root; + LogicalProject> project = agg.child(); + LogicalFilter filter = project.child(); + LogicalOlapScan olapScan = filter.child(); + return pushdownCountOnIndex(agg, project, filter, olapScan, ctx.cascadesContext); + }) + ), RuleType.STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT.build( logicalAggregate( logicalOlapScan() @@ -188,6 +212,64 @@ && couldConvertToMulti(agg)) ); } + private boolean containsMatchExpression(List expressions) { + return expressions.stream().allMatch(expr -> expr instanceof Match); + } + + private boolean enablePushDownCountOnIndex() { + ConnectContext connectContext = ConnectContext.get(); + return connectContext != null && connectContext.getSessionVariable().isEnablePushDownCountOnIndex(); + } + + private boolean isDupOrMowKeyTable(LogicalOlapScan logicalScan) { + if (logicalScan != null) { + KeysType keysType = logicalScan.getTable().getKeysType(); + return (keysType == KeysType.DUP_KEYS) + || (keysType == KeysType.UNIQUE_KEYS && logicalScan.getTable().getEnableUniqueKeyMergeOnWrite()); + } + return false; + } + + /** + * sql: select count(*) from tbl where column match 'token' + *

+ * before: + *

+ * LogicalAggregate(groupBy=[], output=[count(*)]) + * | + * LogicalFilter(column match 'token') + * | + * LogicalOlapScan(table=tbl) + *

+ * after: + *

+ * LogicalAggregate(groupBy=[], output=[count(*)]) + * | + * LogicalFilter(column match 'token') + * | + * PhysicalStorageLayerAggregate(pushAggOp=COUNT_ON_INDEX, table=PhysicalOlapScan(table=tbl)) + * + */ + private LogicalAggregate pushdownCountOnIndex( + LogicalAggregate agg, + LogicalProject project, + LogicalFilter filter, + LogicalOlapScan olapScan, + CascadesContext cascadesContext) { + PhysicalOlapScan physicalOlapScan + = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan() + .build() + .transform(olapScan, cascadesContext) + .get(0); + return agg.withChildren(ImmutableList.of( + project.withChildren(ImmutableList.of( + filter.withChildren(ImmutableList.of( + new PhysicalStorageLayerAggregate( + physicalOlapScan, + PushDownAggOp.COUNT_ON_MATCH))))) + )); + } + /** * sql: select count(*) from tbl *

diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java index 86a43aaabc981d5..6637e02ca118dc6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalStorageLayerAggregate.java @@ -108,8 +108,9 @@ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalPr /** PushAggOp */ public enum PushDownAggOp { - COUNT, MIN_MAX, MIX; + COUNT, MIN_MAX, MIX, COUNT_ON_MATCH; + /** supportedFunctions */ public static Map, PushDownAggOp> supportedFunctions() { return ImmutableMap., PushDownAggOp>builder() .put(Count.class, PushDownAggOp.COUNT) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b549cd507a9af22..3758eda75af186b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -322,6 +322,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_INVERTED_INDEX_QUERY = "enable_inverted_index_query"; + public static final String ENABLE_PUSHDOWN_COUNT_ON_INDEX = "enable_count_on_index_pushdown"; + public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = "group_by_and_having_use_alias_first"; public static final String DROP_TABLE_IF_CTAS_FAILED = "drop_table_if_ctas_failed"; @@ -1004,9 +1006,14 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) { // Whether enable query with inverted index. @VariableMgr.VarAttr(name = ENABLE_INVERTED_INDEX_QUERY, needForward = true, description = { - "是否启用inverted index query。", "Set wether to use inverted index query."}) + "是否启用inverted index query。", "Set whether to use inverted index query."}) public boolean enableInvertedIndexQuery = true; + // Whether enable pushdown count agg to scan node when using inverted index match. + @VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_COUNT_ON_INDEX, needForward = true, description = { + "是否启用count_on_index pushdown。", "Set whether to pushdown count_on_index."}) + public boolean enablePushDownCountOnIndex = true; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -2081,6 +2088,14 @@ public void setEnableInvertedIndexQuery(boolean enableInvertedIndexQuery) { this.enableInvertedIndexQuery = enableInvertedIndexQuery; } + public boolean isEnablePushDownCountOnIndex() { + return enablePushDownCountOnIndex; + } + + public void setEnablePushDownCountOnIndex(boolean enablePushDownCountOnIndex) { + this.enablePushDownCountOnIndex = enablePushDownCountOnIndex; + } + public int getMaxTableCountUseCascadesJoinReorder() { return this.maxTableCountUseCascadesJoinReorder; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 0717fc498dfc205..63d1fc98ca1bf17 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -623,7 +623,8 @@ enum TPushAggOp { NONE = 0, MINMAX = 1, COUNT = 2, - MIX = 3 + MIX = 3, + COUNT_ON_INDEX = 4 } struct TOlapScanNode {