Skip to content

Commit

Permalink
Revert "[Improvement](sort) Free sort blocks if this block is exhaust…
Browse files Browse the repository at this point in the history
…ed (#39306)" (#40211)

Reverts #39956
  • Loading branch information
yiguolei authored Aug 31, 2024
1 parent 44b87cb commit 1768169
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 112 deletions.
42 changes: 22 additions & 20 deletions be/src/vec/common/sort/partition_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,20 @@ Status PartitionSorter::append_block(Block* input_block) {
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
DCHECK(input_block->columns() == sorted_block.columns());
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
return Status::OK();
}

Status PartitionSorter::prepare_for_read() {
auto& cursors = _state->get_cursors();
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
for (auto& block : blocks) {
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
cursors.emplace_back(block, _sort_description);
}
for (auto& cursor : cursors) {
priority_queue.push(MergeSortCursor(&cursor));
}
blocks.clear();
return Status::OK();
}

Expand All @@ -81,30 +84,29 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
}

Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
if (_state->get_priority_queue().empty()) {
*eos = true;
} else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
block->swap(*_state->get_priority_queue().top().impl->block);
block->set_num_rows(_partition_inner_limit);
if (_state->get_sorted_block().empty()) {
*eos = true;
} else {
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
auto& sorted_block = _state->get_sorted_block()[0];
block->swap(sorted_block);
block->set_num_rows(_partition_inner_limit);
*eos = true;
} else {
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
}
}
return Status::OK();
}

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
auto& priority_queue = _state->get_priority_queue();
if (priority_queue.empty()) {
*eos = true;
return Status::OK();
}
const auto& sorted_block = priority_queue.top().impl->block;
size_t num_columns = sorted_block->columns();
const auto& sorted_block = _state->get_sorted_block()[0];
size_t num_columns = sorted_block.columns();
MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;
auto& priority_queue = _state->get_priority_queue();

bool get_enough_data = false;
while (!priority_queue.empty()) {
Expand All @@ -119,7 +121,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
//1 row_number no need to check distinct, just output partition_inner_limit row
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
}
} else {
//rows has get enough
Expand Down Expand Up @@ -153,7 +155,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
}
}
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
}
break;
}
Expand All @@ -178,7 +180,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
*_previous_row = current;
}
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
}
current_output_rows++;
break;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/common/sort/partition_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct SortCursorCmp {
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}

void reset() {
impl->reset();
impl = nullptr;
row = 0;
}
bool compare_two_rows(const MergeSortCursor& rhs) const {
Expand All @@ -67,7 +67,7 @@ struct SortCursorCmp {
return true;
}
int row = 0;
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
MergeSortCursorImpl* impl = nullptr;
};

class PartitionSorter final : public Sorter {
Expand Down
71 changes: 34 additions & 37 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,44 +59,48 @@ namespace doris::vectorized {
void MergeSorterState::reset() {
auto empty_queue = std::priority_queue<MergeSortCursor>();
priority_queue_.swap(empty_queue);
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
std::vector<std::shared_ptr<Block>> empty_blocks(0);
std::vector<MergeSortCursorImpl> empty_cursors(0);
cursors_.swap(empty_cursors);
std::vector<Block> empty_blocks(0);
sorted_blocks_.swap(empty_blocks);
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
in_mem_sorted_bocks_size_ = 0;
}

void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
auto rows = block->rows();
Status MergeSorterState::add_sorted_block(Block& block) {
auto rows = block.rows();
if (0 == rows) {
return;
return Status::OK();
}
in_mem_sorted_bocks_size_ += block->bytes();
sorted_blocks_.emplace_back(block);
in_mem_sorted_bocks_size_ += block.bytes();
sorted_blocks_.emplace_back(std::move(block));
num_rows_ += rows;
return Status::OK();
}

Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
for (auto& block : sorted_blocks_) {
priority_queue_.emplace(
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
cursors_.emplace_back(block, sort_description);
}

if (sorted_blocks_.size() > 1) {
for (auto& cursor : cursors_) {
priority_queue_.emplace(&cursor);
}
}

sorted_blocks_.clear();
return Status::OK();
}

Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size,
bool* eos) {
DCHECK(sorted_blocks_.empty());
DCHECK(unsorted_block_->empty());
if (priority_queue_.empty()) {
if (sorted_blocks_.empty()) {
*eos = true;
} else if (priority_queue_.size() == 1) {
} else if (sorted_blocks_.size() == 1) {
if (offset_ != 0) {
priority_queue_.top().impl->block->skip_num_rows(offset_);
sorted_blocks_[0].skip_num_rows(offset_);
}
block->swap(*priority_queue_.top().impl->block);
block->swap(sorted_blocks_[0]);
*eos = true;
} else {
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
Expand All @@ -106,14 +110,9 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba

Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block,
bool* eos) {
if (priority_queue_.empty()) {
*eos = true;
return Status::OK();
}
size_t num_columns = priority_queue_.top().impl->block->columns();
size_t num_columns = sorted_blocks_[0].columns();

MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
block, *priority_queue_.top().impl->block);
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
MutableColumns& merged_columns = m_block.mutable_columns();

/// Take rows from queue in right order and push to 'merged'.
Expand All @@ -124,7 +123,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized

if (offset_ == 0) {
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
++merged_rows;
} else {
offset_--;
Expand All @@ -135,9 +134,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
priority_queue_.push(current);
}

if (merged_rows == batch_size) {
break;
}
if (merged_rows == batch_size) break;
}
block->set_columns(std::move(merged_columns));

Expand Down Expand Up @@ -264,22 +261,22 @@ Status FullSorter::_do_sort() {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
static_cast<void>(_state->add_sorted_block(desc_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
} else {
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
Block::create_shared(std::move(desc_block)), _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
auto tmp_cursor_impl =
std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->add_sorted_block(tmp_cursor_impl->block);
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
static_cast<void>(_state->add_sorted_block(desc_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
}
}
} else {
// dispose normal sort logic
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
static_cast<void>(_state->add_sorted_block(desc_block));
}
return Status::OK();
}
Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/common/sort/sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class MergeSorterState {

~MergeSorterState() = default;

void add_sorted_block(std::shared_ptr<Block> block);
Status add_sorted_block(Block& block);

Status build_merge_tree(const SortDescription& sort_description);

Expand All @@ -72,19 +72,23 @@ class MergeSorterState {

uint64_t num_rows() const { return num_rows_; }

std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); }
Block& last_sorted_block() { return sorted_blocks_.back(); }

std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; }
std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
void reset();

std::unique_ptr<Block> unsorted_block_;

private:
int _calc_spill_blocks_to_merge() const;

Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos);

std::priority_queue<MergeSortCursor> priority_queue_;
std::vector<std::shared_ptr<Block>> sorted_blocks_;
std::vector<MergeSortCursorImpl> cursors_;
std::vector<Block> sorted_blocks_;
size_t in_mem_sorted_bocks_size_ = 0;
uint64_t num_rows_ = 0;

Expand Down
17 changes: 9 additions & 8 deletions be/src/vec/common/sort/topn_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ Status TopNSorter::_do_sort(Block* block) {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
} else {
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
Block::create_shared(std::move(sorted_block)), _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
auto tmp_cursor_impl =
std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->add_sorted_block(block_cursor.impl->block);
_block_priority_queue.emplace(tmp_cursor_impl);
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
}
}
} else {
Expand Down
Loading

0 comments on commit 1768169

Please sign in to comment.