Skip to content

Commit

Permalink
[Improvement](shuffle) Reduce memory consumption in data stream sender
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Oct 10, 2024
1 parent a9caf05 commit 656146a
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 18 deletions.
18 changes: 4 additions & 14 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num
return Status::OK();
}

void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
void VDataStreamRecvr::SenderQueue::add_block(Block* block) {
if (block->rows() == 0) {
return;
}
Expand All @@ -222,17 +222,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
}
}
BlockUPtr nblock = Block::create_unique(block->get_columns_with_type_and_name());

// local exchange should copy the block contented if use move == false
if (use_move) {
block->clear();
} else {
auto rows = block->rows();
for (int i = 0; i < nblock->columns(); ++i) {
nblock->get_by_position(i).column =
nblock->get_by_position(i).column->clone_resized(rows);
}
}
block->clear();
materialize_block_inplace(*nblock);

auto block_mem_size = nblock->allocated_bytes();
Expand Down Expand Up @@ -405,9 +395,9 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_n
wait_for_worker, time_to_find_recvr);
}

void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
void VDataStreamRecvr::add_block(Block* block, int sender_id) {
int use_sender_id = _is_merging ? sender_id : 0;
_sender_queues[use_sender_id]->add_block(block, use_move);
_sender_queues[use_sender_id]->add_block(block);
}

std::shared_ptr<pipeline::Dependency> VDataStreamRecvr::get_local_channel_dependency(
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class VDataStreamRecvr : public HasTaskExecutionCtx {
::google::protobuf::Closure** done, const int64_t wait_for_worker,
const uint64_t time_to_find_recvr);

void add_block(Block* block, int sender_id, bool use_move);
void add_block(Block* block, int sender_id);

Status get_next(Block* block, bool* eos);

Expand Down Expand Up @@ -183,7 +183,7 @@ class VDataStreamRecvr::SenderQueue {
::google::protobuf::Closure** done, const int64_t wait_for_worker,
const uint64_t time_to_find_recvr);

void add_block(Block* block, bool use_move);
void add_block(Block* block);

void decrement_senders(int sender_id);

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}

_local_recvr->add_block(&block, _parent->sender_id(), true);
_local_recvr->add_block(&block, _parent->sender_id());
if (eos) {
_local_recvr->remove_sender(_parent->sender_id(), _be_number, exec_status);
}
Expand All @@ -218,7 +218,7 @@ Status Channel<Parent>::send_local_block(Block* block) {
COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
}
_local_recvr->add_block(block, _parent->sender_id(), false);
_local_recvr->add_block(block, _parent->sender_id());
return Status::OK();
} else {
return _receiver_status;
Expand Down

0 comments on commit 656146a

Please sign in to comment.