Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](java-udtf) support java-udtf #33595

Merged
merged 9 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions be/src/pipeline/exec/table_function_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ Status TableFunctionLocalState::open(RuntimeState* state) {
for (size_t i = 0; i < _vfn_ctxs.size(); i++) {
RETURN_IF_ERROR(p._vfn_ctxs[i]->clone(state, _vfn_ctxs[i]));

const std::string& tf_name = _vfn_ctxs[i]->root()->fn().name.function_name;
vectorized::TableFunction* fn = nullptr;
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, state->obj_pool(), &fn));
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(_vfn_ctxs[i]->root()->fn(),
state->obj_pool(), &fn));
fn->set_expr_context(_vfn_ctxs[i]);
_fns.push_back(fn);
}

for (auto* fn : _fns) {
RETURN_IF_ERROR(fn->open());
}
_cur_child_offset = -1;
return Status::OK();
}
Expand Down Expand Up @@ -138,6 +140,7 @@ bool TableFunctionLocalState::_roll_table_functions(int last_eos_idx) {
bool TableFunctionLocalState::_is_inner_and_empty() {
for (int i = 0; i < _parent->cast<TableFunctionOperatorX>()._fn_num; i++) {
// if any table function is not outer and has empty result, go to next child row
// if it's outer function, will be insert into one row NULL
if (!_fns[i]->is_outer() && _fns[i]->current_empty()) {
return true;
}
Expand Down Expand Up @@ -269,9 +272,8 @@ Status TableFunctionOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
_vfn_ctxs.push_back(ctx);

auto root = ctx->root();
const std::string& tf_name = root->fn().name.function_name;
vectorized::TableFunction* fn = nullptr;
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(tf_name, _pool, &fn));
RETURN_IF_ERROR(vectorized::TableFunctionFactory::get_fn(root->fn(), _pool, &fn));
fn->set_expr_context(ctx);
_fns.push_back(fn);
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/pipeline/exec/table_function_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class TableFunctionLocalState final : public PipelineXLocalState<> {
~TableFunctionLocalState() override = default;

Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override {
zhangstar333 marked this conversation as resolved.
Show resolved Hide resolved
zhangstar333 marked this conversation as resolved.
Show resolved Hide resolved
for (auto* fn : _fns) {
RETURN_IF_ERROR(fn->close());
}
RETURN_IF_ERROR(PipelineXLocalState<>::close(state));
return Status::OK();
}
void process_next_child_row();
Status get_expanded_block(RuntimeState* state, vectorized::Block* output_block, bool* eos);

Expand All @@ -74,7 +81,7 @@ class TableFunctionLocalState final : public PipelineXLocalState<> {

std::vector<vectorized::TableFunction*> _fns;
vectorized::VExprContextSPtrs _vfn_ctxs;
int64_t _cur_child_offset = 0;
int64_t _cur_child_offset = -1;
std::unique_ptr<vectorized::Block> _child_block;
int _current_row_insert_times = 0;
bool _child_eos = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/columns/column_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void ColumnArray::insert(const Field& x) {
}

void ColumnArray::insert_from(const IColumn& src_, size_t n) {
DCHECK(n < src_.size());
DCHECK_LT(n, src_.size());
const ColumnArray& src = assert_cast<const ColumnArray&>(src_);
size_t size = src.size_at(n);
size_t offset = src.offset_at(n);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vtable_function_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ Status VTableFunctionNode::init(const TPlanNode& tnode, RuntimeState* state) {
_vfn_ctxs.push_back(ctx);

auto root = ctx->root();
const std::string& tf_name = root->fn().name.function_name;
TableFunction* fn = nullptr;
RETURN_IF_ERROR(TableFunctionFactory::get_fn(tf_name, _pool, &fn));
RETURN_IF_ERROR(TableFunctionFactory::get_fn(root->fn(), _pool, &fn));
fn->set_expr_context(ctx);
_fns.push_back(fn);
}
Expand Down Expand Up @@ -93,6 +92,7 @@ Status VTableFunctionNode::_prepare_output_slot_ids(const TPlanNode& tnode) {
bool VTableFunctionNode::_is_inner_and_empty() {
for (int i = 0; i < _fn_num; i++) {
// if any table function is not outer and has empty result, go to next child row
// if it's outer function, will be insert into NULL
if (!_fns[i]->is_outer() && _fns[i]->current_empty()) {
return true;
}
Expand Down
11 changes: 9 additions & 2 deletions be/src/vec/exec/vtable_function_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ class VTableFunctionNode final : public ExecNode {
Status alloc_resource(RuntimeState* state) override {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
return VExpr::open(_vfn_ctxs, state);
RETURN_IF_ERROR(VExpr::open(_vfn_ctxs, state));
for (auto* fn : _fns) {
RETURN_IF_ERROR(fn->open());
}
return Status::OK();
}
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
bool need_more_input_data() const { return !_child_block->rows() && !_child_eos; }
Expand All @@ -67,6 +71,9 @@ class VTableFunctionNode final : public ExecNode {
if (_num_rows_filtered_counter != nullptr) {
COUNTER_SET(_num_rows_filtered_counter, static_cast<int64_t>(_num_rows_filtered));
}
for (auto* fn : _fns) {
static_cast<void>(fn->close());
}
ExecNode::release_resource(state);
}

Expand Down Expand Up @@ -145,7 +152,7 @@ class VTableFunctionNode final : public ExecNode {
std::shared_ptr<Block> _child_block;
std::vector<SlotDescriptor*> _child_slots;
std::vector<SlotDescriptor*> _output_slots;
int64_t _cur_child_offset = 0;
int64_t _cur_child_offset = -1;

VExprContextSPtrs _vfn_ctxs;

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exprs/table_function/table_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TableFunction {
virtual int get_value(MutableColumnPtr& column, int max_step) {
max_step = std::max(1, std::min(max_step, (int)(_cur_size - _cur_offset)));
int i = 0;
// TODO: this for loop maybe could refactor, and call once get_value function, it's could insert into max_step value once
for (; i < max_step && !eos(); i++) {
get_value(column);
forward();
Expand Down
35 changes: 23 additions & 12 deletions be/src/vec/exprs/table_function/table_function_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

#include "vec/exprs/table_function/table_function_factory.h"

#include <gen_cpp/Types_types.h>

#include <string_view>
#include <utility>

#include "common/object_pool.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/exprs/table_function/udf_table_function.h"
#include "vec/exprs/table_function/vexplode.h"
#include "vec/exprs/table_function/vexplode_bitmap.h"
#include "vec/exprs/table_function/vexplode_json_array.h"
Expand Down Expand Up @@ -65,23 +69,30 @@ const std::unordered_map<std::string, std::function<std::unique_ptr<TableFunctio
{"explode_map", TableFunctionCreator<VExplodeMapTableFunction> {}},
{"explode", TableFunctionCreator<VExplodeTableFunction> {}}};

Status TableFunctionFactory::get_fn(const std::string& fn_name_raw, ObjectPool* pool,
TableFunction** fn) {
bool is_outer = match_suffix(fn_name_raw, COMBINATOR_SUFFIX_OUTER);
std::string fn_name_real =
is_outer ? remove_suffix(fn_name_raw, COMBINATOR_SUFFIX_OUTER) : fn_name_raw;

auto fn_iterator = _function_map.find(fn_name_real);
if (fn_iterator != _function_map.end()) {
*fn = pool->add(fn_iterator->second().release());
Status TableFunctionFactory::get_fn(const TFunction& t_fn, ObjectPool* pool, TableFunction** fn) {
zhangstar333 marked this conversation as resolved.
Show resolved Hide resolved
bool is_outer = match_suffix(t_fn.name.function_name, COMBINATOR_SUFFIX_OUTER);
if (t_fn.binary_type == TFunctionBinaryType::JAVA_UDF) {
*fn = pool->add(UDFTableFunction::create_unique(t_fn).release());
if (is_outer) {
(*fn)->set_outer();
}

return Status::OK();
}
} else {
const std::string& fn_name_raw = t_fn.name.function_name;
const std::string& fn_name_real =
is_outer ? remove_suffix(fn_name_raw, COMBINATOR_SUFFIX_OUTER) : fn_name_raw;

return Status::NotSupported("Table function {} is not support", fn_name_raw);
auto fn_iterator = _function_map.find(fn_name_real);
if (fn_iterator != _function_map.end()) {
*fn = pool->add(fn_iterator->second().release());
if (is_outer) {
(*fn)->set_outer();
}

return Status::OK();
}
}
return Status::NotSupported("Table function {} is not support", t_fn.name.function_name);
}

} // namespace doris::vectorized
4 changes: 3 additions & 1 deletion be/src/vec/exprs/table_function/table_function_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include <gen_cpp/Types_types.h>
zhangstar333 marked this conversation as resolved.
Show resolved Hide resolved

#include <functional>
#include <memory>
#include <string>
Expand All @@ -33,7 +35,7 @@ class TableFunction;
class TableFunctionFactory {
public:
TableFunctionFactory() = delete;
static Status get_fn(const std::string& fn_name_raw, ObjectPool* pool, TableFunction** fn);
static Status get_fn(const TFunction& t_fn, ObjectPool* pool, TableFunction** fn);

const static std::unordered_map<std::string, std::function<std::unique_ptr<TableFunction>()>>
_function_map;
Expand Down
Loading
Loading