Skip to content

Commit

Permalink
[Bug](expr) execute expr should use local states instead of operators
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Aug 30, 2024
1 parent 36ab3e0 commit 9681fb9
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 17 deletions.
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,8 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* blo
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
local_state.do_agg_limit(block, eos);
return Status::OK();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
}
COUNTER_SET(local_state.rows_returned_counter(), local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
return Status::OK();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/assert_num_rows_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class AssertNumRowsLocalState final : public PipelineXLocalState<FakeSharedState
AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<FakeSharedState>(state, parent) {}
~AssertNumRowsLocalState() = default;

private:
friend class AssertNumRowsOperatorX;
};

class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLocalState> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc
local_state._make_nullable_output_key(block);
if (!_is_streaming_preagg) {
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
}
local_state.add_num_rows_returned(block->rows());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<Mul
}

private:
friend class MultiCastDataStreamerSourceOperatorX;
vectorized::VExprContextSPtrs _output_expr_contexts;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;

Expand Down Expand Up @@ -95,8 +96,8 @@ class MultiCastDataStreamerSourceOperatorX final

if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts,
_conjuncts));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, _row_desc()));
conjuncts()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state, _row_desc()));
}
return Status::OK();
}
Expand All @@ -107,7 +108,7 @@ class MultiCastDataStreamerSourceOperatorX final
RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts, state));
}
if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
RETURN_IF_ERROR(vectorized::VExpr::open(conjuncts(), state));
}
return Status::OK();
}
Expand Down
15 changes: 8 additions & 7 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -753,17 +753,18 @@ class OperatorXBase : public OperatorBase {
TPlanNodeType::type _type;
ObjectPool* _pool = nullptr;
std::vector<TupleId> _tuple_ids;

private:
// The expr of operator set to private permissions, as cannot be executed concurrently,
// should use local state's expr.
vectorized::VExprContextSPtrs _conjuncts;

RowDescriptor _row_descriptor;

std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
vectorized::VExprContextSPtrs _projections;

std::vector<RowDescriptor> _intermediate_output_row_descriptor;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
protected:
RowDescriptor _row_descriptor;
std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
std::vector<RowDescriptor> _intermediate_output_row_descriptor;


/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/repeat_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* outp
}
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
}
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, output_block,
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
output_block->columns()));
*eos = _child_eos && _child_block.rows() == 0;
local_state.reached_limit(output_block, eos);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1285,8 +1285,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block,
block->columns()));
}
local_state.reached_limit(block, eos);

Expand Down
3 changes: 3 additions & 0 deletions regression-test/data/javaudf_p0/test_javaudf_string.out
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,6 @@ ab***fg7 ab***fg7
ab***fg8 ab***fg8
ab***fg9 ab***fg9

-- !select_5 --
0

13 changes: 13 additions & 0 deletions regression-test/suites/javaudf_p0/test_javaudf_string.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,22 @@ suite("test_javaudf_string") {
); """
exception "does not support type"
}
sql """DROP TABLE IF EXISTS tbl1"""
sql """create table tbl1(k1 int, k2 string) distributed by hash(k1) buckets 1 properties("replication_num" = "1");"""
sql """ insert into tbl1 values(1, "5");"""
Integer count = 0;
Integer maxCount = 20;
while (count < maxCount) {
sql """ insert into tbl1 select * from tbl1;"""
count++
sleep(100);
}
sql """ insert into tbl1 select random()%10000 * 10000, "5" from tbl1;"""
qt_select_5 """ select count(0) from (select k1, max(k2) as k2 from tbl1 group by k1)v where java_udf_string_test(k2, 0, 1) = "asd" """;
} finally {
try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);")
try_sql("DROP TABLE IF EXISTS ${tableName}")
try_sql("DROP TABLE IF EXISTS tbl1")
try_sql("DROP TABLE IF EXISTS test_javaudf_string_2")
}
}

0 comments on commit 9681fb9

Please sign in to comment.