Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Sep 29, 2024
1 parent ae766d3 commit 3e49928
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 60 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ DEFINE_mInt32(orc_natural_read_size_mb, "8");
DEFINE_mInt64(big_column_size_buffer, "65535");
DEFINE_mInt64(small_column_size_buffer, "100");

// rf will decide whether the next sampling_frequency blocks need to be filtered based on the filtering rate of the current block.
// Perform the always_true check at intervals determined by runtime_filter_sampling_frequency
DEFINE_mInt32(runtime_filter_sampling_frequency, "64");

// cooldown task configs
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1214,7 +1214,7 @@ Status IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
// The runtime filter is pushed down, adding filtering information.
auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, "expr_filtered_rows", TUnit::UNIT);
auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", TUnit::UNIT);
auto* always_true_counter = ADD_COUNTER(_profile, "always_true", TUnit::UNIT);
auto* always_true_counter = ADD_COUNTER(_profile, "always_true_pass_rows", TUnit::UNIT);
for (auto i = origin_size; i < push_exprs.size(); i++) {
push_exprs[i]->attach_profile_counter(expr_filtered_rows_counter, expr_input_rows_counter,
always_true_counter);
Expand Down
47 changes: 34 additions & 13 deletions be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class ColumnPredicate {
explicit ColumnPredicate(uint32_t column_id, bool opposite = false)
: _column_id(column_id), _opposite(opposite) {
_predicate_params = std::make_shared<PredicateParams>();
reset_judge_selectivity();
}

virtual ~ColumnPredicate() = default;
Expand All @@ -188,16 +189,15 @@ class ColumnPredicate {
// evaluate predicate on IColumn
// a short circuit eval way
uint16_t evaluate(const vectorized::IColumn& column, uint16_t* sel, uint16_t size) const {
if (always_true(true)) {
if (always_true()) {
return size;
}

uint16_t new_size = _evaluate_inner(column, sel, size);
_evaluated_rows += size;
_passed_rows += new_size;
if (_can_ignore() && !_judge_counter) {
vectorized::VRuntimeFilterWrapper::judge_selectivity(
get_ignore_threshold(), size - new_size, size, _always_true, _judge_counter);
if (_can_ignore()) {
do_judge_selectivity(size - new_size, size);
}
return new_size;
}
Expand Down Expand Up @@ -302,15 +302,7 @@ class ColumnPredicate {
}
}

bool always_true(bool update) const {
if (update) {
_judge_counter--;
if (!_judge_counter) {
_always_true = false;
}
}
return _always_true;
}
bool always_true() const { return _always_true; }

protected:
virtual std::string _debug_string() const = 0;
Expand All @@ -326,13 +318,42 @@ class ColumnPredicate {
throw Exception(INTERNAL_ERROR, "Not Implemented _evaluate_inner");
}

void reset_judge_selectivity() const {
_always_true = false;
_judge_counter = config::runtime_filter_sampling_frequency;
_judge_input_rows = 0;
_judge_filter_rows = 0;
}

void do_judge_selectivity(int64_t filter_rows, int64_t input_rows) const {
if ((_judge_counter--) == 0) {
reset_judge_selectivity();
}
if (!_always_true) {
_judge_filter_rows += filter_rows;
_judge_input_rows += input_rows;
vectorized::VRuntimeFilterWrapper::judge_selectivity(
get_ignore_threshold(), _judge_filter_rows, _judge_input_rows, _always_true);
}
}

uint32_t _column_id;
// TODO: the value is only in delete condition, better be template value
bool _opposite;
std::shared_ptr<PredicateParams> _predicate_params;
mutable uint64_t _evaluated_rows = 1;
mutable uint64_t _passed_rows = 0;
// VRuntimeFilterWrapper and ColumnPredicate share the same logic,
// but it's challenging to unify them, so the code is duplicated.
// _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true
// are variables used to implement the _always_true logic, calculated periodically
// based on runtime_filter_sampling_frequency. During each period, if _always_true
// is evaluated as true, the logic for always_true is applied for the rest of that period
// without recalculating. At the beginning of the next period,
// reset_judge_selectivity is used to reset these variables.
mutable int _judge_counter = 0;
mutable int _judge_input_rows = 0;
mutable int _judge_filter_rows = 0;
mutable bool _always_true = false;
};

Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,13 @@ class ComparisonPredicateBase : public ColumnPredicate {
}
}

if (_can_ignore() && !_judge_counter) {
if (_can_ignore()) {
for (uint16_t i = 0; i < size; i++) {
current_passed_rows += flags[i];
}
_passed_rows += current_passed_rows;
vectorized::VRuntimeFilterWrapper::judge_selectivity(
get_ignore_threshold(), current_evaluated_rows - current_passed_rows,
current_evaluated_rows, _always_true, _judge_counter);
do_judge_selectivity(current_evaluated_rows - current_passed_rows,
current_evaluated_rows);
}
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1760,14 +1760,14 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_
SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns);
bool all_pred_always_true = true;
for (const auto& pred : _pre_eval_block_predicate) {
if (!pred->always_true(false)) {
if (!pred->always_true()) {
all_pred_always_true = false;
break;
}
}
if (all_pred_always_true) {
for (const auto& pred : _pre_eval_block_predicate) {
pred->always_true(true);
pred->always_true();
}
}

Expand All @@ -1784,7 +1784,7 @@ uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_
DCHECK(!_pre_eval_block_predicate.empty());
bool is_first = true;
for (auto& pred : _pre_eval_block_predicate) {
if (pred->always_true(true)) {
if (pred->always_true()) {
continue;
}
auto column_id = pred->column_id();
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exprs/vexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ class VExpr {
VExprSPtr get_child(int i) const { return _children[i]; }
int get_num_children() const { return _children.size(); }

virtual bool need_judge_selectivity() {
virtual bool is_rf_wrapper() const {
return std::ranges::any_of(_children.begin(), _children.end(),
[](VExprSPtr child) { return child->need_judge_selectivity(); });
[](VExprSPtr child) { return child->is_rf_wrapper(); });
}

virtual void do_judge_selectivity(int64_t filter_rows, int64_t input_rows) {
Expand Down
36 changes: 17 additions & 19 deletions be/src/vec/exprs/vexpr_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
*can_filter_all = false;
auto* __restrict result_filter_data = result_filter->data();
for (const auto& ctx : ctxs) {
bool need_judge_selectivity = ctx->root()->need_judge_selectivity();
// Statistics are only required when an rf wrapper exists in the expr.
bool is_rf_wrapper = ctx->root()->is_rf_wrapper();
int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
ColumnPtr& filter_column = block->get_by_position(result_column_id).column;
Expand All @@ -186,9 +187,8 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
const auto* __restrict null_map_data = nullable_column->get_null_map_data().data();

int input_rows =
rows - (need_judge_selectivity
? simd::count_zero_num((int8*)result_filter_data, rows)
: 0);
rows -
(is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0);

if (accept_null) {
for (size_t i = 0; i < rows; ++i) {
Expand All @@ -201,16 +201,15 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
}

int output_rows =
rows - (need_judge_selectivity
? simd::count_zero_num((int8*)result_filter_data, rows)
: 0);
rows -
(is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0);

if (need_judge_selectivity) {
if (is_rf_wrapper) {
ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows);
}

if ((need_judge_selectivity && output_rows == 0) ||
(!need_judge_selectivity && memchr(result_filter_data, 0x1, rows) == nullptr)) {
if ((is_rf_wrapper && output_rows == 0) ||
(!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) {
*can_filter_all = true;
return Status::OK();
}
Expand All @@ -227,25 +226,24 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
assert_cast<const ColumnUInt8&>(*filter_column).get_data();
const auto* __restrict filter_data = filter.data();

int input_rows = rows - (need_judge_selectivity
? simd::count_zero_num((int8*)result_filter_data, rows)
: 0);
int input_rows =
rows -
(is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0);

for (size_t i = 0; i < rows; ++i) {
result_filter_data[i] &= filter_data[i];
}

int output_rows =
rows - (need_judge_selectivity
? simd::count_zero_num((int8*)result_filter_data, rows)
: 0);
rows -
(is_rf_wrapper ? simd::count_zero_num((int8*)result_filter_data, rows) : 0);

if (need_judge_selectivity) {
if (is_rf_wrapper) {
ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows);
}

if ((need_judge_selectivity && output_rows == 0) ||
(!need_judge_selectivity && memchr(result_filter_data, 0x1, rows) == nullptr)) {
if ((is_rf_wrapper && output_rows == 0) ||
(!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) {
*can_filter_all = true;
return Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exprs/vruntimefilter_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ namespace doris::vectorized {

VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, const VExprSPtr& impl,
double ignore_thredhold, bool null_aware)
: VExpr(node), _impl(impl), _ignore_thredhold(ignore_thredhold), _null_aware(null_aware) {}
: VExpr(node), _impl(impl), _ignore_thredhold(ignore_thredhold), _null_aware(null_aware) {
reset_judge_selectivity();
}

Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
Expand All @@ -88,7 +90,6 @@ void VRuntimeFilterWrapper::close(VExprContext* context,

Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, int* result_column_id) {
DCHECK(_open_finished || _getting_const_col);
_judge_counter--;
if (_always_true) {
size_t size = block->rows();
block->insert({create_always_true_column(size, _data_type->is_nullable()), _data_type,
Expand Down
54 changes: 39 additions & 15 deletions be/src/vec/exprs/vruntimefilter_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,34 +72,58 @@ class VRuntimeFilterWrapper final : public VExpr {
_always_true_counter = always_true_counter;
}

template <typename T, typename TT>
void update_counters(int64_t filter_rows, int64_t input_rows) {
if (_expr_filtered_rows_counter) {
COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows);
}
if (_expr_input_rows_counter) {
COUNTER_UPDATE(_expr_input_rows_counter, input_rows);
}
}

template <typename T>
static void judge_selectivity(double ignore_threshold, int64_t filter_rows, int64_t input_rows,
T& always_true, TT& judge_counter) {
T& always_true) {
always_true = filter_rows / (input_rows * 1.0) < ignore_threshold;
judge_counter = config::runtime_filter_sampling_frequency;
}

bool need_judge_selectivity() override {
if (_judge_counter <= 0) {
_always_true = false;
return true;
}
return false;
}
bool is_rf_wrapper() const override { return true; }

void do_judge_selectivity(int64_t filter_rows, int64_t input_rows) override {
judge_selectivity(_ignore_thredhold, filter_rows, input_rows, _always_true, _judge_counter);
if (_expr_filtered_rows_counter) {
COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows);
update_counters(filter_rows, input_rows);

if (_judge_counter.fetch_sub(1) == 0) {
reset_judge_selectivity();
}
if (_expr_input_rows_counter) {
COUNTER_UPDATE(_expr_input_rows_counter, input_rows);

if (!_always_true) {
_judge_filter_rows += filter_rows;
_judge_input_rows += input_rows;
judge_selectivity(_ignore_thredhold, _judge_filter_rows, _judge_input_rows,
_always_true);
}
}

private:
void reset_judge_selectivity() {
_always_true = false;
_judge_counter = config::runtime_filter_sampling_frequency;
_judge_input_rows = 0;
_judge_filter_rows = 0;
}

VExprSPtr _impl;
// VRuntimeFilterWrapper and ColumnPredicate share the same logic,
// but it's challenging to unify them, so the code is duplicated.
// _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true
// are variables used to implement the _always_true logic, calculated periodically
// based on runtime_filter_sampling_frequency. During each period, if _always_true
// is evaluated as true, the logic for always_true is applied for the rest of that period
// without recalculating. At the beginning of the next period,
// reset_judge_selectivity is used to reset these variables.
std::atomic_int _judge_counter = 0;
std::atomic_int _judge_input_rows = 0;
std::atomic_int _judge_filter_rows = 0;
std::atomic_int _always_true = false;

RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr;
Expand Down

0 comments on commit 3e49928

Please sign in to comment.