Skip to content

Commit

Permalink
[FEAT] Multi-partition aggregate; Coalesce (#1249)
Browse files Browse the repository at this point in the history
planner.rs now translates LogicalPlan::Aggregation into a two-phase
Physical Agg->Coalesce->Agg when there is more than one input partition.

As part of this, PhysicalPlan::Coalesce was also implemented.

TODO (next PR): handle groupbys.

---------

Co-authored-by: Xiayue Charles Lin <[email protected]>
  • Loading branch information
xcharleslin and Xiayue Charles Lin committed Aug 10, 2023
1 parent 753aeaf commit 7d3fdfa
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 49 deletions.
3 changes: 3 additions & 0 deletions daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class PyLogicalPlanBuilder(LogicalPlanBuilder):
def __init__(self, plan: LogicalPlan):
self._plan = plan

def __repr__(self) -> str:
return self._plan.pretty_print()

def to_planner(self) -> PyQueryPlanner:
from daft.planner.py_planner import PyQueryPlanner

Expand Down
14 changes: 7 additions & 7 deletions src/daft-dsl/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,31 @@ impl AggExpr {
match self {
Count(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("local_count({child_id})"))
FieldID::new(format!("{child_id}.local_count()"))
}
Sum(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("local_sum({child_id})"))
FieldID::new(format!("{child_id}.local_sum()"))
}
Mean(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("local_mean({child_id})"))
FieldID::new(format!("{child_id}.local_mean()"))
}
Min(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("local_min({child_id})"))
FieldID::new(format!("{child_id}.local_min()"))
}
Max(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("local_max({child_id})"))
FieldID::new(format!("{child_id}.local_max()"))
}
List(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("local_list({child_id})"))
FieldID::new(format!("{child_id}.local_list()"))
}
Concat(expr) => {
let child_id = expr.semantic_id(schema);
FieldID::new(format!("local_concat({child_id})"))
FieldID::new(format!("{child_id}.local_concat()"))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl LogicalPlan {
Self::Sort(Sort { input, .. }) => input.schema(),
Self::Repartition(Repartition { input, .. }) => input.schema(),
Self::Distinct(Distinct { input, .. }) => input.schema(),
Self::Aggregate(Aggregate { schema, .. }) => schema.clone(),
Self::Aggregate(aggregate) => aggregate.schema(),
}
}

Expand Down
37 changes: 17 additions & 20 deletions src/daft-plan/src/ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ use crate::LogicalPlan;

#[derive(Clone, Debug)]
pub struct Aggregate {
/// The schema of the output of this node.
pub schema: SchemaRef,

/// Aggregations to apply.
pub aggregations: Vec<AggExpr>,

Expand All @@ -25,36 +22,36 @@ impl Aggregate {
// TEMP: No groupbys supported for now.
let group_by: Vec<Expr> = vec![];

// Resolve the schema from the expressions.
let schema = {
let source_schema = input.schema();

let fields = group_by
.iter()
.map(|expr| expr.to_field(&source_schema).unwrap())
.chain(
aggregations
.iter()
.map(|agg_expr| agg_expr.to_field(&source_schema).unwrap()),
)
.collect();
Schema::new(fields).unwrap().into()
};
Self {
schema,
aggregations,
group_by,
input,
}
}

pub(crate) fn schema(&self) -> SchemaRef {
let source_schema = self.input.schema();

let fields = self
.group_by
.iter()
.map(|expr| expr.to_field(&source_schema).unwrap())
.chain(
self.aggregations
.iter()
.map(|agg_expr| agg_expr.to_field(&source_schema).unwrap()),
)
.collect();
Schema::new(fields).unwrap().into()
}

pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push(format!("Aggregation: {:?}", self.aggregations));
if !self.group_by.is_empty() {
res.push(format!(" Group by: {:?}", self.group_by));
}
res.push(format!(" Output schema: {}", self.schema.short_string()));
res.push(format!(" Output schema: {}", self.schema().short_string()));
res
}
}
6 changes: 0 additions & 6 deletions src/daft-plan/src/physical_ops/agg.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::sync::Arc;

use daft_core::schema::SchemaRef;
use daft_dsl::{AggExpr, Expr};

use crate::physical_plan::PhysicalPlan;

#[derive(Clone, Debug)]
pub struct Aggregate {
/// The schema of the output of this node.
pub schema: SchemaRef,

/// Aggregations to apply.
pub aggregations: Vec<AggExpr>,

Expand All @@ -25,10 +21,8 @@ impl Aggregate {
input: Arc<PhysicalPlan>,
aggregations: Vec<AggExpr>,
group_by: Vec<Expr>,
schema: SchemaRef,
) -> Self {
Self {
schema,
aggregations,
group_by,
input,
Expand Down
23 changes: 23 additions & 0 deletions src/daft-plan/src/physical_ops/coalesce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::sync::Arc;

use crate::physical_plan::PhysicalPlan;

#[derive(Clone, Debug)]
pub struct Coalesce {
// Upstream node.
pub input: Arc<PhysicalPlan>,

// Number of partitions to coalesce from and to.
pub num_from: usize,
pub num_to: usize,
}

impl Coalesce {
pub(crate) fn new(input: Arc<PhysicalPlan>, num_from: usize, num_to: usize) -> Self {
Self {
input,
num_from,
num_to,
}
}
}
2 changes: 2 additions & 0 deletions src/daft-plan/src/physical_ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod agg;
mod coalesce;
mod csv;
mod fanout;
mod filter;
Expand All @@ -12,6 +13,7 @@ mod sort;
mod split;

pub use agg::Aggregate;
pub use coalesce::Coalesce;
pub use csv::TabularScanCsv;
pub use fanout::{FanoutByHash, FanoutByRange, FanoutRandom};
pub use filter::Filter;
Expand Down
15 changes: 14 additions & 1 deletion src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum PhysicalPlan {
FanoutByRange(FanoutByRange),
ReduceMerge(ReduceMerge),
Aggregate(Aggregate),
Coalesce(Coalesce),
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -166,7 +167,7 @@ impl PhysicalPlan {
.call1((upstream_iter, *limit))?;
let global_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "global_limit"))?
.call1((local_limit_iter, *limit, *num_partitions as i64))?;
.call1((local_limit_iter, *limit, *num_partitions))?;
Ok(global_limit_iter.into())
}
PhysicalPlan::Sort(Sort {
Expand Down Expand Up @@ -262,6 +263,18 @@ impl PhysicalPlan {
.call1((upstream_iter, aggs_as_pyexprs, groupbys_as_pyexprs))?;
Ok(py_iter.into())
}
PhysicalPlan::Coalesce(Coalesce {
input,
num_from,
num_to,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets)?;
let py_iter = py
.import(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "coalesce"))?
.call1((upstream_iter, *num_from, *num_to))?;
Ok(py_iter.into())
}
}
}
}
108 changes: 94 additions & 14 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use crate::ops::{
Aggregate as LogicalAggregate, Distinct as LogicalDistinct, Filter as LogicalFilter,
Limit as LogicalLimit, Repartition as LogicalRepartition, Sort as LogicalSort, Source,
};
use crate::physical_ops::{
Aggregate, FanoutByHash, FanoutRandom, Filter, Limit, ReduceMerge, Sort, Split, TabularScanCsv,
TabularScanJson, TabularScanParquet,
};
use crate::physical_ops::*;
use crate::physical_plan::PhysicalPlan;
use crate::source_info::{ExternalInfo, FileFormatConfig, SourceInfo};
use crate::PartitionScheme;
Expand Down Expand Up @@ -136,7 +133,6 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult<PhysicalPlan> {
input_physical.into(),
vec![],
col_exprs.clone(),
input.schema(),
));
let num_partitions = logical_plan.partition_spec().num_partitions;
if num_partitions > 1 {
Expand All @@ -150,25 +146,109 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult<PhysicalPlan> {
reduce_op.into(),
vec![],
col_exprs,
input.schema(),
)))
} else {
Ok(agg_op)
}
}
LogicalPlan::Aggregate(LogicalAggregate {
schema,
aggregations,
group_by,
input,
}) => {
let input_physical = plan(input)?;
Ok(PhysicalPlan::Aggregate(Aggregate::new(
input_physical.into(),
aggregations.clone(),
group_by.clone(),
schema.clone(),
)))
use daft_dsl::AggExpr::*;
let result_plan = plan(input)?;

if !group_by.is_empty() {
unimplemented!("{:?}", group_by);
}

let num_input_partitions = logical_plan.partition_spec().num_partitions;

let result_plan = match num_input_partitions {
1 => PhysicalPlan::Aggregate(Aggregate::new(
result_plan.into(),
aggregations.clone(),
vec![],
)),
_ => {
// Resolve and assign intermediate names for the aggregations.
let schema = logical_plan.schema();
let intermediate_names: Vec<daft_core::datatypes::FieldID> = aggregations
.iter()
.map(|agg_expr| agg_expr.semantic_id(&schema))
.collect();

let first_stage_aggs: Vec<daft_dsl::AggExpr> = aggregations
.iter()
.zip(intermediate_names.iter())
.map(|(agg_expr, field_id)| match agg_expr {
Count(e) => Count(e.alias(field_id.id.clone()).into()),
Sum(e) => Sum(e.alias(field_id.id.clone()).into()),
Mean(e) => Mean(e.alias(field_id.id.clone()).into()),
Min(e) => Min(e.alias(field_id.id.clone()).into()),
Max(e) => Max(e.alias(field_id.id.clone()).into()),
List(e) => List(e.alias(field_id.id.clone()).into()),
Concat(e) => Concat(e.alias(field_id.id.clone()).into()),
})
.collect();

let second_stage_aggs: Vec<daft_dsl::AggExpr> = intermediate_names
.iter()
.zip(schema.fields.keys())
.zip(aggregations.iter())
.map(|((field_id, original_name), agg_expr)| match agg_expr {
Count(_) => Count(
daft_dsl::Expr::Column(field_id.id.clone().into())
.alias(&**original_name)
.into(),
),
Sum(_) => Sum(daft_dsl::Expr::Column(field_id.id.clone().into())
.alias(&**original_name)
.into()),
Mean(_) => Mean(
daft_dsl::Expr::Column(field_id.id.clone().into())
.alias(&**original_name)
.into(),
),
Min(_) => Min(daft_dsl::Expr::Column(field_id.id.clone().into())
.alias(&**original_name)
.into()),
Max(_) => Max(daft_dsl::Expr::Column(field_id.id.clone().into())
.alias(&**original_name)
.into()),
List(_) => List(
daft_dsl::Expr::Column(field_id.id.clone().into())
.alias(&**original_name)
.into(),
),
Concat(_) => Concat(
daft_dsl::Expr::Column(field_id.id.clone().into())
.alias(&**original_name)
.into(),
),
})
.collect();

let result_plan = PhysicalPlan::Aggregate(Aggregate::new(
result_plan.into(),
first_stage_aggs,
vec![],
));
let result_plan = PhysicalPlan::Coalesce(Coalesce::new(
result_plan.into(),
num_input_partitions,
1,
));
PhysicalPlan::Aggregate(Aggregate::new(
result_plan.into(),
second_stage_aggs,
vec![],
))
}
};

Ok(result_plan)
}
}
}

0 comments on commit 7d3fdfa

Please sign in to comment.