Skip to content

Commit

Permalink
[Feature](inverted index) push count on index down to scan node
Browse files Browse the repository at this point in the history
  • Loading branch information
airborne12 committed Aug 17, 2023
1 parent 330f369 commit a5b19fa
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 7 deletions.
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ Status Segment::new_iterator(SchemaSPtr schema, const StorageReadOptions& read_o

RETURN_IF_ERROR(load_index());
if (read_options.delete_condition_predicates->num_of_column_predicate() == 0 &&
read_options.push_down_agg_type_opt != TPushAggOp::NONE) {
read_options.push_down_agg_type_opt != TPushAggOp::NONE &&
read_options.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
iter->reset(vectorized::new_vstatistics_iterator(this->shared_from_this(), *schema));
} else {
iter->reset(new SegmentIterator(this->shared_from_this(), schema));
Expand Down
14 changes: 12 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -956,9 +956,19 @@ bool SegmentIterator::_need_read_data(ColumnId cid) {
// occurring, return true here that column data needs to be read
return true;
}
// Check the following conditions:
// 1. If the column represented by the unique ID is an inverted index column (indicated by '_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id]')
// and it's not marked for projection in '_output_columns'.
// 2. Or, if the column is an inverted index column and it's marked for projection in '_output_columns',
// and the operation is a push down of the 'COUNT_ON_INDEX' aggregation function.
// If any of the above conditions are met, log a debug message indicating that there's no need to read data for the indexed column.
// Then, return false.
int32_t unique_id = _opts.tablet_schema->column(cid).unique_id();
if (_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
_output_columns.count(unique_id) < 1) {
if ((_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
_output_columns.count(unique_id) < 1) ||
(_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id] &&
_output_columns.count(unique_id) == 1 &&
_opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) {
VLOG_DEBUG << "SegmentIterator no need read data for column: "
<< _opts.tablet_schema->column_by_uid(unique_id).name();
return false;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/scan/new_olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ Status NewOlapScanNode::_process_conjuncts() {
}

Status NewOlapScanNode::_build_key_ranges_and_filters() {
if (_push_down_agg_type == TPushAggOp::NONE) {
if (_push_down_agg_type == TPushAggOp::NONE ||
_push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) {
const std::vector<std::string>& column_names = _olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = _olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,9 @@ public PlanFragment visitPhysicalStorageLayerAggregate(
case COUNT:
pushAggOp = TPushAggOp.COUNT;
break;
case COUNT_ON_MATCH:
pushAggOp = TPushAggOp.COUNT_ON_INDEX;
break;
case MIN_MAX:
pushAggOp = TPushAggOp.MINMAX;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ public enum RuleType {
STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITH_PROJECT_FOR_FILE_SCAN(RuleTypeClass.IMPLEMENTATION),
COUNT_ON_INDEX(RuleTypeClass.IMPLEMENTATION),
ONE_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
TWO_PHASE_AGGREGATE_WITHOUT_DISTINCT(RuleTypeClass.IMPLEMENTATION),
TWO_PHASE_AGGREGATE_WITH_COUNT_DISTINCT_MULTI(RuleTypeClass.IMPLEMENTATION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.Match;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
Expand All @@ -56,6 +57,7 @@
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
Expand Down Expand Up @@ -97,6 +99,28 @@ public List<Rule> buildRules() {
PatternDescriptor<LogicalAggregate<GroupPlan>> basePattern = logicalAggregate();

return ImmutableList.of(
RuleType.COUNT_ON_INDEX.build(
logicalAggregate(
logicalProject(
logicalFilter(
logicalOlapScan()
).when(filter -> containsMatchExpression(filter.getExpressions())
&& filter.getExpressions().size() == 1)
))
.when(agg -> enablePushDownCountOnIndex())
.when(agg -> agg.getGroupByExpressions().size() == 0)
.when(agg -> {
Set<AggregateFunction> funcs = agg.getAggregateFunctions();
return !funcs.isEmpty() && funcs.stream().allMatch(f -> f instanceof Count && !f.isDistinct());
})
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child();
LogicalFilter<LogicalOlapScan> filter = project.child();
LogicalOlapScan olapScan = filter.child();
return pushdownCountOnIndex(agg, project, filter, olapScan, ctx.cascadesContext);
})
),
RuleType.STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT.build(
logicalAggregate(
logicalOlapScan()
Expand Down Expand Up @@ -188,6 +212,55 @@ && couldConvertToMulti(agg))
);
}

private boolean containsMatchExpression(List<Expression> expressions) {
return expressions.stream().allMatch(expr -> expr instanceof Match);
}

private boolean enablePushDownCountOnIndex() {
ConnectContext connectContext = ConnectContext.get();
return connectContext != null && connectContext.getSessionVariable().isEnablePushDownCountOnIndex();
}

/**
* sql: select count(*) from tbl where column match 'token'
* <p>
* before:
* <p>
* LogicalAggregate(groupBy=[], output=[count(*)])
* |
* LogicalFilter(column match 'token')
* |
* LogicalOlapScan(table=tbl)
* <p>
* after:
* <p>
* LogicalAggregate(groupBy=[], output=[count(*)])
* |
* LogicalFilter(column match 'token')
* |
* PhysicalStorageLayerAggregate(pushAggOp=COUNT_ON_INDEX, table=PhysicalOlapScan(table=tbl))
*
*/
private LogicalAggregate<? extends Plan> pushdownCountOnIndex(
LogicalAggregate<? extends Plan> agg,
LogicalProject<? extends Plan> project,
LogicalFilter<? extends Plan> filter,
LogicalOlapScan olapScan,
CascadesContext cascadesContext) {
PhysicalOlapScan physicalOlapScan
= (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan()
.build()
.transform(olapScan, cascadesContext)
.get(0);
return agg.withChildren(ImmutableList.of(
project.withChildren(ImmutableList.of(
filter.withChildren(ImmutableList.of(
new PhysicalStorageLayerAggregate(
physicalOlapScan,
PushDownAggOp.COUNT_ON_MATCH)))))
));
}

/**
* sql: select count(*) from tbl
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalPr

/** PushAggOp */
public enum PushDownAggOp {
COUNT, MIN_MAX, MIX;
COUNT, MIN_MAX, MIX, COUNT_ON_MATCH;

/** supportedFunctions */
public static Map<Class<? extends AggregateFunction>, PushDownAggOp> supportedFunctions() {
return ImmutableMap.<Class<? extends AggregateFunction>, PushDownAggOp>builder()
.put(Count.class, PushDownAggOp.COUNT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_INVERTED_INDEX_QUERY = "enable_inverted_index_query";

public static final String ENABLE_PUSHDOWN_COUNT_ON_INDEX = "enable_count_on_index_pushdown";

public static final String GROUP_BY_AND_HAVING_USE_ALIAS_FIRST = "group_by_and_having_use_alias_first";
public static final String DROP_TABLE_IF_CTAS_FAILED = "drop_table_if_ctas_failed";

Expand Down Expand Up @@ -999,9 +1001,14 @@ public void setMaxJoinNumberOfReorder(int maxJoinNumberOfReorder) {

// Whether enable query with inverted index.
@VariableMgr.VarAttr(name = ENABLE_INVERTED_INDEX_QUERY, needForward = true, description = {
"是否启用inverted index query。", "Set wether to use inverted index query."})
"是否启用inverted index query。", "Set whether to use inverted index query."})
public boolean enableInvertedIndexQuery = true;

// Whether enable pushdown count agg to scan node when using inverted index match.
@VariableMgr.VarAttr(name = ENABLE_PUSHDOWN_COUNT_ON_INDEX, needForward = true, description = {
"是否启用count_on_index pushdown。", "Set whether to pushdown count_on_index."})
public boolean enablePushDownCountOnIndex = true;

// Whether drop table when create table as select insert data appear error.
@VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true)
public boolean dropTableIfCtasFailed = true;
Expand Down Expand Up @@ -2072,6 +2079,14 @@ public void setEnableInvertedIndexQuery(boolean enableInvertedIndexQuery) {
this.enableInvertedIndexQuery = enableInvertedIndexQuery;
}

public boolean isEnablePushDownCountOnIndex() {
return enablePushDownCountOnIndex;
}

public void setEnablePushDownCountOnIndex(boolean enablePushDownCountOnIndex) {
this.enablePushDownCountOnIndex = enablePushDownCountOnIndex;
}

public int getMaxTableCountUseCascadesJoinReorder() {
return this.maxTableCountUseCascadesJoinReorder;
}
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ enum TPushAggOp {
NONE = 0,
MINMAX = 1,
COUNT = 2,
MIX = 3
MIX = 3,
COUNT_ON_INDEX = 4
}

struct TOlapScanNode {
Expand Down

0 comments on commit a5b19fa

Please sign in to comment.