Skip to content

Commit

Permalink
[featrue](expr) support common subexpression elimination be part (#32673
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Mryange authored Apr 1, 2024
1 parent e834d80 commit b42285f
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 54 deletions.
83 changes: 62 additions & 21 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
if (!tnode.intermediate_output_tuple_id_list.empty()) {
DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
// common subexpression elimination
DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
_intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size());
for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
_intermediate_output_row_descriptor.push_back(
RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
}
}

_query_statistics = std::make_shared<QueryStatistics>();
}

Expand All @@ -114,7 +126,15 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
}

if (!tnode.intermediate_projections_list.empty()) {
DCHECK(tnode.__isset.projections) << "no final projections";
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
for (const auto& tnode_projections : tnode.intermediate_projections_list) {
vectorized::VExprContextSPtrs projections;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
_intermediate_projections.push_back(projections);
}
}
return Status::OK();
}

Expand Down Expand Up @@ -143,7 +163,12 @@ Status ExecNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));

for (auto& i : _children) {
RETURN_IF_ERROR(i->prepare(state));
Expand All @@ -155,6 +180,9 @@ Status ExecNode::alloc_resource(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
return Status::OK();
}
Expand Down Expand Up @@ -514,6 +542,22 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
const size_t rows = origin_block->rows();
if (rows == 0) {
return Status::OK();
}
vectorized::Block input_block = *origin_block;

std::vector<int> result_column_ids;
for (auto& projections : _intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
input_block.shuffle_columns(result_column_ids);
}

DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
Expand All @@ -535,29 +579,26 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
auto rows = origin_block->rows();

if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
auto& mutable_columns = mutable_block.mutable_columns();

if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}
if (mutable_columns.size() != _projections.size()) {
return Status::InternalError(
"Logical error during processing {}, output of projections {} mismatches with "
"exec node output {}",
this->get_name(), _projections.size(), mutable_columns.size());
}

for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id));
auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));

return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,26 @@ class ExecNode {
return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
}
virtual const RowDescriptor& intermediate_row_desc() const { return _row_descriptor; }

// input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr
// prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor

[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
if (idx == 0) {
return intermediate_row_desc();
}
DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
return _intermediate_output_row_descriptor[idx - 1];
}

[[nodiscard]] const RowDescriptor& projections_row_desc() const {
if (_intermediate_output_row_descriptor.empty()) {
return intermediate_row_desc();
} else {
return _intermediate_output_row_descriptor.back();
}
}

int64_t rows_returned() const { return _num_rows_returned; }
int64_t limit() const { return _limit; }
bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; }
Expand Down Expand Up @@ -270,6 +290,10 @@ class ExecNode {
std::unique_ptr<RowDescriptor> _output_row_descriptor;
vectorized::VExprContextSPtrs _projections;

std::vector<RowDescriptor> _intermediate_output_row_descriptor;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;

/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;

Expand Down
52 changes: 46 additions & 6 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <string>

#include "common/logging.h"
#include "common/status.h"
#include "exec/exec_node.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "pipeline/exec/analytic_sink_operator.h"
Expand Down Expand Up @@ -123,19 +125,32 @@ Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) {
}

// create the projections expr

if (tnode.__isset.projections) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
}
if (!tnode.intermediate_projections_list.empty()) {
DCHECK(tnode.__isset.projections) << "no final projections";
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
for (const auto& tnode_projections : tnode.intermediate_projections_list) {
vectorized::VExprContextSPtrs projections;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
_intermediate_projections.push_back(projections);
}
}
return Status::OK();
}

Status OperatorXBase::prepare(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));

if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->prepare(state));
Expand All @@ -149,6 +164,9 @@ Status OperatorXBase::open(RuntimeState* state) {
RETURN_IF_ERROR(conjunct->open(state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
if (_child_x && !is_source()) {
RETURN_IF_ERROR(_child_x->open(state));
}
Expand All @@ -175,7 +193,22 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
auto* local_state = state->get_local_state(operator_id());
SCOPED_TIMER(local_state->exec_time_counter());
SCOPED_TIMER(local_state->_projection_timer);
const size_t rows = origin_block->rows();
if (rows == 0) {
return Status::OK();
}
vectorized::Block input_block = *origin_block;

std::vector<int> result_column_ids;
for (const auto& projections : _intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
input_block.shuffle_columns(result_column_ids);
}

DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
Expand All @@ -198,15 +231,13 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
auto rows = origin_block->rows();

if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
DCHECK(mutable_columns.size() == local_state->_projections.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, &result_column_id));
auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
Expand Down Expand Up @@ -365,6 +396,15 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
for (size_t i = 0; i < _projections.size(); i++) {
RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i]));
}
_intermediate_projections.resize(_parent->_intermediate_projections.size());
for (int i = 0; i < _parent->_intermediate_projections.size(); i++) {
_intermediate_projections[i].resize(_parent->_intermediate_projections[i].size());
for (int j = 0; j < _parent->_intermediate_projections[i].size(); j++) {
RETURN_IF_ERROR(_parent->_intermediate_projections[i][j]->clone(
state, _intermediate_projections[i][j]));
}
}

_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1);
_blocks_returned_counter =
Expand Down
42 changes: 42 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class PipelineXLocalStateBase {
RuntimeState* _state = nullptr;
vectorized::VExprContextSPtrs _conjuncts;
vectorized::VExprContextSPtrs _projections;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;

bool _closed = false;
vectorized::Block _origin_block;
};
Expand All @@ -155,6 +158,22 @@ class OperatorXBase : public OperatorBase {
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
}
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
if (!tnode.intermediate_output_tuple_id_list.empty()) {
DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
// common subexpression elimination
DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
_intermediate_output_row_descriptor.reserve(
tnode.intermediate_output_tuple_id_list.size());
for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
_intermediate_output_row_descriptor.push_back(
RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
}
}
}

OperatorXBase(ObjectPool* pool, int node_id, int operator_id)
Expand Down Expand Up @@ -247,6 +266,25 @@ class OperatorXBase : public OperatorBase {
return _row_descriptor;
}

// input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr
// prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor

[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
if (idx == 0) {
return intermediate_row_desc();
}
DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
return _intermediate_output_row_descriptor[idx - 1];
}

[[nodiscard]] const RowDescriptor& projections_row_desc() const {
if (_intermediate_output_row_descriptor.empty()) {
return intermediate_row_desc();
} else {
return _intermediate_output_row_descriptor.back();
}
}

[[nodiscard]] std::string debug_string() const override { return ""; }

virtual std::string debug_string(int indentation_level = 0) const;
Expand Down Expand Up @@ -318,6 +356,10 @@ class OperatorXBase : public OperatorBase {
std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
vectorized::VExprContextSPtrs _projections;

std::vector<RowDescriptor> _intermediate_output_row_descriptor;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;

/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;

Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/core/block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,15 @@ void Block::swap(Block&& other) noexcept {
row_same_bit = std::move(other.row_same_bit);
}

void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
Container tmp_data;
tmp_data.reserve(result_column_ids.size());
for (const int result_column_id : result_column_ids) {
tmp_data.push_back(data[result_column_id]);
}
swap(Block {tmp_data});
}

void Block::update_hash(SipHash& hash) const {
for (size_t row_no = 0, num_rows = rows(); row_no < num_rows; ++row_no) {
for (const auto& col : data) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/core/block.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ class Block {
void swap(Block& other) noexcept;
void swap(Block&& other) noexcept;

// Shuffle columns in place based on the result_column_ids
void shuffle_columns(const std::vector<int>& result_column_ids);

// Default column size = -1 means clear all column in block
// Else clear column [0, column_size) delete column [column_size, data.size)
void clear_column_data(int column_size = -1) noexcept;
Expand Down
Loading

0 comments on commit b42285f

Please sign in to comment.