From 56efa5470655a059192b4ac06ff9730663080a6d Mon Sep 17 00:00:00 2001 From: TCeason <33082201+TCeason@users.noreply.github.com> Date: Thu, 19 Sep 2024 09:48:12 +0800 Subject: [PATCH] feat(query): Procedure Part2 support arguments (#16453) * feat(query): Procedure Part2 support arguments * fix conversation * refactor var_to_ast with parse_expr --- Cargo.lock | 1 + src/meta/app/Cargo.toml | 1 + src/meta/app/src/principal/procedure.rs | 12 +- .../src/procedure_from_to_protobuf_impl.rs | 2 + src/meta/proto-conv/src/util.rs | 3 +- src/meta/proto-conv/tests/it/main.rs | 1 + .../proto-conv/tests/it/v108_procedure.rs | 9 +- .../tests/it/v109_procedure_with_args.rs | 44 +++ src/meta/protos/proto/procedure.proto | 1 + src/query/ast/src/ast/statements/procedure.rs | 11 +- src/query/ast/src/parser/statement.rs | 6 +- src/query/ast/tests/it/parser.rs | 1 + src/query/ast/tests/it/testdata/stmt.txt | 45 +++ src/query/script/src/executor.rs | 4 +- src/query/script/tests/it/main.rs | 2 +- .../interpreters/access/privilege_access.rs | 1 + .../interpreter_execute_immediate.rs | 312 +----------------- .../src/interpreters/interpreter_factory.rs | 5 + .../interpreter_procedure_call.rs | 134 ++++++++ src/query/service/src/interpreters/mod.rs | 1 + src/query/service/src/interpreters/util.rs | 123 +++++++ src/query/sql/src/planner/binder/binder.rs | 4 +- .../sql/src/planner/binder/ddl/procedure.rs | 65 +++- .../sql/src/planner/format/display_plan.rs | 1 + .../sql/src/planner/plans/ddl/procedure.rs | 15 + src/query/sql/src/planner/plans/plan.rs | 3 + .../base/15_procedure/15_0002_procedure.test | 23 +- 27 files changed, 481 insertions(+), 349 deletions(-) create mode 100644 src/meta/proto-conv/tests/it/v109_procedure_with_args.rs create mode 100644 src/query/service/src/interpreters/interpreter_procedure_call.rs diff --git a/Cargo.lock b/Cargo.lock index ce483f0ec8f9..7a6a243e7bf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3756,6 +3756,7 @@ dependencies = [ "chrono-tz 0.8.6", "cron", "databend-common-ast", + "databend-common-base", "databend-common-building", "databend-common-exception", "databend-common-expression", diff --git a/src/meta/app/Cargo.toml b/src/meta/app/Cargo.toml index c23211545cc4..d88de76ebe0b 100644 --- a/src/meta/app/Cargo.toml +++ b/src/meta/app/Cargo.toml @@ -19,6 +19,7 @@ chrono = { workspace = true } chrono-tz = { workspace = true } cron = "0.12.0" databend-common-ast = { workspace = true } +databend-common-base = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } databend-common-io = { workspace = true } diff --git a/src/meta/app/src/principal/procedure.rs b/src/meta/app/src/principal/procedure.rs index 003d3a01dcad..e6d693edb017 100644 --- a/src/meta/app/src/principal/procedure.rs +++ b/src/meta/app/src/principal/procedure.rs @@ -19,6 +19,7 @@ use std::ops::Deref; use chrono::DateTime; use chrono::Utc; +use databend_common_base::display::display_slice::DisplaySliceExt; use databend_common_expression::types::DataType; use crate::principal::procedure_id_ident::ProcedureIdIdent; @@ -45,6 +46,7 @@ pub struct ProcedureIdent { #[derive(Clone, Debug, PartialEq)] pub struct ProcedureMeta { pub return_types: Vec, + pub arg_names: Vec, pub created_on: DateTime, pub updated_on: DateTime, pub script: String, @@ -56,6 +58,7 @@ impl Default for ProcedureMeta { fn default() -> Self { ProcedureMeta { return_types: vec![], + arg_names: vec![], created_on: Utc::now(), updated_on: Utc::now(), script: "".to_string(), @@ -69,8 +72,13 @@ impl Display for ProcedureMeta { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!( f, - "Lanuage: {:?}, return_type: {:?}, CreatedOn: {:?}, Script: {:?}, Comment: {:?}", - self.procedure_language, self.return_types, self.created_on, self.script, self.comment + "Lanuage: {:?}, args {} return_type: {}, CreatedOn: {:?}, Script: {}, Comment: {:?}", + self.procedure_language, + self.arg_names.display_n::<1000>(), + self.return_types.display_n::<1000>(), + self.created_on, + self.script, + self.comment ) } } diff --git a/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs index 4554585a0988..75955c1d2248 100644 --- a/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs @@ -70,6 +70,7 @@ impl FromToProto for mt::principal::ProcedureMeta { let v = Self { return_types, + arg_names: p.arg_names.clone(), created_on: DateTime::::from_pb(p.created_on)?, updated_on: DateTime::::from_pb(p.updated_on)?, script: p.script, @@ -94,6 +95,7 @@ impl FromToProto for mt::principal::ProcedureMeta { ver: VER, min_reader_ver: MIN_READER_VER, return_types, + arg_names: self.arg_names.clone(), created_on: self.created_on.to_pb()?, updated_on: self.updated_on.to_pb()?, script: self.script.to_string(), diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index ca7380806287..d685d83b22ba 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -137,7 +137,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (105, "2024-08-05: Add: add Dictionary meta"), (106, "2024-08-08: Add: add QueryTokenInfo"), (107, "2024-08-09: Add: datatype.proto/DataType Geography type"), - (108, "2024-08-29: Add: procedure.proto: ProcedureMeta and ProcedureIdentity") + (108, "2024-08-29: Add: procedure.proto: ProcedureMeta and ProcedureIdentity"), + (109, "2024-08-29: Refactor: ProcedureMeta add arg_names"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 0ef441368036..5d705b6aa305 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -106,3 +106,4 @@ mod v105_dictionary_meta; mod v106_query_token; mod v107_geography_datatype; mod v108_procedure; +mod v109_procedure_with_args; diff --git a/src/meta/proto-conv/tests/it/v108_procedure.rs b/src/meta/proto-conv/tests/it/v108_procedure.rs index 442397c9146a..f5ea50f002d0 100644 --- a/src/meta/proto-conv/tests/it/v108_procedure.rs +++ b/src/meta/proto-conv/tests/it/v108_procedure.rs @@ -22,15 +22,16 @@ use crate::common; #[test] fn v108_procedure_meta() -> anyhow::Result<()> { - let procedure_meta_v108: Vec = vec![ - 34, 9, 146, 2, 0, 160, 6, 108, 168, 6, 24, 82, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, + let procedure_meta_v108 = vec![ + 34, 9, 146, 2, 0, 160, 6, 109, 168, 6, 24, 82, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 90, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 57, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 98, 7, 102, 111, 111, 32, 98, - 97, 114, 114, 3, 83, 81, 76, 160, 6, 108, 168, 6, 24, + 97, 114, 114, 3, 83, 81, 76, 160, 6, 109, 168, 6, 24, ]; let want = || mt::ProcedureMeta { return_types: vec![DataType::String], + arg_names: vec![], created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 9).unwrap(), script: "".to_string(), @@ -39,7 +40,7 @@ fn v108_procedure_meta() -> anyhow::Result<()> { }; common::test_pb_from_to(func_name!(), want())?; - common::test_load_old(func_name!(), procedure_meta_v108.as_slice(), 108, want()) + common::test_load_old(func_name!(), procedure_meta_v108.as_slice(), 109, want()) } #[test] diff --git a/src/meta/proto-conv/tests/it/v109_procedure_with_args.rs b/src/meta/proto-conv/tests/it/v109_procedure_with_args.rs new file mode 100644 index 000000000000..513e922cfa1b --- /dev/null +++ b/src/meta/proto-conv/tests/it/v109_procedure_with_args.rs @@ -0,0 +1,44 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::TimeZone; +use chrono::Utc; +use databend_common_expression::types::DataType; +use databend_common_meta_app::principal as mt; +use fastrace::func_name; + +use crate::common; + +#[test] +fn v109_procedure_meta() -> anyhow::Result<()> { + let procedure_meta_v109 = vec![ + 34, 9, 146, 2, 0, 160, 6, 109, 168, 6, 24, 42, 7, 109, 101, 115, 115, 97, 103, 101, 82, 23, + 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, + 90, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 57, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, + 84, 67, 98, 7, 102, 111, 111, 32, 98, 97, 114, 114, 3, 83, 81, 76, 160, 6, 109, 168, 6, 24, + ]; + + let want = || mt::ProcedureMeta { + return_types: vec![DataType::String], + arg_names: vec!["message".to_string()], + created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), + updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 9).unwrap(), + script: "".to_string(), + comment: "foo bar".to_string(), + procedure_language: "SQL".to_string(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), procedure_meta_v109.as_slice(), 109, want()) +} diff --git a/src/meta/protos/proto/procedure.proto b/src/meta/protos/proto/procedure.proto index 6824c88d18c9..35f6145ab253 100644 --- a/src/meta/protos/proto/procedure.proto +++ b/src/meta/protos/proto/procedure.proto @@ -37,6 +37,7 @@ message ProcedureMeta { // Procedure return type repeated DataType return_types = 4; + repeated string arg_names = 5; // The time database created. string created_on = 10; diff --git a/src/query/ast/src/ast/statements/procedure.rs b/src/query/ast/src/ast/statements/procedure.rs index 1ebf8e82ab7a..336649120ed4 100644 --- a/src/query/ast/src/ast/statements/procedure.rs +++ b/src/query/ast/src/ast/statements/procedure.rs @@ -19,8 +19,8 @@ use derive_visitor::Drive; use derive_visitor::DriveMut; use crate::ast::write_comma_separated_list; -use crate::ast::write_comma_separated_string_list; use crate::ast::CreateOption; +use crate::ast::Expr; use crate::ast::TypeName; #[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] @@ -169,16 +169,17 @@ impl Display for DescProcedureStmt { } } -#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct CallProcedureStmt { pub name: String, - pub args: Vec, + pub args: Vec, } impl Display for CallProcedureStmt { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "CALL PROCEDURE {}(", self.name)?; - write_comma_separated_string_list(f, self.args.clone())?; + let CallProcedureStmt { name, args } = self; + write!(f, "CALL PROCEDURE {}(", name)?; + write_comma_separated_list(f, args)?; write!(f, ")")?; Ok(()) } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index b4f9c5bf61ce..428ef7ecf766 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -2125,12 +2125,12 @@ pub fn statement_body(i: Input) -> IResult { let call_procedure = map( rule! { - CALL ~ PROCEDURE ~ #ident ~ "(" ~ ")" + CALL ~ PROCEDURE ~ #ident ~ "(" ~ #comma_separated_list0(subexpr(0))? ~ ")" }, - |(_, _, name, _, _)| { + |(_, _, name, _, opt_args, _)| { Statement::CallProcedure(CallProcedureStmt { name: name.to_string(), - args: vec![], + args: opt_args.unwrap_or_default(), }) }, ); diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index c0be807a150b..005a68fb413e 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -831,6 +831,7 @@ fn test_statement() { r#"drop PROCEDURE p1()"#, r#"drop PROCEDURE p1(int, string)"#, r#"call PROCEDURE p1()"#, + r#"call PROCEDURE p1(1, 'x', '2022-02-02'::Date)"#, r#"show PROCEDURES like 'p1%'"#, r#"create PROCEDURE p1() returns string not null language sql comment = 'test' as $$ BEGIN diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 52e504ccf9ea..25e029ec6d15 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -23066,6 +23066,51 @@ CallProcedure( ) +---------- Input ---------- +call PROCEDURE p1(1, 'x', '2022-02-02'::Date) +---------- Output --------- +CALL PROCEDURE p1(1, 'x', '2022-02-02'::DATE) +---------- AST ------------ +CallProcedure( + CallProcedureStmt { + name: "p1", + args: [ + Literal { + span: Some( + 18..19, + ), + value: UInt64( + 1, + ), + }, + Literal { + span: Some( + 21..24, + ), + value: String( + "x", + ), + }, + Cast { + span: Some( + 38..44, + ), + expr: Literal { + span: Some( + 26..38, + ), + value: String( + "2022-02-02", + ), + }, + target_type: Date, + pg_style: true, + }, + ], + }, +) + + ---------- Input ---------- show PROCEDURES like 'p1%' ---------- Output --------- diff --git a/src/query/script/src/executor.rs b/src/query/script/src/executor.rs index 2041fec37a46..a9ee96cf1fe1 100644 --- a/src/query/script/src/executor.rs +++ b/src/query/script/src/executor.rs @@ -35,7 +35,7 @@ pub trait Client { fn var_to_ast(&self, scalar: &Self::Var) -> Result; fn read_from_set(&self, block: &Self::Set, row: usize, col: &ColumnAccess) -> Result; - fn set_len(&self, block: &Self::Set) -> usize; + fn num_rows(&self, block: &Self::Set) -> usize; fn is_true(&self, scalar: &Self::Var) -> Result; } @@ -132,7 +132,7 @@ impl Executor { let cursor = Cursor { set: set.clone(), row: 0, - len: self.client.set_len(block), + len: self.client.num_rows(block), }; self.iters.insert(to_iter.clone(), cursor); } diff --git a/src/query/script/tests/it/main.rs b/src/query/script/tests/it/main.rs index af781da277f2..1489dde9bf16 100644 --- a/src/query/script/tests/it/main.rs +++ b/src/query/script/tests/it/main.rs @@ -689,7 +689,7 @@ impl Client for MockClient { Ok(var) } - fn set_len(&self, set: &Self::Set) -> usize { + fn num_rows(&self, set: &Self::Set) -> usize { set.data.len() } diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 37013d5a0e70..e0577cea1669 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -1260,6 +1260,7 @@ impl AccessChecker for PrivilegeAccess { Plan::DescDatamaskPolicy(_) => {} Plan::Begin => {} Plan::ExecuteImmediate(_) + | Plan::CallProcedure(_) | Plan::CreateProcedure(_) | Plan::DropProcedure(_) /*| Plan::ShowCreateProcedure(_) diff --git a/src/query/service/src/interpreters/interpreter_execute_immediate.rs b/src/query/service/src/interpreters/interpreter_execute_immediate.rs index 3b3ce89f83b8..94799c782d09 100644 --- a/src/query/service/src/interpreters/interpreter_execute_immediate.rs +++ b/src/query/service/src/interpreters/interpreter_execute_immediate.rs @@ -15,42 +15,24 @@ use std::sync::Arc; use databend_common_ast::ast::DeclareItem; -use databend_common_ast::ast::Expr; -use databend_common_ast::ast::FunctionCall; -use databend_common_ast::ast::Identifier; -use databend_common_ast::ast::Literal; use databend_common_ast::ast::ScriptStatement; -use databend_common_ast::ast::TypeName; use databend_common_ast::parser::run_parser; use databend_common_ast::parser::script::script_block; use databend_common_ast::parser::tokenize_sql; use databend_common_ast::parser::ParseMode; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::block_debug::box_render; -use databend_common_expression::types::decimal::DecimalScalar; -use databend_common_expression::types::decimal::MAX_DECIMAL256_PRECISION; -use databend_common_expression::types::NumberScalar; use databend_common_expression::types::StringType; -use databend_common_expression::with_integer_mapped_type; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchemaRef; use databend_common_expression::FromData; -use databend_common_expression::Scalar; use databend_common_script::compile; -use databend_common_script::ir::ColumnAccess; -use databend_common_script::Client; use databend_common_script::Executor; use databend_common_script::ReturnValue; use databend_common_sql::plans::ExecuteImmediatePlan; -use databend_common_sql::Planner; use databend_common_storages_fuse::TableContext; -use futures::TryStreamExt; -use itertools::Itertools; -use serde_json::Value as JsonValue; +use crate::interpreters::util::ScriptClient; use crate::interpreters::Interpreter; -use crate::interpreters::InterpreterFactory; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -139,295 +121,3 @@ impl Interpreter for ExecuteImmediateInterpreter { res.map_err(|err| err.display_with_sql(&self.plan.script)) } } - -#[derive(Debug, Clone)] -struct QueryResult { - schema: DataSchemaRef, - block: DataBlock, -} - -struct ScriptClient { - ctx: Arc, -} - -impl Client for ScriptClient { - type Var = Scalar; - type Set = QueryResult; - - async fn query(&self, query: &str) -> Result { - let ctx = self - .ctx - .get_current_session() - .create_query_context() - .await?; - - let mut planner = Planner::new(ctx.clone()); - let (plan, _) = planner.plan_sql(query).await?; - let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; - let stream = interpreter.execute(ctx.clone()).await?; - let blocks = stream.try_collect::>().await?; - let schema = plan.schema(); - - let block = match blocks.len() { - 0 => DataBlock::empty_with_schema(schema.clone()), - 1 => blocks[0].clone(), - _ => DataBlock::concat(&blocks)?, - }; - - Ok(QueryResult { schema, block }) - } - - fn var_to_ast(&self, scalar: &Self::Var) -> Result { - let ast = match scalar { - Scalar::Number(v) => with_integer_mapped_type!(|NUM_TYPE| match v { - NumberScalar::NUM_TYPE(v) => Expr::Literal { - span: None, - value: Literal::Decimal256 { - value: (*v).into(), - precision: MAX_DECIMAL256_PRECISION, - scale: 0, - }, - }, - NumberScalar::Float32(v) => Expr::Literal { - span: None, - value: Literal::Float64(v.into_inner() as f64), - }, - NumberScalar::Float64(v) => Expr::Literal { - span: None, - value: Literal::Float64(v.into_inner()), - }, - }), - Scalar::Boolean(v) => Expr::Literal { - span: None, - value: Literal::Boolean(*v), - }, - Scalar::String(v) => Expr::Literal { - span: None, - value: Literal::String(v.clone()), - }, - Scalar::Tuple(v) => Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "tuple"), - args: v - .iter() - .map(|x| self.var_to_ast(&x.to_owned())) - .collect::>>()?, - params: vec![], - window: None, - lambda: None, - }, - }, - Scalar::Array(v) => Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "array"), - args: v - .iter() - .map(|x| self.var_to_ast(&x.to_owned())) - .collect::>>()?, - params: vec![], - window: None, - lambda: None, - }, - }, - Scalar::Decimal(DecimalScalar::Decimal128(v, size)) => Expr::Literal { - span: None, - value: Literal::Decimal256 { - value: (*v).into(), - precision: size.precision, - scale: size.scale, - }, - }, - Scalar::Decimal(DecimalScalar::Decimal256(v, size)) => Expr::Literal { - span: None, - value: Literal::Decimal256 { - value: *v, - precision: size.precision, - scale: size.scale, - }, - }, - Scalar::Map(v) => { - let col = v.as_tuple().unwrap(); - let keys = col[0] - .iter() - .map(|x| self.var_to_ast(&x.to_owned())) - .collect::>>()?; - let vals = col[1] - .iter() - .map(|x| self.var_to_ast(&x.to_owned())) - .collect::>>()?; - Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "map"), - args: vec![ - Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "array"), - args: keys, - params: vec![], - window: None, - lambda: None, - }, - }, - Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "array"), - args: vals, - params: vec![], - window: None, - lambda: None, - }, - }, - ], - params: vec![], - window: None, - lambda: None, - }, - } - } - Scalar::Variant(v) => { - let value = jsonb::from_slice(v).unwrap(); - let json = JsonValue::from(value).to_string(); - Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "parse_json"), - args: vec![Expr::Literal { - span: None, - value: Literal::String(json), - }], - params: vec![], - window: None, - lambda: None, - }, - } - } - Scalar::EmptyArray => Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "array"), - args: vec![], - params: vec![], - window: None, - lambda: None, - }, - }, - Scalar::EmptyMap => Expr::FunctionCall { - span: None, - func: FunctionCall { - distinct: false, - name: Identifier::from_name(None, "map"), - args: vec![], - params: vec![], - window: None, - lambda: None, - }, - }, - Scalar::Date(v) => Expr::Cast { - span: None, - expr: Box::new(Expr::Literal { - span: None, - value: Literal::Decimal256 { - value: (*v).into(), - precision: MAX_DECIMAL256_PRECISION, - scale: 0, - }, - }), - target_type: TypeName::Date, - pg_style: false, - }, - Scalar::Timestamp(v) => Expr::Cast { - span: None, - expr: Box::new(Expr::Literal { - span: None, - value: Literal::Decimal256 { - value: (*v).into(), - precision: MAX_DECIMAL256_PRECISION, - scale: 0, - }, - }), - target_type: TypeName::Timestamp, - pg_style: false, - }, - Scalar::Null => Expr::Literal { - span: None, - value: Literal::Null, - }, - Scalar::Bitmap(_) | Scalar::Binary(_) | Scalar::Geometry(_) | Scalar::Geography(_) => { - return Err(ErrorCode::Unimplemented(format!( - "variable of type {} is not supported yet", - scalar.as_ref().infer_data_type() - ))); - } - }; - - Ok(ast) - } - - fn read_from_set(&self, set: &Self::Set, row: usize, col: &ColumnAccess) -> Result { - let offset = match col { - ColumnAccess::Position(offset) => *offset, - // TODO(andylokandy): name resolution - ColumnAccess::Name(name) => set - .schema - .fields() - .iter() - .position(|f| f.name() == name) - .ok_or_else(|| { - ErrorCode::ScriptExecutionError(format!( - "cannot find column with name {} in block, available columns: {}", - name, - set.schema - .fields() - .iter() - .map(|f| format!("'{}'", f.name())) - .join(", ") - )) - })?, - }; - let col = set.block.columns().get(offset).ok_or_else(|| { - ErrorCode::ScriptExecutionError(format!( - "cannot read column at offset {} from block with {} columns", - offset, - set.block.num_columns() - )) - })?; - let cell = col - .value - .index(row) - .ok_or_else(|| { - ErrorCode::ScriptExecutionError(format!( - "cannot read value at row {} from column with {} rows", - row, - set.block.num_rows(), - )) - })? - .to_owned(); - - Ok(cell) - } - - fn set_len(&self, set: &Self::Set) -> usize { - set.block.num_rows() - } - - fn is_true(&self, scalar: &Self::Var) -> Result { - match scalar { - Scalar::Boolean(v) => Ok(*v), - _ => Err(ErrorCode::ScriptExecutionError(format!( - "`is_true` called on non-boolean value {scalar}", - ))), - } - } -} diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 7a082574f04e..d90edac923a9 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -51,6 +51,7 @@ use crate::interpreters::interpreter_notification_create::CreateNotificationInte use crate::interpreters::interpreter_notification_desc::DescNotificationInterpreter; use crate::interpreters::interpreter_notification_drop::DropNotificationInterpreter; use crate::interpreters::interpreter_presign::PresignInterpreter; +use crate::interpreters::interpreter_procedure_call::CallProcedureInterpreter; use crate::interpreters::interpreter_procedure_create::CreateProcedureInterpreter; use crate::interpreters::interpreter_procedure_drop::DropProcedureInterpreter; use crate::interpreters::interpreter_role_show::ShowRolesInterpreter; @@ -596,6 +597,10 @@ impl InterpreterFactory { ctx, *p.clone(), )?)), + Plan::CallProcedure(p) => Ok(Arc::new(CallProcedureInterpreter::try_create( + ctx, + *p.clone(), + )?)), // Plan::ShowCreateProcedure(_) => {} // // Plan::RenameProcedure(p) => Ok(Arc::new(RenameProcedureInterpreter::try_create( diff --git a/src/query/service/src/interpreters/interpreter_procedure_call.rs b/src/query/service/src/interpreters/interpreter_procedure_call.rs new file mode 100644 index 000000000000..8e264910c8eb --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_procedure_call.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_ast::ast::DeclareItem; +use databend_common_ast::ast::DeclareVar; +use databend_common_ast::ast::Identifier; +use databend_common_ast::ast::ScriptStatement; +use databend_common_ast::parser::run_parser; +use databend_common_ast::parser::script::script_block; +use databend_common_ast::parser::tokenize_sql; +use databend_common_ast::parser::ParseMode; +use databend_common_exception::Result; +use databend_common_expression::block_debug::box_render; +use databend_common_expression::types::StringType; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_script::compile; +use databend_common_script::Executor; +use databend_common_script::ReturnValue; +use databend_common_sql::plans::CallProcedurePlan; +use databend_common_storages_fuse::TableContext; + +use crate::interpreters::util::ScriptClient; +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; + +#[derive(Debug)] +pub struct CallProcedureInterpreter { + ctx: Arc, + plan: CallProcedurePlan, +} + +impl CallProcedureInterpreter { + pub fn try_create(ctx: Arc, plan: CallProcedurePlan) -> Result { + Ok(CallProcedureInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for CallProcedureInterpreter { + fn name(&self) -> &str { + "ProcedureCall" + } + + fn is_ddl(&self) -> bool { + false + } + + #[fastrace::trace] + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let res: Result<_> = try { + let mut src = vec![]; + for (arg, arg_name) in self.plan.args.iter().zip(self.plan.arg_names.iter()) { + src.push(ScriptStatement::LetVar { + declare: DeclareVar { + span: None, + name: Identifier::from_name(None, arg_name), + default: arg.clone(), + }, + }); + } + let settings = self.ctx.get_settings(); + let sql_dialect = settings.get_sql_dialect()?; + let tokens = tokenize_sql(&self.plan.script)?; + let mut ast = run_parser( + &tokens, + sql_dialect, + ParseMode::Template, + false, + script_block, + )?; + + for declare in ast.declares { + match declare { + DeclareItem::Var(declare) => src.push(ScriptStatement::LetVar { declare }), + DeclareItem::Set(declare) => { + src.push(ScriptStatement::LetStatement { declare }) + } + } + } + src.append(&mut ast.body); + let compiled = compile(&src)?; + + let client = ScriptClient { + ctx: self.ctx.clone(), + }; + let mut executor = Executor::load(ast.span, client, compiled); + let script_max_steps = settings.get_script_max_steps()?; + let result = executor.run(script_max_steps as usize).await?; + + match result { + Some(ReturnValue::Var(scalar)) => { + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ + StringType::from_data(vec![scalar.to_string()]), + ])])? + } + Some(ReturnValue::Set(set)) => { + let rendered_table = box_render( + &set.schema, + &[set.block.clone()], + usize::MAX, + usize::MAX, + usize::MAX, + true, + )?; + let lines = rendered_table.lines().map(|x| x.to_string()).collect(); + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ + StringType::from_data(lines), + ])])? + } + None => PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ + StringType::from_data(Vec::::new()), + ])])?, + } + }; + + res.map_err(|err| err.display_with_sql(&self.plan.script)) + } +} diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 1c2b738d48e0..d46cc641b8a5 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -72,6 +72,7 @@ mod interpreter_password_policy_drop; mod interpreter_presign; mod interpreter_privilege_grant; mod interpreter_privilege_revoke; +mod interpreter_procedure_call; mod interpreter_procedure_create; mod interpreter_procedure_drop; mod interpreter_replace; diff --git a/src/query/service/src/interpreters/util.rs b/src/query/service/src/interpreters/util.rs index 4a094d24554c..26b7e6789a57 100644 --- a/src/query/service/src/interpreters/util.rs +++ b/src/query/service/src/interpreters/util.rs @@ -12,9 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + +use databend_common_ast::ast::Expr; +use databend_common_ast::parser::parse_expr; +use databend_common_ast::parser::tokenize_sql; +use databend_common_ast::parser::Dialect; +use databend_common_exception::ErrorCode; use databend_common_expression::ComputedExpr; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; +use databend_common_script::ir::ColumnAccess; +use databend_common_script::Client; +use databend_common_sql::Planner; +use futures_util::TryStreamExt; +use itertools::Itertools; + +use crate::interpreters::InterpreterFactory; +use crate::sessions::QueryContext; #[allow(clippy::type_complexity)] pub fn generate_desc_schema( @@ -61,3 +78,109 @@ pub fn generate_desc_schema( } (names, types, nulls, default_exprs, extras) } + +#[derive(Debug, Clone)] +pub struct QueryResult { + pub(crate) schema: DataSchemaRef, + pub(crate) block: DataBlock, +} + +pub struct ScriptClient { + pub(crate) ctx: Arc, +} + +impl Client for ScriptClient { + type Var = Scalar; + type Set = QueryResult; + + async fn query(&self, query: &str) -> databend_common_exception::Result { + let ctx = self + .ctx + .get_current_session() + .create_query_context() + .await?; + + let mut planner = Planner::new(ctx.clone()); + let (plan, _) = planner.plan_sql(query).await?; + let interpreter = InterpreterFactory::get(ctx.clone(), &plan).await?; + let stream = interpreter.execute(ctx.clone()).await?; + let blocks = stream.try_collect::>().await?; + let schema = plan.schema(); + + let block = match blocks.len() { + 0 => DataBlock::empty_with_schema(schema.clone()), + 1 => blocks[0].clone(), + _ => DataBlock::concat(&blocks)?, + }; + + Ok(QueryResult { schema, block }) + } + + fn var_to_ast(&self, scalar: &Self::Var) -> databend_common_exception::Result { + let scalar = scalar.to_string(); + let ast = parse_expr(&tokenize_sql(&scalar)?, Dialect::PostgreSQL)?; + + Ok(ast) + } + + fn read_from_set( + &self, + set: &Self::Set, + row: usize, + col: &ColumnAccess, + ) -> databend_common_exception::Result { + let offset = match col { + ColumnAccess::Position(offset) => *offset, + // TODO(andylokandy): name resolution + ColumnAccess::Name(name) => set + .schema + .fields() + .iter() + .position(|f| f.name() == name) + .ok_or_else(|| { + ErrorCode::ScriptExecutionError(format!( + "cannot find column with name {} in block, available columns: {}", + name, + set.schema + .fields() + .iter() + .map(|f| format!("'{}'", f.name())) + .join(", ") + )) + })?, + }; + let col = set.block.columns().get(offset).ok_or_else(|| { + ErrorCode::ScriptExecutionError(format!( + "cannot read column at offset {} from block with {} columns", + offset, + set.block.num_columns() + )) + })?; + let cell = col + .value + .index(row) + .ok_or_else(|| { + ErrorCode::ScriptExecutionError(format!( + "cannot read value at row {} from column with {} rows", + row, + set.block.num_rows(), + )) + })? + .to_owned(); + + Ok(cell) + } + + fn num_rows(&self, set: &Self::Set) -> usize { + set.block.num_rows() + } + + fn is_true(&self, scalar: &Self::Var) -> databend_common_exception::Result { + match scalar { + Scalar::Boolean(v) => Ok(*v), + _ => Err(ErrorCode::ScriptExecutionError(format!( + "`is_true` called on non-boolean value {scalar}", + ))), + } + } +} diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index b1c3cb070b9a..fc80aa40f37c 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -617,9 +617,9 @@ impl<'a> Binder { } } Statement::CallProcedure(stmt) => { if self.ctx.get_settings().get_enable_experimental_procedure()? { - self.bind_call_procedure(stmt).await? + self.bind_call_procedure(bind_context, stmt).await? } else { - return Err(ErrorCode::SyntaxException("DESC PROCEDURE, set enable_experimental_procedure=1")); + return Err(ErrorCode::SyntaxException("CALL PROCEDURE, set enable_experimental_procedure=1")); } } }; diff --git a/src/query/sql/src/planner/binder/ddl/procedure.rs b/src/query/sql/src/planner/binder/ddl/procedure.rs index fc5c5be5d4c6..69e32716ae04 100644 --- a/src/query/sql/src/planner/binder/ddl/procedure.rs +++ b/src/query/sql/src/planner/binder/ddl/procedure.rs @@ -30,14 +30,18 @@ use databend_common_meta_app::principal::ProcedureNameIdent; use databend_common_users::UserApiProvider; use crate::binder::show::get_show_options; +use crate::plans::CallProcedurePlan; use crate::plans::CreateProcedurePlan; use crate::plans::DropProcedurePlan; use crate::plans::ExecuteImmediatePlan; use crate::plans::Plan; use crate::plans::RewriteKind; +use crate::plans::SubqueryType; use crate::resolve_type_name; use crate::BindContext; use crate::Binder; +use crate::ScalarExpr; +use crate::TypeChecker; impl Binder { #[async_backtrace::framed] @@ -56,7 +60,7 @@ impl Binder { create_option, name, language, - args: _args, + args, return_type, comment, script, @@ -67,7 +71,7 @@ impl Binder { // 1. need parser name: ProcedureNameIdent = name + args // 2. need check script's return type and stmt.return_type - let meta = self.procedure_meta(return_type, script, comment, language)?; + let meta = self.procedure_meta(return_type, script, comment, language, args)?; Ok(Plan::CreateProcedure(Box::new(CreateProcedurePlan { create_option: create_option.clone().into(), tenant: tenant.to_owned(), @@ -107,22 +111,55 @@ impl Binder { .await } - pub async fn bind_call_procedure(&mut self, stmt: &CallProcedureStmt) -> Result { - let CallProcedureStmt { name, args } = stmt; + pub async fn bind_call_procedure( + &mut self, + bind_context: &mut BindContext, + stmt: &CallProcedureStmt, + ) -> Result { + let CallProcedureStmt { + name, + args: arguments, + } = stmt; let tenant = self.ctx.get_tenant(); - // TODO: ProcedureNameIdent = name + args_type. Need to get type in here. + let mut type_checker = TypeChecker::try_create( + bind_context, + self.ctx.clone(), + &self.name_resolution_ctx, + self.metadata.clone(), + &[], + true, + )?; + let mut arg_types = vec![]; + for argument in arguments { + let box (arg, mut arg_type) = type_checker.resolve(argument)?; + if let ScalarExpr::SubqueryExpr(subquery) = &arg { + if subquery.typ == SubqueryType::Scalar && !arg.data_type()?.is_nullable() { + arg_type = arg_type.wrap_nullable(); + } + } + arg_types.push(arg_type.to_string()); + } let req = GetProcedureReq { inner: ProcedureNameIdent::new( tenant.clone(), - ProcedureIdentity::new(name, args.join(",")), + ProcedureIdentity::new(name, arg_types.join(",")), ), }; + let procedure = UserApiProvider::instance() .get_procedure(&tenant, req) .await?; - Ok(Plan::ExecuteImmediate(Box::new(ExecuteImmediatePlan { - script: procedure.procedure_meta.script, - }))) + if arg_types.is_empty() { + Ok(Plan::ExecuteImmediate(Box::new(ExecuteImmediatePlan { + script: procedure.procedure_meta.script, + }))) + } else { + Ok(Plan::CallProcedure(Box::new(CallProcedurePlan { + script: procedure.procedure_meta.script, + arg_names: procedure.procedure_meta.arg_names, + args: arguments.clone(), + }))) + } } fn procedure_meta( @@ -131,7 +168,16 @@ impl Binder { script: &str, comment: &Option, language: &ProcedureLanguage, + args: &Option>, ) -> Result { + let mut arg_names = vec![]; + if let Some(args) = args { + for arg in args { + if let Some(name) = &arg.name { + arg_names.push(name.to_string()); + } + } + } let mut return_types = Vec::with_capacity(return_type.len()); for arg_type in return_type { return_types.push(DataType::from(&resolve_type_name( @@ -142,6 +188,7 @@ impl Binder { Ok(ProcedureMeta { return_types, + arg_names, created_on: Utc::now(), updated_on: Utc::now(), script: script.to_string(), diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index 801cde7a8a55..4f180e6b2959 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -194,6 +194,7 @@ impl Plan { Plan::ExecuteImmediate(_) => Ok("ExecuteImmediate".to_string()), Plan::CreateProcedure(_) => Ok("CreateProcedure".to_string()), Plan::DropProcedure(_) => Ok("DropProcedure".to_string()), + Plan::CallProcedure(_) => Ok("CallProcedure".to_string()), // Plan::ShowCreateProcedure(_) => Ok("ShowCreateProcedure".to_string()), // Plan::RenameProcedure(_) => Ok("ProcedureDatabase".to_string()), diff --git a/src/query/sql/src/planner/plans/ddl/procedure.rs b/src/query/sql/src/planner/plans/ddl/procedure.rs index b8ab6e4de2d3..ca38ad6e3d94 100644 --- a/src/query/sql/src/planner/plans/ddl/procedure.rs +++ b/src/query/sql/src/planner/plans/ddl/procedure.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_ast::ast::Expr; use databend_common_expression::types::DataType; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; @@ -86,3 +87,17 @@ impl From<&DropProcedurePlan> for DropProcedureReq { } } } + +#[derive(Clone, Debug, PartialEq)] +pub struct CallProcedurePlan { + pub script: String, + pub arg_names: Vec, + pub args: Vec, +} + +impl CallProcedurePlan { + pub fn schema(&self) -> DataSchemaRef { + // TODO: schema generated by plan.arg_names + DataSchemaRefExt::create(vec![DataField::new("Result", DataType::String)]) + } +} diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 8655d7f66d62..f3b25ff99d15 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -41,6 +41,7 @@ use crate::plans::AlterUserPlan; use crate::plans::AlterViewPlan; use crate::plans::AlterVirtualColumnPlan; use crate::plans::AnalyzeTablePlan; +use crate::plans::CallProcedurePlan; use crate::plans::CopyIntoTableMode; use crate::plans::CopyIntoTablePlan; use crate::plans::CreateCatalogPlan; @@ -355,6 +356,7 @@ pub enum Plan { // ShowCreateProcedure(Box), DropProcedure(Box), CreateProcedure(Box), + CallProcedure(Box), // RenameProcedure(Box), // sequence @@ -478,6 +480,7 @@ impl Plan { Plan::DescConnection(plan) => plan.schema(), Plan::ShowConnections(plan) => plan.schema(), Plan::ExecuteImmediate(plan) => plan.schema(), + Plan::CallProcedure(plan) => plan.schema(), Plan::InsertMultiTable(plan) => plan.schema(), _ => Arc::new(DataSchema::empty()), diff --git a/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test index fc7a94df4f62..8bc52f539ab9 100644 --- a/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test +++ b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test @@ -19,12 +19,9 @@ call procedure p1(); ---- 2 - statement ok -CREATE PROCEDURE p1(a int, b int) RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ +CREATE PROCEDURE p1(x UInt8, sum UInt8) RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ BEGIN - LET x := -1; - LET sum := 0; FOR x IN x TO x + 3 DO sum := sum + x; END FOR; @@ -33,10 +30,8 @@ END; $$; statement error 3131 -CREATE PROCEDURE p1(a int, b int) RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ +CREATE PROCEDURE p1(x UInt8, sum UInt8) RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ BEGIN - LET x := 0; - LET sum := 0; FOR x IN x TO x + 3 DO sum := sum + x; END FOR; @@ -49,17 +44,27 @@ call procedure p1(); ---- 2 +query T +call procedure p1(0, 0); +---- +6 + +query T +call procedure p1(1,10); +---- +20 + query T select name, arguments from system.procedures where name = 'p1'; ---- p1 p1() RETURN (Int32) -p1 p1(Int32,Int32) RETURN (Int32) +p1 p1(UInt8,UInt8) RETURN (Int32) statement ok drop procedure p1(); statement ok -drop procedure p1(int, int); +drop procedure p1(UInt8, UInt8); query T select count(name) from system.procedures