Skip to content

Commit

Permalink
[Improvementation](join) empty_block shall be set true when build blo… (
Browse files Browse the repository at this point in the history
#33977)

empty_block shall be set true when build block only one row
  • Loading branch information
BiteTheDDDDt authored Apr 25, 2024
1 parent df11c8c commit 4073aba
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 61 deletions.
17 changes: 9 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,22 @@ bool HashJoinBuildSinkLocalState::build_unique() const {

void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
bool empty_block =
!_shared_state->build_block ||
!(_shared_state->build_block->rows() > 1); // build size always mock a row into block
_shared_state->short_circuit_for_probe =
(_shared_state->_has_null_in_build_side &&
p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !p._is_mark_join) ||
(!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN &&
!p._is_mark_join) ||
(!_shared_state->build_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN &&
!p._is_mark_join) ||
(!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
(!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
(!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);
(empty_block && p._join_op == TJoinOp::INNER_JOIN && !p._is_mark_join) ||
(empty_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN && !p._is_mark_join) ||
(empty_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
(empty_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
(empty_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);

//when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
(!_shared_state->build_block && !p._have_other_join_conjunct && !p._is_mark_join) &&
(empty_block && !p._have_other_join_conjunct && !p._is_mark_join) &&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN ||
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}
Expand Down
80 changes: 36 additions & 44 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (local_state._shared_state->empty_right_table_need_probe_dispose) {
if (local_state.empty_right_table_shortcut()) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by add additional null data.
Expand All @@ -257,12 +257,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::OK();
}

vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}

//create build side null column, if need output
for (int i = 0;
(_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) {
Expand All @@ -273,8 +267,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
vectorized::ColumnVector<vectorized::UInt8>::create(block_rows, 1);
auto nullable_column = vectorized::ColumnNullable::create(std::move(column),
std::move(null_map_column));
temp_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
local_state._probe_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
}
if (_is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8*>(
Expand All @@ -290,8 +284,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
/// No need to check the block size in `_filter_data_and_build_output` because here dose not
/// increase the output rows count(just same as `_probe_block`'s rows count).
RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos,
&temp_block, false));
temp_block.clear();
&local_state._probe_block, false));
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
return Status::OK();
}
Expand Down Expand Up @@ -374,36 +367,52 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
}

Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
vectorized::ColumnUInt8::MutablePtr& null_map,
vectorized::ColumnRawPtrs& raw_ptrs,
const std::vector<int>& res_col_ids) {
if (empty_right_table_shortcut()) {
return Status::OK();
}

_probe_columns.resize(_probe_expr_ctxs.size());

if (!_has_set_need_null_map_for_probe) {
_has_set_need_null_map_for_probe = true;
_need_null_map_for_probe = _need_probe_null_map(block, res_col_ids);
}
if (_need_null_map_for_probe) {
if (_null_map_column == nullptr) {
_null_map_column = vectorized::ColumnUInt8::create();
}
_null_map_column->get_data().assign(block.rows(), (uint8_t)0);
}

auto& shared_state = *_shared_state;
auto& p = _parent->cast<HashJoinProbeOperatorX>();
for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
if (p._should_convert_to_nullable[i]) {
_key_columns_holder.emplace_back(
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
raw_ptrs[i] = _key_columns_holder.back().get();
_probe_columns[i] = _key_columns_holder.back().get();
continue;
}

if (shared_state.is_null_safe_eq_join[i]) {
raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get();
_probe_columns[i] = block.get_by_position(res_col_ids[i]).column.get();
} else {
auto column = block.get_by_position(res_col_ids[i]).column.get();
if (auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column)) {
auto& col_nested = nullable->get_nested_column();
auto& col_nullmap = nullable->get_null_map_data();

DCHECK(null_map != nullptr);
vectorized::VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
const auto* column = block.get_by_position(res_col_ids[i]).column.get();
if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column)) {
const auto& col_nested = nullable->get_nested_column();
const auto& col_nullmap = nullable->get_null_map_data();

DCHECK(_null_map_column != nullptr);
vectorized::VectorizedUtils::update_null_map(_null_map_column->get_data(),
col_nullmap);
if (shared_state.store_null_in_hash_table[i]) {
raw_ptrs[i] = nullable;
_probe_columns[i] = nullable;
} else {
raw_ptrs[i] = &col_nested;
_probe_columns[i] = &col_nested;
}
} else {
raw_ptrs[i] = column;
_probe_columns[i] = column;
}
}
}
Expand Down Expand Up @@ -482,33 +491,16 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu
local_state._probe_eos = eos;
if (input_block->rows() > 0) {
COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows());
int probe_expr_ctxs_sz = local_state._probe_expr_ctxs.size();
local_state._probe_columns.resize(probe_expr_ctxs_sz);

std::vector<int> res_col_ids(probe_expr_ctxs_sz);
std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size());
RETURN_IF_ERROR(_do_evaluate(*input_block, local_state._probe_expr_ctxs,
*local_state._probe_expr_call_timer, res_col_ids));
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
local_state._probe_column_convert_to_null =
local_state._convert_block_to_null(*input_block);
}

// TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc`
// so we have to initialize this flag by the first probe block.
if (!local_state._has_set_need_null_map_for_probe) {
local_state._has_set_need_null_map_for_probe = true;
local_state._need_null_map_for_probe =
local_state._need_probe_null_map(*input_block, res_col_ids);
}
if (local_state._need_null_map_for_probe) {
if (local_state._null_map_column == nullptr) {
local_state._null_map_column = vectorized::ColumnUInt8::create();
}
local_state._null_map_column->get_data().assign(input_block->rows(), (uint8_t)0);
}
RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids));

RETURN_IF_ERROR(local_state._extract_join_column(*input_block, local_state._null_map_column,
local_state._probe_columns, res_col_ids));
if (&local_state._probe_block != input_block) {
input_block->swap(local_state._probe_block);
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ class HashJoinProbeLocalState final
const std::shared_ptr<vectorized::Block>& build_block() const {
return _shared_state->build_block;
}
bool empty_right_table_shortcut() const {
// !Base::_projections.empty() means nereids planner
return _shared_state->empty_right_table_need_probe_dispose && !Base::_projections.empty();
}

private:
void _prepare_probe_block();
bool _need_probe_null_map(vectorized::Block& block, const std::vector<int>& res_col_ids);
std::vector<uint16_t> _convert_block_to_null(vectorized::Block& block);
Status _extract_join_column(vectorized::Block& block,
vectorized::ColumnUInt8::MutablePtr& null_map,
vectorized::ColumnRawPtrs& raw_ptrs,
const std::vector<int>& res_col_ids);
Status _extract_join_column(vectorized::Block& block, const std::vector<int>& res_col_ids);
friend class HashJoinProbeOperatorX;
template <int JoinOpType, typename Parent>
friend struct vectorized::ProcessHashTableProbe;
Expand Down
12 changes: 7 additions & 5 deletions be/src/vec/core/column_with_type_and_name.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ void ColumnWithTypeAndName::dump_structure(std::ostream& out) const {
out << name;
}

if (type)
if (type) {
out << " " << type->get_name();
else
} else {
out << " nullptr";
}

if (column)
out << ' ' << column->dump_structure();
else
if (column) {
out << ' ' << column->dump_structure() << "(use_count=" << column->use_count() << ')';
} else {
out << " nullptr";
}
}

String ColumnWithTypeAndName::dump_structure() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,9 @@
4 \N \N \N \N \N
5 1111 1111 3 1111 1111

-- !shortcut --
1 11 11
2 111 111
3 1111 1111
4 111 111

Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,8 @@ suite("test_half_join_nullable_build_side", "query,p0") {
left join test_half_join_nullable_build_side_l r on l.v2 <=> r.v2
order by 1, 2, 3;
"""

qt_shortcut """
select * from test_half_join_nullable_build_side_l l left anti join test_half_join_nullable_build_side_r r on l.v2 <=> r.v2 and r.k1=5 order by 1, 2, 3;
"""
}

0 comments on commit 4073aba

Please sign in to comment.