Skip to content

Commit

Permalink
feat: implment column ref as logical property (#66)
Browse files Browse the repository at this point in the history
Derive base table column ref for logical nodes, so that we can know
which columns the `ColumnRef` objects in a predicate refers to.

Tested manually on the two demos. Not sure how to unit test it.
  • Loading branch information
Gun9niR authored Feb 13, 2024
1 parent 2d91160 commit f5e8f10
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 12 deletions.
2 changes: 1 addition & 1 deletion datafusion-optd-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub async fn main() -> Result<()> {
state = state.with_physical_optimizer_rules(vec![]);
}
// use optd-bridge query planner
let optimizer = DatafusionOptimizer::new_physical(Box::new(DatafusionCatalog::new(
let optimizer = DatafusionOptimizer::new_physical(Arc::new(DatafusionCatalog::new(
state.catalog_list(),
)));
state = state.with_query_planner(Arc::new(OptdQueryPlanner::new(optimizer)));
Expand Down
4 changes: 2 additions & 2 deletions optd-adaptive-demo/src/bin/optd-adaptive-three-join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn main() -> Result<()> {
let runtime_env = RuntimeEnv::new(rn_config.clone())?;
let mut state =
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
let mut optimizer: DatafusionOptimizer = DatafusionOptimizer::new_physical(Box::new(
let mut optimizer: DatafusionOptimizer = DatafusionOptimizer::new_physical(Arc::new(
DatafusionCatalog::new(state.catalog_list()),
));
optimizer.optd_optimizer_mut().prop.partial_explore_iter = None;
Expand All @@ -45,7 +45,7 @@ async fn main() -> Result<()> {
let mut state =
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
let mut optimizer: DatafusionOptimizer =
DatafusionOptimizer::new_alternative_physical_for_demo(Box::new(
DatafusionOptimizer::new_alternative_physical_for_demo(Arc::new(
DatafusionCatalog::new(state.catalog_list()),
));
optimizer.optd_optimizer_mut().prop.partial_explore_iter = None;
Expand Down
2 changes: 1 addition & 1 deletion optd-adaptive-demo/src/bin/optd-adaptive-tpch-q8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() -> Result<()> {
let mut ctx = {
let mut state =
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
let optimizer = DatafusionOptimizer::new_physical(Box::new(DatafusionCatalog::new(
let optimizer = DatafusionOptimizer::new_physical(Arc::new(DatafusionCatalog::new(
state.catalog_list(),
)));
// clean up optimizer rules so that we can plug in our own optimizer
Expand Down
19 changes: 14 additions & 5 deletions optd-datafusion-repr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use anyhow::Result;
use cost::{AdaptiveCostModel, RuntimeAdaptionStorage};
use optd_core::cascades::{CascadesOptimizer, GroupId, OptimizerProperties};
use plan_nodes::{OptRelNode, OptRelNodeRef, OptRelNodeTyp, PlanNode};
use properties::schema::{Catalog, SchemaPropertyBuilder};
use properties::{
column_ref::ColumnRefPropertyBuilder,
schema::{Catalog, SchemaPropertyBuilder},
};
use rules::{
EliminateFilterRule, EliminateJoinRule, HashJoinRule, JoinAssocRule, JoinCommuteRule,
PhysicalConversionRule, ProjectionPullUpJoin,
Expand Down Expand Up @@ -41,7 +44,7 @@ impl DatafusionOptimizer {
}

/// Create an optimizer with default settings: adaptive + partial explore.
pub fn new_physical(catalog: Box<dyn Catalog>) -> Self {
pub fn new_physical(catalog: Arc<dyn Catalog>) -> Self {
let mut rules = PhysicalConversionRule::all_conversions();
rules.push(Arc::new(HashJoinRule::new()));
rules.push(Arc::new(JoinCommuteRule::new()));
Expand All @@ -56,7 +59,10 @@ impl DatafusionOptimizer {
optimizer: CascadesOptimizer::new_with_prop(
rules,
Box::new(cost_model),
vec![Box::new(SchemaPropertyBuilder::new(catalog))],
vec![
Box::new(SchemaPropertyBuilder::new(catalog.clone())),
Box::new(ColumnRefPropertyBuilder::new(catalog)),
],
OptimizerProperties {
partial_explore_iter: Some(1 << 20),
partial_explore_space: Some(1 << 10),
Expand All @@ -67,7 +73,7 @@ impl DatafusionOptimizer {
}

/// The optimizer settings for three-join demo as a perfect optimizer.
pub fn new_alternative_physical_for_demo(catalog: Box<dyn Catalog>) -> Self {
pub fn new_alternative_physical_for_demo(catalog: Arc<dyn Catalog>) -> Self {
let mut rules = PhysicalConversionRule::all_conversions();
rules.push(Arc::new(HashJoinRule::new()));
rules.insert(0, Arc::new(JoinCommuteRule::new()));
Expand All @@ -80,7 +86,10 @@ impl DatafusionOptimizer {
let optimizer = CascadesOptimizer::new(
rules,
Box::new(cost_model),
vec![Box::new(SchemaPropertyBuilder::new(catalog))],
vec![
Box::new(SchemaPropertyBuilder::new(catalog.clone())),
Box::new(ColumnRefPropertyBuilder::new(catalog)),
],
);
Self {
runtime_statistics,
Expand Down
1 change: 1 addition & 0 deletions optd-datafusion-repr/src/properties.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod column_ref;
pub mod schema;
116 changes: 116 additions & 0 deletions optd-datafusion-repr/src/properties/column_ref.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use std::{ops::Deref, sync::Arc};

use optd_core::property::PropertyBuilder;

use crate::plan_nodes::OptRelNodeTyp;

use super::schema::Catalog;

#[derive(Clone, Debug)]
pub enum ColumnRef {
BaseTableColumnRef { table: String, col_idx: usize },
ChildColumnRef { col_idx: usize },
Derived,
}

pub type GroupColumnRefs = Vec<ColumnRef>;

pub struct ColumnRefPropertyBuilder {
catalog: Arc<dyn Catalog>,
}

impl ColumnRefPropertyBuilder {
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
Self { catalog }
}

fn concat_children_properties(children: &[&GroupColumnRefs]) -> GroupColumnRefs {
children.iter().flat_map(Deref::deref).cloned().collect()
}
}

impl PropertyBuilder<OptRelNodeTyp> for ColumnRefPropertyBuilder {
type Prop = GroupColumnRefs;

fn derive(
&self,
typ: OptRelNodeTyp,
data: Option<optd_core::rel_node::Value>,
children: &[&Self::Prop],
) -> Self::Prop {
match typ {
// Should account for PhysicalScan.
OptRelNodeTyp::Scan => {
let table_name = data.unwrap().as_str().to_string();
let schema = self.catalog.get(&table_name);
let column_cnt = schema.fields.len();
(0..column_cnt)
.map(|i| ColumnRef::BaseTableColumnRef {
table: table_name.clone(),
col_idx: i,
})
.collect()
}
OptRelNodeTyp::ColumnRef => {
let col_idx = data.unwrap().as_i64();
vec![ColumnRef::ChildColumnRef {
col_idx: col_idx as usize,
}]
}
OptRelNodeTyp::List => {
// Concatentate the children properties.
Self::concat_children_properties(children)
}
OptRelNodeTyp::Projection => children[1]
.iter()
.map(|p| match p {
ColumnRef::ChildColumnRef { col_idx } => children[0][*col_idx].clone(),
ColumnRef::Derived => ColumnRef::Derived,
_ => panic!("projection expr must be Derived or ChildColumnRef"),
})
.collect(),
// Should account for all physical join types.
OptRelNodeTyp::Join(_) => {
// Concatenate left and right children properties.
Self::concat_children_properties(&children[0..2])
}
OptRelNodeTyp::Agg => {
// Group by columns first.
let mut group_by_col_refs: Vec<_> = children[2]
.iter()
.map(|p| {
let col_idx = match p {
ColumnRef::ChildColumnRef { col_idx } => *col_idx,
_ => panic!("group by expr must be ColumnRef"),
};
children[0][col_idx].clone()
})
.collect();
// Then the aggregate expressions. These columns, (e.g. SUM, COUNT, etc.) are derived columns.
let agg_expr_cnt = children[1].len();
group_by_col_refs.extend((0..agg_expr_cnt).map(|_| ColumnRef::Derived));
group_by_col_refs
}
OptRelNodeTyp::Filter
| OptRelNodeTyp::Sort
| OptRelNodeTyp::Limit
| OptRelNodeTyp::SortOrder(_) => children[0].clone(),
OptRelNodeTyp::Cast => {
// FIXME: we just assume the column value does not change.
children[0].clone()
}
OptRelNodeTyp::Constant(_)
| OptRelNodeTyp::Func(_)
| OptRelNodeTyp::BinOp(_)
| OptRelNodeTyp::Between
| OptRelNodeTyp::EmptyRelation => {
vec![ColumnRef::Derived]
}
_ => unimplemented!("Unsupported rel node type {:?}", typ),
}
}

fn property_name(&self) -> &'static str {
"column_ref"
}
}
6 changes: 4 additions & 2 deletions optd-datafusion-repr/src/properties/schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use optd_core::property::PropertyBuilder;

use crate::plan_nodes::{ConstantType, OptRelNodeTyp};
Expand Down Expand Up @@ -28,11 +30,11 @@ pub trait Catalog: Send + Sync + 'static {
}

pub struct SchemaPropertyBuilder {
catalog: Box<dyn Catalog>,
catalog: Arc<dyn Catalog>,
}

impl SchemaPropertyBuilder {
pub fn new(catalog: Box<dyn Catalog>) -> Self {
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
Self { catalog }
}
}
Expand Down
2 changes: 1 addition & 1 deletion optd-sqlplannertest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl DatafusionDb {
let ctx = {
let mut state =
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
let optimizer = DatafusionOptimizer::new_physical(Box::new(DatafusionCatalog::new(
let optimizer = DatafusionOptimizer::new_physical(Arc::new(DatafusionCatalog::new(
state.catalog_list(),
)));
if !with_logical {
Expand Down

0 comments on commit f5e8f10

Please sign in to comment.