Skip to content

Commit

Permalink
[opt](inverted index) add inlist condition handling to compound
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed May 1, 2024
1 parent 74ed6e8 commit 11298c6
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 66 deletions.
25 changes: 13 additions & 12 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,24 +269,26 @@ class ColumnValueRange {
}

void to_condition_in_compound(std::vector<TCondition>& filters) {
for (const auto& value : _compound_values) {
for (const auto& compound_value : _compound_values) {
TCondition condition;
condition.__set_column_name(_column_name);
if (value.first == FILTER_LARGER) {
if (compound_value.first == FILTER_LARGER) {
condition.__set_condition_op(">>");
} else if (value.first == FILTER_LARGER_OR_EQUAL) {
} else if (compound_value.first == FILTER_LARGER_OR_EQUAL) {
condition.__set_condition_op(">=");
} else if (value.first == FILTER_LESS) {
} else if (compound_value.first == FILTER_LESS) {
condition.__set_condition_op("<<");
} else if (value.first == FILTER_LESS_OR_EQUAL) {
} else if (compound_value.first == FILTER_LESS_OR_EQUAL) {
condition.__set_condition_op("<=");
} else if (value.first == FILTER_IN) {
} else if (compound_value.first == FILTER_IN) {
condition.__set_condition_op("*=");
} else if (value.first == FILTER_NOT_IN) {
} else if (compound_value.first == FILTER_NOT_IN) {
condition.__set_condition_op("!*=");
}
condition.condition_values.push_back(
cast_to_string<primitive_type, CppType>(value.second, _scale));
for (const auto& value : compound_value.second) {
condition.condition_values.push_back(
cast_to_string<primitive_type, CppType>(value, _scale));
}
if (condition.condition_values.size() != 0) {
filters.push_back(std::move(condition));
}
Expand Down Expand Up @@ -438,7 +440,7 @@ class ColumnValueRange {
primitive_type == PrimitiveType::TYPE_DATETIMEV2;

// range value except leaf node of and node in compound expr tree
std::set<std::pair<SQLFilterOp, CppType>> _compound_values;
std::map<SQLFilterOp, std::set<CppType>> _compound_values;
bool _marked_runtime_filter_predicate = false;
};

Expand Down Expand Up @@ -591,8 +593,7 @@ Status ColumnValueRange<primitive_type>::add_fixed_value(const CppType& value) {

template <PrimitiveType primitive_type>
Status ColumnValueRange<primitive_type>::add_compound_value(SQLFilterOp op, CppType value) {
std::pair<SQLFilterOp, CppType> val_with_op(op, value);
_compound_values.insert(val_with_op);
_compound_values[op].insert(value);
_contain_null = false;

_high_value = TYPE_MIN;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool op
return opposite ? FILTER_NOT_IN : FILTER_IN;
} else if (function_name == "ne") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
} else if (function_name == "in_list") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
} else if (function_name == "not_in_list") {
return opposite ? FILTER_NOT_IN : FILTER_IN;
} else {
DCHECK(false) << "Function Name: " << function_name;
return FILTER_IN;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace doris {
class Schema;

struct PredicateParams {
std::string value;
std::vector<std::string> values;
bool marked_by_runtime_filter = false;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ComparisonPredicateBase : public ColumnPredicate {

void clone(ColumnPredicate** to) const override {
auto* cloned = new ComparisonPredicateBase(_column_id, _value, _opposite);
cloned->predicate_params()->value = _predicate_params->value;
cloned->predicate_params()->values = _predicate_params->values;
cloned->_cache_code_enabled = true;
cloned->predicate_params()->marked_by_runtime_filter =
_predicate_params->marked_by_runtime_filter;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
// record condition value into predicate_params in order to pushdown segment_iterator,
// _gen_predicate_result_sign will build predicate result unique sign with condition value
auto predicate_params = predicate->predicate_params();
predicate_params->value = condition.condition_values[0];
predicate_params->values = condition.condition_values;
predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter;
if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
_value_col_predicates.push_back(predicate);
Expand Down Expand Up @@ -556,7 +556,7 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
if (predicate != nullptr) {
auto predicate_params = predicate->predicate_params();
predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter;
predicate_params->value = condition.condition_values[0];
predicate_params->values = condition.condition_values;
_col_preds_except_leafnode_of_andnode.push_back(predicate);
}
}
Expand Down
29 changes: 23 additions & 6 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,17 @@ Status SegmentIterator::_execute_predicates_except_leafnode_of_andnode(
_column_predicate_info->column_name = expr->expr_name();
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = std::dynamic_pointer_cast<doris::vectorized::VLiteral>(expr);
_column_predicate_info->query_value = v_literal_expr->value();
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_values.push_back(v_literal_expr->value());
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED ||
node_type == TExprNodeType::IN_PRED) {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in_list";
} else {
_column_predicate_info->query_op = "not_in_list";
}
} else {
_column_predicate_info->query_op = expr->fn().name.function_name;
}
Expand Down Expand Up @@ -823,7 +830,9 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
bool is_support = pred_type == PredicateType::EQ || pred_type == PredicateType::NE ||
pred_type == PredicateType::LT || pred_type == PredicateType::LE ||
pred_type == PredicateType::GT || pred_type == PredicateType::GE ||
pred_type == PredicateType::MATCH;
pred_type == PredicateType::MATCH ||
pred_type == PredicateType::IN_LIST ||
pred_type == PredicateType::NOT_IN_LIST;
if (!is_support) {
continue;
}
Expand Down Expand Up @@ -905,15 +914,17 @@ std::string SegmentIterator::_gen_predicate_result_sign(ColumnPredicate* predica
auto pred_type = predicate->type();
auto predicate_params = predicate->predicate_params();
pred_result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX + column_desc->name() + "_" +
predicate->pred_type_string(pred_type) + "_" + predicate_params->value;
predicate->pred_type_string(pred_type) + "_";
pred_result_sign += join(predicate_params->values, ",");

return pred_result_sign;
}

std::string SegmentIterator::_gen_predicate_result_sign(ColumnPredicateInfo* predicate_info) {
std::string pred_result_sign;
pred_result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX + predicate_info->column_name + "_" +
predicate_info->query_op + "_" + predicate_info->query_value;
predicate_info->query_op + "_";
pred_result_sign += join(predicate_info->query_values, ",");
return pred_result_sign;
}

Expand Down Expand Up @@ -2459,10 +2470,16 @@ void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
}
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = static_cast<const doris::vectorized::VLiteral*>(expr.get());
_column_predicate_info->query_value = v_literal_expr->value();
_column_predicate_info->query_values.push_back(v_literal_expr->value());
} else {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in_list";
} else {
_column_predicate_info->query_op = "not_in_list";
}
} else if (node_type != TExprNodeType::COMPOUND_PRED) {
_column_predicate_info->query_op = expr->fn().name.function_name;
}
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ struct ColumnPredicateInfo {
std::string debug_string() const {
std::stringstream ss;
ss << "column_name=" << column_name << ", query_op=" << query_op
<< ", query_value=" << query_value;
<< ", query_value=" << join(query_values, ",");
return ss.str();
}

bool is_empty() const { return column_name.empty() && query_value.empty() && query_op.empty(); }
bool is_empty() const {
return column_name.empty() && query_values.empty() && query_op.empty();
}

bool is_equal(const ColumnPredicateInfo& column_pred_info) const {
if (column_pred_info.column_name != column_name) {
return false;
}

if (column_pred_info.query_value != query_value) {
if (column_pred_info.query_values != query_values) {
return false;
}

Expand All @@ -95,7 +97,7 @@ struct ColumnPredicateInfo {
}

std::string column_name;
std::string query_value;
std::vector<std::string> query_values;
std::string query_op;
};

Expand Down
104 changes: 70 additions & 34 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,9 @@ Status VScanNode::_normalize_compound_predicate(
auto children_num = expr->children().size();
for (auto i = 0; i < children_num; ++i) {
auto child_expr = expr->children()[i].get();
if (TExprNodeType::BINARY_PRED == child_expr->node_type()) {
if (TExprNodeType::BINARY_PRED == child_expr->node_type() ||
TExprNodeType::IN_PRED == child_expr->node_type() ||
TExprNodeType::MATCH_PRED == child_expr->node_type()) {
SlotDescriptor* slot = nullptr;
ColumnValueRangeType* range_on_slot = nullptr;
if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot,
Expand All @@ -1027,30 +1029,16 @@ Status VScanNode::_normalize_compound_predicate(
value_range.mark_runtime_filter_predicate(
_is_runtime_filter_predicate);
}};
_normalize_binary_in_compound_predicate(child_expr, expr_ctx, slot,
if (TExprNodeType::BINARY_PRED == child_expr->node_type()) {
_normalize_binary_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
} else if (TExprNodeType::IN_PRED == child_expr->node_type()) {
_normalize_in_and_not_in_compound_predicate(
child_expr, expr_ctx, slot, value_range, pdt);
} else {
_normalize_match_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
},
active_range);

_compound_value_ranges.emplace_back(active_range);
}
} else if (TExprNodeType::MATCH_PRED == child_expr->node_type()) {
SlotDescriptor* slot = nullptr;
ColumnValueRangeType* range_on_slot = nullptr;
if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot,
&range_on_slot) ||
_is_predicate_acting_on_slot(child_expr, eq_predicate_checker, &slot,
&range_on_slot)) {
ColumnValueRangeType active_range =
*range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range
std::visit(
[&](auto& value_range) {
Defer mark_runtime_filter_flag {[&]() {
value_range.mark_runtime_filter_predicate(
_is_runtime_filter_predicate);
}};
_normalize_match_in_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
}
},
active_range);

Expand All @@ -1068,11 +1056,10 @@ Status VScanNode::_normalize_compound_predicate(
}

template <PrimitiveType T>
Status VScanNode::_normalize_binary_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
Status VScanNode::_normalize_binary_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx, SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
DCHECK(expr->children().size() == 2);
if (TExprNodeType::BINARY_PRED == expr->node_type()) {
auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; };
Expand Down Expand Up @@ -1126,11 +1113,60 @@ Status VScanNode::_normalize_binary_in_compound_predicate(vectorized::VExpr* exp
}

template <PrimitiveType T>
Status VScanNode::_normalize_match_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
Status VScanNode::_normalize_in_and_not_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
if (TExprNodeType::IN_PRED == expr->node_type()) {
std::string fn_name =
expr->op() == TExprOpcode::type::FILTER_IN ? "in_list" : "not_in_list";

HybridSetBase::IteratorBase* iter = nullptr;
auto hybrid_set = expr->get_set_func();

if (hybrid_set != nullptr) {
if (hybrid_set->size() <= _max_pushdown_conditions_per_column) {
iter = hybrid_set->begin();
} else {
_filter_predicates.in_filters.emplace_back(slot->col_name(), expr->get_set_func());
*pdt = PushDownType::ACCEPTABLE;
return Status::OK();
}
} else {
VInPredicate* pred = static_cast<VInPredicate*>(expr);

InState* state = reinterpret_cast<InState*>(
expr_ctx->fn_context(pred->fn_context_index())
->get_function_state(FunctionContext::FRAGMENT_LOCAL));

if (!state->use_set) {
return Status::OK();
}

iter = state->hybrid_set->begin();
}

while (iter->has_next()) {
if (nullptr == iter->get_value()) {
iter->next();
continue;
}
auto value = const_cast<void*>(iter->get_value());
RETURN_IF_ERROR(_change_value_range<false>(
range, value, ColumnValueRange<T>::add_compound_value_range, fn_name));
iter->next();
}
*pdt = PushDownType::ACCEPTABLE;
}
return Status::OK();
}

template <PrimitiveType T>
Status VScanNode::_normalize_match_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx, SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
DCHECK(expr->children().size() == 2);
if (TExprNodeType::MATCH_PRED == expr->node_type()) {
RETURN_IF_ERROR(_normalize_match_predicate(expr, expr_ctx, slot, range, pdt));
Expand Down
18 changes: 12 additions & 6 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,20 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
eq_predicate_checker);

template <PrimitiveType T>
Status _normalize_binary_in_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);
Status _normalize_binary_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);

template <PrimitiveType T>
Status _normalize_match_in_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);
Status _normalize_in_and_not_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx, SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt);

template <PrimitiveType T>
Status _normalize_match_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);

template <PrimitiveType T>
Status _normalize_is_null_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
Expand Down
Loading

0 comments on commit 11298c6

Please sign in to comment.