From 4073aba5ad53fa35f2a0992316d59b101870b23c Mon Sep 17 00:00:00 2001 From: Pxl Date: Thu, 25 Apr 2024 11:40:05 +0800 Subject: [PATCH] =?UTF-8?q?[Improvementation](join)=20empty=5Fblock=20shal?= =?UTF-8?q?l=20be=20set=20true=20when=20build=20blo=E2=80=A6=20(#33977)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit empty_block shall be set true when build block only one row --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 17 ++-- .../pipeline/exec/hashjoin_probe_operator.cpp | 80 +++++++++---------- .../pipeline/exec/hashjoin_probe_operator.h | 9 ++- be/src/vec/core/column_with_type_and_name.cpp | 12 +-- .../test_half_join_nullable_build_side.out | 6 ++ .../test_half_join_nullable_build_side.groovy | 4 + 6 files changed, 67 insertions(+), 61 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index a0d111c63a7d98..2b2bdad86f70f8 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -156,21 +156,22 @@ bool HashJoinBuildSinkLocalState::build_unique() const { void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() { auto& p = _parent->cast(); + bool empty_block = + !_shared_state->build_block || + !(_shared_state->build_block->rows() > 1); // build size always mock a row into block _shared_state->short_circuit_for_probe = (_shared_state->_has_null_in_build_side && p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !p._is_mark_join) || - (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN && - !p._is_mark_join) || - (!_shared_state->build_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN && - !p._is_mark_join) || - (!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) || - (!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) || - (!_shared_state->build_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN); + (empty_block && p._join_op == TJoinOp::INNER_JOIN && !p._is_mark_join) || + (empty_block && p._join_op == TJoinOp::LEFT_SEMI_JOIN && !p._is_mark_join) || + (empty_block && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) || + (empty_block && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) || + (empty_block && p._join_op == TJoinOp::RIGHT_ANTI_JOIN); //when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN //we could get the result is probe table + null-column(if need output) _shared_state->empty_right_table_need_probe_dispose = - (!_shared_state->build_block && !p._have_other_join_conjunct && !p._is_mark_join) && + (empty_block && !p._have_other_join_conjunct && !p._is_mark_join) && (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN || p._join_op == TJoinOp::LEFT_ANTI_JOIN); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index a58ad62211ceb2..fc6f81f41902a2 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -247,7 +247,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc } //TODO: this short circuit maybe could refactor, no need to check at here. - if (local_state._shared_state->empty_right_table_need_probe_dispose) { + if (local_state.empty_right_table_shortcut()) { // when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN // we could get the result is probe table + null-column(if need output) // If we use a short-circuit strategy, should return block directly by add additional null data. @@ -257,12 +257,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return Status::OK(); } - vectorized::Block temp_block; - //get probe side output column - for (int i = 0; i < _left_output_slot_flags.size(); ++i) { - temp_block.insert(local_state._probe_block.get_by_position(i)); - } - //create build side null column, if need output for (int i = 0; (_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) { @@ -273,8 +267,8 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc vectorized::ColumnVector::create(block_rows, 1); auto nullable_column = vectorized::ColumnNullable::create(std::move(column), std::move(null_map_column)); - temp_block.insert({std::move(nullable_column), make_nullable(type), - _right_table_column_names[i]}); + local_state._probe_block.insert({std::move(nullable_column), make_nullable(type), + _right_table_column_names[i]}); } if (_is_outer_join) { reinterpret_cast( @@ -290,8 +284,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc /// No need to check the block size in `_filter_data_and_build_output` because here dose not /// increase the output rows count(just same as `_probe_block`'s rows count). RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, eos, - &temp_block, false)); - temp_block.clear(); + &local_state._probe_block, false)); local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); return Status::OK(); } @@ -374,36 +367,52 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc } Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block, - vectorized::ColumnUInt8::MutablePtr& null_map, - vectorized::ColumnRawPtrs& raw_ptrs, const std::vector& res_col_ids) { + if (empty_right_table_shortcut()) { + return Status::OK(); + } + + _probe_columns.resize(_probe_expr_ctxs.size()); + + if (!_has_set_need_null_map_for_probe) { + _has_set_need_null_map_for_probe = true; + _need_null_map_for_probe = _need_probe_null_map(block, res_col_ids); + } + if (_need_null_map_for_probe) { + if (_null_map_column == nullptr) { + _null_map_column = vectorized::ColumnUInt8::create(); + } + _null_map_column->get_data().assign(block.rows(), (uint8_t)0); + } + auto& shared_state = *_shared_state; auto& p = _parent->cast(); for (size_t i = 0; i < shared_state.build_exprs_size; ++i) { if (p._should_convert_to_nullable[i]) { _key_columns_holder.emplace_back( vectorized::make_nullable(block.get_by_position(res_col_ids[i]).column)); - raw_ptrs[i] = _key_columns_holder.back().get(); + _probe_columns[i] = _key_columns_holder.back().get(); continue; } if (shared_state.is_null_safe_eq_join[i]) { - raw_ptrs[i] = block.get_by_position(res_col_ids[i]).column.get(); + _probe_columns[i] = block.get_by_position(res_col_ids[i]).column.get(); } else { - auto column = block.get_by_position(res_col_ids[i]).column.get(); - if (auto* nullable = check_and_get_column(*column)) { - auto& col_nested = nullable->get_nested_column(); - auto& col_nullmap = nullable->get_null_map_data(); - - DCHECK(null_map != nullptr); - vectorized::VectorizedUtils::update_null_map(null_map->get_data(), col_nullmap); + const auto* column = block.get_by_position(res_col_ids[i]).column.get(); + if (const auto* nullable = check_and_get_column(*column)) { + const auto& col_nested = nullable->get_nested_column(); + const auto& col_nullmap = nullable->get_null_map_data(); + + DCHECK(_null_map_column != nullptr); + vectorized::VectorizedUtils::update_null_map(_null_map_column->get_data(), + col_nullmap); if (shared_state.store_null_in_hash_table[i]) { - raw_ptrs[i] = nullable; + _probe_columns[i] = nullable; } else { - raw_ptrs[i] = &col_nested; + _probe_columns[i] = &col_nested; } } else { - raw_ptrs[i] = column; + _probe_columns[i] = column; } } } @@ -482,10 +491,7 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu local_state._probe_eos = eos; if (input_block->rows() > 0) { COUNTER_UPDATE(local_state._probe_rows_counter, input_block->rows()); - int probe_expr_ctxs_sz = local_state._probe_expr_ctxs.size(); - local_state._probe_columns.resize(probe_expr_ctxs_sz); - - std::vector res_col_ids(probe_expr_ctxs_sz); + std::vector res_col_ids(local_state._probe_expr_ctxs.size()); RETURN_IF_ERROR(_do_evaluate(*input_block, local_state._probe_expr_ctxs, *local_state._probe_expr_call_timer, res_col_ids)); if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { @@ -493,22 +499,8 @@ Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* inpu local_state._convert_block_to_null(*input_block); } - // TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc` - // so we have to initialize this flag by the first probe block. - if (!local_state._has_set_need_null_map_for_probe) { - local_state._has_set_need_null_map_for_probe = true; - local_state._need_null_map_for_probe = - local_state._need_probe_null_map(*input_block, res_col_ids); - } - if (local_state._need_null_map_for_probe) { - if (local_state._null_map_column == nullptr) { - local_state._null_map_column = vectorized::ColumnUInt8::create(); - } - local_state._null_map_column->get_data().assign(input_block->rows(), (uint8_t)0); - } + RETURN_IF_ERROR(local_state._extract_join_column(*input_block, res_col_ids)); - RETURN_IF_ERROR(local_state._extract_join_column(*input_block, local_state._null_map_column, - local_state._probe_columns, res_col_ids)); if (&local_state._probe_block != input_block) { input_block->swap(local_state._probe_block); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index b4930307bcc818..1b45a2a258eb07 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -94,15 +94,16 @@ class HashJoinProbeLocalState final const std::shared_ptr& build_block() const { return _shared_state->build_block; } + bool empty_right_table_shortcut() const { + // !Base::_projections.empty() means nereids planner + return _shared_state->empty_right_table_need_probe_dispose && !Base::_projections.empty(); + } private: void _prepare_probe_block(); bool _need_probe_null_map(vectorized::Block& block, const std::vector& res_col_ids); std::vector _convert_block_to_null(vectorized::Block& block); - Status _extract_join_column(vectorized::Block& block, - vectorized::ColumnUInt8::MutablePtr& null_map, - vectorized::ColumnRawPtrs& raw_ptrs, - const std::vector& res_col_ids); + Status _extract_join_column(vectorized::Block& block, const std::vector& res_col_ids); friend class HashJoinProbeOperatorX; template friend struct vectorized::ProcessHashTableProbe; diff --git a/be/src/vec/core/column_with_type_and_name.cpp b/be/src/vec/core/column_with_type_and_name.cpp index cd0f7194004073..e93946804ffa60 100644 --- a/be/src/vec/core/column_with_type_and_name.cpp +++ b/be/src/vec/core/column_with_type_and_name.cpp @@ -62,15 +62,17 @@ void ColumnWithTypeAndName::dump_structure(std::ostream& out) const { out << name; } - if (type) + if (type) { out << " " << type->get_name(); - else + } else { out << " nullptr"; + } - if (column) - out << ' ' << column->dump_structure(); - else + if (column) { + out << ' ' << column->dump_structure() << "(use_count=" << column->use_count() << ')'; + } else { out << " nullptr"; + } } String ColumnWithTypeAndName::dump_structure() const { diff --git a/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out b/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out index 8404bee641f5b2..56c5f6e2229ba7 100644 --- a/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out +++ b/regression-test/data/query_p0/join/test_half_join_nullable_build_side.out @@ -134,3 +134,9 @@ 4 \N \N \N \N \N 5 1111 1111 3 1111 1111 +-- !shortcut -- +1 11 11 +2 111 111 +3 1111 1111 +4 111 111 + diff --git a/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy b/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy index 2bb24309960054..230332fdf3bbe2 100644 --- a/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy +++ b/regression-test/suites/query_p0/join/test_half_join_nullable_build_side.groovy @@ -286,4 +286,8 @@ suite("test_half_join_nullable_build_side", "query,p0") { left join test_half_join_nullable_build_side_l r on l.v2 <=> r.v2 order by 1, 2, 3; """ + + qt_shortcut """ + select * from test_half_join_nullable_build_side_l l left anti join test_half_join_nullable_build_side_r r on l.v2 <=> r.v2 and r.k1=5 order by 1, 2, 3; + """ } \ No newline at end of file