Skip to content

Commit

Permalink
[FEAT] [New Query Planner] Groupby support, aggregation fixes, suppor…
Browse files Browse the repository at this point in the history
…t for remaining aggregation ops (#1272)

This PR adds support for `df.groupby()`, fixes misc. things with
aggregations, and adds support for the remaining (non-sum) aggregation
ops.

This PR builds off of #1257,
where @xcharleslin implemented the core meat of this PR (this PR just
wires things together and fixes a few minor things). From that PR's
description:

"Ported over the logic in our existing AggregationPlanBuilder.
Groupby-aggregates should now be fully supported (including
multi-partition).

Additionally, this PR improves on Daft's existing aggregation logic by
using semantic IDs in intermediate results, so that redundant
intermediates are not computed.

E.g. before, getting the Sum and Mean of a column would compute and
carry around two copies of the intermediate sum, one for the Sum and one
for the Mean. Now, all stages address their required intermediates by
semantic ID, eliminating these duplicates."

---------

Co-authored-by: Xiayue Charles Lin <[email protected]>
  • Loading branch information
clarkzinzow and Xiayue Charles Lin committed Aug 15, 2023
1 parent c43c76c commit a5c702b
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 141 deletions.
34 changes: 17 additions & 17 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,6 @@
PartitionT = TypeVar("PartitionT")


def local_aggregate(
input: physical_plan.InProgressPhysicalPlan[PartitionT],
agg_exprs: list[PyExpr],
group_by: list[PyExpr],
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
aggregation_step = execution_step.Aggregate(
to_agg=[Expression._from_pyexpr(pyexpr) for pyexpr in agg_exprs],
group_by=ExpressionsProjection([Expression._from_pyexpr(pyexpr) for pyexpr in group_by]),
)

return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=aggregation_step,
resource_request=ResourceRequest(),
)


def tabular_scan(
schema: PySchema, file_info_table: PyTable, file_format_config: FileFormatConfig, limit: int
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
Expand Down Expand Up @@ -94,6 +77,23 @@ def explode(
)


def local_aggregate(
input: physical_plan.InProgressPhysicalPlan[PartitionT],
agg_exprs: list[PyExpr],
group_by: list[PyExpr],
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
aggregation_step = execution_step.Aggregate(
to_agg=[Expression._from_pyexpr(pyexpr) for pyexpr in agg_exprs],
group_by=ExpressionsProjection([Expression._from_pyexpr(pyexpr) for pyexpr in group_by]),
)

return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=aggregation_step,
resource_request=ResourceRequest(),
)


def sort(
input: physical_plan.InProgressPhysicalPlan[PartitionT],
sort_by: list[PyExpr],
Expand Down
18 changes: 16 additions & 2 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,24 @@ def agg(
for expr, op in to_agg:
if op == "sum":
exprs.append(expr._sum())
elif op == "count":
exprs.append(expr._count())
elif op == "min":
exprs.append(expr._min())
elif op == "max":
exprs.append(expr._max())
elif op == "mean":
exprs.append(expr._mean())
elif op == "list":
exprs.append(expr._agg_list())
elif op == "concat":
exprs.append(expr._agg_concat())
else:
raise NotImplementedError()
raise NotImplementedError(f"Aggregation {op} is not implemented.")

builder = self._builder.aggregate([expr._expr for expr in exprs])
builder = self._builder.aggregate(
[expr._expr for expr in exprs], group_by.to_inner_py_exprs() if group_by is not None else []
)
return RustLogicalPlanBuilder(builder)

def join( # type: ignore[override]
Expand Down
8 changes: 5 additions & 3 deletions src/daft-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ pub struct Field {

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize, Hash)]
pub struct FieldID {
pub id: String,
pub id: Arc<str>,
}

impl FieldID {
pub fn new<S: Into<String>>(id: S) -> Self {
pub fn new<S: Into<Arc<str>>>(id: S) -> Self {
Self { id: id.into() }
}

/// Create a Field ID directly from a real column name.
/// Performs sanitization on the name so it can be composed.
pub fn from_name(name: String) -> Self {
pub fn from_name<S: Into<String>>(name: S) -> Self {
let name: String = name.into();

// Escape parentheses within a string,
// since we will use parentheses as delimiters in our semantic expression IDs.
let sanitized = name
Expand Down
2 changes: 1 addition & 1 deletion src/daft-dsl/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl Expr {
Function { func, inputs } => {
let inputs = inputs
.iter()
.map(|expr| expr.semantic_id(schema).id)
.map(|expr| expr.semantic_id(schema).id.to_string())
.collect::<Vec<String>>()
.join(", ");
// TODO: check for function idempotency here.
Expand Down
32 changes: 30 additions & 2 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

use common_error::DaftResult;

use crate::{logical_plan::LogicalPlan, ResourceRequest};

#[cfg(feature = "python")]
Expand Down Expand Up @@ -177,7 +179,11 @@ impl LogicalPlanBuilder {
Ok(logical_plan_builder)
}

pub fn aggregate(&self, agg_exprs: Vec<PyExpr>) -> PyResult<LogicalPlanBuilder> {
pub fn aggregate(
&self,
agg_exprs: Vec<PyExpr>,
groupby_exprs: Vec<PyExpr>,
) -> PyResult<LogicalPlanBuilder> {
use crate::ops::Aggregate;
let agg_exprs = agg_exprs
.iter()
Expand All @@ -189,7 +195,29 @@ impl LogicalPlanBuilder {
))),
})
.collect::<PyResult<Vec<daft_dsl::AggExpr>>>()?;
let logical_plan: LogicalPlan = Aggregate::new(agg_exprs, self.plan.clone()).into();
let groupby_exprs = groupby_exprs
.iter()
.map(|expr| expr.clone().into())
.collect::<Vec<Expr>>();

let input_schema = self.plan.schema();
let fields = groupby_exprs
.iter()
.map(|expr| expr.to_field(&input_schema))
.chain(
agg_exprs
.iter()
.map(|agg_expr| agg_expr.to_field(&input_schema)),
)
.collect::<DaftResult<Vec<Field>>>()?;
let output_schema = Schema::new(fields)?;
let logical_plan: LogicalPlan = Aggregate::new(
agg_exprs,
groupby_exprs,
output_schema.into(),
self.plan.clone(),
)
.into();
let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into());
Ok(logical_plan_builder)
}
Expand Down
23 changes: 14 additions & 9 deletions src/daft-plan/src/ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,25 @@ pub struct Aggregate {
pub aggregations: Vec<AggExpr>,

/// Grouping to apply.
pub group_by: Vec<Expr>,
pub groupby: Vec<Expr>,

pub output_schema: SchemaRef,

// Upstream node.
pub input: Arc<LogicalPlan>,
}

impl Aggregate {
pub(crate) fn new(aggregations: Vec<AggExpr>, input: Arc<LogicalPlan>) -> Self {
// TEMP: No groupbys supported for now.
let group_by: Vec<Expr> = vec![];

pub(crate) fn new(
aggregations: Vec<AggExpr>,
groupby: Vec<Expr>,
output_schema: SchemaRef,
input: Arc<LogicalPlan>,
) -> Self {
Self {
aggregations,
group_by,
groupby,
output_schema,
input,
}
}
Expand All @@ -33,7 +38,7 @@ impl Aggregate {
let source_schema = self.input.schema();

let fields = self
.group_by
.groupby
.iter()
.map(|expr| expr.to_field(&source_schema).unwrap())
.chain(
Expand All @@ -48,8 +53,8 @@ impl Aggregate {
pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push(format!("Aggregation: {:?}", self.aggregations));
if !self.group_by.is_empty() {
res.push(format!(" Group by: {:?}", self.group_by));
if !self.groupby.is_empty() {
res.push(format!(" Group by: {:?}", self.groupby));
}
res.push(format!(" Output schema: {}", self.schema().short_string()));
res
Expand Down
6 changes: 3 additions & 3 deletions src/daft-plan/src/physical_ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub struct Aggregate {
pub aggregations: Vec<AggExpr>,

/// Grouping to apply.
pub group_by: Vec<Expr>,
pub groupby: Vec<Expr>,

// Upstream node.
pub input: Arc<PhysicalPlan>,
Expand All @@ -21,11 +21,11 @@ impl Aggregate {
pub(crate) fn new(
input: Arc<PhysicalPlan>,
aggregations: Vec<AggExpr>,
group_by: Vec<Expr>,
groupby: Vec<Expr>,
) -> Self {
Self {
aggregations,
group_by,
groupby,
input,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl PhysicalPlan {
}
PhysicalPlan::Aggregate(Aggregate {
aggregations,
group_by,
groupby,
input,
..
}) => {
Expand All @@ -382,7 +382,7 @@ impl PhysicalPlan {
.iter()
.map(|agg_expr| PyExpr::from(Expr::Agg(agg_expr.clone())))
.collect();
let groupbys_as_pyexprs: Vec<PyExpr> = group_by
let groupbys_as_pyexprs: Vec<PyExpr> = groupby
.iter()
.map(|expr| PyExpr::from(expr.clone()))
.collect();
Expand Down
Loading

0 comments on commit a5c702b

Please sign in to comment.