diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index ed032d0976700e..63b88aa9de2b92 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -89,6 +89,18 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl _output_row_descriptor = std::make_unique( descs, std::vector {tnode.output_tuple_id}, std::vector {true}); } + if (!tnode.intermediate_output_tuple_id_list.empty()) { + DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id"; + // common subexpression elimination + DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(), + tnode.intermediate_projections_list.size()); + _intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size()); + for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) { + _intermediate_output_row_descriptor.push_back( + RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true})); + } + } + _query_statistics = std::make_shared(); } @@ -114,7 +126,15 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { DCHECK(tnode.__isset.output_tuple_id); RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections)); } - + if (!tnode.intermediate_projections_list.empty()) { + DCHECK(tnode.__isset.projections) << "no final projections"; + _intermediate_projections.reserve(tnode.intermediate_projections_list.size()); + for (const auto& tnode_projections : tnode.intermediate_projections_list) { + vectorized::VExprContextSPtrs projections; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections)); + _intermediate_projections.push_back(projections); + } + } return Status::OK(); } @@ -143,7 +163,12 @@ Status ExecNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); } - RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc())); + for (int i = 0; i < _intermediate_projections.size(); i++) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state, + intermediate_row_desc(i))); + } + + RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc())); for (auto& i : _children) { RETURN_IF_ERROR(i->prepare(state)); @@ -155,6 +180,9 @@ Status ExecNode::alloc_resource(RuntimeState* state) { for (auto& conjunct : _conjuncts) { RETURN_IF_ERROR(conjunct->open(state)); } + for (auto& projections : _intermediate_projections) { + RETURN_IF_ERROR(vectorized::VExpr::open(projections, state)); + } RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state)); return Status::OK(); } @@ -514,6 +542,22 @@ std::string ExecNode::get_name() { Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_projection_timer); + const size_t rows = origin_block->rows(); + if (rows == 0) { + return Status::OK(); + } + vectorized::Block input_block = *origin_block; + + std::vector result_column_ids; + for (auto& projections : _intermediate_projections) { + result_column_ids.resize(projections.size()); + for (int i = 0; i < projections.size(); i++) { + RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i])); + } + input_block.shuffle_columns(result_column_ids); + } + + DCHECK_EQ(rows, input_block.rows()); auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { if (to->is_nullable() && !from->is_nullable()) { if (_keep_origin || !from->is_exclusive()) { @@ -535,29 +579,26 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo using namespace vectorized; MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); - auto rows = origin_block->rows(); - if (rows != 0) { - auto& mutable_columns = mutable_block.mutable_columns(); + auto& mutable_columns = mutable_block.mutable_columns(); - if (mutable_columns.size() != _projections.size()) { - return Status::InternalError( - "Logical error during processing {}, output of projections {} mismatches with " - "exec node output {}", - this->get_name(), _projections.size(), mutable_columns.size()); - } + if (mutable_columns.size() != _projections.size()) { + return Status::InternalError( + "Logical error during processing {}, output of projections {} mismatches with " + "exec node output {}", + this->get_name(), _projections.size(), mutable_columns.size()); + } - for (int i = 0; i < mutable_columns.size(); ++i) { - auto result_column_id = -1; - RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id)); - auto column_ptr = origin_block->get_by_position(result_column_id) - .column->convert_to_full_column_if_const(); - //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - insert_column_datas(mutable_columns[i], column_ptr, rows); - } - DCHECK(mutable_block.rows() == rows); - output_block->set_columns(std::move(mutable_columns)); + for (int i = 0; i < mutable_columns.size(); ++i) { + auto result_column_id = -1; + RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id)); + auto column_ptr = input_block.get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it + insert_column_datas(mutable_columns[i], column_ptr, rows); } + DCHECK(mutable_block.rows() == rows); + output_block->set_columns(std::move(mutable_columns)); return Status::OK(); } diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index f2303068437b2f..10b035835d7a7f 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -220,6 +220,26 @@ class ExecNode { return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor; } virtual const RowDescriptor& intermediate_row_desc() const { return _row_descriptor; } + + // input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr + // prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor + + [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) { + if (idx == 0) { + return intermediate_row_desc(); + } + DCHECK((idx - 1) < _intermediate_output_row_descriptor.size()); + return _intermediate_output_row_descriptor[idx - 1]; + } + + [[nodiscard]] const RowDescriptor& projections_row_desc() const { + if (_intermediate_output_row_descriptor.empty()) { + return intermediate_row_desc(); + } else { + return _intermediate_output_row_descriptor.back(); + } + } + int64_t rows_returned() const { return _num_rows_returned; } int64_t limit() const { return _limit; } bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; } @@ -270,6 +290,10 @@ class ExecNode { std::unique_ptr _output_row_descriptor; vectorized::VExprContextSPtrs _projections; + std::vector _intermediate_output_row_descriptor; + // Used in common subexpression elimination to compute intermediate results. + std::vector _intermediate_projections; + /// Resource information sent from the frontend. const TBackendResourceProfile _resource_profile; diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 989b1ee00a517d..4a16cb65a014be 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -23,6 +23,8 @@ #include #include "common/logging.h" +#include "common/status.h" +#include "exec/exec_node.h" #include "pipeline/exec/aggregation_sink_operator.h" #include "pipeline/exec/aggregation_source_operator.h" #include "pipeline/exec/analytic_sink_operator.h" @@ -123,10 +125,20 @@ Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) { } // create the projections expr + if (tnode.__isset.projections) { DCHECK(tnode.__isset.output_tuple_id); RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections)); } + if (!tnode.intermediate_projections_list.empty()) { + DCHECK(tnode.__isset.projections) << "no final projections"; + _intermediate_projections.reserve(tnode.intermediate_projections_list.size()); + for (const auto& tnode_projections : tnode.intermediate_projections_list) { + vectorized::VExprContextSPtrs projections; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections)); + _intermediate_projections.push_back(projections); + } + } return Status::OK(); } @@ -134,8 +146,11 @@ Status OperatorXBase::prepare(RuntimeState* state) { for (auto& conjunct : _conjuncts) { RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); } - - RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc())); + for (int i = 0; i < _intermediate_projections.size(); i++) { + RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state, + intermediate_row_desc(i))); + } + RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc())); if (_child_x && !is_source()) { RETURN_IF_ERROR(_child_x->prepare(state)); @@ -149,6 +164,9 @@ Status OperatorXBase::open(RuntimeState* state) { RETURN_IF_ERROR(conjunct->open(state)); } RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state)); + for (auto& projections : _intermediate_projections) { + RETURN_IF_ERROR(vectorized::VExpr::open(projections, state)); + } if (_child_x && !is_source()) { RETURN_IF_ERROR(_child_x->open(state)); } @@ -175,7 +193,22 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori auto* local_state = state->get_local_state(operator_id()); SCOPED_TIMER(local_state->exec_time_counter()); SCOPED_TIMER(local_state->_projection_timer); + const size_t rows = origin_block->rows(); + if (rows == 0) { + return Status::OK(); + } + vectorized::Block input_block = *origin_block; + std::vector result_column_ids; + for (const auto& projections : _intermediate_projections) { + result_column_ids.resize(projections.size()); + for (int i = 0; i < projections.size(); i++) { + RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i])); + } + input_block.shuffle_columns(result_column_ids); + } + + DCHECK_EQ(rows, input_block.rows()); auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { if (to->is_nullable() && !from->is_nullable()) { if (_keep_origin || !from->is_exclusive()) { @@ -198,15 +231,13 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori vectorized::MutableBlock mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); - auto rows = origin_block->rows(); - if (rows != 0) { auto& mutable_columns = mutable_block.mutable_columns(); DCHECK(mutable_columns.size() == local_state->_projections.size()); for (int i = 0; i < mutable_columns.size(); ++i) { auto result_column_id = -1; - RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id)); - auto column_ptr = origin_block->get_by_position(result_column_id) + RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, &result_column_id)); + auto column_ptr = input_block.get_by_position(result_column_id) .column->convert_to_full_column_if_const(); insert_column_datas(mutable_columns[i], column_ptr, rows); } @@ -365,6 +396,15 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState for (size_t i = 0; i < _projections.size(); i++) { RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i])); } + _intermediate_projections.resize(_parent->_intermediate_projections.size()); + for (int i = 0; i < _parent->_intermediate_projections.size(); i++) { + _intermediate_projections[i].resize(_parent->_intermediate_projections[i].size()); + for (int j = 0; j < _parent->_intermediate_projections[i].size(); j++) { + RETURN_IF_ERROR(_parent->_intermediate_projections[i][j]->clone( + state, _intermediate_projections[i][j])); + } + } + _rows_returned_counter = ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1); _blocks_returned_counter = diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index b1667affe614e8..aa2bf7aa5e0e27 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -135,6 +135,9 @@ class PipelineXLocalStateBase { RuntimeState* _state = nullptr; vectorized::VExprContextSPtrs _conjuncts; vectorized::VExprContextSPtrs _projections; + // Used in common subexpression elimination to compute intermediate results. + std::vector _intermediate_projections; + bool _closed = false; vectorized::Block _origin_block; }; @@ -155,6 +158,22 @@ class OperatorXBase : public OperatorBase { if (tnode.__isset.output_tuple_id) { _output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true})); } + if (tnode.__isset.output_tuple_id) { + _output_row_descriptor = std::make_unique( + descs, std::vector {tnode.output_tuple_id}, std::vector {true}); + } + if (!tnode.intermediate_output_tuple_id_list.empty()) { + DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id"; + // common subexpression elimination + DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(), + tnode.intermediate_projections_list.size()); + _intermediate_output_row_descriptor.reserve( + tnode.intermediate_output_tuple_id_list.size()); + for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) { + _intermediate_output_row_descriptor.push_back( + RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true})); + } + } } OperatorXBase(ObjectPool* pool, int node_id, int operator_id) @@ -247,6 +266,25 @@ class OperatorXBase : public OperatorBase { return _row_descriptor; } + // input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr + // prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor + + [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) { + if (idx == 0) { + return intermediate_row_desc(); + } + DCHECK((idx - 1) < _intermediate_output_row_descriptor.size()); + return _intermediate_output_row_descriptor[idx - 1]; + } + + [[nodiscard]] const RowDescriptor& projections_row_desc() const { + if (_intermediate_output_row_descriptor.empty()) { + return intermediate_row_desc(); + } else { + return _intermediate_output_row_descriptor.back(); + } + } + [[nodiscard]] std::string debug_string() const override { return ""; } virtual std::string debug_string(int indentation_level = 0) const; @@ -318,6 +356,10 @@ class OperatorXBase : public OperatorBase { std::unique_ptr _output_row_descriptor = nullptr; vectorized::VExprContextSPtrs _projections; + std::vector _intermediate_output_row_descriptor; + // Used in common subexpression elimination to compute intermediate results. + std::vector _intermediate_projections; + /// Resource information sent from the frontend. const TBackendResourceProfile _resource_profile; diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index c93bfb11f09d6d..1d8d3e838015c9 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -719,6 +719,15 @@ void Block::swap(Block&& other) noexcept { row_same_bit = std::move(other.row_same_bit); } +void Block::shuffle_columns(const std::vector& result_column_ids) { + Container tmp_data; + tmp_data.reserve(result_column_ids.size()); + for (const int result_column_id : result_column_ids) { + tmp_data.push_back(data[result_column_id]); + } + swap(Block {tmp_data}); +} + void Block::update_hash(SipHash& hash) const { for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) { for (const auto& col : data) { diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index ce32cc5cf39189..d6567de0a44211 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -234,6 +234,9 @@ class Block { void swap(Block& other) noexcept; void swap(Block&& other) noexcept; + // Shuffle columns in place based on the result_column_ids + void shuffle_columns(const std::vector& result_column_ids); + // Default column size = -1 means clear all column in block // Else clear column [0, column_size) delete column [column_size, data.size) void clear_column_data(int column_size = -1) noexcept; diff --git a/be/src/vec/exec/scan/vscanner.cpp b/be/src/vec/exec/scan/vscanner.cpp index 7354b9e085fd85..de0b6b4569156f 100644 --- a/be/src/vec/exec/scan/vscanner.cpp +++ b/be/src/vec/exec/scan/vscanner.cpp @@ -20,6 +20,7 @@ #include #include "common/config.h" +#include "exec/exec_node.h" #include "pipeline/exec/scan_operator.h" #include "runtime/descriptors.h" #include "util/defer_op.h" @@ -69,6 +70,19 @@ Status VScanner::prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts } } + const auto& intermediate_projections = + _parent ? _parent->_intermediate_projections : _local_state->_intermediate_projections; + if (!intermediate_projections.empty()) { + _intermediate_projections.resize(intermediate_projections.size()); + for (int i = 0; i < intermediate_projections.size(); i++) { + _intermediate_projections[i].resize(intermediate_projections[i].size()); + for (int j = 0; j < intermediate_projections[i].size(); j++) { + RETURN_IF_ERROR(intermediate_projections[i][j]->clone( + state, _intermediate_projections[i][j])); + } + } + } + return Status::OK(); } @@ -172,42 +186,55 @@ Status VScanner::_filter_output_block(Block* block) { } Status VScanner::_do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { - auto projection_timer = _parent ? _parent->_projection_timer : _local_state->_projection_timer; - auto exec_timer = _parent ? _parent->_exec_timer : _local_state->_exec_timer; + auto& projection_timer = _parent ? _parent->_projection_timer : _local_state->_projection_timer; + auto& exec_timer = _parent ? _parent->_exec_timer : _local_state->_exec_timer; SCOPED_TIMER(exec_timer); SCOPED_TIMER(projection_timer); + const size_t rows = origin_block->rows(); + if (rows == 0) { + return Status::OK(); + } + vectorized::Block input_block = *origin_block; + + std::vector result_column_ids; + for (auto& projections : _intermediate_projections) { + result_column_ids.resize(projections.size()); + for (int i = 0; i < projections.size(); i++) { + RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i])); + } + input_block.shuffle_columns(result_column_ids); + } + + DCHECK_EQ(rows, input_block.rows()); MutableBlock mutable_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); - auto rows = origin_block->rows(); - if (rows != 0) { - auto& mutable_columns = mutable_block.mutable_columns(); + auto& mutable_columns = mutable_block.mutable_columns(); - if (mutable_columns.size() != _projections.size()) { - return Status::InternalError( - "Logical error in scanner, output of projections {} mismatches with " - "scanner output {}", - _projections.size(), mutable_columns.size()); - } + if (mutable_columns.size() != _projections.size()) { + return Status::InternalError( + "Logical error in scanner, output of projections {} mismatches with " + "scanner output {}", + _projections.size(), mutable_columns.size()); + } - for (int i = 0; i < mutable_columns.size(); ++i) { - auto result_column_id = -1; - RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id)); - auto column_ptr = origin_block->get_by_position(result_column_id) - .column->convert_to_full_column_if_const(); - //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it - if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { - DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); - reinterpret_cast(mutable_columns[i].get()) - ->insert_range_from_not_nullable(*column_ptr, 0, rows); - } else { - mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); - } + for (int i = 0; i < mutable_columns.size(); ++i) { + auto result_column_id = -1; + RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id)); + auto column_ptr = input_block.get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it + if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) { + DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable()); + reinterpret_cast(mutable_columns[i].get()) + ->insert_range_from_not_nullable(*column_ptr, 0, rows); + } else { + mutable_columns[i]->insert_range_from(*column_ptr, 0, rows); } - DCHECK(mutable_block.rows() == rows); - output_block->set_columns(std::move(mutable_columns)); } + DCHECK(mutable_block.rows() == rows); + output_block->set_columns(std::move(mutable_columns)); return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index cf91446c4f652b..ba953192507c8e 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -203,6 +203,8 @@ class VScanner { // It includes predicate in SQL and runtime filters. VExprContextSPtrs _conjuncts; VExprContextSPtrs _projections; + // Used in common subexpression elimination to compute intermediate results. + std::vector _intermediate_projections; vectorized::Block _origin_block; VExprContextSPtrs _common_expr_ctxs_push_down; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 2fadcdae538795..d88ab993363352 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1294,10 +1294,13 @@ struct TPlanNode { 49: optional i64 push_down_count 50: optional list> distribute_expr_lists - + // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list projections 102: optional Types.TTupleId output_tuple_id 103: optional TPartitionSortNode partition_sort_node + // Intermediate projections will not materialize into the output block. + 104: optional list> intermediate_projections_list + 105: optional list intermediate_output_tuple_id_list } // A flattened representation of a tree of PlanNodes, obtained by depth-first