Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: projection merge and projection filter transpose rule #162

Merged
merged 72 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
8f09e0e
Add recursive logic, and add filter/sort transpose logic
jurplel Feb 23, 2024
e84a576
change a couple notes
jurplel Feb 23, 2024
097029a
Modularize projection rewriting code from ProjectionPullUpJoin rule (…
jurplel Mar 12, 2024
b5e881f
Hardcoded filter/sort test
jurplel Mar 17, 2024
67e9632
push past projection w/ broken test
jurplel Mar 17, 2024
330cdac
Implement filter merge
jurplel Mar 17, 2024
64d5572
More prog on join
jurplel Mar 18, 2024
6020a31
merge
jurplel Mar 18, 2024
bc100ed
Publicize flattened_nested_logical
jurplel Mar 18, 2024
abbd6b7
Fix filter merge
jurplel Mar 18, 2024
f78f37b
Join progress
jurplel Mar 18, 2024
9c9c515
Add dummy cost model
jurplel Mar 21, 2024
f4a8506
dummmy catalog + dummy optimizer method
jurplel Mar 21, 2024
e7d4c35
Join test start
jurplel Mar 22, 2024
609233d
Filter pushdown is beginning to work
jurplel Mar 23, 2024
bb3d1a2
Working testing infra + all tests working!
jurplel Mar 25, 2024
9cb0894
Fix projection mistake
jurplel Mar 25, 2024
038db19
restructure predicate traversal
jurplel Mar 25, 2024
5023c2a
Push filter past agg
jurplel Mar 25, 2024
dacc205
Merge w/ main
jurplel Mar 25, 2024
924649b
Remove print
jurplel Mar 25, 2024
16650a3
two stage heuristic
AveryQi115 Mar 26, 2024
72e1484
Merge remote-tracking branch 'origin/main' into bowad/filter_pushdown
jurplel Mar 26, 2024
7d1768e
merge w/ two-stage
jurplel Mar 26, 2024
5d71f67
Revise testing infra and port filter pushdown to it
jurplel Mar 26, 2024
927717f
Make categorize_conds_helper more flexible
jurplel Mar 26, 2024
2e906c8
Fix crashes related to List
jurplel Mar 26, 2024
88397f4
Improve generation of singleton expressions
jurplel Mar 26, 2024
7f2f942
Update tpc-h sqlplannertest with filter pushdown run
jurplel Mar 26, 2024
52f9408
non-merging merge
jurplel Mar 26, 2024
c701b14
More polish + rewrite a join helper as rewrite_column_refs
jurplel Mar 27, 2024
73e4e23
rewrite one more thing as rewrite_column_refs
jurplel Mar 27, 2024
4b4322a
clippy fix
jurplel Mar 27, 2024
b407d8a
fmt fix
jurplel Mar 27, 2024
db56f27
Fmt fix again
jurplel Mar 27, 2024
e60e3eb
Additional documentation for rewrite_column_refs
jurplel Mar 27, 2024
5cae5b3
Remove old TODOs in filter_pushdown.rs
jurplel Mar 27, 2024
9fdaa25
Separate filter pushdown rules (includes helper modifications)
jurplel Mar 28, 2024
e84ffd9
merge for some reason
jurplel Mar 28, 2024
6119dbb
Get children from groups, now
jurplel Mar 28, 2024
53098e6
Change rules to cost-based
jurplel Mar 28, 2024
d3b1882
Missed a spot with group conversions
jurplel Mar 28, 2024
a4e0600
add rules to lib and rules files
Sweetsuro Mar 29, 2024
3ede496
Merge remote-tracking branch 'origin/bowad/filter_pushdown' into swee…
Sweetsuro Mar 29, 2024
427fc9b
project remove rule
Sweetsuro Mar 29, 2024
6078cc5
rename some stuff
Sweetsuro Mar 29, 2024
bea15b9
rename some other stuff
Sweetsuro Mar 29, 2024
8119158
broken project filter transpose
Sweetsuro Mar 29, 2024
8a28b2c
working project filter transpose rule
Sweetsuro Mar 29, 2024
5e9b482
Address most comments
jurplel Mar 30, 2024
f04e4cb
fix for heuristic rule wrapper
AveryQi115 Apr 2, 2024
c057b1e
working project filter transpose
Sweetsuro Apr 3, 2024
3d142b3
Merge remote-tracking branch 'origin/bowad/filter_pushdown' into swee…
Sweetsuro Apr 3, 2024
7fe1fd0
heuristic rule wrapper fix
AveryQi115 Apr 8, 2024
487f1e2
still merge group
AveryQi115 Apr 8, 2024
0c6d314
Merge branch 'main' into heuristic_wrapper_fix
AveryQi115 Apr 8, 2024
1d4cd5b
fmt
AveryQi115 Apr 8, 2024
9c45e69
Merge branch 'main' into heuristic_wrapper_fix
AveryQi115 Apr 15, 2024
1814b84
tpc-h result changed due to search space limit after the fix
AveryQi115 Apr 15, 2024
8b10c94
massive refactor
Sweetsuro Apr 15, 2024
82c18f0
Merge branch 'main' into sweetsuro/project-remove
Sweetsuro Apr 15, 2024
2532254
Merge branch 'heuristic_wrapper_fix' into sweetsuro/project-remove
Sweetsuro Apr 15, 2024
fc18944
project merge as default hueristic
Sweetsuro Apr 16, 2024
e92271d
move filter_project test cases
Sweetsuro Apr 16, 2024
37bfc37
project filter unit tests
Sweetsuro Apr 17, 2024
804ef96
project merge tests
Sweetsuro Apr 17, 2024
facd914
move filter project into project filter file
Sweetsuro Apr 17, 2024
42bb85b
fix fmt and clippy
Sweetsuro Apr 17, 2024
3c6ccb3
Merge branch 'main' into sweetsuro/project-remove
Sweetsuro Apr 17, 2024
68ca683
removed unecessecary file
Sweetsuro Apr 18, 2024
b1b6898
Merge branch 'main' into sweetsuro/project-remove
Sweetsuro Apr 25, 2024
ab888da
Merge branch 'main' into sweetsuro/project-remove
Sweetsuro May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use rules::{
EliminateJoinRule, EliminateLimitRule, FilterAggTransposeRule, FilterCrossJoinTransposeRule,
FilterInnerJoinTransposeRule, FilterMergeRule, FilterProjectTransposeRule,
FilterSortTransposeRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, PhysicalConversionRule,
ProjectionPullUpJoin, SimplifyFilterRule, SimplifyJoinCondRule,
ProjectFilterTransposeRule, ProjectMergeRule, ProjectionPullUpJoin, SimplifyFilterRule,
SimplifyJoinCondRule,
};

pub use optd_core::rel_node::Value;
Expand Down Expand Up @@ -87,6 +88,8 @@ impl DatafusionOptimizer {
Arc::new(EliminateLimitRule::new()),
Arc::new(EliminateDuplicatedSortExprRule::new()),
Arc::new(EliminateDuplicatedAggExprRule::new()),
Arc::new(ProjectMergeRule::new()),
Arc::new(FilterMergeRule::new()),
]
}

Expand All @@ -97,11 +100,14 @@ impl DatafusionOptimizer {
for rule in rules {
rule_wrappers.push(RuleWrapper::new_cascades(rule));
}
// project transpose rules
rule_wrappers.push(RuleWrapper::new_cascades(Arc::new(
ProjectFilterTransposeRule::new(),
)));
// add all filter pushdown rules as heuristic rules
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterProjectTransposeRule::new(),
)));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(FilterMergeRule::new())));
rule_wrappers.push(RuleWrapper::new_heuristic(Arc::new(
FilterCrossJoinTransposeRule::new(),
)));
Expand Down
24 changes: 24 additions & 0 deletions optd-datafusion-repr/src/plan_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,30 @@ impl Expr {
.unwrap(),
)
}

/// Recursively retrieves all column references in the expression
/// using a provided function.
/// The provided function will, given a ColumnRefExpr's index,
/// return a Vec<Expr> including the expr in col ref.
pub fn get_column_refs(&self) -> Vec<Expr> {
assert!(self.typ().is_expression());
if let OptRelNodeTyp::ColumnRef = self.typ() {
let col_ref = Expr::from_rel_node(self.0.clone()).unwrap();
return vec![col_ref];
}

let children = self.0.children.clone();
let children = children.into_iter().map(|child| {
if child.typ == OptRelNodeTyp::List {
// TODO: What should we do with List?
return vec![];
}
Expr::from_rel_node(child.clone())
.unwrap()
.get_column_refs()
});
children.collect_vec().concat()
}
}

impl OptRelNode for Expr {
Expand Down
73 changes: 1 addition & 72 deletions optd-datafusion-repr/src/plan_nodes/projection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::expr::ExprList;
use super::macros::define_plan_node;

use super::{ColumnRefExpr, Expr, OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
use super::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};

#[derive(Clone, Debug)]
pub struct LogicalProjection(pub PlanNode);
Expand All @@ -26,74 +26,3 @@ define_plan_node!(
{ 1, exprs: ExprList }
]
);

/// This struct holds the mapping from original columns to projected columns.
///
/// # Example
/// With the following plan:
/// | Filter (#0 < 5)
/// |
/// |-| Projection [#2, #3]
/// |- Scan [#0, #1, #2, #3]
///
/// The computed projection mapping is:
/// #2 -> #0
/// #3 -> #1
pub struct ProjectionMapping {
forward: Vec<usize>,
_backward: Vec<Option<usize>>,
}

impl ProjectionMapping {
pub fn build(mapping: Vec<usize>) -> Option<Self> {
let mut backward = vec![];
for (i, &x) in mapping.iter().enumerate() {
if x >= backward.len() {
backward.resize(x + 1, None);
}
backward[x] = Some(i);
}
Some(Self {
forward: mapping,
_backward: backward,
})
}

pub fn projection_col_refers_to(&self, col: usize) -> usize {
self.forward[col]
}

pub fn _original_col_maps_to(&self, col: usize) -> Option<usize> {
self._backward[col]
}

/// Recursively rewrites all ColumnRefs in an Expr to *undo* the projection
/// condition. You might want to do this if you are pushing something
/// through a projection, or pulling a projection up.
///
/// # Example
/// If we have a projection node, mapping column A to column B (A -> B)
/// All B's in `cond` will be rewritten as A.
pub fn rewrite_condition(&self, cond: Expr, child_schema_len: usize) -> Expr {
let proj_schema_size = self.forward.len();
cond.rewrite_column_refs(&|idx| {
Some(if idx < proj_schema_size {
self.projection_col_refers_to(idx)
} else {
idx - proj_schema_size + child_schema_len
})
})
.unwrap()
}
}

impl LogicalProjection {
pub fn compute_column_mapping(exprs: &ExprList) -> Option<ProjectionMapping> {
let mut mapping = vec![];
for expr in exprs.to_vec() {
let col_expr = ColumnRefExpr::from_rel_node(expr.into_rel_node())?;
mapping.push(col_expr.index());
}
ProjectionMapping::build(mapping)
}
}
12 changes: 8 additions & 4 deletions optd-datafusion-repr/src/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod filter_pushdown;
mod joins;
mod macros;
mod physical;
mod project_transpose;

// pub use filter_join::FilterJoinPullUpRule;
pub use eliminate_duplicated_expr::{
Expand All @@ -15,9 +16,12 @@ pub use eliminate_limit::EliminateLimitRule;
pub use filter::{EliminateFilterRule, SimplifyFilterRule, SimplifyJoinCondRule};
pub use filter_pushdown::{
FilterAggTransposeRule, FilterCrossJoinTransposeRule, FilterInnerJoinTransposeRule,
FilterMergeRule, FilterProjectTransposeRule, FilterSortTransposeRule,
};
pub use joins::{
EliminateJoinRule, HashJoinRule, JoinAssocRule, JoinCommuteRule, ProjectionPullUpJoin,
FilterMergeRule, FilterSortTransposeRule,
};
pub use joins::{EliminateJoinRule, HashJoinRule, JoinAssocRule, JoinCommuteRule};
pub use physical::PhysicalConversionRule;
pub use project_transpose::{
project_filter_transpose::{FilterProjectTransposeRule, ProjectFilterTransposeRule},
project_join_transpose::ProjectionPullUpJoin,
project_merge::ProjectMergeRule,
};
116 changes: 4 additions & 112 deletions optd-datafusion-repr/src/rules/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use optd_core::{optimizer::Optimizer, rel_node::RelNode};

use crate::plan_nodes::{
ColumnRefExpr, Expr, ExprList, JoinType, LogOpExpr, LogOpType, LogicalAgg, LogicalFilter,
LogicalJoin, LogicalProjection, LogicalSort, OptRelNode, OptRelNodeTyp, PlanNode,
LogicalJoin, LogicalSort, OptRelNode, OptRelNodeTyp, PlanNode,
};
use crate::properties::schema::SchemaPropertyBuilder;

Expand Down Expand Up @@ -122,36 +122,6 @@ fn categorize_conds(mut categorization_fn: impl FnMut(Expr, &Vec<Expr>), cond: E
}
}

define_rule!(
FilterProjectTransposeRule,
apply_filter_project_transpose,
(Filter, (Projection, child, [exprs]), [cond])
);

/// Datafusion only pushes filter past project when the project does not contain
/// volatile (i.e. non-deterministic) expressions that are present in the filter
/// Calcite only checks if the projection contains a windowing calculation
/// We check neither of those things and do it always (which may be wrong)
fn apply_filter_project_transpose(
optimizer: &impl Optimizer<OptRelNodeTyp>,
FilterProjectTransposeRulePicks { child, exprs, cond }: FilterProjectTransposeRulePicks,
) -> Vec<RelNode<OptRelNodeTyp>> {
let child_schema_len = optimizer
.get_property::<SchemaPropertyBuilder>(child.clone().into(), 0)
.len();

let child = PlanNode::from_group(child.into());
let cond_as_expr = Expr::from_rel_node(cond.into()).unwrap();
let exprs = ExprList::from_rel_node(exprs.into()).unwrap();

let proj_col_map = LogicalProjection::compute_column_mapping(&exprs).unwrap();
let rewritten_cond = proj_col_map.rewrite_condition(cond_as_expr.clone(), child_schema_len);

let new_filter_node = LogicalFilter::new(child, rewritten_cond);
let new_proj = LogicalProjection::new(new_filter_node.into_plan_node(), exprs);
vec![new_proj.into_rel_node().as_ref().clone()]
}

define_rule!(
FilterMergeRule,
apply_filter_merge,
Expand Down Expand Up @@ -451,12 +421,12 @@ mod tests {
use crate::{
plan_nodes::{
BinOpExpr, BinOpType, ColumnRefExpr, ConstantExpr, ExprList, LogOpExpr, LogOpType,
LogicalAgg, LogicalFilter, LogicalJoin, LogicalProjection, LogicalScan, LogicalSort,
OptRelNode, OptRelNodeTyp,
LogicalAgg, LogicalFilter, LogicalJoin, LogicalScan, LogicalSort, OptRelNode,
OptRelNodeTyp,
},
rules::{
FilterAggTransposeRule, FilterInnerJoinTransposeRule, FilterMergeRule,
FilterProjectTransposeRule, FilterSortTransposeRule,
FilterSortTransposeRule,
},
testing::new_test_optimizer,
};
Expand Down Expand Up @@ -538,84 +508,6 @@ mod tests {
assert_eq!(col_4.value().as_i32(), 1);
}

#[test]
fn push_past_proj_basic() {
let mut test_optimizer = new_test_optimizer(Arc::new(FilterProjectTransposeRule::new()));

let scan = LogicalScan::new("customer".into());
let proj = LogicalProjection::new(scan.into_plan_node(), ExprList::new(vec![]));

let filter_expr = BinOpExpr::new(
ColumnRefExpr::new(0).into_expr(),
ConstantExpr::int32(5).into_expr(),
BinOpType::Eq,
)
.into_expr();

let filter = LogicalFilter::new(proj.into_plan_node(), filter_expr);
let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap();

assert_eq!(plan.typ, OptRelNodeTyp::Projection);
assert!(matches!(plan.child(0).typ, OptRelNodeTyp::Filter));
}

#[test]
fn push_past_proj_adv() {
let mut test_optimizer = new_test_optimizer(Arc::new(FilterProjectTransposeRule::new()));

let scan = LogicalScan::new("customer".into());
let proj = LogicalProjection::new(
scan.into_plan_node(),
ExprList::new(vec![
ColumnRefExpr::new(0).into_expr(),
ColumnRefExpr::new(4).into_expr(),
ColumnRefExpr::new(5).into_expr(),
ColumnRefExpr::new(7).into_expr(),
]),
);

let filter_expr = LogOpExpr::new(
LogOpType::And,
ExprList::new(vec![
BinOpExpr::new(
// This one should be pushed to the left child
ColumnRefExpr::new(1).into_expr(),
ConstantExpr::int32(5).into_expr(),
BinOpType::Eq,
)
.into_expr(),
BinOpExpr::new(
// This one should be pushed to the right child
ColumnRefExpr::new(3).into_expr(),
ConstantExpr::int32(6).into_expr(),
BinOpType::Eq,
)
.into_expr(),
]),
);

let filter = LogicalFilter::new(proj.into_plan_node(), filter_expr.into_expr());

let plan = test_optimizer.optimize(filter.into_rel_node()).unwrap();

assert!(matches!(plan.typ, OptRelNodeTyp::Projection));
let plan_filter = LogicalFilter::from_rel_node(plan.child(0)).unwrap();
assert!(matches!(plan_filter.0.typ(), OptRelNodeTyp::Filter));
let plan_filter_expr =
LogOpExpr::from_rel_node(plan_filter.cond().into_rel_node()).unwrap();
assert!(matches!(plan_filter_expr.op_type(), LogOpType::And));
let op_0 = BinOpExpr::from_rel_node(plan_filter_expr.children()[0].clone().into_rel_node())
.unwrap();
let col_0 =
ColumnRefExpr::from_rel_node(op_0.left_child().clone().into_rel_node()).unwrap();
assert_eq!(col_0.index(), 4);
let op_1 = BinOpExpr::from_rel_node(plan_filter_expr.children()[1].clone().into_rel_node())
.unwrap();
let col_1 =
ColumnRefExpr::from_rel_node(op_1.left_child().clone().into_rel_node()).unwrap();
assert_eq!(col_1.index(), 7);
}

#[test]
fn push_past_join_conjunction() {
// Test pushing a complex filter past a join, where one clause can
Expand Down
56 changes: 0 additions & 56 deletions optd-datafusion-repr/src/rules/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,59 +309,3 @@ fn apply_hash_join(
}
vec![]
}

// (Proj A) join B -> (Proj (A join B))
define_rule!(
ProjectionPullUpJoin,
apply_projection_pull_up_join,
(
Join(JoinType::Inner),
(Projection, left, [list]),
right,
[cond]
)
);

fn apply_projection_pull_up_join(
optimizer: &impl Optimizer<OptRelNodeTyp>,
ProjectionPullUpJoinPicks {
left,
right,
list,
cond,
}: ProjectionPullUpJoinPicks,
) -> Vec<RelNode<OptRelNodeTyp>> {
let left = Arc::new(left.clone());
let right = Arc::new(right.clone());

let list = ExprList::from_rel_node(Arc::new(list)).unwrap();

let projection = LogicalProjection::new(PlanNode::from_group(left.clone()), list.clone());

let Some(mapping) = LogicalProjection::compute_column_mapping(&projection.exprs()) else {
return vec![];
};

// TODO(chi): support capture projection node.
let left_schema = optimizer.get_property::<SchemaPropertyBuilder>(left.clone(), 0);
let right_schema = optimizer.get_property::<SchemaPropertyBuilder>(right.clone(), 0);
let mut new_projection_exprs = list.to_vec();
for i in 0..right_schema.len() {
let col: Expr = ColumnRefExpr::new(i + left_schema.len()).into_expr();
new_projection_exprs.push(col);
}
let node = LogicalProjection::new(
LogicalJoin::new(
PlanNode::from_group(left),
PlanNode::from_group(right),
mapping.rewrite_condition(
Expr::from_rel_node(Arc::new(cond)).unwrap(),
left_schema.len(),
),
JoinType::Inner,
)
.into_plan_node(),
ExprList::new(new_projection_exprs),
);
vec![node.into_rel_node().as_ref().clone()]
}
4 changes: 4 additions & 0 deletions optd-datafusion-repr/src/rules/project_transpose.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod project_filter_transpose;
pub mod project_join_transpose;
pub mod project_merge;
pub mod project_transpose_common;
Loading
Loading