From d93304655142b83fc5b94beee4dbebde1505aad3 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Wed, 27 Mar 2024 19:44:07 +0800 Subject: [PATCH] [fix](hash_join) uninited hash table probe caused by short circui (#32901) --- be/src/vec/exec/join/vhash_join_node.cpp | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index b16ef0cc5048f7..2b0b388c9dbec7 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -550,7 +550,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ /// `_has_null_in_build_side` means have null value in build side. /// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join). - if (_has_null_in_build_side && _short_circuit_for_null_in_build_side && _is_mark_join) { + if (_has_null_in_build_side && _short_circuit_for_null_in_build_side) { /// We need to create a column as mark with all rows set to NULL. auto block_rows = _probe_block.rows(); if (block_rows == 0) { @@ -563,10 +563,13 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_ for (int i = 0; i < _left_output_slot_flags.size(); ++i) { temp_block.insert(_probe_block.get_by_position(i)); } - auto mark_column = ColumnNullable::create(ColumnUInt8::create(block_rows, 0), - ColumnUInt8::create(block_rows, 1)); - temp_block.insert( - {std::move(mark_column), make_nullable(std::make_shared()), ""}); + + if (_is_mark_join) { + auto mark_column = ColumnNullable::create(ColumnUInt8::create(block_rows, 0), + ColumnUInt8::create(block_rows, 1)); + temp_block.insert( + {std::move(mark_column), make_nullable(std::make_shared()), ""}); + } { SCOPED_TIMER(_join_filter_timer); @@ -905,6 +908,13 @@ Status HashJoinNode::_materialize_build_side(RuntimeState* state) { } RETURN_IF_ERROR(sink(state, &block, eos)); } + + // For broadcast join, if `sink` is not called with eos, + // other instances will not be signaled. + if (!eos) { + Block tmp_block; + RETURN_IF_ERROR(sink(state, &tmp_block, true)); + } RETURN_IF_ERROR(child(1)->close(state)); } else { RETURN_IF_ERROR(child(1)->close(state));