diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 84bb4ca656..5bba024c5b 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -76,7 +76,6 @@ def file_read( Yield a plan to read those filenames. """ - materializations: deque[SingleOutputPartitionTask[PartitionT]] = deque() output_partition_index = 0 diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index 2f6a3a41cd..fe65687549 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -22,7 +22,11 @@ def tabular_scan( - schema: PySchema, file_info_table: PyTable, file_format_config: FileFormatConfig, limit: int + schema: PySchema, + columns_to_read: list[str], + file_info_table: PyTable, + file_format_config: FileFormatConfig, + limit: int, ) -> physical_plan.InProgressPhysicalPlan[PartitionT]: # TODO(Clark): Fix this Ray runner hack. part = Table._from_pytable(file_info_table) @@ -36,8 +40,15 @@ def tabular_scan( file_info_iter = physical_plan.partition_read(iter(parts_t)) filepaths_column_name = get_context().runner().runner_io().FS_LISTING_PATH_COLUMN_NAME + pyschema = Schema._from_pyschema(schema) return physical_plan.file_read( - file_info_iter, limit, Schema._from_pyschema(schema), None, None, file_format_config, filepaths_column_name + child_plan=file_info_iter, + limit_rows=limit, + schema=pyschema, + fs=None, + columns_to_read=columns_to_read, + file_format_config=file_format_config, + filepaths_column_name=filepaths_column_name, ) diff --git a/src/daft-core/src/schema.rs b/src/daft-core/src/schema.rs index a300e2b0c6..24e96f2c5e 100644 --- a/src/daft-core/src/schema.rs +++ b/src/daft-core/src/schema.rs @@ -133,29 +133,31 @@ impl Eq for Schema {} impl Hash for Schema { fn hash(&self, state: &mut H) { - // Must preserve x == y --> hash(x) == hash(y). - // Since IndexMap implements order-independent equality, we must implement an order-independent hashing scheme. - // We achieve this by combining the hashes of key-value pairs with an associative + commutative operation so - // order does not matter, i.e. (u64, *, 0) must form a commutative monoid. This is satisfied by * = u64::wrapping_add. - // - // Moreover, the hashing of each individual element must be independent of the hashing of other elements, so we hash - // each element with a fresh state (hasher). - // - // NOTE: This is a relatively weak hash function, but should be fine for our current hashing use case, which is detecting - // logical optimization cycles in the optimizer. - state.write_u64( - self.fields - .iter() - .map(|kv| { - let mut h = DefaultHasher::new(); - kv.hash(&mut h); - h.finish() - }) - .fold(0, u64::wrapping_add), - ) + state.write_u64(hash_index_map(&self.fields)) } } +pub fn hash_index_map(indexmap: &indexmap::IndexMap) -> u64 { + // Must preserve x == y --> hash(x) == hash(y). + // Since IndexMap implements order-independent equality, we must implement an order-independent hashing scheme. + // We achieve this by combining the hashes of key-value pairs with an associative + commutative operation so + // order does not matter, i.e. (u64, *, 0) must form a commutative monoid. This is satisfied by * = u64::wrapping_add. + // + // Moreover, the hashing of each individual element must be independent of the hashing of other elements, so we hash + // each element with a fresh state (hasher). + // + // NOTE: This is a relatively weak hash function, but should be fine for our current hashing use case, which is detecting + // logical optimization cycles in the optimizer. + indexmap + .iter() + .map(|kv| { + let mut h = DefaultHasher::new(); + kv.hash(&mut h); + h.finish() + }) + .fold(0, u64::wrapping_add) +} + impl Default for Schema { fn default() -> Self { Self::empty() diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 5b27a0157d..901e1daa4c 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -43,6 +43,7 @@ impl LogicalPlanBuilder { partition_spec: &PartitionSpec, ) -> PyResult { let source_info = SourceInfo::InMemoryInfo(InMemoryInfo::new( + schema.schema.clone(), partition_key.into(), cache_entry.to_object(cache_entry.py()), )); @@ -225,7 +226,7 @@ impl LogicalPlanBuilder { ))); } let logical_plan: LogicalPlan = - ops::Concat::new(other.plan.clone(), self.plan.clone()).into(); + ops::Concat::new(self.plan.clone(), other.plan.clone()).into(); Ok(logical_plan.into()) } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 4c3a08fd80..c08ef4e9e0 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -2,6 +2,8 @@ use std::{cmp::max, num::NonZeroUsize, sync::Arc}; use common_error::DaftError; use daft_core::schema::SchemaRef; +use daft_dsl::{optimization::get_required_columns, Expr}; +use indexmap::IndexSet; use snafu::Snafu; use crate::{display::TreeDisplay, ops::*, PartitionScheme, PartitionSpec}; @@ -26,7 +28,7 @@ pub enum LogicalPlan { impl LogicalPlan { pub fn schema(&self) -> SchemaRef { match self { - Self::Source(Source { schema, .. }) => schema.clone(), + Self::Source(Source { output_schema, .. }) => output_schema.clone(), Self::Project(Project { projected_schema, .. }) => projected_schema.clone(), @@ -46,6 +48,80 @@ impl LogicalPlan { } } + pub fn required_columns(&self) -> Vec> { + // TODO: https://github.com/Eventual-Inc/Daft/pull/1288#discussion_r1307820697 + match self { + Self::Limit(..) | Self::Coalesce(..) => vec![IndexSet::new()], + Self::Concat(..) => vec![IndexSet::new(), IndexSet::new()], + Self::Project(projection) => { + let res = projection + .projection + .iter() + .flat_map(get_required_columns) + .collect(); + vec![res] + } + Self::Filter(filter) => { + vec![get_required_columns(&filter.predicate) + .iter() + .cloned() + .collect()] + } + Self::Sort(sort) => { + let res = sort.sort_by.iter().flat_map(get_required_columns).collect(); + vec![res] + } + Self::Repartition(repartition) => { + let res = repartition + .partition_by + .iter() + .flat_map(get_required_columns) + .collect(); + vec![res] + } + Self::Explode(explode) => { + let res = explode + .to_explode + .iter() + .flat_map(get_required_columns) + .collect(); + vec![res] + } + Self::Distinct(distinct) => { + let res = distinct + .input + .schema() + .fields + .iter() + .map(|(name, _)| name) + .cloned() + .collect(); + vec![res] + } + Self::Aggregate(aggregate) => { + let res = aggregate + .aggregations + .iter() + .map(|agg| get_required_columns(&Expr::Agg(agg.clone()))) + .chain(aggregate.groupby.iter().map(get_required_columns)) + .flatten() + .collect(); + vec![res] + } + Self::Join(join) => { + let left = join.left_on.iter().flat_map(get_required_columns).collect(); + let right = join + .right_on + .iter() + .flat_map(get_required_columns) + .collect(); + vec![left, right] + } + Self::Source(_) => todo!(), + Self::Sink(_) => todo!(), + } + } + pub fn partition_spec(&self) -> Arc { match self { Self::Source(Source { partition_spec, .. }) => partition_spec.clone(), @@ -147,7 +223,7 @@ impl LogicalPlan { }, [input1, input2] => match self { Self::Source(_) => panic!("Source nodes don't have children, with_new_children() should never be called for Source ops"), - Self::Concat(_) => Self::Concat(Concat::new(input2.clone(), input1.clone())), + Self::Concat(_) => Self::Concat(Concat::new(input1.clone(), input2.clone())), Self::Join(Join { left_on, right_on, join_type, .. }) => Self::Join(Join::try_new(input1.clone(), input2.clone(), left_on.clone(), right_on.clone(), *join_type).unwrap()), _ => panic!("Logical op {} has one input, but got two", self), }, diff --git a/src/daft-plan/src/ops/concat.rs b/src/daft-plan/src/ops/concat.rs index dc05584293..0b69aedfdf 100644 --- a/src/daft-plan/src/ops/concat.rs +++ b/src/daft-plan/src/ops/concat.rs @@ -4,13 +4,13 @@ use crate::LogicalPlan; #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub struct Concat { - pub other: Arc, - // Upstream node. + // Upstream nodes. pub input: Arc, + pub other: Arc, } impl Concat { - pub(crate) fn new(other: Arc, input: Arc) -> Self { - Self { other, input } + pub(crate) fn new(input: Arc, other: Arc) -> Self { + Self { input, other } } } diff --git a/src/daft-plan/src/ops/join.rs b/src/daft-plan/src/ops/join.rs index 3584c900e5..a3c8e538b8 100644 --- a/src/daft-plan/src/ops/join.rs +++ b/src/daft-plan/src/ops/join.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, sync::Arc}; -use daft_core::schema::{Schema, SchemaRef}; +use daft_core::schema::{hash_index_map, Schema, SchemaRef}; use daft_dsl::Expr; use snafu::ResultExt; @@ -9,7 +9,7 @@ use crate::{ JoinType, LogicalPlan, }; -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Join { // Upstream nodes. pub input: Arc, @@ -17,8 +17,24 @@ pub struct Join { pub left_on: Vec, pub right_on: Vec, - pub output_schema: SchemaRef, pub join_type: JoinType, + pub output_schema: SchemaRef, + + // Joins may rename columns from the right input; this struct tracks those renames. + // Output name -> Original name + pub right_input_mapping: indexmap::IndexMap, +} + +impl std::hash::Hash for Join { + fn hash(&self, state: &mut H) { + std::hash::Hash::hash(&self.input, state); + std::hash::Hash::hash(&self.right, state); + std::hash::Hash::hash(&self.left_on, state); + std::hash::Hash::hash(&self.right_on, state); + std::hash::Hash::hash(&self.join_type, state); + std::hash::Hash::hash(&self.output_schema, state); + state.write_u64(hash_index_map(&self.right_input_mapping)) + } } impl Join { @@ -29,6 +45,7 @@ impl Join { right_on: Vec, join_type: JoinType, ) -> logical_plan::Result { + let mut right_input_mapping = indexmap::IndexMap::new(); // Schema inference ported from existing behaviour for parity, // but contains bug https://github.com/Eventual-Inc/Daft/issues/1294 let output_schema = { @@ -44,11 +61,14 @@ impl Join { .cloned() .chain(right.schema().fields.iter().filter_map(|(rname, rfield)| { if left_join_keys.contains(rname.as_str()) { + right_input_mapping.insert(rname.clone(), rname.clone()); None } else if left_schema.contains_key(rname) { let new_name = format!("right.{}", rname); + right_input_mapping.insert(new_name.clone(), rname.clone()); Some(rfield.rename(new_name)) } else { + right_input_mapping.insert(rname.clone(), rname.clone()); Some(rfield.clone()) } })) @@ -60,8 +80,9 @@ impl Join { right, left_on, right_on, - output_schema, join_type, + output_schema, + right_input_mapping, }) } diff --git a/src/daft-plan/src/ops/source.rs b/src/daft-plan/src/ops/source.rs index 9dddcc04bd..c0feae5585 100644 --- a/src/daft-plan/src/ops/source.rs +++ b/src/daft-plan/src/ops/source.rs @@ -12,7 +12,7 @@ use crate::{ pub struct Source { /// The schema of the output of this node (the source data schema). /// May be a subset of the source data schema; executors should push down this projection if possible. - pub schema: SchemaRef, + pub output_schema: SchemaRef, /// Information about the source data location. pub source_info: Arc, @@ -27,12 +27,12 @@ pub struct Source { impl Source { pub(crate) fn new( - schema: SchemaRef, + output_schema: SchemaRef, source_info: Arc, partition_spec: Arc, ) -> Self { Self { - schema, + output_schema, source_info, partition_spec, filters: vec![], // Will be populated by plan optimizer. @@ -42,7 +42,7 @@ impl Source { pub fn with_limit(&self, limit: Option) -> Self { Self { - schema: self.schema.clone(), + output_schema: self.output_schema.clone(), source_info: self.source_info.clone(), partition_spec: self.partition_spec.clone(), filters: self.filters.clone(), @@ -52,7 +52,7 @@ impl Source { pub fn with_filters(&self, filters: Vec) -> Self { Self { - schema: self.schema.clone(), + output_schema: self.output_schema.clone(), source_info: self.source_info.clone(), partition_spec: self.partition_spec.clone(), filters, @@ -65,7 +65,7 @@ impl Source { match self.source_info.as_ref() { SourceInfo::ExternalInfo(ExternalInfo { - schema, + source_schema, file_info, file_format_config, }) => { @@ -73,13 +73,16 @@ impl Source { for fp in file_info.file_paths.iter() { res.push(format!("File paths = {}", fp)); } - res.push(format!("File schema = {}", schema.short_string())); + res.push(format!("File schema = {}", source_schema.short_string())); res.push(format!("Format-specific config = {:?}", file_format_config)); } #[cfg(feature = "python")] SourceInfo::InMemoryInfo(_) => {} } - res.push(format!("Output schema = {}", self.schema.short_string())); + res.push(format!( + "Output schema = {}", + self.output_schema.short_string() + )); if !self.filters.is_empty() { res.push(format!("Filters = {:?}", self.filters)); } diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index bb1a133e34..c09fd184b0 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -6,7 +6,9 @@ use crate::LogicalPlan; use super::{ logical_plan_tracker::LogicalPlanTracker, - rules::{ApplyOrder, OptimizerRule, PushDownFilter, PushDownLimit, Transformed}, + rules::{ + ApplyOrder, OptimizerRule, PushDownFilter, PushDownLimit, PushDownProjection, Transformed, + }, }; /// Config for optimizer. @@ -106,6 +108,7 @@ impl Optimizer { let rule_batches: Vec = vec![RuleBatch::new( vec![ Box::new(PushDownFilter::new()), + Box::new(PushDownProjection::new()), Box::new(PushDownLimit::new()), ], RuleExecutionStrategy::Once, @@ -238,7 +241,7 @@ impl Optimizer { ) -> DaftResult>> { // Fold over the rules, applying each rule to this plan node sequentially. rules.iter().try_fold(Transformed::No(plan), |plan, rule| { - rule.try_optimize(plan.unwrap().clone()) + Ok(rule.try_optimize(plan.unwrap().clone())?.or(plan)) }) } diff --git a/src/daft-plan/src/optimization/rules/mod.rs b/src/daft-plan/src/optimization/rules/mod.rs index ea3fe266e9..6e3aeb93f4 100644 --- a/src/daft-plan/src/optimization/rules/mod.rs +++ b/src/daft-plan/src/optimization/rules/mod.rs @@ -1,8 +1,10 @@ mod push_down_filter; mod push_down_limit; +mod push_down_projection; mod rule; mod utils; pub use push_down_filter::PushDownFilter; pub use push_down_limit::PushDownLimit; +pub use push_down_projection::PushDownProjection; pub use rule::{ApplyOrder, OptimizerRule, Transformed}; diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index b73d5be0c9..4b1d7ac445 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -147,7 +147,7 @@ impl OptimizerRule for PushDownFilter { let new_other: LogicalPlan = Filter::new(filter.predicate.clone(), other.clone()).into(); let new_concat: LogicalPlan = - Concat::new(new_other.into(), new_input.into()).into(); + Concat::new(new_input.into(), new_other.into()).into(); new_concat.into() } LogicalPlan::Join(child_join) => { @@ -418,7 +418,7 @@ mod tests { ]; let source1: LogicalPlan = dummy_scan_node(fields.clone()).into(); let source2: LogicalPlan = dummy_scan_node(fields).into(); - let concat: LogicalPlan = Concat::new(source2.into(), source1.into()).into(); + let concat: LogicalPlan = Concat::new(source1.into(), source2.into()).into(); let filter: LogicalPlan = Filter::new(col("a").lt(&lit(2)), concat.into()).into(); let expected = "\ Concat\ diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 001695d168..146a858cd8 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -151,9 +151,11 @@ mod tests { #[cfg(feature = "python")] fn limit_does_not_push_into_in_memory_source() -> DaftResult<()> { let py_obj = Python::with_gil(|py| py.None()); + let schema: Arc = Schema::new(vec![Field::new("a", DataType::Int64)])?.into(); let source: LogicalPlan = Source::new( - Schema::new(vec![Field::new("a", DataType::Int64)])?.into(), - SourceInfo::InMemoryInfo(InMemoryInfo::new("foo".to_string(), py_obj)).into(), + schema.clone(), + SourceInfo::InMemoryInfo(InMemoryInfo::new(schema.clone(), "foo".to_string(), py_obj)) + .into(), Default::default(), ) .into(); diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs new file mode 100644 index 0000000000..7be94b591a --- /dev/null +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -0,0 +1,402 @@ +use std::sync::Arc; + +use common_error::DaftResult; + +use daft_core::schema::Schema; +use daft_dsl::Expr; +use indexmap::IndexSet; + +use crate::{ + ops::{Aggregate, Project, Source}, + LogicalPlan, +}; + +use super::{ApplyOrder, OptimizerRule, Transformed}; + +#[derive(Default)] +pub struct PushDownProjection {} + +impl PushDownProjection { + pub fn new() -> Self { + Self {} + } + + fn try_optimize_project( + &self, + projection: &Project, + plan: Arc, + ) -> DaftResult>> { + let upstream_plan = &projection.input; + let upstream_schema = upstream_plan.schema(); + + // First, drop this projection if it is a no-op + // (selecting exactly all parent columns in the same order and nothing else). + let projection_is_noop = { + // Short circuit early if the projection length is different (obviously not a no-op). + upstream_schema.names().len() == projection.projection.len() + && projection + .projection + .iter() + .zip(upstream_schema.names().iter()) + .all(|(expr, upstream_col)| match expr { + Expr::Column(colname) => colname.as_ref() == upstream_col, + _ => false, + }) + }; + if projection_is_noop { + // Projection discarded but new root node has not been looked at; + // look at the new root node. + let new_plan = self + .try_optimize(upstream_plan.clone())? + .or(Transformed::Yes(upstream_plan.clone())); + return Ok(new_plan); + } + + match upstream_plan.as_ref() { + LogicalPlan::Source(source) => { + // Prune unnecessary columns directly from the source. + let [required_columns] = &plan.required_columns()[..] else { + panic!() + }; + if required_columns.len() < upstream_schema.names().len() { + let pruned_upstream_schema = upstream_schema + .fields + .iter() + .filter_map(|(name, field)| { + required_columns.contains(name).then(|| field.clone()) + }) + .collect::>(); + let schema = Schema::new(pruned_upstream_schema)?; + let new_source: LogicalPlan = Source::new( + schema.into(), + source.source_info.clone(), + source.partition_spec.clone(), + ) + .into(); + + let new_plan = plan.with_new_children(&[new_source.into()]); + // Retry optimization now that the upstream node is different. + let new_plan = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)); + Ok(new_plan) + } else { + Ok(Transformed::No(plan)) + } + } + LogicalPlan::Project(upstream_projection) => { + // Prune unnecessary columns from the child projection. + let required_columns = &plan.required_columns()[0]; + if required_columns.len() < upstream_schema.names().len() { + let pruned_upstream_projections = upstream_projection + .projection + .iter() + .filter_map(|e| { + required_columns + .contains(e.name().unwrap()) + .then(|| e.clone()) + }) + .collect::>(); + + let new_upstream: LogicalPlan = Project::try_new( + upstream_projection.input.clone(), + pruned_upstream_projections, + upstream_projection.resource_request.clone(), + )? + .into(); + + let new_plan = plan.with_new_children(&[new_upstream.into()]); + // Retry optimization now that the upstream node is different. + let new_plan = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)); + Ok(new_plan) + } else { + Ok(Transformed::No(plan)) + } + } + LogicalPlan::Aggregate(aggregate) => { + // Prune unnecessary columns from the child aggregate. + let required_columns = &plan.required_columns()[0]; + let pruned_aggregate_exprs = aggregate + .aggregations + .iter() + .filter_map(|e| { + required_columns + .contains(e.name().unwrap()) + .then(|| e.clone()) + }) + .collect::>(); + + if pruned_aggregate_exprs.len() < aggregate.aggregations.len() { + let new_upstream: LogicalPlan = Aggregate::try_new( + aggregate.input.clone(), + pruned_aggregate_exprs, + aggregate.groupby.clone(), + )? + .into(); + + let new_plan = plan.with_new_children(&[new_upstream.into()]); + // Retry optimization now that the upstream node is different. + let new_plan = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)); + Ok(new_plan) + } else { + Ok(Transformed::No(plan)) + } + } + LogicalPlan::Sort(..) + | LogicalPlan::Repartition(..) + | LogicalPlan::Coalesce(..) + | LogicalPlan::Limit(..) + | LogicalPlan::Filter(..) + | LogicalPlan::Explode(..) => { + // Get required columns from projection and upstream. + let combined_dependencies = plan + .required_columns() + .iter() + .flatten() + .chain(upstream_plan.required_columns().iter().flatten()) + .cloned() + .collect::>(); + + // Skip optimization if no columns would be pruned. + let grand_upstream_plan = upstream_plan.children()[0]; + let grand_upstream_columns = grand_upstream_plan.schema().names(); + if grand_upstream_columns.len() == combined_dependencies.len() { + return Ok(Transformed::No(plan)); + } + + let new_subprojection: LogicalPlan = { + let pushdown_column_exprs = combined_dependencies + .into_iter() + .map(|s| Expr::Column(s.into())) + .collect::>(); + + Project::try_new( + grand_upstream_plan.clone(), + pushdown_column_exprs, + Default::default(), + )? + .into() + }; + + let new_upstream = upstream_plan.with_new_children(&[new_subprojection.into()]); + let new_plan = plan.with_new_children(&[new_upstream]); + // Retry optimization now that the upstream node is different. + let new_plan = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)); + Ok(new_plan) + } + LogicalPlan::Concat(concat) => { + // Get required columns from projection and upstream. + let combined_dependencies = plan + .required_columns() + .iter() + .flatten() + .chain(upstream_plan.required_columns().iter().flatten()) + .cloned() + .collect::>(); + + // Skip optimization if no columns would be pruned. + let grand_upstream_plan = upstream_plan.children()[0]; + let grand_upstream_columns = grand_upstream_plan.schema().names(); + if grand_upstream_columns.len() == combined_dependencies.len() { + return Ok(Transformed::No(plan)); + } + + let pushdown_column_exprs = combined_dependencies + .into_iter() + .map(|s| Expr::Column(s.into())) + .collect::>(); + let new_left_subprojection: LogicalPlan = { + Project::try_new( + concat.input.clone(), + pushdown_column_exprs.clone(), + Default::default(), + )? + .into() + }; + let new_right_subprojection: LogicalPlan = { + Project::try_new( + concat.other.clone(), + pushdown_column_exprs.clone(), + Default::default(), + )? + .into() + }; + + let new_upstream = upstream_plan.with_new_children(&[ + new_left_subprojection.into(), + new_right_subprojection.into(), + ]); + let new_plan = plan.with_new_children(&[new_upstream]); + // Retry optimization now that the upstream node is different. + let new_plan = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)); + Ok(new_plan) + } + LogicalPlan::Join(join) => { + // Get required columns from projection and both upstreams. + let [projection_required_columns] = &plan.required_columns()[..] else { + panic!() + }; + let [left_dependencies, right_dependencies] = &upstream_plan.required_columns()[..] + else { + panic!() + }; + + let left_upstream_names = join + .input + .schema() + .names() + .iter() + .cloned() + .collect::>(); + let right_upstream_names = join + .right + .schema() + .names() + .iter() + .cloned() + .collect::>(); + + let right_combined_dependencies = projection_required_columns + .iter() + .filter_map(|colname| join.right_input_mapping.get(colname)) + .chain(right_dependencies.iter()) + .cloned() + .collect::>(); + + let left_combined_dependencies = projection_required_columns + .iter() + .filter_map(|colname| left_upstream_names.get(colname)) + .chain(left_dependencies.iter()) + // We also have to keep any name conflict columns referenced by the right side. + // E.g. if the user wants "right.c", left must also provide "c", or "right.c" disappears. + // This is mostly an artifact of https://github.com/Eventual-Inc/Daft/issues/1303 + .chain( + right_combined_dependencies + .iter() + .filter_map(|rname| left_upstream_names.get(rname)), + ) + .cloned() + .collect::>(); + + // For each upstream, see if a non-vacuous pushdown is possible. + let maybe_new_left_upstream: Option> = { + if left_combined_dependencies.len() < left_upstream_names.len() { + let pushdown_column_exprs = left_combined_dependencies + .into_iter() + .map(|s| Expr::Column(s.into())) + .collect::>(); + let new_project: LogicalPlan = Project::try_new( + join.input.clone(), + pushdown_column_exprs, + Default::default(), + )? + .into(); + Some(new_project.into()) + } else { + None + } + }; + + let maybe_new_right_upstream: Option> = { + if right_combined_dependencies.len() < right_upstream_names.len() { + let pushdown_column_exprs = right_combined_dependencies + .into_iter() + .map(|s| Expr::Column(s.into())) + .collect::>(); + let new_project: LogicalPlan = Project::try_new( + join.right.clone(), + pushdown_column_exprs, + Default::default(), + )? + .into(); + Some(new_project.into()) + } else { + None + } + }; + + // If either pushdown is possible, create a new Join node. + if maybe_new_left_upstream.is_some() || maybe_new_right_upstream.is_some() { + let new_left_upstream = maybe_new_left_upstream.unwrap_or(join.input.clone()); + let new_right_upstream = maybe_new_right_upstream.unwrap_or(join.right.clone()); + let new_join = + upstream_plan.with_new_children(&[new_left_upstream, new_right_upstream]); + let new_plan = plan.with_new_children(&[new_join]); + // Retry optimization now that the upstream node is different. + let new_plan = self + .try_optimize(new_plan.clone())? + .or(Transformed::Yes(new_plan)); + Ok(new_plan) + } else { + Ok(Transformed::No(plan)) + } + } + LogicalPlan::Distinct(_) => { + // Cannot push down past a Distinct, + // since Distinct implicitly requires all parent columns. + Ok(Transformed::No(plan)) + } + LogicalPlan::Sink(_) => { + panic!("Bad projection due to upstream sink node: {:?}", projection) + } + } + } + + fn try_optimize_aggregation( + &self, + aggregation: &Aggregate, + plan: Arc, + ) -> DaftResult>> { + // If this aggregation prunes columns from its upstream, + // then explicitly create a projection to do so. + let upstream_plan = &aggregation.input; + let upstream_schema = upstream_plan.schema(); + + let aggregation_required_cols = &plan.required_columns()[0]; + if aggregation_required_cols.len() < upstream_schema.names().len() { + let new_subprojection: LogicalPlan = { + let pushdown_column_exprs = aggregation_required_cols + .iter() + .map(|s| Expr::Column(s.clone().into())) + .collect::>(); + + Project::try_new( + upstream_plan.clone(), + pushdown_column_exprs, + Default::default(), + )? + .into() + }; + + let new_aggregation = plan.with_new_children(&[new_subprojection.into()]); + Ok(Transformed::Yes(new_aggregation)) + } else { + Ok(Transformed::No(plan)) + } + } +} + +impl OptimizerRule for PushDownProjection { + fn apply_order(&self) -> ApplyOrder { + ApplyOrder::TopDown + } + + fn try_optimize(&self, plan: Arc) -> DaftResult>> { + match plan.as_ref() { + LogicalPlan::Project(projection) => self.try_optimize_project(projection, plan.clone()), + // Aggregations also do column projection + LogicalPlan::Aggregate(aggregation) => { + self.try_optimize_aggregation(aggregation, plan.clone()) + } + _ => Ok(Transformed::No(plan)), + } + } +} diff --git a/src/daft-plan/src/optimization/rules/rule.rs b/src/daft-plan/src/optimization/rules/rule.rs index cb92381f20..17232a1468 100644 --- a/src/daft-plan/src/optimization/rules/rule.rs +++ b/src/daft-plan/src/optimization/rules/rule.rs @@ -29,6 +29,7 @@ pub trait OptimizerRule { } /// An enum indicating whether or not the wrapped data has been transformed. +#[derive(Debug)] pub enum Transformed { // Yes, the data has been transformed. Yes(T), diff --git a/src/daft-plan/src/physical_ops/csv.rs b/src/daft-plan/src/physical_ops/csv.rs index 12fd42e4b4..3435a03eb6 100644 --- a/src/daft-plan/src/physical_ops/csv.rs +++ b/src/daft-plan/src/physical_ops/csv.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize)] pub struct TabularScanCsv { - pub schema: SchemaRef, + pub projection_schema: SchemaRef, pub external_info: ExternalInfo, pub partition_spec: Arc, pub limit: Option, @@ -20,14 +20,14 @@ pub struct TabularScanCsv { impl TabularScanCsv { pub(crate) fn new( - schema: SchemaRef, + projection_schema: SchemaRef, external_info: ExternalInfo, partition_spec: Arc, limit: Option, filters: Vec, ) -> Self { Self { - schema, + projection_schema, external_info, partition_spec, limit, diff --git a/src/daft-plan/src/physical_ops/json.rs b/src/daft-plan/src/physical_ops/json.rs index a042770361..0d2142e856 100644 --- a/src/daft-plan/src/physical_ops/json.rs +++ b/src/daft-plan/src/physical_ops/json.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub struct TabularScanJson { - pub schema: SchemaRef, + pub projection_schema: SchemaRef, pub external_info: ExternalInfo, pub partition_spec: Arc, pub limit: Option, @@ -20,14 +20,14 @@ pub struct TabularScanJson { impl TabularScanJson { pub(crate) fn new( - schema: SchemaRef, + projection_schema: SchemaRef, external_info: ExternalInfo, partition_spec: Arc, limit: Option, filters: Vec, ) -> Self { Self { - schema, + projection_schema, external_info, partition_spec, limit, diff --git a/src/daft-plan/src/physical_ops/parquet.rs b/src/daft-plan/src/physical_ops/parquet.rs index 1a42a7fbc5..1d4140f325 100644 --- a/src/daft-plan/src/physical_ops/parquet.rs +++ b/src/daft-plan/src/physical_ops/parquet.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] pub struct TabularScanParquet { - pub schema: SchemaRef, + pub projection_schema: SchemaRef, pub external_info: ExternalSourceInfo, pub partition_spec: Arc, pub limit: Option, @@ -20,14 +20,14 @@ pub struct TabularScanParquet { impl TabularScanParquet { pub(crate) fn new( - schema: SchemaRef, + projection_schema: SchemaRef, external_info: ExternalSourceInfo, partition_spec: Arc, limit: Option, filters: Vec, ) -> Self { Self { - schema, + projection_schema, external_info, partition_spec, limit, diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 64ecf2f54c..91cbbbca8b 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -70,7 +70,11 @@ impl PhysicalPlanScheduler { // Create dummy inner PhysicalPlan, to be overridden by __setstate__. 0 => Ok(Arc::new(PhysicalPlan::InMemoryScan(InMemoryScan::new( Default::default(), - InMemoryInfo::new("".to_string(), args.py().None()), + InMemoryInfo::new( + daft_core::schema::Schema::new(vec![])?.into(), + "".to_string(), + args.py().None(), + ), Default::default(), ))) .into()), @@ -117,17 +121,25 @@ impl PartitionIterator { #[cfg(feature = "python")] fn tabular_scan( py: Python<'_>, - schema: &SchemaRef, + source_schema: &SchemaRef, + projection_schema: &SchemaRef, file_info: &Arc, file_format_config: &Arc, limit: &Option, ) -> PyResult { let file_info_table: PyTable = file_info.to_table()?.into(); + let columns_to_read = projection_schema + .fields + .iter() + .map(|(name, _)| name) + .cloned() + .collect::>(); let py_iter = py .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "tabular_scan"))? .call1(( - PySchema::from(schema.clone()), + PySchema::from(source_schema.clone()), + columns_to_read, file_info_table, PyFileFormatConfig::from(file_format_config.clone()), *limit, @@ -187,38 +199,62 @@ impl PhysicalPlan { Ok(py_iter.into()) } PhysicalPlan::TabularScanParquet(TabularScanParquet { - schema, + projection_schema, external_info: ExternalInfo { + source_schema, file_info, file_format_config, .. }, limit, .. - }) => tabular_scan(py, schema, file_info, file_format_config, limit), + }) => tabular_scan( + py, + source_schema, + projection_schema, + file_info, + file_format_config, + limit, + ), PhysicalPlan::TabularScanCsv(TabularScanCsv { - schema, + projection_schema, external_info: ExternalInfo { + source_schema, file_info, file_format_config, .. }, limit, .. - }) => tabular_scan(py, schema, file_info, file_format_config, limit), + }) => tabular_scan( + py, + source_schema, + projection_schema, + file_info, + file_format_config, + limit, + ), PhysicalPlan::TabularScanJson(TabularScanJson { - schema, + projection_schema, external_info: ExternalInfo { + source_schema, file_info, file_format_config, .. }, limit, .. - }) => tabular_scan(py, schema, file_info, file_format_config, limit), + }) => tabular_scan( + py, + source_schema, + projection_schema, + file_info, + file_format_config, + limit, + ), PhysicalPlan::Project(Project { input, projection, diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 3718a743a5..4bef419a45 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -24,7 +24,7 @@ use crate::physical_ops::InMemoryScan; pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { match logical_plan { LogicalPlan::Source(Source { - schema, + output_schema, source_info, partition_spec, limit, @@ -37,7 +37,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { ) => match file_format_config.as_ref() { FileFormatConfig::Parquet(_) => { Ok(PhysicalPlan::TabularScanParquet(TabularScanParquet::new( - schema.clone(), + output_schema.clone(), ext_info.clone(), partition_spec.clone(), *limit, @@ -45,7 +45,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { ))) } FileFormatConfig::Csv(_) => Ok(PhysicalPlan::TabularScanCsv(TabularScanCsv::new( - schema.clone(), + output_schema.clone(), ext_info.clone(), partition_spec.clone(), *limit, @@ -53,7 +53,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { ))), FileFormatConfig::Json(_) => { Ok(PhysicalPlan::TabularScanJson(TabularScanJson::new( - schema.clone(), + output_schema.clone(), ext_info.clone(), partition_spec.clone(), *limit, @@ -62,9 +62,24 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { } }, #[cfg(feature = "python")] - SourceInfo::InMemoryInfo(mem_info) => Ok(PhysicalPlan::InMemoryScan( - InMemoryScan::new(schema.clone(), mem_info.clone(), partition_spec.clone()), - )), + SourceInfo::InMemoryInfo(mem_info) => { + let scan = PhysicalPlan::InMemoryScan(InMemoryScan::new( + mem_info.source_schema.clone(), + mem_info.clone(), + partition_spec.clone(), + )); + let plan = if output_schema.fields.len() < mem_info.source_schema.fields.len() { + let projection = output_schema + .fields + .iter() + .map(|(name, _)| Expr::Column(name.clone().into())) + .collect::>(); + PhysicalPlan::Project(Project::new(projection, Default::default(), scan.into())) + } else { + scan + }; + Ok(plan) + } }, LogicalPlan::Project(LogicalProject { input, diff --git a/src/daft-plan/src/source_info.rs b/src/daft-plan/src/source_info.rs index a533ecb7e7..6b84accd43 100644 --- a/src/daft-plan/src/source_info.rs +++ b/src/daft-plan/src/source_info.rs @@ -101,6 +101,7 @@ where #[cfg(feature = "python")] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct InMemoryInfo { + pub source_schema: SchemaRef, pub cache_key: String, #[serde( serialize_with = "serialize_py_object", @@ -111,8 +112,9 @@ pub struct InMemoryInfo { #[cfg(feature = "python")] impl InMemoryInfo { - pub fn new(cache_key: String, cache_entry: PyObject) -> Self { + pub fn new(source_schema: SchemaRef, cache_key: String, cache_entry: PyObject) -> Self { Self { + source_schema, cache_key, cache_entry, } @@ -151,19 +153,19 @@ impl Hash for InMemoryInfo { #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ExternalInfo { - pub schema: SchemaRef, + pub source_schema: SchemaRef, pub file_info: Arc, pub file_format_config: Arc, } impl ExternalInfo { pub fn new( - schema: SchemaRef, + source_schema: SchemaRef, file_info: Arc, file_format_config: Arc, ) -> Self { Self { - schema, + source_schema, file_info, file_format_config, }