Skip to content

Commit

Permalink
[feature](java-udtf) support java-udtf (apache#33595)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed May 11, 2024
1 parent e9f5388 commit 72e9edd
Show file tree
Hide file tree
Showing 45 changed files with 3,250 additions and 30 deletions.
12 changes: 7 additions & 5 deletions be/src/pipeline/exec/table_function_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ Status TableFunctionLocalState::open(RuntimeState* state) {
for (size_t i = 0; i < _vfn_ctxs.size(); i++) {
RETURN_IF_ERROR(p._vfn_ctxs[i]->clone(state, _vfn_ctxs[i]));

const std::string& tf_name = _vfn_ctxs[i]->root()->fn().name.function_name;
vectorized::TableFunction* fn = nullptr;
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, state->obj_pool(), &fn));
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(_vfn_ctxs[i]->root()->fn(),
state->obj_pool(), &fn));
fn->set_expr_context(_vfn_ctxs[i]);
_fns.push_back(fn);
}

for (auto* fn : _fns) {
RETURN_IF_ERROR(fn->open());
}
_cur_child_offset = -1;
return Status::OK();
}
Expand Down Expand Up @@ -138,6 +140,7 @@ bool TableFunctionLocalState::_roll_table_functions(int last_eos_idx) {
bool TableFunctionLocalState::_is_inner_and_empty() {
for (int i = 0; i < _parent->cast<TableFunctionOperatorX>()._fn_num; i++) {
// if any table function is not outer and has empty result, go to next child row
// if it's outer function, will be insert into one row NULL
if (!_fns[i]->is_outer() && _fns[i]->current_empty()) {
return true;
}
Expand Down Expand Up @@ -269,9 +272,8 @@ Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
_vfn_ctxs.push_back(ctx);

auto root = ctx->root();
const std::string& tf_name = root->fn().name.function_name;
vectorized::TableFunction* fn = nullptr;
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, _pool, &fn));
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(root->fn(), _pool, &fn));
fn->set_expr_context(ctx);
_fns.push_back(fn);
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/pipeline/exec/table_function_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class TableFunctionLocalState final : public PipelineXLocalState<> {
~TableFunctionLocalState() override = default;

Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override {
for (auto* fn : _fns) {
RETURN_IF_ERROR(fn->close());
}
RETURN_IF_ERROR(PipelineXLocalState<>::close(state));
return Status::OK();
}
void process_next_child_row();
Status get_expanded_block(RuntimeState* state, vectorized::Block* output_block, bool* eos);

Expand All @@ -74,7 +81,7 @@ class TableFunctionLocalState final : public PipelineXLocalState<> {

std::vector<vectorized::TableFunction*> _fns;
vectorized::VExprContextSPtrs _vfn_ctxs;
int64_t _cur_child_offset = 0;
int64_t _cur_child_offset = -1;
std::unique_ptr<vectorized::Block> _child_block;
int _current_row_insert_times = 0;
bool _child_eos = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ void ColumnArray::insert(const Field& x) {
}

void ColumnArray::insert_from(const IColumn& src_, size_t n) {
DCHECK(n < src_.size());
DCHECK_LT(n, src_.size());
const ColumnArray& src = assert_cast<const ColumnArray&>(src_);
size_t size = src.size_at(n);
size_t offset = src.offset_at(n);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vtable_function_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
_vfn_ctxs.push_back(ctx);

auto root = ctx->root();
const std::string& tf_name = root->fn().name.function_name;
TableFunction* fn = nullptr;
RETURN_IF_ERROR(TableFunctionFactory::get_fn(tf_name, _pool, &fn));
RETURN_IF_ERROR(TableFunctionFactory::get_fn(root->fn(), _pool, &fn));
fn->set_expr_context(ctx);
_fns.push_back(fn);
}
Expand Down Expand Up @@ -93,6 +92,7 @@ Status VTableFunctionNode::_prepare_output_slot_ids(const TPlanNode& tnode) {
bool VTableFunctionNode::_is_inner_and_empty() {
for (int i = 0; i < _fn_num; i++) {
// if any table function is not outer and has empty result, go to next child row
// if it's outer function, will be insert into NULL
if (!_fns[i]->is_outer() && _fns[i]->current_empty()) {
return true;
}
Expand Down
11 changes: 9 additions & 2 deletions be/src/vec/exec/vtable_function_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ class VTableFunctionNode final : public ExecNode {
Status alloc_resource(RuntimeState* state) override {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
return VExpr::open(_vfn_ctxs, state);
RETURN_IF_ERROR(VExpr::open(_vfn_ctxs, state));
for (auto* fn : _fns) {
RETURN_IF_ERROR(fn->open());
}
return Status::OK();
}
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
bool need_more_input_data() const { return !_child_block->rows() && !_child_eos; }
Expand All @@ -67,6 +71,9 @@ class VTableFunctionNode final : public ExecNode {
if (_num_rows_filtered_counter != nullptr) {
COUNTER_SET(_num_rows_filtered_counter, static_cast<int64_t>(_num_rows_filtered));
}
for (auto* fn : _fns) {
static_cast<void>(fn->close());
}
ExecNode::release_resource(state);
}

Expand Down Expand Up @@ -145,7 +152,7 @@ class VTableFunctionNode final : public ExecNode {
std::shared_ptr<Block> _child_block;
std::vector<SlotDescriptor*> _child_slots;
std::vector<SlotDescriptor*> _output_slots;
int64_t _cur_child_offset = 0;
int64_t _cur_child_offset = -1;

VExprContextSPtrs _vfn_ctxs;

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exprs/table_function/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TableFunction {
virtual int get_value(MutableColumnPtr& column, int max_step) {
max_step = std::max(1, std::min(max_step, (int)(_cur_size - _cur_offset)));
int i = 0;
// TODO: this for loop maybe could refactor, and call once get_value function, it's could insert into max_step value once
for (; i < max_step && !eos(); i++) {
get_value(column);
forward();
Expand Down
35 changes: 23 additions & 12 deletions be/src/vec/exprs/table_function/table_function_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

#include "vec/exprs/table_function/table_function_factory.h"

#include <gen_cpp/Types_types.h>

#include <string_view>
#include <utility>

#include "common/object_pool.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/exprs/table_function/udf_table_function.h"
#include "vec/exprs/table_function/vexplode.h"
#include "vec/exprs/table_function/vexplode_bitmap.h"
#include "vec/exprs/table_function/vexplode_json_array.h"
Expand Down Expand Up @@ -65,23 +69,30 @@ const std::unordered_map<std::string, std::function<std::unique_ptr<TableFunctio
{"explode_map", TableFunctionCreator<VExplodeMapTableFunction> {}},
{"explode", TableFunctionCreator<VExplodeTableFunction> {}}};

Status TableFunctionFactory::get_fn(const std::string& fn_name_raw, ObjectPool* pool,
TableFunction** fn) {
bool is_outer = match_suffix(fn_name_raw, COMBINATOR_SUFFIX_OUTER);
std::string fn_name_real =
is_outer ? remove_suffix(fn_name_raw, COMBINATOR_SUFFIX_OUTER) : fn_name_raw;

auto fn_iterator = _function_map.find(fn_name_real);
if (fn_iterator != _function_map.end()) {
*fn = pool->add(fn_iterator->second().release());
Status TableFunctionFactory::get_fn(const TFunction& t_fn, ObjectPool* pool, TableFunction** fn) {
bool is_outer = match_suffix(t_fn.name.function_name, COMBINATOR_SUFFIX_OUTER);
if (t_fn.binary_type == TFunctionBinaryType::JAVA_UDF) {
*fn = pool->add(UDFTableFunction::create_unique(t_fn).release());
if (is_outer) {
(*fn)->set_outer();
}

return Status::OK();
}
} else {
const std::string& fn_name_raw = t_fn.name.function_name;
const std::string& fn_name_real =
is_outer ? remove_suffix(fn_name_raw, COMBINATOR_SUFFIX_OUTER) : fn_name_raw;

return Status::NotSupported("Table function {} is not support", fn_name_raw);
auto fn_iterator = _function_map.find(fn_name_real);
if (fn_iterator != _function_map.end()) {
*fn = pool->add(fn_iterator->second().release());
if (is_outer) {
(*fn)->set_outer();
}

return Status::OK();
}
}
return Status::NotSupported("Table function {} is not support", t_fn.name.function_name);
}

} // namespace doris::vectorized
4 changes: 3 additions & 1 deletion be/src/vec/exprs/table_function/table_function_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/Types_types.h>

#include <functional>
#include <memory>
#include <string>
Expand All @@ -33,7 +35,7 @@ class TableFunction;
class TableFunctionFactory {
public:
TableFunctionFactory() = delete;
static Status get_fn(const std::string& fn_name_raw, ObjectPool* pool, TableFunction** fn);
static Status get_fn(const TFunction& t_fn, ObjectPool* pool, TableFunction** fn);

const static std::unordered_map<std::string, std::function<std::unique_ptr<TableFunction>()>>
_function_map;
Expand Down
Loading

0 comments on commit 72e9edd

Please sign in to comment.