Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[Improvement](sort) Free sort blocks if this block is exhausted (#39306)" #40211

Merged
merged 1 commit into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions be/src/vec/common/sort/partition_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,20 @@ Status PartitionSorter::append_block(Block* input_block) {
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
DCHECK(input_block->columns() == sorted_block.columns());
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
return Status::OK();
}

Status PartitionSorter::prepare_for_read() {
auto& cursors = _state->get_cursors();
auto& blocks = _state->get_sorted_block();
auto& priority_queue = _state->get_priority_queue();
for (auto& block : blocks) {
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
cursors.emplace_back(block, _sort_description);
}
for (auto& cursor : cursors) {
priority_queue.push(MergeSortCursor(&cursor));
}
blocks.clear();
return Status::OK();
}

Expand All @@ -81,30 +84,29 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
}

Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
if (_state->get_priority_queue().empty()) {
*eos = true;
} else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
block->swap(*_state->get_priority_queue().top().impl->block);
block->set_num_rows(_partition_inner_limit);
if (_state->get_sorted_block().empty()) {
*eos = true;
} else {
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
auto& sorted_block = _state->get_sorted_block()[0];
block->swap(sorted_block);
block->set_num_rows(_partition_inner_limit);
*eos = true;
} else {
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
}
}
return Status::OK();
}

Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
auto& priority_queue = _state->get_priority_queue();
if (priority_queue.empty()) {
*eos = true;
return Status::OK();
}
const auto& sorted_block = priority_queue.top().impl->block;
size_t num_columns = sorted_block->columns();
const auto& sorted_block = _state->get_sorted_block()[0];
size_t num_columns = sorted_block.columns();
MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;
auto& priority_queue = _state->get_priority_queue();

bool get_enough_data = false;
while (!priority_queue.empty()) {
Expand All @@ -119,7 +121,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
//1 row_number no need to check distinct, just output partition_inner_limit row
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
}
} else {
//rows has get enough
Expand Down Expand Up @@ -153,7 +155,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
}
}
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
}
break;
}
Expand All @@ -178,7 +180,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
*_previous_row = current;
}
for (size_t i = 0; i < num_columns; ++i) {
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
}
current_output_rows++;
break;
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/common/sort/partition_sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct SortCursorCmp {
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}

void reset() {
impl->reset();
impl = nullptr;
row = 0;
}
bool compare_two_rows(const MergeSortCursor& rhs) const {
Expand All @@ -67,7 +67,7 @@ struct SortCursorCmp {
return true;
}
int row = 0;
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
MergeSortCursorImpl* impl = nullptr;
};

class PartitionSorter final : public Sorter {
Expand Down
71 changes: 34 additions & 37 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,44 +59,48 @@ namespace doris::vectorized {
void MergeSorterState::reset() {
auto empty_queue = std::priority_queue<MergeSortCursor>();
priority_queue_.swap(empty_queue);
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
std::vector<std::shared_ptr<Block>> empty_blocks(0);
std::vector<MergeSortCursorImpl> empty_cursors(0);
cursors_.swap(empty_cursors);
std::vector<Block> empty_blocks(0);
sorted_blocks_.swap(empty_blocks);
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
in_mem_sorted_bocks_size_ = 0;
}

void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
auto rows = block->rows();
Status MergeSorterState::add_sorted_block(Block& block) {
auto rows = block.rows();
if (0 == rows) {
return;
return Status::OK();
}
in_mem_sorted_bocks_size_ += block->bytes();
sorted_blocks_.emplace_back(block);
in_mem_sorted_bocks_size_ += block.bytes();
sorted_blocks_.emplace_back(std::move(block));
num_rows_ += rows;
return Status::OK();
}

Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
for (auto& block : sorted_blocks_) {
priority_queue_.emplace(
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
cursors_.emplace_back(block, sort_description);
}

if (sorted_blocks_.size() > 1) {
for (auto& cursor : cursors_) {
priority_queue_.emplace(&cursor);
}
}

sorted_blocks_.clear();
return Status::OK();
}

Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size,
bool* eos) {
DCHECK(sorted_blocks_.empty());
DCHECK(unsorted_block_->empty());
if (priority_queue_.empty()) {
if (sorted_blocks_.empty()) {
*eos = true;
} else if (priority_queue_.size() == 1) {
} else if (sorted_blocks_.size() == 1) {
if (offset_ != 0) {
priority_queue_.top().impl->block->skip_num_rows(offset_);
sorted_blocks_[0].skip_num_rows(offset_);
}
block->swap(*priority_queue_.top().impl->block);
block->swap(sorted_blocks_[0]);
*eos = true;
} else {
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
Expand All @@ -106,14 +110,9 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba

Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block,
bool* eos) {
if (priority_queue_.empty()) {
*eos = true;
return Status::OK();
}
size_t num_columns = priority_queue_.top().impl->block->columns();
size_t num_columns = sorted_blocks_[0].columns();

MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
block, *priority_queue_.top().impl->block);
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
MutableColumns& merged_columns = m_block.mutable_columns();

/// Take rows from queue in right order and push to 'merged'.
Expand All @@ -124,7 +123,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized

if (offset_ == 0) {
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
++merged_rows;
} else {
offset_--;
Expand All @@ -135,9 +134,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
priority_queue_.push(current);
}

if (merged_rows == batch_size) {
break;
}
if (merged_rows == batch_size) break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: statement should be inside braces [readability-braces-around-statements]

Suggested change
if (merged_rows == batch_size) break;
if (merged_rows == batch_size) { break;
}

}
block->set_columns(std::move(merged_columns));

Expand Down Expand Up @@ -264,22 +261,22 @@ Status FullSorter::_do_sort() {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
static_cast<void>(_state->add_sorted_block(desc_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
} else {
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
Block::create_shared(std::move(desc_block)), _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
auto tmp_cursor_impl =
std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->add_sorted_block(tmp_cursor_impl->block);
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
static_cast<void>(_state->add_sorted_block(desc_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
}
}
} else {
// dispose normal sort logic
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
static_cast<void>(_state->add_sorted_block(desc_block));
}
return Status::OK();
}
Expand Down
12 changes: 8 additions & 4 deletions be/src/vec/common/sort/sorter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class MergeSorterState {

~MergeSorterState() = default;

void add_sorted_block(std::shared_ptr<Block> block);
Status add_sorted_block(Block& block);

Status build_merge_tree(const SortDescription& sort_description);

Expand All @@ -72,19 +72,23 @@ class MergeSorterState {

uint64_t num_rows() const { return num_rows_; }

std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); }
Block& last_sorted_block() { return sorted_blocks_.back(); }

std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; }
std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
void reset();

std::unique_ptr<Block> unsorted_block_;

private:
int _calc_spill_blocks_to_merge() const;

Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos);

std::priority_queue<MergeSortCursor> priority_queue_;
std::vector<std::shared_ptr<Block>> sorted_blocks_;
std::vector<MergeSortCursorImpl> cursors_;
std::vector<Block> sorted_blocks_;
size_t in_mem_sorted_bocks_size_ = 0;
uint64_t num_rows_ = 0;

Expand Down
17 changes: 9 additions & 8 deletions be/src/vec/common/sort/topn_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ Status TopNSorter::_do_sort(Block* block) {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows() < _offset + _limit) {
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
_state->last_sorted_block(), _sort_description));
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
} else {
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
Block::create_shared(std::move(sorted_block)), _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
auto tmp_cursor_impl =
std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description);
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->add_sorted_block(block_cursor.impl->block);
_block_priority_queue.emplace(tmp_cursor_impl);
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
}
}
} else {
Expand Down
Loading
Loading