Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvementation](join) empty_block shall be set true when build blo… #33977

Merged
merged 3 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,22 @@ bool HashJoinBuildSinkLocalState::build_unique() const {

void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
bool empty_block =
!_shared_state->build_block ||
!(_shared_state->build_block->rows() > 1); // build size always mock a row into block
_shared_state->short_circuit_for_probe =
(_shared_state->_has_null_in_build_side &&
p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !p._is_mark_join) ||
(!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN &&
!p._is_mark_join) ||
(!_shared_state->build_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN &&
!p._is_mark_join) ||
(!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
(!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
(!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);
(empty_block && p._join_op == TJoinOp::INNER_JOIN && !p._is_mark_join) ||
(empty_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN && !p._is_mark_join) ||
(empty_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
(empty_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
(empty_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);

//when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
(!_shared_state->build_block && !p._have_other_join_conjunct && !p._is_mark_join) &&
(empty_block && !p._have_other_join_conjunct && !p._is_mark_join) &&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN ||
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}
Expand Down
80 changes: 36 additions & 44 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (local_state._shared_state->empty_right_table_need_probe_dispose) {
if (local_state.empty_right_table_shortcut()) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by add additional null data.
Expand All @@ -257,12 +257,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::OK();
}

vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}

//create build side null column, if need output
for (int i = 0;
(_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) {
Expand All @@ -273,8 +267,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
vectorized::ColumnVector<vectorized::UInt8>::create(block_rows, 1);
auto nullable_column = vectorized::ColumnNullable::create(std::move(column),
std::move(null_map_column));
temp_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
local_state._probe_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
}
if (_is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8*>(
Expand All @@ -290,8 +284,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
/// No need to check the block size in `_filter_data_and_build_output` because here dose not
/// increase the output rows count(just same as `_probe_block`'s rows count).
RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos,
&temp_block, false));
temp_block.clear();
&local_state._probe_block, false));
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
return Status::OK();
}
Expand Down Expand Up @@ -374,36 +367,52 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
}

Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
vectorized::ColumnUInt8::MutablePtr& null_map,
vectorized::ColumnRawPtrs& raw_ptrs,
const std::vector<int>& res_col_ids) {
if (empty_right_table_shortcut()) {
return Status::OK();
}

_probe_columns.resize(_probe_expr_ctxs.size());

if (!_has_set_need_null_map_for_probe) {
_has_set_need_null_map_for_probe = true;
_need_null_map_for_probe = _need_probe_null_map(block, res_col_ids);
}
if (_need_null_map_for_probe) {
if (_null_map_column == nullptr) {
_null_map_column = vectorized::ColumnUInt8::create();
}
_null_map_column->get_data().assign(block.rows(), (uint8_t)0);
}

auto& shared_state = *_shared_state;
auto& p = _parent->cast<HashJoinProbeOperatorX>();
for (size_t i = 0; i < shared_state.build_exprs_size; ++i) {
if (p._should_convert_to_nullable[i]) {
_key_columns_holder.emplace_back(
vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column));
raw_ptrs[i] = _key_columns_holder.back().get();
_probe_columns[i] = _key_columns_holder.back().get();
continue;
}

if (shared_state.is_null_safe_eq_join[i]) {
raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get();
_probe_columns[i] = block.get_by_position(res_col_ids[i]).column.get();
} else {
auto column = block.get_by_position(res_col_ids[i]).column.get();
if (auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column)) {
auto& col_nested = nullable->get_nested_column();
auto& col_nullmap = nullable->get_null_map_data();

DCHECK(null_map != nullptr);
vectorized::VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap);
const auto* column = block.get_by_position(res_col_ids[i]).column.get();
if (const auto* nullable = check_and_get_column<vectorized::ColumnNullable>(*column)) {
const auto& col_nested = nullable->get_nested_column();
const auto& col_nullmap = nullable->get_null_map_data();

DCHECK(_null_map_column != nullptr);
vectorized::VectorizedUtils::update_null_map(_null_map_column->get_data(),
col_nullmap);
if (shared_state.store_null_in_hash_table[i]) {
raw_ptrs[i] = nullable;
_probe_columns[i] = nullable;
} else {
raw_ptrs[i] = &col_nested;
_probe_columns[i] = &col_nested;
}
} else {
raw_ptrs[i] = column;
_probe_columns[i] = column;
}
}
}
Expand Down Expand Up @@ -482,33 +491,16 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu
local_state._probe_eos = eos;
if (input_block->rows() > 0) {
COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows());
int probe_expr_ctxs_sz = local_state._probe_expr_ctxs.size();
local_state._probe_columns.resize(probe_expr_ctxs_sz);

std::vector<int> res_col_ids(probe_expr_ctxs_sz);
std::vector<int> res_col_ids(local_state._probe_expr_ctxs.size());
RETURN_IF_ERROR(_do_evaluate(*input_block, local_state._probe_expr_ctxs,
*local_state._probe_expr_call_timer, res_col_ids));
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
local_state._probe_column_convert_to_null =
local_state._convert_block_to_null(*input_block);
}

// TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc`
// so we have to initialize this flag by the first probe block.
if (!local_state._has_set_need_null_map_for_probe) {
local_state._has_set_need_null_map_for_probe = true;
local_state._need_null_map_for_probe =
local_state._need_probe_null_map(*input_block, res_col_ids);
}
if (local_state._need_null_map_for_probe) {
if (local_state._null_map_column == nullptr) {
local_state._null_map_column = vectorized::ColumnUInt8::create();
}
local_state._null_map_column->get_data().assign(input_block->rows(), (uint8_t)0);
}
RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids));

RETURN_IF_ERROR(local_state._extract_join_column(*input_block, local_state._null_map_column,
local_state._probe_columns, res_col_ids));
if (&local_state._probe_block != input_block) {
input_block->swap(local_state._probe_block);
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ class HashJoinProbeLocalState final
const std::shared_ptr<vectorized::Block>& build_block() const {
return _shared_state->build_block;
}
bool empty_right_table_shortcut() const {
// !Base::_projections.empty() means nereids planner
return _shared_state->empty_right_table_need_probe_dispose && !Base::_projections.empty();
}

private:
void _prepare_probe_block();
bool _need_probe_null_map(vectorized::Block& block, const std::vector<int>& res_col_ids);
std::vector<uint16_t> _convert_block_to_null(vectorized::Block& block);
Status _extract_join_column(vectorized::Block& block,
vectorized::ColumnUInt8::MutablePtr& null_map,
vectorized::ColumnRawPtrs& raw_ptrs,
const std::vector<int>& res_col_ids);
Status _extract_join_column(vectorized::Block& block, const std::vector<int>& res_col_ids);
friend class HashJoinProbeOperatorX;
template <int JoinOpType, typename Parent>
friend struct vectorized::ProcessHashTableProbe;
Expand Down
12 changes: 7 additions & 5 deletions be/src/vec/core/column_with_type_and_name.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ void ColumnWithTypeAndName::dump_structure(std::ostream& out) const {
out << name;
}

if (type)
if (type) {
out << " " << type->get_name();
else
} else {
out << " nullptr";
}

if (column)
out << ' ' << column->dump_structure();
else
if (column) {
out << ' ' << column->dump_structure() << "(use_count=" << column->use_count() << ')';
} else {
out << " nullptr";
}
}

String ColumnWithTypeAndName::dump_structure() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,9 @@
4 \N \N \N \N \N
5 1111 1111 3 1111 1111

-- !shortcut --
1 11 11
2 111 111
3 1111 1111
4 111 111

Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,8 @@ suite("test_half_join_nullable_build_side", "query,p0") {
left join test_half_join_nullable_build_side_l r on l.v2 <=> r.v2
order by 1, 2, 3;
"""

qt_shortcut """
select * from test_half_join_nullable_build_side_l l left anti join test_half_join_nullable_build_side_r r on l.v2 <=> r.v2 and r.k1=5 order by 1, 2, 3;
"""
}
Loading