From 1768169b9a034204491cbc3368d101977d55ec7a Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Sat, 31 Aug 2024 15:58:55 +0800 Subject: [PATCH] Revert "[Improvement](sort) Free sort blocks if this block is exhausted (#39306)" (#40211) Reverts apache/doris#39956 --- be/src/vec/common/sort/partition_sorter.cpp | 42 ++++++------ be/src/vec/common/sort/partition_sorter.h | 4 +- be/src/vec/common/sort/sorter.cpp | 71 ++++++++++----------- be/src/vec/common/sort/sorter.h | 12 ++-- be/src/vec/common/sort/topn_sorter.cpp | 17 ++--- be/src/vec/core/sort_cursor.h | 68 ++++++++++++-------- be/src/vec/runtime/vsorted_run_merger.cpp | 34 +++++++--- be/src/vec/runtime/vsorted_run_merger.h | 13 ++-- 8 files changed, 149 insertions(+), 112 deletions(-) diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index c363a41d1c772e..1ea7c6de6a8a77 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -58,17 +58,20 @@ Status PartitionSorter::append_block(Block* input_block) { Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); DCHECK(input_block->columns() == sorted_block.columns()); RETURN_IF_ERROR(partial_sort(*input_block, sorted_block)); - _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); + RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); return Status::OK(); } Status PartitionSorter::prepare_for_read() { + auto& cursors = _state->get_cursors(); auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description)); + cursors.emplace_back(block, _sort_description); + } + for (auto& cursor : cursors) { + priority_queue.push(MergeSortCursor(&cursor)); } - blocks.clear(); return Status::OK(); } @@ -81,30 +84,29 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { } Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - if (_state->get_priority_queue().empty()) { - *eos = true; - } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) { - block->swap(*_state->get_priority_queue().top().impl->block); - block->set_num_rows(_partition_inner_limit); + if (_state->get_sorted_block().empty()) { *eos = true; } else { - RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); + if (_state->get_sorted_block().size() == 1 && _has_global_limit) { + auto& sorted_block = _state->get_sorted_block()[0]; + block->swap(sorted_block); + block->set_num_rows(_partition_inner_limit); + *eos = true; + } else { + RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); + } } return Status::OK(); } Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { - auto& priority_queue = _state->get_priority_queue(); - if (priority_queue.empty()) { - *eos = true; - return Status::OK(); - } - const auto& sorted_block = priority_queue.top().impl->block; - size_t num_columns = sorted_block->columns(); + const auto& sorted_block = _state->get_sorted_block()[0]; + size_t num_columns = sorted_block.columns(); MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block); + VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block); MutableColumns& merged_columns = m_block.mutable_columns(); size_t current_output_rows = 0; + auto& priority_queue = _state->get_priority_queue(); bool get_enough_data = false; while (!priority_queue.empty()) { @@ -119,7 +121,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int //1 row_number no need to check distinct, just output partition_inner_limit row if ((current_output_rows + _output_total_rows) < _partition_inner_limit) { for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + merged_columns[i]->insert_from(*current->all_columns[i], current->pos); } } else { //rows has get enough @@ -153,7 +155,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + merged_columns[i]->insert_from(*current->all_columns[i], current->pos); } break; } @@ -178,7 +180,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int *_previous_row = current; } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + merged_columns[i]->insert_from(*current->all_columns[i], current->pos); } current_output_rows++; break; diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index 01e009d200db8b..77dcb68371131c 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -50,7 +50,7 @@ struct SortCursorCmp { SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {} void reset() { - impl->reset(); + impl = nullptr; row = 0; } bool compare_two_rows(const MergeSortCursor& rhs) const { @@ -67,7 +67,7 @@ struct SortCursorCmp { return true; } int row = 0; - std::shared_ptr impl = nullptr; + MergeSortCursorImpl* impl = nullptr; }; class PartitionSorter final : public Sorter { diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 89f1c7d73f1c58..eca7e15626b2eb 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -59,44 +59,48 @@ namespace doris::vectorized { void MergeSorterState::reset() { auto empty_queue = std::priority_queue(); priority_queue_.swap(empty_queue); - std::vector> empty_cursors(0); - std::vector> empty_blocks(0); + std::vector empty_cursors(0); + cursors_.swap(empty_cursors); + std::vector empty_blocks(0); sorted_blocks_.swap(empty_blocks); unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); in_mem_sorted_bocks_size_ = 0; } -void MergeSorterState::add_sorted_block(std::shared_ptr block) { - auto rows = block->rows(); +Status MergeSorterState::add_sorted_block(Block& block) { + auto rows = block.rows(); if (0 == rows) { - return; + return Status::OK(); } - in_mem_sorted_bocks_size_ += block->bytes(); - sorted_blocks_.emplace_back(block); + in_mem_sorted_bocks_size_ += block.bytes(); + sorted_blocks_.emplace_back(std::move(block)); num_rows_ += rows; + return Status::OK(); } Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { for (auto& block : sorted_blocks_) { - priority_queue_.emplace( - MergeSortCursorImpl::create_shared(std::move(block), sort_description)); + cursors_.emplace_back(block, sort_description); + } + + if (sorted_blocks_.size() > 1) { + for (auto& cursor : cursors_) { + priority_queue_.emplace(&cursor); + } } - sorted_blocks_.clear(); return Status::OK(); } Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos) { - DCHECK(sorted_blocks_.empty()); - DCHECK(unsorted_block_->empty()); - if (priority_queue_.empty()) { + if (sorted_blocks_.empty()) { *eos = true; - } else if (priority_queue_.size() == 1) { + } else if (sorted_blocks_.size() == 1) { if (offset_ != 0) { - priority_queue_.top().impl->block->skip_num_rows(offset_); + sorted_blocks_[0].skip_num_rows(offset_); } - block->swap(*priority_queue_.top().impl->block); + block->swap(sorted_blocks_[0]); *eos = true; } else { RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos)); @@ -106,14 +110,9 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos) { - if (priority_queue_.empty()) { - *eos = true; - return Status::OK(); - } - size_t num_columns = priority_queue_.top().impl->block->columns(); + size_t num_columns = sorted_blocks_[0].columns(); - MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( - block, *priority_queue_.top().impl->block); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]); MutableColumns& merged_columns = m_block.mutable_columns(); /// Take rows from queue in right order and push to 'merged'. @@ -124,7 +123,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized if (offset_ == 0) { for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + merged_columns[i]->insert_from(*current->all_columns[i], current->pos); ++merged_rows; } else { offset_--; @@ -135,9 +134,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized priority_queue_.push(current); } - if (merged_rows == batch_size) { - break; - } + if (merged_rows == batch_size) break; } block->set_columns(std::move(merged_columns)); @@ -264,22 +261,22 @@ Status FullSorter::_do_sort() { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - _state->add_sorted_block(Block::create_shared(std::move(desc_block))); - _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( - _state->last_sorted_block(), _sort_description)); + static_cast(_state->add_sorted_block(desc_block)); + _block_priority_queue.emplace(_pool->add( + new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); } else { - auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( - Block::create_shared(std::move(desc_block)), _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl); + auto tmp_cursor_impl = + std::make_unique(desc_block, _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - _state->add_sorted_block(tmp_cursor_impl->block); - _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( - _state->last_sorted_block(), _sort_description)); + static_cast(_state->add_sorted_block(desc_block)); + _block_priority_queue.emplace(_pool->add( + new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); } } } else { // dispose normal sort logic - _state->add_sorted_block(Block::create_shared(std::move(desc_block))); + static_cast(_state->add_sorted_block(desc_block)); } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index daa871f5d48008..2525ca8c0c11b9 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -59,7 +59,7 @@ class MergeSorterState { ~MergeSorterState() = default; - void add_sorted_block(std::shared_ptr block); + Status add_sorted_block(Block& block); Status build_merge_tree(const SortDescription& sort_description); @@ -72,19 +72,23 @@ class MergeSorterState { uint64_t num_rows() const { return num_rows_; } - std::shared_ptr last_sorted_block() { return sorted_blocks_.back(); } + Block& last_sorted_block() { return sorted_blocks_.back(); } - std::vector>& get_sorted_block() { return sorted_blocks_; } + std::vector& get_sorted_block() { return sorted_blocks_; } std::priority_queue& get_priority_queue() { return priority_queue_; } + std::vector& get_cursors() { return cursors_; } void reset(); std::unique_ptr unsorted_block_; private: + int _calc_spill_blocks_to_merge() const; + Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos); std::priority_queue priority_queue_; - std::vector> sorted_blocks_; + std::vector cursors_; + std::vector sorted_blocks_; size_t in_mem_sorted_bocks_size_ = 0; uint64_t num_rows_ = 0; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 1f24fb14c950a9..58c3cd2dd0cfad 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -72,16 +72,17 @@ Status TopNSorter::_do_sort(Block* block) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); - _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( - _state->last_sorted_block(), _sort_description)); + RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); + _block_priority_queue.emplace(_pool->add( + new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); } else { - auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( - Block::create_shared(std::move(sorted_block)), _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl); + auto tmp_cursor_impl = + std::make_unique(sorted_block, _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - _state->add_sorted_block(block_cursor.impl->block); - _block_priority_queue.emplace(tmp_cursor_impl); + RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); + _block_priority_queue.emplace(_pool->add( + new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); } } } else { diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 8b627f50af77c8..e565819c9d6639 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -120,8 +120,7 @@ struct HeapSortCursorImpl { * It is used in priority queue. */ struct MergeSortCursorImpl { - ENABLE_FACTORY_CREATOR(MergeSortCursorImpl); - std::shared_ptr block; + ColumnRawPtrs all_columns; ColumnRawPtrs sort_columns; SortDescription desc; size_t sort_columns_size = 0; @@ -131,30 +130,37 @@ struct MergeSortCursorImpl { MergeSortCursorImpl() = default; virtual ~MergeSortCursorImpl() = default; - MergeSortCursorImpl(std::shared_ptr block_, const SortDescription& desc_) - : block(block_), desc(desc_), sort_columns_size(desc.size()) { - reset(); + MergeSortCursorImpl(Block& block, const SortDescription& desc_) + : desc(desc_), sort_columns_size(desc.size()) { + reset(block); } MergeSortCursorImpl(const SortDescription& desc_) - : block(Block::create_shared()), desc(desc_), sort_columns_size(desc.size()) {} + : desc(desc_), sort_columns_size(desc.size()) {} bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. - void reset() { + void reset(Block& block) { + all_columns.clear(); sort_columns.clear(); - auto columns = block->get_columns_and_convert(); + auto columns = block.get_columns_and_convert(); + size_t num_columns = columns.size(); + + for (size_t j = 0; j < num_columns; ++j) { + all_columns.push_back(columns[j].get()); + } + for (size_t j = 0, size = desc.size(); j < size; ++j) { auto& column_desc = desc[j]; size_t column_number = !column_desc.column_name.empty() - ? block->get_position_by_name(column_desc.column_name) + ? block.get_position_by_name(column_desc.column_name) : column_desc.column_number; sort_columns.push_back(columns[column_number].get()); } pos = 0; - rows = block->rows(); + rows = all_columns[0]->size(); } bool is_first() const { return pos == 0; } @@ -168,13 +174,11 @@ struct MergeSortCursorImpl { using BlockSupplier = std::function; struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { - ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl); BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first) : _ordering_expr(ordering_expr), _block_supplier(block_supplier) { - block = Block::create_shared(); sort_columns_size = ordering_expr.size(); desc.resize(ordering_expr.size()); @@ -191,21 +195,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { } bool has_next_block() override { - block->clear(); + _block.clear(); Status status; do { - status = _block_supplier(block.get(), &_is_eof); - } while (block->empty() && !_is_eof && status.ok()); + status = _block_supplier(&_block, &_is_eof); + } while (_block.empty() && !_is_eof && status.ok()); // If status not ok, upper callers could not detect whether it is eof or error. // So that fatal here, and should throw exception in the future. - if (status.ok() && !block->empty()) { + if (status.ok() && !_block.empty()) { if (_ordering_expr.size() > 0) { for (int i = 0; status.ok() && i < desc.size(); ++i) { // TODO yiguolei: throw exception if status not ok in the future - status = _ordering_expr[i]->execute(block.get(), &desc[i].column_number); + status = _ordering_expr[i]->execute(&_block, &desc[i].column_number); } } - MergeSortCursorImpl::reset(); + MergeSortCursorImpl::reset(_block); return status.ok(); } else if (!status.ok()) { throw std::runtime_error(status.msg()); @@ -217,21 +221,32 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return nullptr; } - return block.get(); + return &_block; + } + + size_t columns_num() const { return all_columns.size(); } + + Block create_empty_blocks() const { + size_t num_columns = columns_num(); + MutableColumns columns(num_columns); + for (size_t i = 0; i < num_columns; ++i) { + columns[i] = all_columns[i]->clone_empty(); + } + return _block.clone_with_columns(std::move(columns)); } VExprContextSPtrs _ordering_expr; + Block _block; BlockSupplier _block_supplier {}; bool _is_eof = false; }; /// For easy copying. struct MergeSortCursor { - ENABLE_FACTORY_CREATOR(MergeSortCursor); - std::shared_ptr impl; + MergeSortCursorImpl* impl; - MergeSortCursor(std::shared_ptr impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl.get(); } + MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl; } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const { @@ -271,11 +286,10 @@ struct MergeSortCursor { /// For easy copying. struct MergeSortBlockCursor { - ENABLE_FACTORY_CREATOR(MergeSortBlockCursor); - std::shared_ptr impl = nullptr; + MergeSortCursorImpl* impl = nullptr; - MergeSortBlockCursor(std::shared_ptr impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl.get(); } + MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl; } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const { diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index f321622012fd7e..3b17f957debaa6 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -28,6 +28,14 @@ #include "vec/core/column_with_type_and_name.h" #include "vec/utils/util.hpp" +namespace doris { +namespace vectorized { +class VExprContext; +} // namespace vectorized +} // namespace doris + +using std::vector; + namespace doris::vectorized { VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr, @@ -60,14 +68,13 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) { _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock"); } -Status VSortedRunMerger::prepare(const std::vector& input_runs) { +Status VSortedRunMerger::prepare(const vector& input_runs) { try { for (const auto& supplier : input_runs) { if (_use_sort_desc) { - _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, _desc)); + _cursors.emplace_back(supplier, _desc); } else { - _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared( - supplier, _ordering_expr, _is_asc_order, _nulls_first)); + _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first); } } } catch (const std::exception& e) { @@ -75,8 +82,15 @@ Status VSortedRunMerger::prepare(const std::vector& input_runs) { } for (auto& _cursor : _cursors) { - if (!_cursor->_is_eof) { - _priority_queue.push(MergeSortCursor(_cursor)); + if (!_cursor._is_eof) { + _priority_queue.push(MergeSortCursor(&_cursor)); + } + } + + for (const auto& cursor : _cursors) { + if (!cursor._is_eof) { + _empty_block = cursor.create_empty_blocks(); + break; } } @@ -131,7 +145,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } else { if (current->block_ptr() != nullptr) { - for (int i = 0; i < current->block->columns(); i++) { + for (int i = 0; i < current->all_columns.size(); i++) { auto& column_with_type = current->block_ptr()->get_by_position(i); column_with_type.column = column_with_type.column->cut( current->pos, current->rows - current->pos); @@ -148,9 +162,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } } else { - size_t num_columns = _priority_queue.top().impl->block->columns(); - MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( - output_block, *_priority_queue.top().impl->block); + size_t num_columns = _empty_block.columns(); + MutableBlock m_block = + VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block); MutableColumns& merged_columns = m_block.mutable_columns(); if (num_columns != merged_columns.size()) { diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 844704fd130296..943956d8c3878f 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -30,7 +30,9 @@ #include "vec/core/sort_description.h" #include "vec/exprs/vexpr_fwd.h" -namespace doris::vectorized { +namespace doris { + +namespace vectorized { // VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is a sorted // sequence of blocks, which are fetched from a BlockSupplier function object. @@ -76,12 +78,14 @@ class VSortedRunMerger { bool _pipeline_engine_enabled = false; - std::vector> _cursors; + std::vector _cursors; std::priority_queue _priority_queue; /// In pipeline engine, if a cursor needs to read one more block from supplier, /// we make it as a pending cursor until the supplier is readable. - std::shared_ptr _pending_cursor = nullptr; + MergeSortCursorImpl* _pending_cursor = nullptr; + + Block _empty_block; // Times calls to get_next(). RuntimeProfile::Counter* _get_next_timer = nullptr; @@ -101,4 +105,5 @@ class VSortedRunMerger { bool has_next_block(MergeSortCursor& current); }; -} // namespace doris::vectorized +} // namespace vectorized +} // namespace doris