From 7d3fdfafbb14d7315d6595c86dca6d8ded13b79e Mon Sep 17 00:00:00 2001 From: xcharleslin <4212216+xcharleslin@users.noreply.github.com> Date: Thu, 10 Aug 2023 11:10:24 -0700 Subject: [PATCH] [FEAT] Multi-partition aggregate; Coalesce (#1249) planner.rs now translates LogicalPlan::Aggregation into a two-phase Physical Agg->Coalesce->Agg when there is more than one input partition. As part of this, PhysicalPlan::Coalesce was also implemented. TODO (next PR): handle groupbys. --------- Co-authored-by: Xiayue Charles Lin --- daft/logical/logical_plan.py | 3 + src/daft-dsl/src/expr.rs | 14 +-- src/daft-plan/src/logical_plan.rs | 2 +- src/daft-plan/src/ops/agg.rs | 37 ++++--- src/daft-plan/src/physical_ops/agg.rs | 6 -- src/daft-plan/src/physical_ops/coalesce.rs | 23 +++++ src/daft-plan/src/physical_ops/mod.rs | 2 + src/daft-plan/src/physical_plan.rs | 15 ++- src/daft-plan/src/planner.rs | 108 ++++++++++++++++++--- 9 files changed, 161 insertions(+), 49 deletions(-) create mode 100644 src/daft-plan/src/physical_ops/coalesce.rs diff --git a/daft/logical/logical_plan.py b/daft/logical/logical_plan.py index 58b41ee958..0e06341c57 100644 --- a/daft/logical/logical_plan.py +++ b/daft/logical/logical_plan.py @@ -38,6 +38,9 @@ class PyLogicalPlanBuilder(LogicalPlanBuilder): def __init__(self, plan: LogicalPlan): self._plan = plan + def __repr__(self) -> str: + return self._plan.pretty_print() + def to_planner(self) -> PyQueryPlanner: from daft.planner.py_planner import PyQueryPlanner diff --git a/src/daft-dsl/src/expr.rs b/src/daft-dsl/src/expr.rs index ba473ffda5..e54cbf6ba5 100644 --- a/src/daft-dsl/src/expr.rs +++ b/src/daft-dsl/src/expr.rs @@ -81,31 +81,31 @@ impl AggExpr { match self { Count(expr) => { let child_id = expr.semantic_id(schema); - FieldID::new(format!("local_count({child_id})")) + FieldID::new(format!("{child_id}.local_count()")) } Sum(expr) => { let child_id = expr.semantic_id(schema); - FieldID::new(format!("local_sum({child_id})")) + FieldID::new(format!("{child_id}.local_sum()")) } Mean(expr) => { let child_id = expr.semantic_id(schema); - FieldID::new(format!("local_mean({child_id})")) + FieldID::new(format!("{child_id}.local_mean()")) } Min(expr) => { let child_id = expr.semantic_id(schema); - FieldID::new(format!("local_min({child_id})")) + FieldID::new(format!("{child_id}.local_min()")) } Max(expr) => { let child_id = expr.semantic_id(schema); - FieldID::new(format!("local_max({child_id})")) + FieldID::new(format!("{child_id}.local_max()")) } List(expr) => { let child_id = expr.semantic_id(schema); - FieldID::new(format!("local_list({child_id})")) + FieldID::new(format!("{child_id}.local_list()")) } Concat(expr) => { let child_id = expr.semantic_id(schema); - FieldID::new(format!("local_concat({child_id})")) + FieldID::new(format!("{child_id}.local_concat()")) } } } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index cb29f6093e..ec6c3c5f7b 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -24,7 +24,7 @@ impl LogicalPlan { Self::Sort(Sort { input, .. }) => input.schema(), Self::Repartition(Repartition { input, .. }) => input.schema(), Self::Distinct(Distinct { input, .. }) => input.schema(), - Self::Aggregate(Aggregate { schema, .. }) => schema.clone(), + Self::Aggregate(aggregate) => aggregate.schema(), } } diff --git a/src/daft-plan/src/ops/agg.rs b/src/daft-plan/src/ops/agg.rs index ec7539813e..641629c9a0 100644 --- a/src/daft-plan/src/ops/agg.rs +++ b/src/daft-plan/src/ops/agg.rs @@ -7,9 +7,6 @@ use crate::LogicalPlan; #[derive(Clone, Debug)] pub struct Aggregate { - /// The schema of the output of this node. - pub schema: SchemaRef, - /// Aggregations to apply. pub aggregations: Vec, @@ -25,36 +22,36 @@ impl Aggregate { // TEMP: No groupbys supported for now. let group_by: Vec = vec![]; - // Resolve the schema from the expressions. - let schema = { - let source_schema = input.schema(); - - let fields = group_by - .iter() - .map(|expr| expr.to_field(&source_schema).unwrap()) - .chain( - aggregations - .iter() - .map(|agg_expr| agg_expr.to_field(&source_schema).unwrap()), - ) - .collect(); - Schema::new(fields).unwrap().into() - }; Self { - schema, aggregations, group_by, input, } } + pub(crate) fn schema(&self) -> SchemaRef { + let source_schema = self.input.schema(); + + let fields = self + .group_by + .iter() + .map(|expr| expr.to_field(&source_schema).unwrap()) + .chain( + self.aggregations + .iter() + .map(|agg_expr| agg_expr.to_field(&source_schema).unwrap()), + ) + .collect(); + Schema::new(fields).unwrap().into() + } + pub fn multiline_display(&self) -> Vec { let mut res = vec![]; res.push(format!("Aggregation: {:?}", self.aggregations)); if !self.group_by.is_empty() { res.push(format!(" Group by: {:?}", self.group_by)); } - res.push(format!(" Output schema: {}", self.schema.short_string())); + res.push(format!(" Output schema: {}", self.schema().short_string())); res } } diff --git a/src/daft-plan/src/physical_ops/agg.rs b/src/daft-plan/src/physical_ops/agg.rs index a62b202290..7ab5c5ac7f 100644 --- a/src/daft-plan/src/physical_ops/agg.rs +++ b/src/daft-plan/src/physical_ops/agg.rs @@ -1,15 +1,11 @@ use std::sync::Arc; -use daft_core::schema::SchemaRef; use daft_dsl::{AggExpr, Expr}; use crate::physical_plan::PhysicalPlan; #[derive(Clone, Debug)] pub struct Aggregate { - /// The schema of the output of this node. - pub schema: SchemaRef, - /// Aggregations to apply. pub aggregations: Vec, @@ -25,10 +21,8 @@ impl Aggregate { input: Arc, aggregations: Vec, group_by: Vec, - schema: SchemaRef, ) -> Self { Self { - schema, aggregations, group_by, input, diff --git a/src/daft-plan/src/physical_ops/coalesce.rs b/src/daft-plan/src/physical_ops/coalesce.rs new file mode 100644 index 0000000000..b9ff95932e --- /dev/null +++ b/src/daft-plan/src/physical_ops/coalesce.rs @@ -0,0 +1,23 @@ +use std::sync::Arc; + +use crate::physical_plan::PhysicalPlan; + +#[derive(Clone, Debug)] +pub struct Coalesce { + // Upstream node. + pub input: Arc, + + // Number of partitions to coalesce from and to. + pub num_from: usize, + pub num_to: usize, +} + +impl Coalesce { + pub(crate) fn new(input: Arc, num_from: usize, num_to: usize) -> Self { + Self { + input, + num_from, + num_to, + } + } +} diff --git a/src/daft-plan/src/physical_ops/mod.rs b/src/daft-plan/src/physical_ops/mod.rs index ae6b2e29d4..7333f1bbca 100644 --- a/src/daft-plan/src/physical_ops/mod.rs +++ b/src/daft-plan/src/physical_ops/mod.rs @@ -1,4 +1,5 @@ mod agg; +mod coalesce; mod csv; mod fanout; mod filter; @@ -12,6 +13,7 @@ mod sort; mod split; pub use agg::Aggregate; +pub use coalesce::Coalesce; pub use csv::TabularScanCsv; pub use fanout::{FanoutByHash, FanoutByRange, FanoutRandom}; pub use filter::Filter; diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index b27024216a..f92d0cb89e 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -30,6 +30,7 @@ pub enum PhysicalPlan { FanoutByRange(FanoutByRange), ReduceMerge(ReduceMerge), Aggregate(Aggregate), + Coalesce(Coalesce), } #[cfg(feature = "python")] @@ -166,7 +167,7 @@ impl PhysicalPlan { .call1((upstream_iter, *limit))?; let global_limit_iter = py_physical_plan .getattr(pyo3::intern!(py, "global_limit"))? - .call1((local_limit_iter, *limit, *num_partitions as i64))?; + .call1((local_limit_iter, *limit, *num_partitions))?; Ok(global_limit_iter.into()) } PhysicalPlan::Sort(Sort { @@ -262,6 +263,18 @@ impl PhysicalPlan { .call1((upstream_iter, aggs_as_pyexprs, groupbys_as_pyexprs))?; Ok(py_iter.into()) } + PhysicalPlan::Coalesce(Coalesce { + input, + num_from, + num_to, + }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "coalesce"))? + .call1((upstream_iter, *num_from, *num_to))?; + Ok(py_iter.into()) + } } } } diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index d310ad41a7..e892a36c1e 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -8,10 +8,7 @@ use crate::ops::{ Aggregate as LogicalAggregate, Distinct as LogicalDistinct, Filter as LogicalFilter, Limit as LogicalLimit, Repartition as LogicalRepartition, Sort as LogicalSort, Source, }; -use crate::physical_ops::{ - Aggregate, FanoutByHash, FanoutRandom, Filter, Limit, ReduceMerge, Sort, Split, TabularScanCsv, - TabularScanJson, TabularScanParquet, -}; +use crate::physical_ops::*; use crate::physical_plan::PhysicalPlan; use crate::source_info::{ExternalInfo, FileFormatConfig, SourceInfo}; use crate::PartitionScheme; @@ -136,7 +133,6 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { input_physical.into(), vec![], col_exprs.clone(), - input.schema(), )); let num_partitions = logical_plan.partition_spec().num_partitions; if num_partitions > 1 { @@ -150,25 +146,109 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { reduce_op.into(), vec![], col_exprs, - input.schema(), ))) } else { Ok(agg_op) } } LogicalPlan::Aggregate(LogicalAggregate { - schema, aggregations, group_by, input, }) => { - let input_physical = plan(input)?; - Ok(PhysicalPlan::Aggregate(Aggregate::new( - input_physical.into(), - aggregations.clone(), - group_by.clone(), - schema.clone(), - ))) + use daft_dsl::AggExpr::*; + let result_plan = plan(input)?; + + if !group_by.is_empty() { + unimplemented!("{:?}", group_by); + } + + let num_input_partitions = logical_plan.partition_spec().num_partitions; + + let result_plan = match num_input_partitions { + 1 => PhysicalPlan::Aggregate(Aggregate::new( + result_plan.into(), + aggregations.clone(), + vec![], + )), + _ => { + // Resolve and assign intermediate names for the aggregations. + let schema = logical_plan.schema(); + let intermediate_names: Vec = aggregations + .iter() + .map(|agg_expr| agg_expr.semantic_id(&schema)) + .collect(); + + let first_stage_aggs: Vec = aggregations + .iter() + .zip(intermediate_names.iter()) + .map(|(agg_expr, field_id)| match agg_expr { + Count(e) => Count(e.alias(field_id.id.clone()).into()), + Sum(e) => Sum(e.alias(field_id.id.clone()).into()), + Mean(e) => Mean(e.alias(field_id.id.clone()).into()), + Min(e) => Min(e.alias(field_id.id.clone()).into()), + Max(e) => Max(e.alias(field_id.id.clone()).into()), + List(e) => List(e.alias(field_id.id.clone()).into()), + Concat(e) => Concat(e.alias(field_id.id.clone()).into()), + }) + .collect(); + + let second_stage_aggs: Vec = intermediate_names + .iter() + .zip(schema.fields.keys()) + .zip(aggregations.iter()) + .map(|((field_id, original_name), agg_expr)| match agg_expr { + Count(_) => Count( + daft_dsl::Expr::Column(field_id.id.clone().into()) + .alias(&**original_name) + .into(), + ), + Sum(_) => Sum(daft_dsl::Expr::Column(field_id.id.clone().into()) + .alias(&**original_name) + .into()), + Mean(_) => Mean( + daft_dsl::Expr::Column(field_id.id.clone().into()) + .alias(&**original_name) + .into(), + ), + Min(_) => Min(daft_dsl::Expr::Column(field_id.id.clone().into()) + .alias(&**original_name) + .into()), + Max(_) => Max(daft_dsl::Expr::Column(field_id.id.clone().into()) + .alias(&**original_name) + .into()), + List(_) => List( + daft_dsl::Expr::Column(field_id.id.clone().into()) + .alias(&**original_name) + .into(), + ), + Concat(_) => Concat( + daft_dsl::Expr::Column(field_id.id.clone().into()) + .alias(&**original_name) + .into(), + ), + }) + .collect(); + + let result_plan = PhysicalPlan::Aggregate(Aggregate::new( + result_plan.into(), + first_stage_aggs, + vec![], + )); + let result_plan = PhysicalPlan::Coalesce(Coalesce::new( + result_plan.into(), + num_input_partitions, + 1, + )); + PhysicalPlan::Aggregate(Aggregate::new( + result_plan.into(), + second_stage_aggs, + vec![], + )) + } + }; + + Ok(result_plan) } } }