Skip to content

Commit

Permalink
[Exec](mem) Reduce the memory usage during JOIN operations (apache#41388
Browse files Browse the repository at this point in the history
)

## Proposed changes
```
select t1.a from t1, t2 where t1.a = t2.a
```
Before:
```
38G 
```

After:
```
30.8G
```

<!--Describe your changes.-->
  • Loading branch information
HappenLee authored and eldenmoon committed Oct 10, 2024
1 parent 50c4667 commit f1c0558
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 10 deletions.
32 changes: 31 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,20 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (_should_build_hash_table) {
// The build side hash key column maybe no need output, but we need to keep the column in block
// because it is used to compare with probe side hash key column
if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
p._should_keep_column_flags[_build_col_ids[0]] = true;
}

if (_shared_state->build_block) {
// release the memory of unused column in probe stage
_shared_state->build_block->clear_column_mem_not_keep(
p._should_keep_column_flags, bool(p._shared_hashtable_controller));
}
}

if (_should_build_hash_table && p._shared_hashtable_controller) {
p._shared_hashtable_controller->signal_finish(p.node_id());
}
Expand Down Expand Up @@ -386,7 +400,9 @@ void HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
default:
_shared_state->hash_table_variants
->emplace<vectorized::SerializedHashTableContext>();
return;
}
p._should_keep_hash_key_column = true;
return;
}

Expand Down Expand Up @@ -433,6 +449,10 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* st
RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);

if (tnode.hash_join_node.__isset.hash_output_slot_ids) {
_hash_output_slot_ids = tnode.hash_join_node.hash_output_slot_ids;
}

const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::RIGHT_ANTI_JOIN;
Expand Down Expand Up @@ -494,6 +514,17 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
_shared_hash_table_context = _shared_hashtable_controller->get_context(node_id());
}
}
auto init_keep_column_flags = [&](auto& tuple_descs, auto& output_slot_flags) {
for (const auto& tuple_desc : tuple_descs) {
for (const auto& slot_desc : tuple_desc->slots()) {
output_slot_flags.emplace_back(
_hash_output_slot_ids.empty() ||
std::find(_hash_output_slot_ids.begin(), _hash_output_slot_ids.end(),
slot_desc->id()) != _hash_output_slot_ids.end());
}
}
};
init_keep_column_flags(row_desc().tuple_descriptors(), _should_keep_column_flags);
RETURN_IF_ERROR(vectorized::VExpr::prepare(_build_expr_ctxs, state, _child->row_desc()));
return vectorized::VExpr::open(_build_expr_ctxs, state);
}
Expand Down Expand Up @@ -565,7 +596,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
_shared_hash_table_context->build_indexes_null =
local_state._shared_state->build_indexes_null;
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
_shared_hashtable_controller->signal(node_id());
}
} else if (!local_state._should_build_hash_table) {
DCHECK(_shared_hashtable_controller != nullptr);
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ class HashJoinBuildSinkOperatorX final
const std::vector<TExpr> _partition_exprs;

const bool _need_local_merge;

std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _should_keep_column_flags;
bool _should_keep_hash_key_column = false;
};

template <class HashTableContext>
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,9 +640,13 @@ Status ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(
mcol.size(), _right_col_len, _right_col_idx);
}
for (size_t j = 0; j < _right_col_len; ++j) {
const auto& column = *_build_block->safe_get_by_position(j).column;
mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(),
_build_indexs.data() + block_size);
if (_right_output_slot_flags->at(j)) {
const auto& column = *_build_block->safe_get_by_position(j).column;
mcol[j + _right_col_idx]->insert_indices_from(column, _build_indexs.data(),
_build_indexs.data() + block_size);
} else {
mcol[j + _right_col_idx]->resize(block_size);
}
}

// just resize the left table column in case with other conjunct to make block size is not zero
Expand Down
18 changes: 18 additions & 0 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,24 @@ void Block::erase_tmp_columns() noexcept {
}
}

void Block::clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first) {
if (data.size() >= column_keep_flags.size()) {
auto origin_rows = rows();
for (size_t i = 0; i < column_keep_flags.size(); ++i) {
if (!column_keep_flags[i]) {
data[i].column = data[i].column->clone_empty();
}
}

if (need_keep_first && !column_keep_flags[0]) {
auto first_column = data[0].column->clone_empty();
first_column->resize(origin_rows);
data[0].column = std::move(first_column);
}
}
}

void Block::swap(Block& other) noexcept {
SCOPED_SKIP_MEMORY_CHECK();
data.swap(other.data);
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ class Block {
// we built some temporary columns into block
void erase_tmp_columns() noexcept;

void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
bool need_keep_first);

private:
void erase_impl(size_t position);
};
Expand Down
6 changes: 1 addition & 5 deletions be/src/vec/runtime/shared_hash_table_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ SharedHashTableContextPtr SharedHashTableController::get_context(int my_node_id)
return _shared_contexts[my_node_id];
}

void SharedHashTableController::signal(int my_node_id) {
void SharedHashTableController::signal_finish(int my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
auto it = _shared_contexts.find(my_node_id);
if (it != _shared_contexts.cend()) {
Expand All @@ -52,10 +52,6 @@ void SharedHashTableController::signal(int my_node_id) {
for (auto& dep : _dependencies[my_node_id]) {
dep->set_ready();
}
}

void SharedHashTableController::signal_finish(int my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
for (auto& dep : _finish_dependencies[my_node_id]) {
dep->set_ready();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ class SharedHashTableController {
void set_builder_and_consumers(TUniqueId builder, int node_id);
TUniqueId get_builder_fragment_instance_id(int my_node_id);
SharedHashTableContextPtr get_context(int my_node_id);
void signal(int my_node_id);
void signal_finish(int my_node_id);
void append_dependency(int node_id, std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> finish_dep) {
Expand Down

0 comments on commit f1c0558

Please sign in to comment.