diff --git a/optd-datafusion-bridge/src/into_optd.rs b/optd-datafusion-bridge/src/into_optd.rs index 0b8103fd..a49d9cc3 100644 --- a/optd-datafusion-bridge/src/into_optd.rs +++ b/optd-datafusion-bridge/src/into_optd.rs @@ -15,28 +15,6 @@ use optd_datafusion_repr::properties::schema::Schema as OPTDSchema; use crate::OptdPlanContext; -// flatten_nested_logical is a helper function to flatten nested logical operators with same op type -// eg. (a AND (b AND c)) => ExprList([a, b, c]) -// (a OR (b OR c)) => ExprList([a, b, c]) -// It assume the children of the input expr_list are already flattened -// and can only be used in bottom up manner -fn flatten_nested_logical(op: LogOpType, expr_list: ExprList) -> ExprList { - // conv_into_optd_expr is building the children bottom up so there is no need to - // call flatten_nested_logical recursively - let mut new_expr_list = Vec::new(); - for child in expr_list.to_vec() { - if let OptRelNodeTyp::LogOp(child_op) = child.typ() { - if child_op == op { - let child_log_op_expr = LogOpExpr::from_rel_node(child.into_rel_node()).unwrap(); - new_expr_list.extend(child_log_op_expr.children().to_vec()); - continue; - } - } - new_expr_list.push(child.clone()); - } - ExprList::new(new_expr_list) -} - impl OptdPlanContext<'_> { fn conv_into_optd_table_scan(&mut self, node: &logical_plan::TableScan) -> Result { let table_name = node.table_name.to_string(); @@ -73,14 +51,16 @@ impl OptdPlanContext<'_> { Operator::And => { let op = LogOpType::And; let expr_list = ExprList::new(vec![left, right]); - let expr_list = flatten_nested_logical(op, expr_list); - return Ok(LogOpExpr::new(op, expr_list).into_expr()); + return Ok( + LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr() + ); } Operator::Or => { let op = LogOpType::Or; let expr_list = ExprList::new(vec![left, right]); - let expr_list = flatten_nested_logical(op, expr_list); - return Ok(LogOpExpr::new(op, expr_list).into_expr()); + return Ok( + LogOpExpr::new_flattened_nested_logical(op, expr_list).into_expr() + ); } _ => {} } @@ -331,13 +311,8 @@ impl OptdPlanContext<'_> { } else { let expr_list = ExprList::new(log_ops); // the expr from filter is already flattened in conv_into_optd_expr - let expr_list = flatten_nested_logical(LogOpType::And, expr_list); - Ok(LogicalJoin::new( - left, - right, - LogOpExpr::new(LogOpType::And, expr_list).into_expr(), - join_type, - )) + let log_op = LogOpExpr::new_flattened_nested_logical(LogOpType::And, expr_list); + Ok(LogicalJoin::new(left, right, log_op.into_expr(), join_type)) } } diff --git a/optd-datafusion-repr/src/lib.rs b/optd-datafusion-repr/src/lib.rs index 131015e2..3a149bc3 100644 --- a/optd-datafusion-repr/src/lib.rs +++ b/optd-datafusion-repr/src/lib.rs @@ -23,8 +23,10 @@ use properties::{ }; use rules::{ EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule, EliminateFilterRule, - EliminateJoinRule, EliminateLimitRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, - PhysicalConversionRule, ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule, + EliminateJoinRule, EliminateLimitRule, FilterAggTransposeRule, FilterCrossJoinTransposeRule, + FilterInnerJoinTransposeRule, FilterMergeRule, FilterProjectTransposeRule, + FilterSortTransposeRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule, + ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule, }; pub use optd_core::rel_node::Value; @@ -34,6 +36,8 @@ mod explain; pub mod plan_nodes; pub mod properties; pub mod rules; +#[cfg(test)] +mod testing; pub struct DatafusionOptimizer { hueristic_optimizer: HeuristicsOptimizer, @@ -92,6 +96,23 @@ impl DatafusionOptimizer { for rule in rules { rule_wrappers.push(RuleWrapper::new_cascades(rule)); } + // add all filter pushdown rules as heuristic rules + rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new( + FilterProjectTransposeRule::new(), + ))); + rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(FilterMergeRule::new()))); + rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new( + FilterCrossJoinTransposeRule::new(), + ))); + rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new( + FilterInnerJoinTransposeRule::new(), + ))); + rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new( + FilterSortTransposeRule::new(), + ))); + rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new( + FilterAggTransposeRule::new(), + ))); rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(HashJoinRule::new()))); // 17 rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinCommuteRule::new()))); // 18 rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(JoinAssocRule::new()))); diff --git a/optd-datafusion-repr/src/plan_nodes.rs b/optd-datafusion-repr/src/plan_nodes.rs index e872b3a9..89a68c22 100644 --- a/optd-datafusion-repr/src/plan_nodes.rs +++ b/optd-datafusion-repr/src/plan_nodes.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::sync::Arc; use arrow_schema::DataType; +use itertools::Itertools; use optd_core::{ cascades::{CascadesOptimizer, GroupId}, rel_node::{RelNode, RelNodeMeta, RelNodeMetaMap, RelNodeRef, RelNodeTyp}, @@ -284,6 +285,56 @@ impl Expr { pub fn child(&self, idx: usize) -> OptRelNodeRef { self.0.child(idx) } + + /// Recursively rewrite all column references in the expression.using a provided + /// function that replaces a column index. + /// The provided function will, given a ColumnRefExpr's index, + /// return either Some(usize) or None. + /// - If it is Some, the column index can be rewritten with the value. + /// - If any of the columns is None, we will return None all the way up + /// the call stack, and no expression will be returned. + pub fn rewrite_column_refs( + &self, + rewrite_fn: &impl Fn(usize) -> Option, + ) -> Option { + assert!(self.typ().is_expression()); + if let OptRelNodeTyp::ColumnRef = self.typ() { + let col_ref = ColumnRefExpr::from_rel_node(self.0.clone()).unwrap(); + let rewritten = rewrite_fn(col_ref.index()); + return if let Some(rewritten_idx) = rewritten { + let new_col_ref = ColumnRefExpr::new(rewritten_idx); + Some(Self(new_col_ref.into_rel_node())) + } else { + None + }; + } + + let children = self.0.children.clone(); + let children = children + .into_iter() + .map(|child| { + if child.typ == OptRelNodeTyp::List { + // TODO: What should we do with List? + return Some(child); + } + Expr::from_rel_node(child.clone()) + .unwrap() + .rewrite_column_refs(rewrite_fn) + .map(|x| x.into_rel_node()) + }) + .collect::>>()?; + Some( + Expr::from_rel_node( + RelNode { + typ: self.0.typ.clone(), + children: children.into_iter().collect_vec(), + data: self.0.data.clone(), + } + .into(), + ) + .unwrap(), + ) + } } impl OptRelNode for Expr { diff --git a/optd-datafusion-repr/src/plan_nodes/expr.rs b/optd-datafusion-repr/src/plan_nodes/expr.rs index 4093388c..7099cb05 100644 --- a/optd-datafusion-repr/src/plan_nodes/expr.rs +++ b/optd-datafusion-repr/src/plan_nodes/expr.rs @@ -608,6 +608,29 @@ impl LogOpExpr { )) } + /// flatten_nested_logical is a helper function to flatten nested logical operators with same op type + /// eg. (a AND (b AND c)) => ExprList([a, b, c]) + /// (a OR (b OR c)) => ExprList([a, b, c]) + /// It assume the children of the input expr_list are already flattened + /// and can only be used in bottom up manner + pub fn new_flattened_nested_logical(op: LogOpType, expr_list: ExprList) -> Self { + // Since we assume that we are building the children bottom up, + // there is no need to call flatten_nested_logical recursively + let mut new_expr_list = Vec::new(); + for child in expr_list.to_vec() { + if let OptRelNodeTyp::LogOp(child_op) = child.typ() { + if child_op == op { + let child_log_op_expr = + LogOpExpr::from_rel_node(child.into_rel_node()).unwrap(); + new_expr_list.extend(child_log_op_expr.children().to_vec()); + continue; + } + } + new_expr_list.push(child.clone()); + } + LogOpExpr::new(op, ExprList::new(new_expr_list)) + } + pub fn children(&self) -> Vec { self.0 .0 diff --git a/optd-datafusion-repr/src/plan_nodes/join.rs b/optd-datafusion-repr/src/plan_nodes/join.rs index c2dfc710..28ed439b 100644 --- a/optd-datafusion-repr/src/plan_nodes/join.rs +++ b/optd-datafusion-repr/src/plan_nodes/join.rs @@ -62,3 +62,20 @@ define_plan_node!( { 3, right_keys: ExprList } ], { join_type: JoinType } ); + +impl LogicalJoin { + /// Takes in left/right schema sizes, and maps a column index to be as if it + /// were pushed down to the left or right side of a join accordingly. + pub fn map_through_join( + col_idx: usize, + left_schema_size: usize, + right_schema_size: usize, + ) -> usize { + assert!(col_idx < left_schema_size + right_schema_size); + if col_idx < left_schema_size { + col_idx + } else { + col_idx - left_schema_size + } + } +} diff --git a/optd-datafusion-repr/src/plan_nodes/projection.rs b/optd-datafusion-repr/src/plan_nodes/projection.rs index 3898733d..61168db0 100644 --- a/optd-datafusion-repr/src/plan_nodes/projection.rs +++ b/optd-datafusion-repr/src/plan_nodes/projection.rs @@ -1,7 +1,7 @@ use super::expr::ExprList; use super::macros::define_plan_node; -use super::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode}; +use super::{ColumnRefExpr, Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode}; #[derive(Clone, Debug)] pub struct LogicalProjection(pub PlanNode); @@ -26,3 +26,74 @@ define_plan_node!( { 1, exprs: ExprList } ] ); + +/// This struct holds the mapping from original columns to projected columns. +/// +/// # Example +/// With the following plan: +/// | Filter (#0 < 5) +/// | +/// |-| Projection [#2, #3] +/// |- Scan [#0, #1, #2, #3] +/// +/// The computed projection mapping is: +/// #2 -> #0 +/// #3 -> #1 +pub struct ProjectionMapping { + forward: Vec, + _backward: Vec>, +} + +impl ProjectionMapping { + pub fn build(mapping: Vec) -> Option { + let mut backward = vec![]; + for (i, &x) in mapping.iter().enumerate() { + if x >= backward.len() { + backward.resize(x + 1, None); + } + backward[x] = Some(i); + } + Some(Self { + forward: mapping, + _backward: backward, + }) + } + + pub fn projection_col_refers_to(&self, col: usize) -> usize { + self.forward[col] + } + + pub fn _original_col_maps_to(&self, col: usize) -> Option { + self._backward[col] + } + + /// Recursively rewrites all ColumnRefs in an Expr to *undo* the projection + /// condition. You might want to do this if you are pushing something + /// through a projection, or pulling a projection up. + /// + /// # Example + /// If we have a projection node, mapping column A to column B (A -> B) + /// All B's in `cond` will be rewritten as A. + pub fn rewrite_condition(&self, cond: Expr, child_schema_len: usize) -> Expr { + let proj_schema_size = self.forward.len(); + cond.rewrite_column_refs(&|idx| { + Some(if idx < proj_schema_size { + self.projection_col_refers_to(idx) + } else { + idx - proj_schema_size + child_schema_len + }) + }) + .unwrap() + } +} + +impl LogicalProjection { + pub fn compute_column_mapping(exprs: &ExprList) -> Option { + let mut mapping = vec![]; + for expr in exprs.to_vec() { + let col_expr = ColumnRefExpr::from_rel_node(expr.into_rel_node())?; + mapping.push(col_expr.index()); + } + ProjectionMapping::build(mapping) + } +} diff --git a/optd-datafusion-repr/src/rules.rs b/optd-datafusion-repr/src/rules.rs index aaebdcef..e02337ca 100644 --- a/optd-datafusion-repr/src/rules.rs +++ b/optd-datafusion-repr/src/rules.rs @@ -2,6 +2,7 @@ mod eliminate_duplicated_expr; mod eliminate_limit; mod filter; +mod filter_pushdown; mod joins; mod macros; mod physical; @@ -12,6 +13,10 @@ pub use eliminate_duplicated_expr::{ }; pub use eliminate_limit::EliminateLimitRule; pub use filter::{EliminateFilterRule, SimplifyFilterRule, SimplifyJoinCondRule}; +pub use filter_pushdown::{ + FilterAggTransposeRule, FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule, + FilterMergeRule, FilterProjectTransposeRule, FilterSortTransposeRule, +}; pub use joins::{ EliminateJoinRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, ProjectionPullUpJoin, }; diff --git a/optd-datafusion-repr/src/rules/filter_pushdown.rs b/optd-datafusion-repr/src/rules/filter_pushdown.rs new file mode 100644 index 00000000..882e8714 --- /dev/null +++ b/optd-datafusion-repr/src/rules/filter_pushdown.rs @@ -0,0 +1,816 @@ +//! This rule is designed to be applied heuristically (read: all the time, blindly). +//! However, pushing a filter is not *always* better (but it usually is). If cost is +//! to be taken into account, each transposition step can be done separately +//! (and are thus all in independent functions). +//! One can even implement each of these helper functions as their own transpose rule, +//! like Calcite does. +//! +//! At a high level, filter pushdown is responsible for pushing the filter node +//! further down the query plan whenever it is possible to do so. + +use std::collections::{HashMap, HashSet}; +use std::vec; + +use optd_core::rules::{Rule, RuleMatcher}; +use optd_core::{optimizer::Optimizer, rel_node::RelNode}; + +use crate::plan_nodes::{ + ColumnRefExpr, Expr, ExprList, JoinType, LogOpExpr, LogOpType, LogicalAgg, LogicalFilter, + LogicalJoin, LogicalProjection, LogicalSort, OptRelNode, OptRelNodeTyp, PlanNode, +}; +use crate::properties::schema::SchemaPropertyBuilder; + +use super::macros::define_rule; + +/// Emits a LogOpExpr AND if the list has more than one element +/// Otherwise, returns the single element +fn and_expr_list_to_expr(exprs: Vec) -> Expr { + if exprs.len() == 1 { + exprs.first().unwrap().clone() + } else { + LogOpExpr::new(LogOpType::And, ExprList::new(exprs)).into_expr() + } +} + +fn merge_conds(first: Expr, second: Expr) -> Expr { + let new_expr_list = ExprList::new(vec![first, second]); + // Flatten nested logical expressions if possible + LogOpExpr::new_flattened_nested_logical(LogOpType::And, new_expr_list).into_expr() +} + +#[derive(Debug, Clone, Copy)] +enum JoinCondDependency { + Left, + Right, + Both, + None, +} + +/// Given a list of expressions (presumably a flattened tree), determine +/// if the expression is dependent on the left child, the right child, both, +/// or neither, by analyzing which columnrefs are used in the expressions. +fn determine_join_cond_dep( + children: &Vec, + left_schema_size: usize, + right_schema_size: usize, +) -> JoinCondDependency { + let mut left_col = false; + let mut right_col = false; + for child in children { + if child.typ() == OptRelNodeTyp::ColumnRef { + let col_ref = ColumnRefExpr::from_rel_node(child.clone().into_rel_node()).unwrap(); + let index = col_ref.index(); + if index < left_schema_size { + left_col = true; + } else if index >= left_schema_size && index < left_schema_size + right_schema_size { + right_col = true; + } + } + } + match (left_col, right_col) { + (true, true) => JoinCondDependency::Both, + (true, false) => JoinCondDependency::Left, + (false, true) => JoinCondDependency::Right, + (false, false) => JoinCondDependency::None, + } +} + +/// This function recurses/loops to the bottom-level of the expression tree, +/// building a list of bottom-level exprs for each separable expr +/// +/// # Arguments +/// * `categorization_fn` - Function, called with a list of each bottom-level +/// expression, along with the top-level expression node that will be +/// categorized. +/// * `cond` - The top-level expression node to begin separating +fn categorize_conds(mut categorization_fn: impl FnMut(Expr, &Vec), cond: Expr) { + fn categorize_conds_helper(cond: Expr, bottom_level_children: &mut Vec) { + assert!(cond.typ().is_expression()); + match cond.typ() { + OptRelNodeTyp::ColumnRef | OptRelNodeTyp::Constant(_) => { + bottom_level_children.push(cond) + } + _ => { + for child in &cond.clone().into_rel_node().children { + if child.typ == OptRelNodeTyp::List { + // TODO: What should we do when we encounter a List? + continue; + } + categorize_conds_helper( + Expr::from_rel_node(child.clone()).unwrap(), + bottom_level_children, + ); + } + } + } + } + + let mut categorize_indep_expr = |cond: Expr| { + let bottom_level_children = &mut vec![]; + categorize_conds_helper(cond.clone(), bottom_level_children); + categorization_fn(cond, bottom_level_children); + }; + match cond.typ() { + OptRelNodeTyp::LogOp(LogOpType::And) => { + for child in &cond.into_rel_node().children { + categorize_indep_expr(Expr::from_rel_node(child.clone()).unwrap()); + } + } + _ => { + categorize_indep_expr(cond); + } + } +} + +define_rule!( + FilterProjectTransposeRule, + apply_filter_project_transpose, + (Filter, (Projection, child, [exprs]), [cond]) +); + +/// Datafusion only pushes filter past project when the project does not contain +/// volatile (i.e. non-deterministic) expressions that are present in the filter +/// Calcite only checks if the projection contains a windowing calculation +/// We check neither of those things and do it always (which may be wrong) +fn apply_filter_project_transpose( + optimizer: &impl Optimizer, + FilterProjectTransposeRulePicks { child, exprs, cond }: FilterProjectTransposeRulePicks, +) -> Vec> { + let child_schema_len = optimizer + .get_property::(child.clone().into(), 0) + .len(); + + let child = PlanNode::from_group(child.into()); + let cond_as_expr = Expr::from_rel_node(cond.into()).unwrap(); + let exprs = ExprList::from_rel_node(exprs.into()).unwrap(); + + let proj_col_map = LogicalProjection::compute_column_mapping(&exprs).unwrap(); + let rewritten_cond = proj_col_map.rewrite_condition(cond_as_expr.clone(), child_schema_len); + + let new_filter_node = LogicalFilter::new(child, rewritten_cond); + let new_proj = LogicalProjection::new(new_filter_node.into_plan_node(), exprs); + vec![new_proj.into_rel_node().as_ref().clone()] +} + +define_rule!( + FilterMergeRule, + apply_filter_merge, + (Filter, (Filter, child, [cond1]), [cond]) +); + +fn apply_filter_merge( + _optimizer: &impl Optimizer, + FilterMergeRulePicks { child, cond1, cond }: FilterMergeRulePicks, +) -> Vec> { + let child = PlanNode::from_group(child.into()); + let curr_cond = Expr::from_rel_node(cond.into()).unwrap(); + let child_cond = Expr::from_rel_node(cond1.into()).unwrap(); + + let merged_cond = merge_conds(curr_cond, child_cond); + + let new_filter = LogicalFilter::new(child, merged_cond); + vec![new_filter.into_rel_node().as_ref().clone()] +} + +// TODO: define_rule! should be able to match on any join type, ideally... +define_rule!( + FilterCrossJoinTransposeRule, + apply_filter_cross_join_transpose, + ( + Filter, + (Join(JoinType::Cross), child_a, child_b, [join_cond]), + [cond] + ) +); + +fn apply_filter_cross_join_transpose( + optimizer: &impl Optimizer, + FilterCrossJoinTransposeRulePicks { + child_a, + child_b, + join_cond, + cond, + }: FilterCrossJoinTransposeRulePicks, +) -> Vec> { + filter_join_transpose( + optimizer, + JoinType::Cross, + child_a, + child_b, + join_cond, + cond, + ) +} + +define_rule!( + FilterInnerJoinTransposeRule, + apply_filter_inner_join_transpose, + ( + Filter, + (Join(JoinType::Inner), child_a, child_b, [join_cond]), + [cond] + ) +); + +fn apply_filter_inner_join_transpose( + optimizer: &impl Optimizer, + FilterInnerJoinTransposeRulePicks { + child_a, + child_b, + join_cond, + cond, + }: FilterInnerJoinTransposeRulePicks, +) -> Vec> { + filter_join_transpose( + optimizer, + JoinType::Inner, + child_a, + child_b, + join_cond, + cond, + ) +} + +/// Cases: +/// - Push down to the left child (only involves keys from the left child) +/// - Push down to the right child (only involves keys from the right child) +/// - Push into the join condition (involves keys from both children) +/// We will consider each part of the conjunction separately, and push down +/// only the relevant parts. +fn filter_join_transpose( + optimizer: &impl Optimizer, + join_typ: JoinType, + join_child_a: RelNode, + join_child_b: RelNode, + join_cond: RelNode, + filter_cond: RelNode, +) -> Vec> { + let left_schema_size = optimizer + .get_property::(join_child_a.clone().into(), 0) + .len(); + let right_schema_size = optimizer + .get_property::(join_child_b.clone().into(), 0) + .len(); + + let join_child_a = PlanNode::from_group(join_child_a.into()); + let join_child_b = PlanNode::from_group(join_child_b.into()); + let join_cond = Expr::from_rel_node(join_cond.into()).unwrap(); + let filter_cond = Expr::from_rel_node(filter_cond.into()).unwrap(); + // TODO: Push existing join conditions down as well + + let mut left_conds = vec![]; + let mut right_conds = vec![]; + let mut join_conds = vec![]; + let mut keep_conds = vec![]; + + let categorization_fn = |expr: Expr, children: &Vec| { + let location = determine_join_cond_dep(children, left_schema_size, right_schema_size); + match location { + JoinCondDependency::Left => left_conds.push(expr), + JoinCondDependency::Right => right_conds.push( + expr.rewrite_column_refs(&|idx| { + Some(LogicalJoin::map_through_join( + idx, + left_schema_size, + right_schema_size, + )) + }) + .unwrap(), + ), + JoinCondDependency::Both => join_conds.push(expr), + JoinCondDependency::None => keep_conds.push(expr), + } + }; + categorize_conds(categorization_fn, filter_cond); + + let new_left = if !left_conds.is_empty() { + let new_filter_node = LogicalFilter::new(join_child_a, and_expr_list_to_expr(left_conds)); + new_filter_node.into_plan_node() + } else { + join_child_a + }; + + let new_right = if !right_conds.is_empty() { + let new_filter_node = LogicalFilter::new(join_child_b, and_expr_list_to_expr(right_conds)); + new_filter_node.into_plan_node() + } else { + join_child_b + }; + + let new_join = match join_typ { + JoinType::Inner => { + let old_cond = join_cond; + let new_conds = merge_conds(and_expr_list_to_expr(join_conds), old_cond); + LogicalJoin::new(new_left, new_right, new_conds, JoinType::Inner) + } + JoinType::Cross => { + if !join_conds.is_empty() { + LogicalJoin::new( + new_left, + new_right, + and_expr_list_to_expr(join_conds), + JoinType::Inner, + ) + } else { + LogicalJoin::new(new_left, new_right, join_cond, JoinType::Cross) + } + } + _ => { + // We don't support modifying the join condition for other join types yet + LogicalJoin::new(new_left, new_right, join_cond, join_typ) + } + }; + + let new_filter = if !keep_conds.is_empty() { + let new_filter_node = + LogicalFilter::new(new_join.into_plan_node(), and_expr_list_to_expr(keep_conds)); + new_filter_node.into_rel_node().as_ref().clone() + } else { + new_join.into_rel_node().as_ref().clone() + }; + + vec![new_filter] +} + +define_rule!( + FilterSortTransposeRule, + apply_filter_sort_transpose, + (Filter, (Sort, child, [exprs]), [cond]) +); + +/// Filter and sort should always be commutable. +fn apply_filter_sort_transpose( + _optimizer: &impl Optimizer, + FilterSortTransposeRulePicks { child, exprs, cond }: FilterSortTransposeRulePicks, +) -> Vec> { + let child = PlanNode::from_group(child.into()); + let exprs = ExprList::from_rel_node(exprs.into()).unwrap(); + + let cond_as_expr = Expr::from_rel_node(cond.into()).unwrap(); + let new_filter_node = LogicalFilter::new(child, cond_as_expr); + // Exprs should be the same, no projections have occurred here. + let new_sort = LogicalSort::new(new_filter_node.into_plan_node(), exprs); + vec![new_sort.into_rel_node().as_ref().clone()] +} + +define_rule!( + FilterAggTransposeRule, + apply_filter_agg_transpose, + (Filter, (Agg, child, [exprs], [groups]), [cond]) +); + +/// Filter is commutable past aggregations when the filter condition only +/// involves the group by columns. We will consider each part of the conjunction +/// separately, and push down only the relevant parts. +fn apply_filter_agg_transpose( + _optimizer: &impl Optimizer, + FilterAggTransposeRulePicks { + child, + exprs, + groups, + cond, + }: FilterAggTransposeRulePicks, +) -> Vec> { + let exprs = ExprList::from_rel_node(exprs.into()).unwrap(); + let groups = ExprList::from_rel_node(groups.into()).unwrap(); + let child = PlanNode::from_group(child.into()); + + // Get top-level group-by columns. Does not cover cases where group-by exprs + // are more complex than a top-level column reference. + let group_cols = groups + .clone() + .into_rel_node() + .children + .iter() + .filter_map(|expr| match expr.typ { + OptRelNodeTyp::ColumnRef => { + Some(ColumnRefExpr::from_rel_node(expr.clone()).unwrap().index()) + } + _ => None, + }) + .collect::>(); + + // Categorize predicates that only use our group-by columns as push-able. + let mut keep_conds = vec![]; + let mut push_conds = vec![]; + + let categorization_fn = |expr: Expr, children: &Vec| { + let mut group_by_cols_only = true; + for child in children { + if child.typ() == OptRelNodeTyp::ColumnRef { + let col_ref = ColumnRefExpr::from_rel_node(child.clone().into_rel_node()).unwrap(); + if !group_cols.contains(&col_ref.index()) { + group_by_cols_only = false; + break; + } + } + } + if group_by_cols_only { + push_conds.push(expr); + } else { + keep_conds.push(expr); + } + }; + categorize_conds(categorization_fn, Expr::from_rel_node(cond.into()).unwrap()); + + let new_child = if !push_conds.is_empty() { + LogicalFilter::new( + child, + LogOpExpr::new_flattened_nested_logical(LogOpType::And, ExprList::new(push_conds)) + .into_expr(), + ) + .into_plan_node() + } else { + child + }; + + let new_agg = LogicalAgg::new(new_child, exprs, groups); + + let new_filter = if !keep_conds.is_empty() { + LogicalFilter::new( + new_agg.into_plan_node(), + LogOpExpr::new_flattened_nested_logical(LogOpType::And, ExprList::new(keep_conds)) + .into_expr(), + ) + .into_rel_node() + .as_ref() + .clone() + } else { + new_agg.into_rel_node().as_ref().clone() + }; + + vec![new_filter] +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use optd_core::optimizer::Optimizer; + + use crate::{ + plan_nodes::{ + BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, ExprList, LogOpExpr, LogOpType, + LogicalAgg, LogicalFilter, LogicalJoin, LogicalProjection, LogicalScan, LogicalSort, + OptRelNode, OptRelNodeTyp, + }, + rules::{ + FilterAggTransposeRule, FilterInnerJoinTransposeRule, FilterMergeRule, + FilterProjectTransposeRule, FilterSortTransposeRule, + }, + testing::new_test_optimizer, + }; + + #[test] + fn push_past_sort() { + let mut test_optimizer = new_test_optimizer(Arc::new(FilterSortTransposeRule::new())); + + let scan = LogicalScan::new("customer".into()); + let sort = LogicalSort::new(scan.into_plan_node(), ExprList::new(vec![])); + + let filter_expr = BinOpExpr::new( + ColumnRefExpr::new(0).into_expr(), + ConstantExpr::int32(5).into_expr(), + BinOpType::Eq, + ) + .into_expr(); + + let filter = LogicalFilter::new(sort.into_plan_node(), filter_expr); + + let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap(); + + assert!(matches!(plan.typ, OptRelNodeTyp::Sort)); + assert!(matches!(plan.child(0).typ, OptRelNodeTyp::Filter)); + } + + #[test] + fn filter_merge() { + // TODO: write advanced proj with more expr that need to be transformed + let mut test_optimizer = new_test_optimizer(Arc::new(FilterMergeRule::new())); + + let scan = LogicalScan::new("customer".into()); + let filter_ch_expr = BinOpExpr::new( + ColumnRefExpr::new(0).into_expr(), + ConstantExpr::int32(1).into_expr(), + BinOpType::Eq, + ) + .into_expr(); + let filter_ch = LogicalFilter::new(scan.into_plan_node(), filter_ch_expr); + + let filter_expr = BinOpExpr::new( + ColumnRefExpr::new(1).into_expr(), + ConstantExpr::int32(6).into_expr(), + BinOpType::Eq, + ) + .into_expr(); + + let filter = LogicalFilter::new(filter_ch.into_plan_node(), filter_expr); + + let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap(); + + assert!(matches!(plan.typ, OptRelNodeTyp::Filter)); + let cond_log_op = LogOpExpr::from_rel_node( + LogicalFilter::from_rel_node(plan.clone()) + .unwrap() + .cond() + .into_rel_node(), + ) + .unwrap(); + assert!(matches!(cond_log_op.op_type(), LogOpType::And)); + + let cond_exprs = cond_log_op.children(); + assert_eq!(cond_exprs.len(), 2); + let expr_1 = BinOpExpr::from_rel_node(cond_exprs[0].clone().into_rel_node()).unwrap(); + let expr_2 = BinOpExpr::from_rel_node(cond_exprs[1].clone().into_rel_node()).unwrap(); + assert!(matches!(expr_1.op_type(), BinOpType::Eq)); + assert!(matches!(expr_2.op_type(), BinOpType::Eq)); + let col_1 = + ColumnRefExpr::from_rel_node(expr_1.left_child().clone().into_rel_node()).unwrap(); + let col_2 = + ConstantExpr::from_rel_node(expr_1.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_1.index(), 1); + assert_eq!(col_2.value().as_i32(), 6); + let col_3 = + ColumnRefExpr::from_rel_node(expr_2.left_child().clone().into_rel_node()).unwrap(); + let col_4 = + ConstantExpr::from_rel_node(expr_2.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_3.index(), 0); + assert_eq!(col_4.value().as_i32(), 1); + } + + #[test] + fn push_past_proj_basic() { + let mut test_optimizer = new_test_optimizer(Arc::new(FilterProjectTransposeRule::new())); + + let scan = LogicalScan::new("customer".into()); + let proj = LogicalProjection::new(scan.into_plan_node(), ExprList::new(vec![])); + + let filter_expr = BinOpExpr::new( + ColumnRefExpr::new(0).into_expr(), + ConstantExpr::int32(5).into_expr(), + BinOpType::Eq, + ) + .into_expr(); + + let filter = LogicalFilter::new(proj.into_plan_node(), filter_expr); + let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap(); + + assert_eq!(plan.typ, OptRelNodeTyp::Projection); + assert!(matches!(plan.child(0).typ, OptRelNodeTyp::Filter)); + } + + #[test] + fn push_past_proj_adv() { + let mut test_optimizer = new_test_optimizer(Arc::new(FilterProjectTransposeRule::new())); + + let scan = LogicalScan::new("customer".into()); + let proj = LogicalProjection::new( + scan.into_plan_node(), + ExprList::new(vec![ + ColumnRefExpr::new(0).into_expr(), + ColumnRefExpr::new(4).into_expr(), + ColumnRefExpr::new(5).into_expr(), + ColumnRefExpr::new(7).into_expr(), + ]), + ); + + let filter_expr = LogOpExpr::new( + LogOpType::And, + ExprList::new(vec![ + BinOpExpr::new( + // This one should be pushed to the left child + ColumnRefExpr::new(1).into_expr(), + ConstantExpr::int32(5).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + BinOpExpr::new( + // This one should be pushed to the right child + ColumnRefExpr::new(3).into_expr(), + ConstantExpr::int32(6).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + ]), + ); + + let filter = LogicalFilter::new(proj.into_plan_node(), filter_expr.into_expr()); + + let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap(); + + assert!(matches!(plan.typ, OptRelNodeTyp::Projection)); + let plan_filter = LogicalFilter::from_rel_node(plan.child(0)).unwrap(); + assert!(matches!(plan_filter.0.typ(), OptRelNodeTyp::Filter)); + let plan_filter_expr = + LogOpExpr::from_rel_node(plan_filter.cond().into_rel_node()).unwrap(); + assert!(matches!(plan_filter_expr.op_type(), LogOpType::And)); + let op_0 = BinOpExpr::from_rel_node(plan_filter_expr.children()[0].clone().into_rel_node()) + .unwrap(); + let col_0 = + ColumnRefExpr::from_rel_node(op_0.left_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_0.index(), 4); + let op_1 = BinOpExpr::from_rel_node(plan_filter_expr.children()[1].clone().into_rel_node()) + .unwrap(); + let col_1 = + ColumnRefExpr::from_rel_node(op_1.left_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_1.index(), 7); + } + + #[test] + fn push_past_join_conjunction() { + // Test pushing a complex filter past a join, where one clause can + // be pushed to the left child, one to the right child, one gets incorporated + // into the join condition, and a constant one remains in the + // original filter. + let mut test_optimizer = new_test_optimizer(Arc::new(FilterInnerJoinTransposeRule::new())); + + let scan1 = LogicalScan::new("customer".into()); + + let scan2 = LogicalScan::new("orders".into()); + + let join = LogicalJoin::new( + scan1.into_plan_node(), + scan2.into_plan_node(), + LogOpExpr::new( + LogOpType::And, + ExprList::new(vec![BinOpExpr::new( + ColumnRefExpr::new(0).into_expr(), + ConstantExpr::int32(1).into_expr(), + BinOpType::Eq, + ) + .into_expr()]), + ) + .into_expr(), + super::JoinType::Inner, + ); + + let filter_expr = LogOpExpr::new( + LogOpType::And, + ExprList::new(vec![ + BinOpExpr::new( + // This one should be pushed to the left child + ColumnRefExpr::new(0).into_expr(), + ConstantExpr::int32(5).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + BinOpExpr::new( + // This one should be pushed to the right child + ColumnRefExpr::new(11).into_expr(), + ConstantExpr::int32(6).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + BinOpExpr::new( + // This one should be pushed to the join condition + ColumnRefExpr::new(2).into_expr(), + ColumnRefExpr::new(8).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + BinOpExpr::new( + // always true, should be removed by other rules + ConstantExpr::int32(2).into_expr(), + ConstantExpr::int32(7).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + ]), + ); + + let filter = LogicalFilter::new(join.into_plan_node(), filter_expr.into_expr()); + + let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap(); + + // Examine original filter + condition + let top_level_filter = LogicalFilter::from_rel_node(plan.clone()).unwrap(); + let bin_op_0 = + BinOpExpr::from_rel_node(top_level_filter.cond().clone().into_rel_node()).unwrap(); + assert!(matches!(bin_op_0.op_type(), BinOpType::Eq)); + let col_0 = + ConstantExpr::from_rel_node(bin_op_0.left_child().clone().into_rel_node()).unwrap(); + let col_1 = + ConstantExpr::from_rel_node(bin_op_0.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_0.value().as_i32(), 2); + assert_eq!(col_1.value().as_i32(), 7); + + // Examine join node + condition + let join_node = + LogicalJoin::from_rel_node(top_level_filter.child().clone().into_rel_node()).unwrap(); + let join_conds = LogOpExpr::from_rel_node(join_node.cond().into_rel_node()).unwrap(); + assert!(matches!(join_conds.op_type(), LogOpType::And)); + assert_eq!(join_conds.children().len(), 2); + let bin_op_1 = + BinOpExpr::from_rel_node(join_conds.children()[0].clone().into_rel_node()).unwrap(); + assert!(matches!(bin_op_1.op_type(), BinOpType::Eq)); + let col_2 = + ColumnRefExpr::from_rel_node(bin_op_1.left_child().clone().into_rel_node()).unwrap(); + let col_3 = + ColumnRefExpr::from_rel_node(bin_op_1.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_2.index(), 2); + assert_eq!(col_3.index(), 8); + + // Examine left child filter + condition + let filter_1 = LogicalFilter::from_rel_node(join_node.left().into_rel_node()).unwrap(); + let bin_op_3 = BinOpExpr::from_rel_node(filter_1.cond().clone().into_rel_node()).unwrap(); + assert!(matches!(bin_op_3.op_type(), BinOpType::Eq)); + let col_6 = + ColumnRefExpr::from_rel_node(bin_op_3.left_child().clone().into_rel_node()).unwrap(); + let col_7 = + ConstantExpr::from_rel_node(bin_op_3.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_6.index(), 0); + assert_eq!(col_7.value().as_i32(), 5); + + // Examine right child filter + condition + let filter_2 = LogicalFilter::from_rel_node(join_node.right().into_rel_node()).unwrap(); + let bin_op_4 = BinOpExpr::from_rel_node(filter_2.cond().clone().into_rel_node()).unwrap(); + assert!(matches!(bin_op_4.op_type(), BinOpType::Eq)); + let col_8 = + ColumnRefExpr::from_rel_node(bin_op_4.left_child().clone().into_rel_node()).unwrap(); + let col_9 = + ConstantExpr::from_rel_node(bin_op_4.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_8.index(), 3); + assert_eq!(col_9.value().as_i32(), 6); + } + + #[test] + fn push_past_agg() { + // Test pushing a filter past an aggregation node, where the filter + // condition has one clause that can be pushed down to the child and + // one that must remain in the filter. + let mut test_optimizer = new_test_optimizer(Arc::new(FilterAggTransposeRule::new())); + + let scan = LogicalScan::new("customer".into()); + + let agg = LogicalAgg::new( + scan.clone().into_plan_node(), + ExprList::new(vec![]), + ExprList::new(vec![ColumnRefExpr::new(0).into_expr()]), + ); + + let filter_expr = LogOpExpr::new( + LogOpType::And, + ExprList::new(vec![ + BinOpExpr::new( + // This one should be pushed to the child + ColumnRefExpr::new(0).into_expr(), + ConstantExpr::int32(5).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + BinOpExpr::new( + // This one should remain in the filter + ColumnRefExpr::new(1).into_expr(), + ConstantExpr::int32(6).into_expr(), + BinOpType::Eq, + ) + .into_expr(), + ]), + ); + + let filter = LogicalFilter::new(agg.into_plan_node(), filter_expr.into_expr()); + + let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap(); + + let plan_filter = LogicalFilter::from_rel_node(plan.clone()).unwrap(); + assert!(matches!(plan_filter.0.typ(), OptRelNodeTyp::Filter)); + let plan_filter_expr = + LogOpExpr::from_rel_node(plan_filter.cond().into_rel_node()).unwrap(); + assert!(matches!(plan_filter_expr.op_type(), LogOpType::And)); + assert_eq!(plan_filter_expr.children().len(), 1); + let op_0 = BinOpExpr::from_rel_node(plan_filter_expr.children()[0].clone().into_rel_node()) + .unwrap(); + let col_0 = + ColumnRefExpr::from_rel_node(op_0.left_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_0.index(), 1); + let col_1 = + ConstantExpr::from_rel_node(op_0.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_1.value().as_i32(), 6); + + let plan_agg = LogicalAgg::from_rel_node(plan.child(0)).unwrap(); + let plan_agg_groups = plan_agg.groups(); + assert_eq!(plan_agg_groups.len(), 1); + let group_col = ColumnRefExpr::from_rel_node(plan_agg_groups.child(0).into_rel_node()) + .unwrap() + .index(); + assert_eq!(group_col, 0); + + let plan_agg_child_filter = + LogicalFilter::from_rel_node(plan_agg.child().into_rel_node()).unwrap(); + let plan_agg_child_filter_expr = + LogOpExpr::from_rel_node(plan_agg_child_filter.cond().into_rel_node()).unwrap(); + assert!(matches!( + plan_agg_child_filter_expr.op_type(), + LogOpType::And + )); + assert_eq!(plan_agg_child_filter_expr.children().len(), 1); + let op_1 = + BinOpExpr::from_rel_node(plan_agg_child_filter_expr.child(0).into_rel_node()).unwrap(); + let col_2 = + ColumnRefExpr::from_rel_node(op_1.left_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_2.index(), 0); + let col_3 = + ConstantExpr::from_rel_node(op_1.right_child().clone().into_rel_node()).unwrap(); + assert_eq!(col_3.value().as_i32(), 5); + } +} diff --git a/optd-datafusion-repr/src/rules/joins.rs b/optd-datafusion-repr/src/rules/joins.rs index 91c31411..d76083ba 100644 --- a/optd-datafusion-repr/src/rules/joins.rs +++ b/optd-datafusion-repr/src/rules/joins.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::vec; -use itertools::Itertools; use optd_core::optimizer::Optimizer; use optd_core::rel_node::RelNode; use optd_core::rules::{Rule, RuleMatcher}; @@ -26,46 +25,21 @@ fn apply_join_commute( optimizer: &impl Optimizer, JoinCommuteRulePicks { left, right, cond }: JoinCommuteRulePicks, ) -> Vec> { - fn rewrite_column_refs(expr: Expr, left_size: usize, right_size: usize) -> Expr { - let expr = expr.into_rel_node(); - if let Some(expr) = ColumnRefExpr::from_rel_node(expr.clone()) { - let index = expr.index(); - if index < left_size { - return ColumnRefExpr::new(index + right_size).into_expr(); - } else { - return ColumnRefExpr::new(index - left_size).into_expr(); - } - } - let children = expr.children.clone(); - let children = children - .into_iter() - .map(|x| { - rewrite_column_refs(Expr::from_rel_node(x).unwrap(), left_size, right_size) - .into_rel_node() - }) - .collect_vec(); - Expr::from_rel_node( - RelNode { - typ: expr.typ.clone(), - children, - data: expr.data.clone(), - } - .into(), - ) - .unwrap() - } - let left_schema = optimizer.get_property::(Arc::new(left.clone()), 0); let right_schema = optimizer.get_property::(Arc::new(right.clone()), 0); - let cond = rewrite_column_refs( - Expr::from_rel_node(cond.into()).unwrap(), - left_schema.len(), - right_schema.len(), - ); + let cond = Expr::from_rel_node(cond.into()) + .unwrap() + .rewrite_column_refs(&|idx| { + Some(if idx < left_schema.len() { + idx + right_schema.len() + } else { + idx - left_schema.len() + }) + }); let node = LogicalJoin::new( PlanNode::from_group(right.into()), PlanNode::from_group(left.into()), - cond, + cond.unwrap(), JoinType::Inner, ); let mut proj_expr = Vec::with_capacity(left_schema.len() + right_schema.len()); @@ -88,7 +62,6 @@ define_rule!( /// Eliminate logical join with constant predicates /// True predicates becomes CrossJoin (not yet implemented) -/// False predicates become EmptyRelation (not yet implemented) #[allow(unused_variables)] fn apply_eliminate_join( optimizer: &impl Optimizer, @@ -150,43 +123,22 @@ fn apply_join_assoc( cond2, }: JoinAssocRulePicks, ) -> Vec> { - fn rewrite_column_refs(expr: Expr, a_size: usize) -> Option { - let expr = expr.into_rel_node(); - if let Some(expr) = ColumnRefExpr::from_rel_node(expr.clone()) { - let index = expr.index(); - if index < a_size { - return None; - } else { - return Some(ColumnRefExpr::new(index - a_size).into_expr()); - } - } - let children = expr.children.clone(); - let children = children - .into_iter() - .map(|x| rewrite_column_refs(Expr::from_rel_node(x).unwrap(), a_size)) - .collect::>>()?; - Some( - Expr::from_rel_node( - RelNode { - typ: expr.typ.clone(), - children: children - .into_iter() - .map(|x| x.into_rel_node()) - .collect_vec(), - data: expr.data.clone(), - } - .into(), - ) - .unwrap(), - ) - } let a_schema = optimizer.get_property::(Arc::new(a.clone()), 0); let _b_schema = optimizer.get_property::(Arc::new(b.clone()), 0); let _c_schema = optimizer.get_property::(Arc::new(c.clone()), 0); + let cond2 = Expr::from_rel_node(cond2.into()).unwrap(); - let Some(cond2) = rewrite_column_refs(cond2, a_schema.len()) else { + + let Some(cond2) = cond2.rewrite_column_refs(&|idx| { + if idx < a_schema.len() { + None + } else { + Some(idx - a_schema.len()) + } + }) else { return vec![]; }; + let node = RelNode { typ: OptRelNodeTyp::Join(JoinType::Inner), children: vec![ @@ -370,35 +322,6 @@ define_rule!( ) ); -struct ProjectionMapping { - forward: Vec, - _backward: Vec>, -} - -impl ProjectionMapping { - pub fn build(mapping: Vec) -> Option { - let mut backward = vec![]; - for (i, &x) in mapping.iter().enumerate() { - if x >= backward.len() { - backward.resize(x + 1, None); - } - backward[x] = Some(i); - } - Some(Self { - forward: mapping, - _backward: backward, - }) - } - - pub fn projection_col_refers_to(&self, col: usize) -> usize { - self.forward[col] - } - - pub fn _original_col_maps_to(&self, col: usize) -> Option { - self._backward[col] - } -} - fn apply_projection_pull_up_join( optimizer: &impl Optimizer, ProjectionPullUpJoinPicks { @@ -408,72 +331,19 @@ fn apply_projection_pull_up_join( cond, }: ProjectionPullUpJoinPicks, ) -> Vec> { + let left = Arc::new(left.clone()); + let right = Arc::new(right.clone()); + let list = ExprList::from_rel_node(Arc::new(list)).unwrap(); - fn compute_column_mapping(list: ExprList) -> Option { - let mut mapping = vec![]; - for expr in list.to_vec() { - let col_expr = ColumnRefExpr::from_rel_node(expr.into_rel_node())?; - mapping.push(col_expr.index()); - } - ProjectionMapping::build(mapping) - } + let projection = LogicalProjection::new(PlanNode::from_group(left.clone()), list.clone()); - let Some(mapping) = compute_column_mapping(list.clone()) else { + let Some(mapping) = LogicalProjection::compute_column_mapping(&projection.exprs()) else { return vec![]; }; - fn rewrite_condition( - cond: Expr, - mapping: &ProjectionMapping, - left_schema_size: usize, - projection_schema_size: usize, - ) -> Expr { - if cond.typ() == OptRelNodeTyp::ColumnRef { - let col = ColumnRefExpr::from_rel_node(cond.into_rel_node()).unwrap(); - let idx = col.index(); - if idx < projection_schema_size { - let col = mapping.projection_col_refers_to(col.index()); - return ColumnRefExpr::new(col).into_expr(); - } else { - let col = col.index(); - return ColumnRefExpr::new(col - projection_schema_size + left_schema_size) - .into_expr(); - } - } - let expr = cond.into_rel_node(); - let mut children = Vec::with_capacity(expr.children.len()); - for child in &expr.children { - children.push( - rewrite_condition( - Expr::from_rel_node(child.clone()).unwrap(), - mapping, - left_schema_size, - projection_schema_size, - ) - .into_rel_node(), - ); - } - - Expr::from_rel_node( - RelNode { - typ: expr.typ.clone(), - children, - data: expr.data.clone(), - } - .into(), - ) - .unwrap() - } - - let left = Arc::new(left.clone()); - let right = Arc::new(right.clone()); - // TODO(chi): support capture projection node. - let projection = - LogicalProjection::new(PlanNode::from_group(left.clone()), list.clone()).into_rel_node(); let left_schema = optimizer.get_property::(left.clone(), 0); - let projection_schema = optimizer.get_property::(projection.clone(), 0); let right_schema = optimizer.get_property::(right.clone(), 0); let mut new_projection_exprs = list.to_vec(); for i in 0..right_schema.len() { @@ -484,11 +354,9 @@ fn apply_projection_pull_up_join( LogicalJoin::new( PlanNode::from_group(left), PlanNode::from_group(right), - rewrite_condition( + mapping.rewrite_condition( Expr::from_rel_node(Arc::new(cond)).unwrap(), - &mapping, left_schema.len(), - projection_schema.len(), ), JoinType::Inner, ) diff --git a/optd-datafusion-repr/src/testing.rs b/optd-datafusion-repr/src/testing.rs new file mode 100644 index 00000000..9ce555ac --- /dev/null +++ b/optd-datafusion-repr/src/testing.rs @@ -0,0 +1,27 @@ +mod dummy_cost; +mod tpch_catalog; + +use std::sync::Arc; + +use optd_core::{ + heuristics::{ApplyOrder, HeuristicsOptimizer}, + rules::Rule, +}; + +use crate::{plan_nodes::OptRelNodeTyp, properties::schema::SchemaPropertyBuilder}; + +use self::tpch_catalog::TpchCatalog; + +/// Create a "dummy" optimizer preloaded with the TPC-H catalog for testing +/// Note: Only provides the schema property currently +pub fn new_test_optimizer( + rule: Arc>>, +) -> HeuristicsOptimizer { + let dummy_catalog = Arc::new(TpchCatalog); + + HeuristicsOptimizer::new_with_rules( + vec![rule], + ApplyOrder::TopDown, + Arc::new([Box::new(SchemaPropertyBuilder::new(dummy_catalog))]), + ) +} diff --git a/optd-datafusion-repr/src/testing/dummy_cost.rs b/optd-datafusion-repr/src/testing/dummy_cost.rs new file mode 100644 index 00000000..ea00fcb1 --- /dev/null +++ b/optd-datafusion-repr/src/testing/dummy_cost.rs @@ -0,0 +1,37 @@ +use crate::plan_nodes::OptRelNodeTyp; +use optd_core::{ + cascades::{CascadesOptimizer, RelNodeContext}, + cost::{Cost, CostModel}, + rel_node::{RelNode, Value}, +}; + +/// Dummy cost model that returns a 0 cost in all cases. +/// Intended for testing with the cascades optimizer. +pub struct DummyCostModel; + +impl CostModel for DummyCostModel { + fn compute_cost( + &self, + _node: &OptRelNodeTyp, + _data: &Option, + _children: &[Cost], + _context: Option, + _optimizer: Option<&CascadesOptimizer>, + ) -> Cost { + Cost(vec![0.0]) + } + + fn compute_plan_node_cost(&self, _node: &RelNode) -> Cost { + Cost(vec![0.0]) + } + + fn explain(&self, _node: &Cost) -> String { + "Dummy cost".to_string() + } + + fn accumulate(&self, _total_cost: &mut Cost, _cost: &Cost) {} + + fn zero(&self) -> Cost { + Cost(vec![0.0]) + } +} diff --git a/optd-datafusion-repr/src/testing/tpch_catalog.rs b/optd-datafusion-repr/src/testing/tpch_catalog.rs new file mode 100644 index 00000000..cc5c1389 --- /dev/null +++ b/optd-datafusion-repr/src/testing/tpch_catalog.rs @@ -0,0 +1,120 @@ +use core::panic; + +use crate::{ + plan_nodes::ConstantType, + properties::schema::{Catalog, Field, Schema}, +}; + +pub struct TpchCatalog; + +impl Catalog for TpchCatalog { + fn get(&self, name: &str) -> Schema { + match name { + "customer" => { + // Define the schema for the "customer" table + + Schema { + fields: vec![ + Field { + name: "custkey".to_string(), + typ: ConstantType::Int32, + nullable: false, + }, + Field { + name: "name".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + Field { + name: "address".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + Field { + name: "nationkey".to_string(), + typ: ConstantType::Int32, + nullable: false, + }, + Field { + name: "phone".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + Field { + name: "acctbal".to_string(), + typ: ConstantType::Float64, + nullable: false, + }, + Field { + name: "mktsegment".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + Field { + name: "comment".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + ], + } + } + "orders" => { + // Define the schema for the "orders" table + + Schema { + fields: vec![ + Field { + name: "orderkey".to_string(), + typ: ConstantType::Int32, + nullable: false, + }, + Field { + name: "custkey".to_string(), + typ: ConstantType::Int32, + nullable: false, + }, + Field { + name: "orderstatus".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + Field { + name: "totalprice".to_string(), + typ: ConstantType::Float64, + nullable: false, + }, + Field { + name: "orderdate".to_string(), + typ: ConstantType::Date, + nullable: false, + }, + Field { + name: "orderpriority".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + Field { + name: "clerk".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + Field { + name: "shippriority".to_string(), + typ: ConstantType::Int32, + nullable: false, + }, + Field { + name: "comment".to_string(), + typ: ConstantType::Utf8String, + nullable: false, + }, + ], + } + } + // Add more cases for other tables as needed + _ => { + panic!("Unknown table: {}", name); + } + } + } +} diff --git a/optd-sqlplannertest/tests/filter.planner.sql b/optd-sqlplannertest/tests/filter.planner.sql index 7dd0e300..8ba252fa 100644 --- a/optd-sqlplannertest/tests/filter.planner.sql +++ b/optd-sqlplannertest/tests/filter.planner.sql @@ -70,17 +70,9 @@ LogicalProjection { exprs: [ #0, #1, #2, #3 ] } ├── LogicalScan { table: t1 } └── LogicalScan { table: t2 } PhysicalProjection { exprs: [ #0, #1, #2, #3 ] } -└── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #0 - │ │ └── #2 - │ └── Eq - │ ├── #0 - │ └── #3 - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalScan { table: t1 } - └── PhysicalScan { table: t2 } +└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0, #0 ], right_keys: [ #0, #1 ] } + ├── PhysicalScan { table: t1 } + └── PhysicalScan { table: t2 } */ -- Test SimplifyFilterRule (skip true filter for and) @@ -102,7 +94,8 @@ LogicalProjection { exprs: [ #0, #1, #2, #3 ] } ├── LogicalScan { table: t1 } └── LogicalScan { table: t2 } PhysicalProjection { exprs: [ #0, #1, #2, #3 ] } -└── PhysicalFilter +└── PhysicalNestedLoopJoin + ├── join_type: Inner ├── cond:Or │ ├── Eq │ │ ├── #0 @@ -110,9 +103,8 @@ PhysicalProjection { exprs: [ #0, #1, #2, #3 ] } │ └── Eq │ ├── #0 │ └── #3 - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalScan { table: t1 } - └── PhysicalScan { table: t2 } + ├── PhysicalScan { table: t1 } + └── PhysicalScan { table: t2 } 0 0 0 200 1 1 1 201 2 2 2 202 @@ -171,13 +163,9 @@ LogicalProjection { exprs: [ #0, #1, #2, #3 ] } ├── LogicalScan { table: t1 } └── LogicalScan { table: t2 } PhysicalProjection { exprs: [ #0, #1, #2, #3 ] } -└── PhysicalFilter - ├── cond:Eq - │ ├── #0 - │ └── #2 - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalScan { table: t1 } - └── PhysicalScan { table: t2 } +└── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + ├── PhysicalScan { table: t1 } + └── PhysicalScan { table: t2 } 0 0 0 200 1 1 1 201 2 2 2 202 diff --git a/optd-sqlplannertest/tests/join_enumerate.planner.sql b/optd-sqlplannertest/tests/join_enumerate.planner.sql index a51e77cc..426dfa65 100644 --- a/optd-sqlplannertest/tests/join_enumerate.planner.sql +++ b/optd-sqlplannertest/tests/join_enumerate.planner.sql @@ -16,7 +16,14 @@ insert into t3 values (0, 300), (1, 301), (2, 302); select * from t2, t1, t3 where t1v1 = t2v1 and t1v2 = t3v2; /* +(Join t2 (Join t1 t3)) +(Join t2 (Join t3 t1)) +(Join t3 (Join t1 t2)) +(Join t3 (Join t2 t1)) +(Join (Join t1 t2) t3) +(Join (Join t1 t3) t2) (Join (Join t2 t1) t3) +(Join (Join t3 t1) t2) 0 200 0 0 0 300 1 201 1 1 1 301 diff --git a/optd-sqlplannertest/tests/tpch.planner.sql b/optd-sqlplannertest/tests/tpch.planner.sql index 1e5dc603..c9730a42 100644 --- a/optd-sqlplannertest/tests/tpch.planner.sql +++ b/optd-sqlplannertest/tests/tpch.planner.sql @@ -374,13 +374,13 @@ PhysicalLimit { skip: 0, fetch: 100 } │ │ │ ├── PhysicalProjection { exprs: [ #0, #1, #3, #4 ] } │ │ │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } │ │ │ │ ├── PhysicalProjection { exprs: [ #0, #1 ] } - │ │ │ │ │ └── PhysicalFilter - │ │ │ │ │ ├── cond:And - │ │ │ │ │ │ ├── Eq - │ │ │ │ │ │ │ ├── #3 - │ │ │ │ │ │ │ └── 4 - │ │ │ │ │ │ └── Like { expr: #2, pattern: "%TIN", negated: false, case_insensitive: false } - │ │ │ │ │ └── PhysicalProjection { exprs: [ #0, #2, #4, #5 ] } + │ │ │ │ │ └── PhysicalProjection { exprs: [ #0, #2, #4, #5 ] } + │ │ │ │ │ └── PhysicalFilter + │ │ │ │ │ ├── cond:And + │ │ │ │ │ │ ├── Eq + │ │ │ │ │ │ │ ├── #5 + │ │ │ │ │ │ │ └── 4 + │ │ │ │ │ │ └── Like { expr: #4, pattern: "%TIN", negated: false, case_insensitive: false } │ │ │ │ │ └── PhysicalScan { table: part } │ │ │ │ └── PhysicalProjection { exprs: [ #0, #1, #3 ] } │ │ │ │ └── PhysicalScan { table: partsupp } @@ -389,11 +389,11 @@ PhysicalLimit { skip: 0, fetch: 100 } │ │ └── PhysicalProjection { exprs: [ #0, #1, #2 ] } │ │ └── PhysicalScan { table: nation } │ └── PhysicalProjection { exprs: [ #0 ] } - │ └── PhysicalFilter - │ ├── cond:Eq - │ │ ├── #1 - │ │ └── "AFRICA" - │ └── PhysicalProjection { exprs: [ #0, #1 ] } + │ └── PhysicalProjection { exprs: [ #0, #1 ] } + │ └── PhysicalFilter + │ ├── cond:Eq + │ │ ├── #1 + │ │ └── "AFRICA" │ └── PhysicalScan { table: region } └── PhysicalProjection { exprs: [ #1, #0 ] } └── PhysicalAgg @@ -413,11 +413,11 @@ PhysicalLimit { skip: 0, fetch: 100 } │ └── PhysicalProjection { exprs: [ #0, #2 ] } │ └── PhysicalScan { table: nation } └── PhysicalProjection { exprs: [ #0 ] } - └── PhysicalFilter - ├── cond:Eq - │ ├── #1 - │ └── "AFRICA" - └── PhysicalProjection { exprs: [ #0, #1 ] } + └── PhysicalProjection { exprs: [ #0, #1 ] } + └── PhysicalFilter + ├── cond:Eq + │ ├── #1 + │ └── "AFRICA" └── PhysicalScan { table: region } */ @@ -515,24 +515,24 @@ PhysicalLimit { skip: 0, fetch: 10 } ├── PhysicalProjection { exprs: [ #1, #3, #4 ] } │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } │ ├── PhysicalProjection { exprs: [ #0 ] } - │ │ └── PhysicalFilter - │ │ ├── cond:Eq - │ │ │ ├── #1 - │ │ │ └── "FURNITURE" - │ │ └── PhysicalProjection { exprs: [ #0, #6 ] } + │ │ └── PhysicalProjection { exprs: [ #0, #6 ] } + │ │ └── PhysicalFilter + │ │ ├── cond:Eq + │ │ │ ├── #6 + │ │ │ └── "FURNITURE" │ │ └── PhysicalScan { table: customer } - │ └── PhysicalFilter - │ ├── cond:Lt - │ │ ├── #2 - │ │ └── 9218 - │ └── PhysicalProjection { exprs: [ #0, #1, #4, #7 ] } + │ └── PhysicalProjection { exprs: [ #0, #1, #4, #7 ] } + │ └── PhysicalFilter + │ ├── cond:Lt + │ │ ├── #4 + │ │ └── 9218 │ └── PhysicalScan { table: orders } └── PhysicalProjection { exprs: [ #0, #1, #2 ] } - └── PhysicalFilter - ├── cond:Gt - │ ├── #3 - │ └── 9218 - └── PhysicalProjection { exprs: [ #0, #5, #6, #10 ] } + └── PhysicalProjection { exprs: [ #0, #5, #6, #10 ] } + └── PhysicalFilter + ├── cond:Gt + │ ├── #10 + │ └── 9218 └── PhysicalScan { table: lineitem } */ @@ -627,46 +627,29 @@ PhysicalSort │ ├── Cast { cast_to: Decimal128(20, 0), expr: 1 } │ └── #23 ├── groups: [ #41 ] - └── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #0 - │ │ └── #9 - │ ├── Eq - │ │ ├── #17 - │ │ └── #8 - │ ├── Eq - │ │ ├── #19 - │ │ └── #33 - │ ├── Eq - │ │ ├── #3 - │ │ └── #36 - │ ├── Eq - │ │ ├── #36 - │ │ └── #40 - │ ├── Eq - │ │ ├── #42 - │ │ └── #44 - │ ├── Eq - │ │ ├── #45 - │ │ └── "Asia" - │ ├── Geq - │ │ ├── #12 - │ │ └── Cast { cast_to: Date32, expr: "2023-01-01" } - │ └── Lt - │ ├── #12 - │ └── Cast { cast_to: Date32, expr: "2024-01-01" } - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ ├── PhysicalScan { table: customer } - │ │ │ │ └── PhysicalScan { table: orders } - │ │ │ └── PhysicalScan { table: lineitem } - │ │ └── PhysicalScan { table: supplier } - │ └── PhysicalScan { table: nation } - └── PhysicalScan { table: region } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #36 ], right_keys: [ #0 ] } + ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #19, #3 ], right_keys: [ #0, #3 ] } + │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } + │ │ ├── PhysicalScan { table: customer } + │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + │ │ ├── PhysicalFilter + │ │ │ ├── cond:And + │ │ │ │ ├── Geq + │ │ │ │ │ ├── #4 + │ │ │ │ │ └── Cast { cast_to: Date32, expr: "2023-01-01" } + │ │ │ │ └── Lt + │ │ │ │ ├── #4 + │ │ │ │ └── Cast { cast_to: Date32, expr: "2024-01-01" } + │ │ │ └── PhysicalScan { table: orders } + │ │ └── PhysicalScan { table: lineitem } + │ └── PhysicalScan { table: supplier } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #2 ], right_keys: [ #0 ] } + ├── PhysicalScan { table: nation } + └── PhysicalFilter + ├── cond:Eq + │ ├── #1 + │ └── "Asia" + └── PhysicalScan { table: region } */ -- TPC-H Q6 @@ -857,50 +840,38 @@ PhysicalSort │ └── Sub │ ├── Cast { cast_to: Decimal128(20, 0), expr: 1 } │ └── #13 - └── PhysicalFilter + └── PhysicalNestedLoopJoin + ├── join_type: Inner ├── cond:And │ ├── Eq - │ │ ├── #0 - │ │ └── #9 - │ ├── Eq - │ │ ├── #23 - │ │ └── #7 - │ ├── Eq - │ │ ├── #32 - │ │ └── #24 - │ ├── Eq - │ │ ├── #3 - │ │ └── #40 - │ ├── Eq │ │ ├── #35 │ │ └── #44 - │ ├── Or - │ │ ├── And - │ │ │ ├── Eq - │ │ │ │ ├── #41 - │ │ │ │ └── "FRANCE" - │ │ │ └── Eq - │ │ │ ├── #45 - │ │ │ └── "GERMANY" - │ │ └── And - │ │ ├── Eq - │ │ │ ├── #41 - │ │ │ └── "GERMANY" - │ │ └── Eq - │ │ ├── #45 - │ │ └── "FRANCE" - │ └── Between { expr: #17, lower: Cast { cast_to: Date32, expr: "1995-01-01" }, upper: Cast { cast_to: Date32, expr: "1996-12-31" } } - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ ├── PhysicalScan { table: supplier } - │ │ │ │ └── PhysicalScan { table: lineitem } - │ │ │ └── PhysicalScan { table: orders } - │ │ └── PhysicalScan { table: customer } - │ └── PhysicalScan { table: nation } - └── PhysicalScan { table: nation } + │ └── Or + │ ├── And + │ │ ├── Eq + │ │ │ ├── #41 + │ │ │ └── "FRANCE" + │ │ └── Eq + │ │ ├── #45 + │ │ └── "GERMANY" + │ └── And + │ ├── Eq + │ │ ├── #41 + │ │ └── "GERMANY" + │ └── Eq + │ ├── #45 + │ └── "FRANCE" + ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #3 ], right_keys: [ #0 ] } + │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #2 ] } + │ │ ├── PhysicalScan { table: supplier } + │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + │ │ ├── PhysicalFilter { cond: Between { expr: #10, lower: Cast { cast_to: Date32, expr: "1995-01-01" }, upper: Cast { cast_to: Date32, expr: "1996-12-31" } } } + │ │ │ └── PhysicalScan { table: lineitem } + │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ] } + │ │ ├── PhysicalScan { table: orders } + │ │ └── PhysicalScan { table: customer } + │ └── PhysicalScan { table: nation } + └── PhysicalScan { table: nation } */ -- TPC-H Q8 without top-most limit node @@ -1052,50 +1023,29 @@ PhysicalSort │ │ ├── Cast { cast_to: Decimal128(20, 0), expr: 1 } │ │ └── #22 │ └── #54 - └── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #0 - │ │ └── #17 - │ ├── Eq - │ │ ├── #9 - │ │ └── #18 - │ ├── Eq - │ │ ├── #16 - │ │ └── #32 - │ ├── Eq - │ │ ├── #33 - │ │ └── #41 - │ ├── Eq - │ │ ├── #44 - │ │ └── #49 - │ ├── Eq - │ │ ├── #51 - │ │ └── #57 - │ ├── Eq - │ │ ├── #58 - │ │ └── "AMERICA" - │ ├── Eq - │ │ ├── #12 - │ │ └── #53 - │ ├── Between { expr: #36, lower: Cast { cast_to: Date32, expr: "1995-01-01" }, upper: Cast { cast_to: Date32, expr: "1996-12-31" } } - │ └── Eq - │ ├── #4 - │ └── "ECONOMY ANODIZED STEEL" - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ │ │ ├── PhysicalScan { table: part } - │ │ │ │ │ │ └── PhysicalScan { table: supplier } - │ │ │ │ │ └── PhysicalScan { table: lineitem } - │ │ │ │ └── PhysicalScan { table: orders } - │ │ │ └── PhysicalScan { table: customer } - │ │ └── PhysicalScan { table: nation } - │ └── PhysicalScan { table: nation } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #51 ], right_keys: [ #0 ] } + ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #12 ], right_keys: [ #0 ] } + │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0, #9 ], right_keys: [ #1, #2 ] } + │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } + │ │ │ ├── PhysicalFilter + │ │ │ │ ├── cond:Eq + │ │ │ │ │ ├── #4 + │ │ │ │ │ └── "ECONOMY ANODIZED STEEL" + │ │ │ │ └── PhysicalScan { table: part } + │ │ │ └── PhysicalScan { table: supplier } + │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + │ │ ├── PhysicalScan { table: lineitem } + │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ] } + │ │ ├── PhysicalFilter { cond: Between { expr: #4, lower: Cast { cast_to: Date32, expr: "1995-01-01" }, upper: Cast { cast_to: Date32, expr: "1996-12-31" } } } + │ │ │ └── PhysicalScan { table: orders } + │ │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #3 ], right_keys: [ #0 ] } + │ │ ├── PhysicalScan { table: customer } + │ │ └── PhysicalScan { table: nation } + │ └── PhysicalScan { table: nation } + └── PhysicalFilter + ├── cond:Eq + │ ├── #1 + │ └── "AMERICA" └── PhysicalScan { table: region } */ @@ -1216,38 +1166,18 @@ PhysicalSort │ └── Mul │ ├── #35 │ └── #20 - └── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #9 - │ │ └── #18 - │ ├── Eq - │ │ ├── #33 - │ │ └── #18 - │ ├── Eq - │ │ ├── #32 - │ │ └── #17 - │ ├── Eq - │ │ ├── #0 - │ │ └── #17 - │ ├── Eq - │ │ ├── #37 - │ │ └── #16 - │ ├── Eq - │ │ ├── #12 - │ │ └── #46 - │ └── Like { expr: #1, pattern: "%green%", negated: false, case_insensitive: false } - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ ├── PhysicalScan { table: part } - │ │ │ │ └── PhysicalScan { table: supplier } - │ │ │ └── PhysicalScan { table: lineitem } - │ │ └── PhysicalScan { table: partsupp } - │ └── PhysicalScan { table: orders } - └── PhysicalScan { table: nation } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #12 ], right_keys: [ #0 ] } + ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #9, #0 ], right_keys: [ #2, #1 ] } + │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } + │ │ ├── PhysicalFilter { cond: Like { expr: #1, pattern: "%green%", negated: false, case_insensitive: false } } + │ │ │ └── PhysicalScan { table: part } + │ │ └── PhysicalScan { table: supplier } + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #2, #1 ], right_keys: [ #1, #0 ] } + │ │ ├── PhysicalScan { table: lineitem } + │ │ └── PhysicalScan { table: partsupp } + │ └── PhysicalScan { table: orders } + └── PhysicalScan { table: nation } */ -- TPC-H Q9 @@ -1367,38 +1297,18 @@ PhysicalSort │ └── Mul │ ├── #35 │ └── #20 - └── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #9 - │ │ └── #18 - │ ├── Eq - │ │ ├── #33 - │ │ └── #18 - │ ├── Eq - │ │ ├── #32 - │ │ └── #17 - │ ├── Eq - │ │ ├── #0 - │ │ └── #17 - │ ├── Eq - │ │ ├── #37 - │ │ └── #16 - │ ├── Eq - │ │ ├── #12 - │ │ └── #46 - │ └── Like { expr: #1, pattern: "%green%", negated: false, case_insensitive: false } - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ │ │ ├── PhysicalScan { table: part } - │ │ │ │ └── PhysicalScan { table: supplier } - │ │ │ └── PhysicalScan { table: lineitem } - │ │ └── PhysicalScan { table: partsupp } - │ └── PhysicalScan { table: orders } - └── PhysicalScan { table: nation } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #12 ], right_keys: [ #0 ] } + ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #9, #0 ], right_keys: [ #2, #1 ] } + │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } + │ │ ├── PhysicalFilter { cond: Like { expr: #1, pattern: "%green%", negated: false, case_insensitive: false } } + │ │ │ └── PhysicalScan { table: part } + │ │ └── PhysicalScan { table: supplier } + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + │ ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #2, #1 ], right_keys: [ #1, #0 ] } + │ │ ├── PhysicalScan { table: lineitem } + │ │ └── PhysicalScan { table: partsupp } + │ └── PhysicalScan { table: orders } + └── PhysicalScan { table: nation } */ -- TPC-H Q10 @@ -1491,35 +1401,27 @@ PhysicalLimit { skip: 0, fetch: 20 } │ ├── Cast { cast_to: Decimal128(20, 0), expr: 1 } │ └── #23 ├── groups: [ #0, #1, #5, #4, #34, #2, #7 ] - └── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #0 - │ │ └── #9 - │ ├── Eq - │ │ ├── #17 - │ │ └── #8 - │ ├── Geq - │ │ ├── #12 - │ │ └── Cast { cast_to: Date32, expr: "1993-07-01" } - │ ├── Lt - │ │ ├── #12 - │ │ └── Add - │ │ ├── Cast { cast_to: Date32, expr: "1993-07-01" } - │ │ └── INTERVAL_MONTH_DAY_NANO (3, 0, 0) - │ ├── Eq - │ │ ├── #25 - │ │ └── "R" - │ └── Eq - │ ├── #3 - │ └── #33 - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ ├── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - │ │ ├── PhysicalScan { table: customer } - │ │ └── PhysicalScan { table: orders } - │ └── PhysicalScan { table: lineitem } - └── PhysicalScan { table: nation } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #3 ], right_keys: [ #0 ] } + ├── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #1 ] } + │ ├── PhysicalScan { table: customer } + │ └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + │ ├── PhysicalFilter + │ │ ├── cond:And + │ │ │ ├── Geq + │ │ │ │ ├── #4 + │ │ │ │ └── Cast { cast_to: Date32, expr: "1993-07-01" } + │ │ │ └── Lt + │ │ │ ├── #4 + │ │ │ └── Add + │ │ │ ├── Cast { cast_to: Date32, expr: "1993-07-01" } + │ │ │ └── INTERVAL_MONTH_DAY_NANO (3, 0, 0) + │ │ └── PhysicalScan { table: orders } + │ └── PhysicalFilter + │ ├── cond:Eq + │ │ ├── #8 + │ │ └── "R" + │ └── PhysicalScan { table: lineitem } + └── PhysicalScan { table: nation } */ -- TPC-H Q11 @@ -1658,11 +1560,11 @@ PhysicalSort │ │ └── PhysicalProjection { exprs: [ #0, #3 ] } │ │ └── PhysicalScan { table: supplier } │ └── PhysicalProjection { exprs: [ #0 ] } - │ └── PhysicalFilter - │ ├── cond:Eq - │ │ ├── #1 - │ │ └── "CHINA" - │ └── PhysicalProjection { exprs: [ #0, #1 ] } + │ └── PhysicalProjection { exprs: [ #0, #1 ] } + │ └── PhysicalFilter + │ ├── cond:Eq + │ │ ├── #1 + │ │ └── "CHINA" │ └── PhysicalScan { table: nation } └── PhysicalAgg ├── aggrs:Agg(Sum) @@ -1679,11 +1581,11 @@ PhysicalSort │ └── PhysicalProjection { exprs: [ #0, #3 ] } │ └── PhysicalScan { table: supplier } └── PhysicalProjection { exprs: [ #0 ] } - └── PhysicalFilter - ├── cond:Eq - │ ├── #1 - │ └── "CHINA" - └── PhysicalProjection { exprs: [ #0, #1 ] } + └── PhysicalProjection { exprs: [ #0, #1 ] } + └── PhysicalFilter + ├── cond:Eq + │ ├── #1 + │ └── "CHINA" └── PhysicalScan { table: nation } */ @@ -1795,26 +1697,23 @@ PhysicalSort │ ├── 1 │ └── 0 ├── groups: [ #23 ] - └── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #0 - │ │ └── #9 - │ ├── InList { expr: #23, list: [ "MAIL", "SHIP" ], negated: false } - │ ├── Lt - │ │ ├── #20 - │ │ └── #21 - │ ├── Lt - │ │ ├── #19 - │ │ └── #20 - │ ├── Geq - │ │ ├── #21 - │ │ └── Cast { cast_to: Date32, expr: "1994-01-01" } - │ └── Lt - │ ├── #21 - │ └── Cast { cast_to: Date32, expr: "1995-01-01" } - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalScan { table: orders } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #0 ], right_keys: [ #0 ] } + ├── PhysicalScan { table: orders } + └── PhysicalFilter + ├── cond:And + │ ├── InList { expr: #14, list: [ "MAIL", "SHIP" ], negated: false } + │ ├── Lt + │ │ ├── #11 + │ │ └── #12 + │ ├── Lt + │ │ ├── #10 + │ │ └── #11 + │ ├── Geq + │ │ ├── #12 + │ │ └── Cast { cast_to: Date32, expr: "1994-01-01" } + │ └── Lt + │ ├── #12 + │ └── Cast { cast_to: Date32, expr: "1995-01-01" } └── PhysicalScan { table: lineitem } */ @@ -1898,22 +1797,19 @@ PhysicalProjection │ ├── Cast { cast_to: Decimal128(20, 0), expr: 1 } │ └── #6 ├── groups: [] - └── PhysicalFilter - ├── cond:And - │ ├── Eq - │ │ ├── #1 - │ │ └── #16 - │ ├── Geq - │ │ ├── #10 - │ │ └── Cast { cast_to: Date32, expr: "1995-09-01" } - │ └── Lt - │ ├── #10 - │ └── Add - │ ├── Cast { cast_to: Date32, expr: "1995-09-01" } - │ └── INTERVAL_MONTH_DAY_NANO (1, 0, 0) - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalScan { table: lineitem } - └── PhysicalScan { table: part } + └── PhysicalHashJoin { join_type: Inner, left_keys: [ #1 ], right_keys: [ #0 ] } + ├── PhysicalFilter + │ ├── cond:And + │ │ ├── Geq + │ │ │ ├── #10 + │ │ │ └── Cast { cast_to: Date32, expr: "1995-09-01" } + │ │ └── Lt + │ │ ├── #10 + │ │ └── Add + │ │ ├── Cast { cast_to: Date32, expr: "1995-09-01" } + │ │ └── INTERVAL_MONTH_DAY_NANO (1, 0, 0) + │ └── PhysicalScan { table: lineitem } + └── PhysicalScan { table: part } */ -- TPC-H Q15 @@ -2038,15 +1934,15 @@ PhysicalSort │ │ └── #2 │ ├── groups: [ #0 ] │ └── PhysicalProjection { exprs: [ #0, #1, #2 ] } - │ └── PhysicalFilter - │ ├── cond:And - │ │ ├── Geq - │ │ │ ├── #3 - │ │ │ └── 8401 - │ │ └── Lt - │ │ ├── #3 - │ │ └── 8491 - │ └── PhysicalProjection { exprs: [ #2, #5, #6, #10 ] } + │ └── PhysicalProjection { exprs: [ #2, #5, #6, #10 ] } + │ └── PhysicalFilter + │ ├── cond:And + │ │ ├── Geq + │ │ │ ├── #10 + │ │ │ └── 8401 + │ │ └── Lt + │ │ ├── #10 + │ │ └── 8491 │ └── PhysicalScan { table: lineitem } └── PhysicalAgg ├── aggrs:Agg(Sum) @@ -2057,15 +1953,15 @@ PhysicalSort │ └── #2 ├── groups: [ #0 ] └── PhysicalProjection { exprs: [ #0, #1, #2 ] } - └── PhysicalFilter - ├── cond:And - │ ├── Geq - │ │ ├── #3 - │ │ └── 8401 - │ └── Lt - │ ├── #3 - │ └── 8491 - └── PhysicalProjection { exprs: [ #2, #5, #6, #10 ] } + └── PhysicalProjection { exprs: [ #2, #5, #6, #10 ] } + └── PhysicalFilter + ├── cond:And + │ ├── Geq + │ │ ├── #10 + │ │ └── 8401 + │ └── Lt + │ ├── #10 + │ └── 8491 └── PhysicalScan { table: lineitem } */ @@ -2170,15 +2066,15 @@ PhysicalProjection │ ├── PhysicalProjection { exprs: [ #1, #4, #5 ] } │ │ └── PhysicalScan { table: lineitem } │ └── PhysicalProjection { exprs: [ #0 ] } - │ └── PhysicalFilter - │ ├── cond:And - │ │ ├── Eq - │ │ │ ├── #1 - │ │ │ └── "Brand#13" - │ │ └── Eq - │ │ ├── #2 - │ │ └── "JUMBO PKG" - │ └── PhysicalProjection { exprs: [ #0, #3, #6 ] } + │ └── PhysicalProjection { exprs: [ #0, #3, #6 ] } + │ └── PhysicalFilter + │ ├── cond:And + │ │ ├── Eq + │ │ │ ├── #3 + │ │ │ └── "Brand#13" + │ │ └── Eq + │ │ ├── #6 + │ │ └── "JUMBO PKG" │ └── PhysicalScan { table: part } └── PhysicalProjection ├── exprs: @@ -2311,7 +2207,8 @@ PhysicalProjection { exprs: [ #0 ] } │ ├── Cast { cast_to: Decimal128(20, 0), expr: 1 } │ └── #6 ├── groups: [] - └── PhysicalFilter + └── PhysicalNestedLoopJoin + ├── join_type: Inner ├── cond:Or │ ├── And │ │ ├── Eq @@ -2370,8 +2267,7 @@ PhysicalProjection { exprs: [ #0 ] } │ └── Eq │ ├── #13 │ └── "DELIVER IN PERSON" - └── PhysicalNestedLoopJoin { join_type: Cross, cond: true } - ├── PhysicalScan { table: lineitem } - └── PhysicalScan { table: part } + ├── PhysicalScan { table: lineitem } + └── PhysicalScan { table: part } */