Skip to content

Commit

Permalink
[refactor](mem_reuse) refactor mem_reuse in MutableBlock (#21564)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Jul 20, 2023
1 parent f3d9a84 commit 6875ef4
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 106 deletions.
9 changes: 2 additions & 7 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,8 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_projection_timer);
using namespace vectorized;
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
is_mem_reuse ? MutableBlock(output_block)
: MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
*_output_row_descriptor));
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
auto rows = origin_block->rows();

if (rows != 0) {
Expand All @@ -552,9 +549,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
}

if (!is_mem_reuse) output_block->swap(mutable_block.to_block());
DCHECK(output_block->rows() == rows);
DCHECK(mutable_block.rows() == rows);
}

return Status::OK();
Expand Down
11 changes: 3 additions & 8 deletions be/src/vec/common/sort/partition_sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
const auto& sorted_block = _state->get_sorted_block()[0];
size_t num_columns = sorted_block.columns();
bool mem_reuse = output_block->mem_reuse();
MutableColumns merged_columns =
mem_reuse ? output_block->mutate_columns() : sorted_block.clone_empty_columns();

MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;
auto& priority_queue = _state->get_priority_queue();

Expand Down Expand Up @@ -189,10 +188,6 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
}
}

if (!mem_reuse) {
Block merge_block = sorted_block.clone_with_columns(std::move(merged_columns));
merge_block.swap(*output_block);
}
_output_total_rows += output_block->rows();
if (current_output_rows == 0 || get_enough_data == true) {
*eos = true;
Expand Down
12 changes: 4 additions & 8 deletions be/src/vec/common/sort/sorter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
#include "runtime/thread_context.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
#include "vec/core/block_spill_reader.h"
#include "vec/core/block_spill_writer.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/sort_block.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/utils/util.hpp"

namespace doris {
class RowDescriptor;
Expand Down Expand Up @@ -160,9 +162,8 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
doris::vectorized::Block* block, bool* eos) {
size_t num_columns = sorted_blocks_[0].columns();

bool mem_reuse = block->mem_reuse();
MutableColumns merged_columns =
mem_reuse ? block->mutate_columns() : sorted_blocks_[0].clone_empty_columns();
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
MutableColumns& merged_columns = m_block.mutable_columns();

/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
Expand Down Expand Up @@ -191,11 +192,6 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
return Status::OK();
}

if (!mem_reuse) {
Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns));
merge_block.swap(*block);
}

return Status::OK();
}

Expand Down
27 changes: 5 additions & 22 deletions be/src/vec/exec/vrepeat_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/utils/util.hpp"

namespace doris::vectorized {
VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
Expand Down Expand Up @@ -102,17 +104,10 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl

size_t child_column_size = child_block->columns();
size_t column_size = _output_slots.size();
bool mem_reuse = output_block->mem_reuse();
DCHECK_LT(child_column_size, column_size);
std::vector<vectorized::MutableColumnPtr> columns(column_size);
for (size_t i = 0; i < column_size; i++) {
if (mem_reuse) {
columns[i] = std::move(*output_block->get_by_position(i).column).mutate();
} else {
columns[i] = _output_slots[i]->get_empty_mutable_column();
}
}

MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
MutableColumns& columns = m_block.mutable_columns();
/* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2));
* insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1);
* slot_id_set_list=[[0],[1]],repeat_id_idx=0,
Expand Down Expand Up @@ -173,18 +168,6 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl

DCHECK_EQ(cur_col, column_size);

if (!columns.empty() && !columns[0]->empty()) {
auto n_columns = 0;
if (!mem_reuse) {
for (const auto slot_desc : _output_slots) {
output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
} else {
columns.clear();
}
}
return Status::OK();
}

Expand Down
31 changes: 6 additions & 25 deletions be/src/vec/exec/vtable_function_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
#include <string>
#include <utility>

#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/exprs/table_function/table_function_factory.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/utils/util.hpp"

namespace doris {
class ObjectPool;
Expand Down Expand Up @@ -153,18 +156,9 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos

Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block,
bool* eos) {
size_t column_size = _output_slots.size();
bool mem_reuse = output_block->mem_reuse();

std::vector<MutableColumnPtr> columns(column_size);
for (size_t i = 0; i < column_size; i++) {
if (mem_reuse) {
columns[i] = std::move(*output_block->get_by_position(i).column).mutate();
} else {
columns[i] = _output_slots[i]->get_empty_mutable_column();
}
}

MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
MutableColumns& columns = m_block.mutable_columns();
for (int i = 0; i < _fn_num; i++) {
if (columns[i + _child_slots.size()]->is_nullable()) {
_fns[i]->set_nullable();
Expand Down Expand Up @@ -222,19 +216,6 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
columns[index]->insert_many_defaults(row_size - columns[index]->size());
}

if (!columns.empty() && !columns[0]->empty()) {
auto n_columns = 0;
if (!mem_reuse) {
for (const auto slot_desc : _output_slots) {
output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
} else {
columns.clear();
}
}

// 3. eval conjuncts
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));

Expand Down
31 changes: 4 additions & 27 deletions be/src/vec/exec/vunion_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
DCHECK(!reached_limit());
DCHECK_LT(_child_idx, _children.size());

bool mem_reuse = block->mem_reuse();
MutableBlock mblock =
mem_reuse ? MutableBlock::build_mutable_block(block)
: MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
_row_descriptor)));
MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);

Block child_block;
while (has_more_materialized() && mblock.rows() <= state->batch_size()) {
Expand Down Expand Up @@ -202,10 +198,6 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
}
}

if (!mem_reuse) {
block->swap(mblock.to_block());
}

DCHECK_LE(_child_idx, _children.size());
return Status::OK();
}
Expand All @@ -214,11 +206,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
DCHECK_EQ(state->per_fragment_instance_idx(), 0);
DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size());

bool mem_reuse = block->mem_reuse();
MutableBlock mblock =
mem_reuse ? MutableBlock::build_mutable_block(block)
: MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
_row_descriptor)));
MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);
for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size();
++_const_expr_list_idx) {
Block tmp_block;
Expand All @@ -237,10 +225,6 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
}
}

if (!mem_reuse) {
block->swap(mblock.to_block());
}

// some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);"
// the const expr will be in output expr cause the union node return a empty block. so here we
// need add one row to make sure the union node exec const expr return at least one row
Expand All @@ -257,19 +241,12 @@ Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id,
vectorized::Block* output_block) {
DCHECK_LT(child_id, _children.size());
DCHECK(!is_child_passthrough(child_id));
bool mem_reuse = output_block->mem_reuse();
MutableBlock mblock =
mem_reuse ? MutableBlock::build_mutable_block(output_block)
: MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
_row_descriptor)));

if (input_block->rows() > 0) {
MutableBlock mblock =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, _row_descriptor);
Block res;
RETURN_IF_ERROR(materialize_block(input_block, child_id, &res));
RETURN_IF_ERROR(mblock.merge(res));
if (!mem_reuse) {
output_block->swap(mblock.to_block());
}
}
return Status::OK();
}
Expand Down
12 changes: 4 additions & 8 deletions be/src/vec/runtime/vsorted_run_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/utils/util.hpp"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -129,9 +130,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
}
} else {
size_t num_columns = _empty_block.columns();
bool mem_reuse = output_block->mem_reuse();
MutableColumns merged_columns =
mem_reuse ? output_block->mutate_columns() : _empty_block.clone_empty_columns();
MutableBlock m_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block);
MutableColumns& merged_columns = m_block.mutable_columns();

/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
Expand All @@ -154,11 +155,6 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
*eos = true;
return Status::OK();
}

if (!mem_reuse) {
Block merge_block = _empty_block.clone_with_columns(std::move(merged_columns));
merge_block.swap(*output_block);
}
}

_num_rows_returned += output_block->rows();
Expand Down
33 changes: 32 additions & 1 deletion be/src/vec/utils/util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <boost/shared_ptr.hpp>

#include "runtime/descriptors.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
Expand All @@ -34,7 +35,37 @@ class VectorizedUtils {
// Block block;
return create_columns_with_type_and_name(row_desc);
}

static MutableBlock build_mutable_mem_reuse_block(Block* block, const RowDescriptor& row_desc) {
if (!block->mem_reuse()) {
MutableBlock tmp(VectorizedUtils::create_columns_with_type_and_name(row_desc));
block->swap(tmp.to_block());
}
return MutableBlock::build_mutable_block(block);
}
static MutableBlock build_mutable_mem_reuse_block(Block* block, const Block& other) {
if (!block->mem_reuse()) {
MutableBlock tmp(other.clone_empty());
block->swap(tmp.to_block());
}
return MutableBlock::build_mutable_block(block);
}
static MutableBlock build_mutable_mem_reuse_block(Block* block,
std::vector<SlotDescriptor*>& slots) {
if (!block->mem_reuse()) {
size_t column_size = slots.size();
MutableColumns columns(column_size);
for (size_t i = 0; i < column_size; i++) {
columns[i] = slots[i]->get_empty_mutable_column();
}
int n_columns = 0;
for (const auto slot_desc : slots) {
block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
}
return MutableBlock(block);
}
static ColumnsWithTypeAndName create_columns_with_type_and_name(
const RowDescriptor& row_desc, bool ignore_trivial_slot = true) {
ColumnsWithTypeAndName columns_with_type_and_name;
Expand Down

0 comments on commit 6875ef4

Please sign in to comment.