Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](inverted index) add inlist condition handling to compound #34375

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,24 +269,26 @@ class ColumnValueRange {
}

void to_condition_in_compound(std::vector<TCondition>& filters) {
for (const auto& value : _compound_values) {
for (const auto& compound_value : _compound_values) {
TCondition condition;
condition.__set_column_name(_column_name);
if (value.first == FILTER_LARGER) {
if (compound_value.first == FILTER_LARGER) {
condition.__set_condition_op(">>");
} else if (value.first == FILTER_LARGER_OR_EQUAL) {
} else if (compound_value.first == FILTER_LARGER_OR_EQUAL) {
condition.__set_condition_op(">=");
} else if (value.first == FILTER_LESS) {
} else if (compound_value.first == FILTER_LESS) {
condition.__set_condition_op("<<");
} else if (value.first == FILTER_LESS_OR_EQUAL) {
} else if (compound_value.first == FILTER_LESS_OR_EQUAL) {
condition.__set_condition_op("<=");
} else if (value.first == FILTER_IN) {
} else if (compound_value.first == FILTER_IN) {
condition.__set_condition_op("*=");
} else if (value.first == FILTER_NOT_IN) {
} else if (compound_value.first == FILTER_NOT_IN) {
condition.__set_condition_op("!*=");
}
condition.condition_values.push_back(
cast_to_string<primitive_type, CppType>(value.second, _scale));
for (const auto& value : compound_value.second) {
condition.condition_values.push_back(
cast_to_string<primitive_type, CppType>(value, _scale));
}
if (condition.condition_values.size() != 0) {
filters.push_back(std::move(condition));
}
Expand Down Expand Up @@ -438,7 +440,7 @@ class ColumnValueRange {
primitive_type == PrimitiveType::TYPE_DATETIMEV2;

// range value except leaf node of and node in compound expr tree
std::set<std::pair<SQLFilterOp, CppType>> _compound_values;
std::map<SQLFilterOp, std::set<CppType>> _compound_values;
bool _marked_runtime_filter_predicate = false;
};

Expand Down Expand Up @@ -591,8 +593,7 @@ Status ColumnValueRange<primitive_type>::add_fixed_value(const CppType& value) {

template <PrimitiveType primitive_type>
Status ColumnValueRange<primitive_type>::add_compound_value(SQLFilterOp op, CppType value) {
std::pair<SQLFilterOp, CppType> val_with_op(op, value);
_compound_values.insert(val_with_op);
_compound_values[op].insert(value);
_contain_null = false;

_high_value = TYPE_MIN;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/olap_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ inline SQLFilterOp to_olap_filter_type(const std::string& function_name, bool op
return opposite ? FILTER_NOT_IN : FILTER_IN;
} else if (function_name == "ne") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
} else if (function_name == "in_list") {
return opposite ? FILTER_IN : FILTER_NOT_IN;
} else if (function_name == "not_in_list") {
return opposite ? FILTER_NOT_IN : FILTER_IN;
} else {
DCHECK(false) << "Function Name: " << function_name;
return FILTER_IN;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace doris {
class Schema;

struct PredicateParams {
std::string value;
std::vector<std::string> values;
bool marked_by_runtime_filter = false;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ComparisonPredicateBase : public ColumnPredicate {

void clone(ColumnPredicate** to) const override {
auto* cloned = new ComparisonPredicateBase(_column_id, _value, _opposite);
cloned->predicate_params()->value = _predicate_params->value;
cloned->predicate_params()->values = _predicate_params->values;
cloned->_cache_code_enabled = true;
cloned->predicate_params()->marked_by_runtime_filter =
_predicate_params->marked_by_runtime_filter;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
// record condition value into predicate_params in order to pushdown segment_iterator,
// _gen_predicate_result_sign will build predicate result unique sign with condition value
auto predicate_params = predicate->predicate_params();
predicate_params->value = condition.condition_values[0];
predicate_params->values = condition.condition_values;
predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter;
if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) {
_value_col_predicates.push_back(predicate);
Expand Down Expand Up @@ -556,7 +556,7 @@ void TabletReader::_init_conditions_param_except_leafnode_of_andnode(
if (predicate != nullptr) {
auto predicate_params = predicate->predicate_params();
predicate_params->marked_by_runtime_filter = condition.marked_by_runtime_filter;
predicate_params->value = condition.condition_values[0];
predicate_params->values = condition.condition_values;
_col_preds_except_leafnode_of_andnode.push_back(predicate);
}
}
Expand Down
29 changes: 23 additions & 6 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -698,10 +698,17 @@ Status SegmentIterator::_execute_predicates_except_leafnode_of_andnode(
_column_predicate_info->column_name = expr->expr_name();
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = std::dynamic_pointer_cast<doris::vectorized::VLiteral>(expr);
_column_predicate_info->query_value = v_literal_expr->value();
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_values.push_back(v_literal_expr->value());
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED ||
node_type == TExprNodeType::IN_PRED) {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in_list";
} else {
_column_predicate_info->query_op = "not_in_list";
}
} else {
_column_predicate_info->query_op = expr->fn().name.function_name;
}
Expand Down Expand Up @@ -823,7 +830,9 @@ Status SegmentIterator::_apply_index_except_leafnode_of_andnode() {
bool is_support = pred_type == PredicateType::EQ || pred_type == PredicateType::NE ||
pred_type == PredicateType::LT || pred_type == PredicateType::LE ||
pred_type == PredicateType::GT || pred_type == PredicateType::GE ||
pred_type == PredicateType::MATCH;
pred_type == PredicateType::MATCH ||
pred_type == PredicateType::IN_LIST ||
pred_type == PredicateType::NOT_IN_LIST;
if (!is_support) {
continue;
}
Expand Down Expand Up @@ -905,15 +914,17 @@ std::string SegmentIterator::_gen_predicate_result_sign(ColumnPredicate* predica
auto pred_type = predicate->type();
auto predicate_params = predicate->predicate_params();
pred_result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX + column_desc->name() + "_" +
predicate->pred_type_string(pred_type) + "_" + predicate_params->value;
predicate->pred_type_string(pred_type) + "_";
pred_result_sign += join(predicate_params->values, ",");

return pred_result_sign;
}

std::string SegmentIterator::_gen_predicate_result_sign(ColumnPredicateInfo* predicate_info) {
std::string pred_result_sign;
pred_result_sign = BeConsts::BLOCK_TEMP_COLUMN_PREFIX + predicate_info->column_name + "_" +
predicate_info->query_op + "_" + predicate_info->query_value;
predicate_info->query_op + "_";
pred_result_sign += join(predicate_info->query_values, ",");
return pred_result_sign;
}

Expand Down Expand Up @@ -2459,10 +2470,16 @@ void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
}
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = static_cast<const doris::vectorized::VLiteral*>(expr.get());
_column_predicate_info->query_value = v_literal_expr->value();
_column_predicate_info->query_values.push_back(v_literal_expr->value());
} else {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
if (expr->op() == TExprOpcode::type::FILTER_IN) {
_column_predicate_info->query_op = "in_list";
} else {
_column_predicate_info->query_op = "not_in_list";
}
} else if (node_type != TExprNodeType::COMPOUND_PRED) {
_column_predicate_info->query_op = expr->fn().name.function_name;
}
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,20 @@ struct ColumnPredicateInfo {
std::string debug_string() const {
std::stringstream ss;
ss << "column_name=" << column_name << ", query_op=" << query_op
<< ", query_value=" << query_value;
<< ", query_value=" << join(query_values, ",");
return ss.str();
}

bool is_empty() const { return column_name.empty() && query_value.empty() && query_op.empty(); }
bool is_empty() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'is_empty' should be marked [[nodiscard]] [modernize-use-nodiscard]

Suggested change
bool is_empty() const {
[[nodiscard]] bool is_empty() const {

return column_name.empty() && query_values.empty() && query_op.empty();
}

bool is_equal(const ColumnPredicateInfo& column_pred_info) const {
if (column_pred_info.column_name != column_name) {
return false;
}

if (column_pred_info.query_value != query_value) {
if (column_pred_info.query_values != query_values) {
return false;
}

Expand All @@ -95,7 +97,7 @@ struct ColumnPredicateInfo {
}

std::string column_name;
std::string query_value;
std::vector<std::string> query_values;
std::string query_op;
};

Expand Down
104 changes: 70 additions & 34 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,9 @@ Status VScanNode::_normalize_compound_predicate(
auto children_num = expr->children().size();
for (auto i = 0; i < children_num; ++i) {
auto child_expr = expr->children()[i].get();
if (TExprNodeType::BINARY_PRED == child_expr->node_type()) {
if (TExprNodeType::BINARY_PRED == child_expr->node_type() ||
TExprNodeType::IN_PRED == child_expr->node_type() ||
TExprNodeType::MATCH_PRED == child_expr->node_type()) {
SlotDescriptor* slot = nullptr;
ColumnValueRangeType* range_on_slot = nullptr;
if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot,
Expand All @@ -1027,30 +1029,16 @@ Status VScanNode::_normalize_compound_predicate(
value_range.mark_runtime_filter_predicate(
_is_runtime_filter_predicate);
}};
_normalize_binary_in_compound_predicate(child_expr, expr_ctx, slot,
if (TExprNodeType::BINARY_PRED == child_expr->node_type()) {
_normalize_binary_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
} else if (TExprNodeType::IN_PRED == child_expr->node_type()) {
_normalize_in_and_not_in_compound_predicate(
child_expr, expr_ctx, slot, value_range, pdt);
} else {
_normalize_match_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
},
active_range);

_compound_value_ranges.emplace_back(active_range);
}
} else if (TExprNodeType::MATCH_PRED == child_expr->node_type()) {
SlotDescriptor* slot = nullptr;
ColumnValueRangeType* range_on_slot = nullptr;
if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot,
&range_on_slot) ||
_is_predicate_acting_on_slot(child_expr, eq_predicate_checker, &slot,
&range_on_slot)) {
ColumnValueRangeType active_range =
*range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range
std::visit(
[&](auto& value_range) {
Defer mark_runtime_filter_flag {[&]() {
value_range.mark_runtime_filter_predicate(
_is_runtime_filter_predicate);
}};
_normalize_match_in_compound_predicate(child_expr, expr_ctx, slot,
value_range, pdt);
}
},
active_range);

Expand All @@ -1068,11 +1056,10 @@ Status VScanNode::_normalize_compound_predicate(
}

template <PrimitiveType T>
Status VScanNode::_normalize_binary_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
Status VScanNode::_normalize_binary_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx, SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
DCHECK(expr->children().size() == 2);
if (TExprNodeType::BINARY_PRED == expr->node_type()) {
auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; };
Expand Down Expand Up @@ -1126,11 +1113,60 @@ Status VScanNode::_normalize_binary_in_compound_predicate(vectorized::VExpr* exp
}

template <PrimitiveType T>
Status VScanNode::_normalize_match_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
Status VScanNode::_normalize_in_and_not_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx,
SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
if (TExprNodeType::IN_PRED == expr->node_type()) {
std::string fn_name =
expr->op() == TExprOpcode::type::FILTER_IN ? "in_list" : "not_in_list";

HybridSetBase::IteratorBase* iter = nullptr;
auto hybrid_set = expr->get_set_func();

if (hybrid_set != nullptr) {
if (hybrid_set->size() <= _max_pushdown_conditions_per_column) {
iter = hybrid_set->begin();
} else {
_filter_predicates.in_filters.emplace_back(slot->col_name(), expr->get_set_func());
*pdt = PushDownType::ACCEPTABLE;
return Status::OK();
}
} else {
VInPredicate* pred = static_cast<VInPredicate*>(expr);

InState* state = reinterpret_cast<InState*>(
expr_ctx->fn_context(pred->fn_context_index())
->get_function_state(FunctionContext::FRAGMENT_LOCAL));

if (!state->use_set) {
return Status::OK();
}

iter = state->hybrid_set->begin();
}

while (iter->has_next()) {
if (nullptr == iter->get_value()) {
iter->next();
continue;
}
auto value = const_cast<void*>(iter->get_value());
RETURN_IF_ERROR(_change_value_range<false>(
range, value, ColumnValueRange<T>::add_compound_value_range, fn_name));
iter->next();
}
*pdt = PushDownType::ACCEPTABLE;
}
return Status::OK();
}

template <PrimitiveType T>
Status VScanNode::_normalize_match_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx, SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt) {
DCHECK(expr->children().size() == 2);
if (TExprNodeType::MATCH_PRED == expr->node_type()) {
RETURN_IF_ERROR(_normalize_match_predicate(expr, expr_ctx, slot, range, pdt));
Expand Down
18 changes: 12 additions & 6 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,20 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer {
eq_predicate_checker);

template <PrimitiveType T>
Status _normalize_binary_in_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);
Status _normalize_binary_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);

template <PrimitiveType T>
Status _normalize_match_in_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);
Status _normalize_in_and_not_in_compound_predicate(vectorized::VExpr* expr,
VExprContext* expr_ctx, SlotDescriptor* slot,
ColumnValueRange<T>& range,
PushDownType* pdt);

template <PrimitiveType T>
Status _normalize_match_compound_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
SlotDescriptor* slot, ColumnValueRange<T>& range,
PushDownType* pdt);

template <PrimitiveType T>
Status _normalize_is_null_predicate(vectorized::VExpr* expr, VExprContext* expr_ctx,
Expand Down
Loading
Loading