From 3e49928daa7797a8b4e0be2b5d1d7d5c4135a595 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Sun, 29 Sep 2024 18:33:04 +0800 Subject: [PATCH] upd --- be/src/common/config.cpp | 2 +- be/src/exprs/runtime_filter.cpp | 2 +- be/src/olap/column_predicate.h | 47 +++++++++++----- be/src/olap/comparison_predicate.h | 7 ++- .../rowset/segment_v2/segment_iterator.cpp | 6 +-- be/src/vec/exprs/vexpr.h | 4 +- be/src/vec/exprs/vexpr_context.cpp | 36 ++++++------- be/src/vec/exprs/vruntimefilter_wrapper.cpp | 5 +- be/src/vec/exprs/vruntimefilter_wrapper.h | 54 +++++++++++++------ 9 files changed, 103 insertions(+), 60 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index a742b1dc8dbf9a..1c9a2113243082 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -896,7 +896,7 @@ DEFINE_mInt32(orc_natural_read_size_mb, "8"); DEFINE_mInt64(big_column_size_buffer, "65535"); DEFINE_mInt64(small_column_size_buffer, "100"); -// rf will decide whether the next sampling_frequency blocks need to be filtered based on the filtering rate of the current block. +// Perform the always_true check at intervals determined by runtime_filter_sampling_frequency DEFINE_mInt32(runtime_filter_sampling_frequency, "64"); // cooldown task configs diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 96c99c606561af..777ae4630666be 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1214,7 +1214,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::listattach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter, always_true_counter); diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h index a4a8e637bc63a5..764521818f5b1f 100644 --- a/be/src/olap/column_predicate.h +++ b/be/src/olap/column_predicate.h @@ -165,6 +165,7 @@ class ColumnPredicate { explicit ColumnPredicate(uint32_t column_id, bool opposite = false) : _column_id(column_id), _opposite(opposite) { _predicate_params = std::make_shared(); + reset_judge_selectivity(); } virtual ~ColumnPredicate() = default; @@ -188,16 +189,15 @@ class ColumnPredicate { // evaluate predicate on IColumn // a short circuit eval way uint16_t evaluate(const vectorized::IColumn& column, uint16_t* sel, uint16_t size) const { - if (always_true(true)) { + if (always_true()) { return size; } uint16_t new_size = _evaluate_inner(column, sel, size); _evaluated_rows += size; _passed_rows += new_size; - if (_can_ignore() && !_judge_counter) { - vectorized::VRuntimeFilterWrapper::judge_selectivity( - get_ignore_threshold(), size - new_size, size, _always_true, _judge_counter); + if (_can_ignore()) { + do_judge_selectivity(size - new_size, size); } return new_size; } @@ -302,15 +302,7 @@ class ColumnPredicate { } } - bool always_true(bool update) const { - if (update) { - _judge_counter--; - if (!_judge_counter) { - _always_true = false; - } - } - return _always_true; - } + bool always_true() const { return _always_true; } protected: virtual std::string _debug_string() const = 0; @@ -326,13 +318,42 @@ class ColumnPredicate { throw Exception(INTERNAL_ERROR, "Not Implemented _evaluate_inner"); } + void reset_judge_selectivity() const { + _always_true = false; + _judge_counter = config::runtime_filter_sampling_frequency; + _judge_input_rows = 0; + _judge_filter_rows = 0; + } + + void do_judge_selectivity(int64_t filter_rows, int64_t input_rows) const { + if ((_judge_counter--) == 0) { + reset_judge_selectivity(); + } + if (!_always_true) { + _judge_filter_rows += filter_rows; + _judge_input_rows += input_rows; + vectorized::VRuntimeFilterWrapper::judge_selectivity( + get_ignore_threshold(), _judge_filter_rows, _judge_input_rows, _always_true); + } + } + uint32_t _column_id; // TODO: the value is only in delete condition, better be template value bool _opposite; std::shared_ptr _predicate_params; mutable uint64_t _evaluated_rows = 1; mutable uint64_t _passed_rows = 0; + // VRuntimeFilterWrapper and ColumnPredicate share the same logic, + // but it's challenging to unify them, so the code is duplicated. + // _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true + // are variables used to implement the _always_true logic, calculated periodically + // based on runtime_filter_sampling_frequency. During each period, if _always_true + // is evaluated as true, the logic for always_true is applied for the rest of that period + // without recalculating. At the beginning of the next period, + // reset_judge_selectivity is used to reset these variables. mutable int _judge_counter = 0; + mutable int _judge_input_rows = 0; + mutable int _judge_filter_rows = 0; mutable bool _always_true = false; }; diff --git a/be/src/olap/comparison_predicate.h b/be/src/olap/comparison_predicate.h index ece960f0250459..63227919652b8b 100644 --- a/be/src/olap/comparison_predicate.h +++ b/be/src/olap/comparison_predicate.h @@ -339,14 +339,13 @@ class ComparisonPredicateBase : public ColumnPredicate { } } - if (_can_ignore() && !_judge_counter) { + if (_can_ignore()) { for (uint16_t i = 0; i < size; i++) { current_passed_rows += flags[i]; } _passed_rows += current_passed_rows; - vectorized::VRuntimeFilterWrapper::judge_selectivity( - get_ignore_threshold(), current_evaluated_rows - current_passed_rows, - current_evaluated_rows, _always_true, _judge_counter); + do_judge_selectivity(current_evaluated_rows - current_passed_rows, + current_evaluated_rows); } } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 2a7da619c7a3ab..3db52596d688cc 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -1760,14 +1760,14 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_ SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns); bool all_pred_always_true = true; for (const auto& pred : _pre_eval_block_predicate) { - if (!pred->always_true(false)) { + if (!pred->always_true()) { all_pred_always_true = false; break; } } if (all_pred_always_true) { for (const auto& pred : _pre_eval_block_predicate) { - pred->always_true(true); + pred->always_true(); } } @@ -1784,7 +1784,7 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_ DCHECK(!_pre_eval_block_predicate.empty()); bool is_first = true; for (auto& pred : _pre_eval_block_predicate) { - if (pred->always_true(true)) { + if (pred->always_true()) { continue; } auto column_id = pred->column_id(); diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h index 382713b2afc018..ff72b156ef5a68 100644 --- a/be/src/vec/exprs/vexpr.h +++ b/be/src/vec/exprs/vexpr.h @@ -155,9 +155,9 @@ class VExpr { VExprSPtr get_child(int i) const { return _children[i]; } int get_num_children() const { return _children.size(); } - virtual bool need_judge_selectivity() { + virtual bool is_rf_wrapper() const { return std::ranges::any_of(_children.begin(), _children.end(), - [](VExprSPtr child) { return child->need_judge_selectivity(); }); + [](VExprSPtr child) { return child->is_rf_wrapper(); }); } virtual void do_judge_selectivity(int64_t filter_rows, int64_t input_rows) { diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index bcfd7cda102e2c..9f99949a9cd39e 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -169,7 +169,8 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, *can_filter_all = false; auto* __restrict result_filter_data = result_filter->data(); for (const auto& ctx : ctxs) { - bool need_judge_selectivity = ctx->root()->need_judge_selectivity(); + // Statistics are only required when an rf wrapper exists in the expr. + bool is_rf_wrapper = ctx->root()->is_rf_wrapper(); int result_column_id = -1; RETURN_IF_ERROR(ctx->execute(block, &result_column_id)); ColumnPtr& filter_column = block->get_by_position(result_column_id).column; @@ -186,9 +187,8 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); int input_rows = - rows - (need_judge_selectivity - ? simd::count_zero_num((int8*)result_filter_data, rows) - : 0); + rows - + (is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0); if (accept_null) { for (size_t i = 0; i < rows; ++i) { @@ -201,16 +201,15 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, } int output_rows = - rows - (need_judge_selectivity - ? simd::count_zero_num((int8*)result_filter_data, rows) - : 0); + rows - + (is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0); - if (need_judge_selectivity) { + if (is_rf_wrapper) { ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows); } - if ((need_judge_selectivity && output_rows == 0) || - (!need_judge_selectivity && memchr(result_filter_data, 0x1, rows) == nullptr)) { + if ((is_rf_wrapper && output_rows == 0) || + (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) { *can_filter_all = true; return Status::OK(); } @@ -227,25 +226,24 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, assert_cast(*filter_column).get_data(); const auto* __restrict filter_data = filter.data(); - int input_rows = rows - (need_judge_selectivity - ? simd::count_zero_num((int8*)result_filter_data, rows) - : 0); + int input_rows = + rows - + (is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0); for (size_t i = 0; i < rows; ++i) { result_filter_data[i] &= filter_data[i]; } int output_rows = - rows - (need_judge_selectivity - ? simd::count_zero_num((int8*)result_filter_data, rows) - : 0); + rows - + (is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0); - if (need_judge_selectivity) { + if (is_rf_wrapper) { ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows); } - if ((need_judge_selectivity && output_rows == 0) || - (!need_judge_selectivity && memchr(result_filter_data, 0x1, rows) == nullptr)) { + if ((is_rf_wrapper && output_rows == 0) || + (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) { *can_filter_all = true; return Status::OK(); } diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp b/be/src/vec/exprs/vruntimefilter_wrapper.cpp index 20cee5d2000fff..a8d0519a8fe7c9 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp +++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp @@ -63,7 +63,9 @@ namespace doris::vectorized { VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, const VExprSPtr& impl, double ignore_thredhold, bool null_aware) - : VExpr(node), _impl(impl), _ignore_thredhold(ignore_thredhold), _null_aware(null_aware) {} + : VExpr(node), _impl(impl), _ignore_thredhold(ignore_thredhold), _null_aware(null_aware) { + reset_judge_selectivity(); +} Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) { @@ -88,7 +90,6 @@ void VRuntimeFilterWrapper::close(VExprContext* context, Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) { DCHECK(_open_finished || _getting_const_col); - _judge_counter--; if (_always_true) { size_t size = block->rows(); block->insert({create_always_true_column(size, _data_type->is_nullable()), _data_type, diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h b/be/src/vec/exprs/vruntimefilter_wrapper.h index b75cb520ed4a4a..2bd604e46345b8 100644 --- a/be/src/vec/exprs/vruntimefilter_wrapper.h +++ b/be/src/vec/exprs/vruntimefilter_wrapper.h @@ -72,34 +72,58 @@ class VRuntimeFilterWrapper final : public VExpr { _always_true_counter = always_true_counter; } - template + void update_counters(int64_t filter_rows, int64_t input_rows) { + if (_expr_filtered_rows_counter) { + COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows); + } + if (_expr_input_rows_counter) { + COUNTER_UPDATE(_expr_input_rows_counter, input_rows); + } + } + + template static void judge_selectivity(double ignore_threshold, int64_t filter_rows, int64_t input_rows, - T& always_true, TT& judge_counter) { + T& always_true) { always_true = filter_rows / (input_rows * 1.0) < ignore_threshold; - judge_counter = config::runtime_filter_sampling_frequency; } - bool need_judge_selectivity() override { - if (_judge_counter <= 0) { - _always_true = false; - return true; - } - return false; - } + bool is_rf_wrapper() const override { return true; } void do_judge_selectivity(int64_t filter_rows, int64_t input_rows) override { - judge_selectivity(_ignore_thredhold, filter_rows, input_rows, _always_true, _judge_counter); - if (_expr_filtered_rows_counter) { - COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows); + update_counters(filter_rows, input_rows); + + if (_judge_counter.fetch_sub(1) == 0) { + reset_judge_selectivity(); } - if (_expr_input_rows_counter) { - COUNTER_UPDATE(_expr_input_rows_counter, input_rows); + + if (!_always_true) { + _judge_filter_rows += filter_rows; + _judge_input_rows += input_rows; + judge_selectivity(_ignore_thredhold, _judge_filter_rows, _judge_input_rows, + _always_true); } } private: + void reset_judge_selectivity() { + _always_true = false; + _judge_counter = config::runtime_filter_sampling_frequency; + _judge_input_rows = 0; + _judge_filter_rows = 0; + } + VExprSPtr _impl; + // VRuntimeFilterWrapper and ColumnPredicate share the same logic, + // but it's challenging to unify them, so the code is duplicated. + // _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true + // are variables used to implement the _always_true logic, calculated periodically + // based on runtime_filter_sampling_frequency. During each period, if _always_true + // is evaluated as true, the logic for always_true is applied for the rest of that period + // without recalculating. At the beginning of the next period, + // reset_judge_selectivity is used to reset these variables. std::atomic_int _judge_counter = 0; + std::atomic_int _judge_input_rows = 0; + std::atomic_int _judge_filter_rows = 0; std::atomic_int _always_true = false; RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr;