Skip to content

Commit

Permalink
feat: Eliminate Duplicated Expr Rule (#67)
Browse files Browse the repository at this point in the history
# Major Changes
Add an eliminate duplicate expression rule which:
1. Removes duplicate sort expressions
2. Removes duplicate aggregate group bys

Also, this PR adds derive traits for Hash, PartialEq, and Eq for
RelNode.

Examples:
`select * from t1 order by id, name, id desc, id asc, name desc` becomes
`select * from t1 order by id, name`

`select * from t1 group by id, name, id` becomes 
`select * from t1 group by id, name`

## Rule Type
Heuristics (always apply), Transformation Rule (logical to logical)
  • Loading branch information
Sweetsuro authored Feb 16, 2024
1 parent e846c58 commit d34c22c
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 3 deletions.
2 changes: 1 addition & 1 deletion optd-core/src/rel_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl Value {
}

/// A RelNode is consisted of a plan node type and some children.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct RelNode<T: RelNodeTyp> {
pub typ: T,
pub children: Vec<RelNodeRef<T>>,
Expand Down
7 changes: 5 additions & 2 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use properties::{
schema::{Catalog, SchemaPropertyBuilder},
};
use rules::{
EliminateFilterRule, EliminateJoinRule, EliminateLimitRule, HashJoinRule, JoinAssocRule,
JoinCommuteRule, PhysicalConversionRule, ProjectionPullUpJoin,
EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule, EliminateFilterRule,
EliminateJoinRule, EliminateLimitRule, HashJoinRule, JoinAssocRule, JoinCommuteRule,
PhysicalConversionRule, ProjectionPullUpJoin,
};

pub use adaptive::PhysicalCollector;
Expand Down Expand Up @@ -53,6 +54,8 @@ impl DatafusionOptimizer {
rules.push(Arc::new(EliminateJoinRule::new()));
rules.push(Arc::new(EliminateFilterRule::new()));
rules.push(Arc::new(EliminateLimitRule::new()));
rules.push(Arc::new(EliminateDuplicatedSortExprRule::new()));
rules.push(Arc::new(EliminateDuplicatedAggExprRule::new()));

let cost_model = AdaptiveCostModel::new(50);
Self {
Expand Down
4 changes: 4 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl ExprList {
.map(|x| Expr::from_rel_node(x.clone()).unwrap())
.collect_vec()
}

pub fn from_group(rel_node: OptRelNodeRef) -> Self {
Self(rel_node)
}
}

impl OptRelNode for ExprList {
Expand Down
4 changes: 4 additions & 0 deletions optd-datafusion-repr/src/plan_nodes/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use super::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
#[derive(Clone, Debug)]
pub struct LogicalSort(pub PlanNode);

// each expression in ExprList is represented as a SortOrderExpr
// 1. nulls_first is not included from DF
// 2. node type defines sort order per expression
// 3. actual expr is stored as a child of this node
define_plan_node!(
LogicalSort : PlanNode,
Sort, [
Expand Down
4 changes: 4 additions & 0 deletions optd-datafusion-repr/src/rules.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// mod filter_join;
mod eliminate_duplicated_expr;
mod eliminate_filter;
mod eliminate_limit;
mod joins;
mod macros;
mod physical;

// pub use filter_join::FilterJoinPullUpRule;
pub use eliminate_duplicated_expr::{
EliminateDuplicatedAggExprRule, EliminateDuplicatedSortExprRule,
};
pub use eliminate_filter::EliminateFilterRule;
pub use eliminate_limit::EliminateLimitRule;
pub use joins::{
Expand Down
117 changes: 117 additions & 0 deletions optd-datafusion-repr/src/rules/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use itertools::Itertools;
use optd_core::rules::{Rule, RuleMatcher};
use optd_core::{optimizer::Optimizer, rel_node::RelNode};

use crate::plan_nodes::{
Expr, ExprList, LogicalAgg, LogicalSort, OptRelNode, OptRelNodeTyp, PlanNode, SortOrderExpr,
SortOrderType,
};

use super::macros::define_rule;

define_rule!(
EliminateDuplicatedSortExprRule,
apply_eliminate_duplicated_sort_expr,
(Sort, child, [exprs])
);

/// Removes duplicate sort expressions
/// For exmaple:
/// select *
/// from t1
/// order by id desc, id, name, id asc
/// becomes
/// select *
/// from t1
/// order by id desc, name
fn apply_eliminate_duplicated_sort_expr(
_optimizer: &impl Optimizer<OptRelNodeTyp>,
EliminateDuplicatedSortExprRulePicks { child, exprs }: EliminateDuplicatedSortExprRulePicks,
) -> Vec<RelNode<OptRelNodeTyp>> {
let sort_keys: Vec<Expr> = exprs
.children
.iter()
.map(|x| Expr::from_rel_node(x.clone()).unwrap())
.collect_vec();

let normalized_sort_keys: Vec<Arc<RelNode<OptRelNodeTyp>>> = exprs
.children
.iter()
.map(|x| match x.typ {
OptRelNodeTyp::SortOrder(_) => SortOrderExpr::new(
SortOrderType::Asc,
SortOrderExpr::from_rel_node(x.clone()).unwrap().child(),
)
.into_rel_node(),
_ => x.clone(),
})
.collect_vec();

let mut dedup_expr: Vec<Expr> = Vec::new();
let mut dedup_set: HashSet<Arc<RelNode<OptRelNodeTyp>>> = HashSet::new();

sort_keys
.iter()
.zip(normalized_sort_keys.iter())
.for_each(|(expr, normalized_expr)| {
if !dedup_set.contains(normalized_expr) {
dedup_expr.push(expr.clone());
dedup_set.insert(normalized_expr.to_owned());
}
});

if dedup_expr.len() != sort_keys.len() {
let node = LogicalSort::new(
PlanNode::from_group(child.into()),
ExprList::new(dedup_expr),
);
return vec![node.into_rel_node().as_ref().clone()];
}
vec![]
}

define_rule!(
EliminateDuplicatedAggExprRule,
apply_eliminate_duplicated_agg_expr,
(Agg, child, exprs, [groups])
);

/// Removes duplicate group by expressions
/// For exmaple:
/// select *
/// from t1
/// group by id, name, id, id
/// becomes
/// select *
/// from t1
/// group by id, name
fn apply_eliminate_duplicated_agg_expr(
_optimizer: &impl Optimizer<OptRelNodeTyp>,
EliminateDuplicatedAggExprRulePicks {
child,
exprs,
groups,
}: EliminateDuplicatedAggExprRulePicks,
) -> Vec<RelNode<OptRelNodeTyp>> {
let mut dedup_expr: Vec<Expr> = Vec::new();
let mut dedup_set: HashSet<Arc<RelNode<OptRelNodeTyp>>> = HashSet::new();
groups.children.iter().for_each(|expr| {
if !dedup_set.contains(expr) {
dedup_expr.push(Expr::from_rel_node(expr.clone()).unwrap());
dedup_set.insert(expr.clone());
}
});

if dedup_expr.len() != groups.children.len() {
let node = LogicalAgg::new(
PlanNode::from_group(child.into()),
ExprList::from_group(exprs.into()),
ExprList::new(dedup_expr),
);
return vec![node.into_rel_node().as_ref().clone()];
}
vec![]
}
108 changes: 108 additions & 0 deletions optd-sqlplannertest/tests/eliminate_duplicated_expr.planner.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
-- (no id or description)
create table t1(v1 int, v2 int);
insert into t1 values (0, 0), (1, 1), (5, 2), (2, 4), (0, 2);

/*
5
*/

-- Test without sorts/aggs.
select * from t1;

/*
LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalScan { table: t1 }
PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
0 0
1 1
5 2
2 4
0 2
*/

-- Test whether the optimizer handles duplicate sort expressions correctly.
select * from t1 order by v1, v2, v1 desc, v2 desc, v1 asc;

/*
LogicalSort
├── exprs:
│ ┌── SortOrder { order: Asc }
│ │ └── #0
│ ├── SortOrder { order: Asc }
│ │ └── #1
│ ├── SortOrder { order: Desc }
│ │ └── #0
│ ├── SortOrder { order: Desc }
│ │ └── #1
│ └── SortOrder { order: Asc }
│ └── #0
└── LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalScan { table: t1 }
PhysicalSort
├── exprs:
│ ┌── SortOrder { order: Asc }
│ │ └── #0
│ └── SortOrder { order: Asc }
│ └── #1
└── PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
0 0
0 2
1 1
2 4
5 2
*/

-- Test whether the optimizer handles duplicate agg expressions correctly.
select * from t1 group by v1, v2, v1;

/*
LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalAgg { exprs: [], groups: [ #0, #1, #0 ] }
└── LogicalScan { table: t1 }
PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
0 0
1 1
5 2
2 4
0 2
*/

-- Test whether the optimizer handles duplicate sort and agg expressions correctly.
select * from t1 group by v1, v2, v1, v2, v2 order by v1, v2, v1 desc, v2 desc, v1 asc;

/*
LogicalSort
├── exprs:
│ ┌── SortOrder { order: Asc }
│ │ └── #0
│ ├── SortOrder { order: Asc }
│ │ └── #1
│ ├── SortOrder { order: Desc }
│ │ └── #0
│ ├── SortOrder { order: Desc }
│ │ └── #1
│ └── SortOrder { order: Asc }
│ └── #0
└── LogicalProjection { exprs: [ #0, #1 ] }
└── LogicalAgg { exprs: [], groups: [ #0, #1, #0, #1, #1 ] }
└── LogicalScan { table: t1 }
PhysicalSort
├── exprs:
│ ┌── SortOrder { order: Asc }
│ │ └── #0
│ └── SortOrder { order: Asc }
│ └── #1
└── PhysicalProjection { exprs: [ #0, #1 ] }
└── PhysicalAgg { aggrs: [], groups: [ #0, #1 ] }
└── PhysicalScan { table: t1 }
0 0
0 2
1 1
2 4
5 2
*/

29 changes: 29 additions & 0 deletions optd-sqlplannertest/tests/eliminate_duplicated_expr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
- sql: |
create table t1(v1 int, v2 int);
insert into t1 values (0, 0), (1, 1), (5, 2), (2, 4), (0, 2);
tasks:
- execute
- sql: |
select * from t1;
desc: Test without sorts/aggs.
tasks:
- explain:logical_optd,physical_optd
- execute
- sql: |
select * from t1 order by v1, v2, v1 desc, v2 desc, v1 asc;
desc: Test whether the optimizer handles duplicate sort expressions correctly.
tasks:
- explain:logical_optd,physical_optd
- execute
- sql: |
select * from t1 group by v1, v2, v1;
desc: Test whether the optimizer handles duplicate agg expressions correctly.
tasks:
- explain:logical_optd,physical_optd
- execute
- sql: |
select * from t1 group by v1, v2, v1, v2, v2 order by v1, v2, v1 desc, v2 desc, v1 asc;
desc: Test whether the optimizer handles duplicate sort and agg expressions correctly.
tasks:
- explain:logical_optd,physical_optd
- execute

0 comments on commit d34c22c

Please sign in to comment.