diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 8f7b176a979a4db..47d86b78d124f01 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -113,6 +113,17 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { auto p = _parent->cast(); Defer defer {[&]() { + if (_should_build_hash_table) { + // The build side hash key column maybe no need output, but we need to keep the column in block + // because it is used to compare with probe side hash key column + if (p._should_keep_hash_key_column) { + DCHECK_EQ(_build_col_ids.size(), 1); + p._should_keep_column_flags[_build_col_ids[0]] = true; + } + // release the memory of unused column in probe stage + _shared_state->build_block->clear_column_mem_not_keep(p._should_keep_column_flags); + } + if (_should_build_hash_table && p._shared_hashtable_controller) { p._shared_hashtable_controller->signal_finish(p.node_id()); } @@ -386,7 +397,9 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) { default: _shared_state->hash_table_variants ->emplace(); + return; } + p._should_keep_hash_key_column = true; return; } @@ -433,6 +446,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); + if (tnode.hash_join_node.__isset.hash_output_slot_ids) { + _hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids; + } + const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN; @@ -494,6 +511,17 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) { _shared_hash_table_context = _shared_hashtable_controller->get_context(node_id()); } } + auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) { + for (const auto& tuple_desc : tuple_descs) { + for (const auto& slot_desc : tuple_desc->slots()) { + output_slot_flags.emplace_back( + _hash_output_slot_ids.empty() || + std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(), + slot_desc->id()) != _hash_output_slot_ids.end()); + } + } + }; + init_keep_column_flags(row_desc().tuple_descriptors(), _should_keep_column_flags); RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc())); return vectorized::VExpr::open(_build_expr_ctxs, state); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index cf677833fb5b64e..a544cdcf4563a49 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -173,6 +173,10 @@ class HashJoinBuildSinkOperatorX final const std::vector _partition_exprs; const bool _need_local_merge; + + std::vector _hash_output_slot_ids; + std::vector _should_keep_column_flags; + bool _should_keep_hash_key_column = false; }; template diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index d4644fca4898d8b..7dc0e4dbe2535fc 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -753,6 +753,15 @@ void Block::erase_tmp_columns() noexcept { } } +void Block::clear_column_mem_not_keep(const std::vector& column_keep_flags) { + DCHECK_LE(column_keep_flags.size(), data.size()); + for (int i = 0; i < column_keep_flags.size(); ++i) { + if (!column_keep_flags[i]) { + data[i].column = data[i].column->clone_empty(); + } + } +} + void Block::swap(Block& other) noexcept { SCOPED_SKIP_MEMORY_CHECK(); data.swap(other.data); diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 108cb5e1c9fdf55..dace7073f6085f2 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -404,6 +404,8 @@ class Block { // we built some temporary columns into block void erase_tmp_columns() noexcept; + void clear_column_mem_not_keep(const std::vector& column_keep_flags); + private: void erase_impl(size_t position); };