diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 9cbd102b63..7e2176d563 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -528,7 +528,6 @@ def coalesce( def reduce( fanout_plan: InProgressPhysicalPlan[PartitionT], - num_partitions: int, reduce_instruction: ReduceInstruction, ) -> InProgressPhysicalPlan[PartitionT]: """Reduce the result of fanout_plan. @@ -656,7 +655,6 @@ def sort( # Execute a sorting reduce on it. yield from reduce( fanout_plan=range_fanout_plan, - num_partitions=num_partitions, reduce_instruction=execution_step.ReduceMergeAndSort( sort_by=sort_by, descending=descending, diff --git a/daft/execution/physical_plan_factory.py b/daft/execution/physical_plan_factory.py index 08cb644211..1fe1ee08ce 100644 --- a/daft/execution/physical_plan_factory.py +++ b/daft/execution/physical_plan_factory.py @@ -144,7 +144,6 @@ def _get_physical_plan(node: LogicalPlan, psets: dict[str, list[PartitionT]]) -> # Do the reduce. return physical_plan.reduce( fanout_plan=fanout_plan, - num_partitions=node.num_partitions(), reduce_instruction=execution_step.ReduceMerge(), ) diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index 5b927ad5fa..5c413db3ea 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -39,3 +39,42 @@ def tabular_scan( return physical_plan.file_read( file_info_iter, limit, Schema._from_pyschema(schema), None, None, file_format_config, filepaths_column_name ) + + +def sort( + input: physical_plan.InProgressPhysicalPlan[PartitionT], + sort_by: list[PyExpr], + descending: list[bool], + num_partitions: int, +) -> physical_plan.InProgressPhysicalPlan[PartitionT]: + expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in sort_by]) + return physical_plan.sort( + child_plan=input, + sort_by=expr_projection, + descending=descending, + num_partitions=num_partitions, + ) + + +def split_by_hash( + input: physical_plan.InProgressPhysicalPlan[PartitionT], + num_partitions: int, + partition_by: list[PyExpr], +) -> physical_plan.InProgressPhysicalPlan[PartitionT]: + expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in partition_by]) + fanout_instruction = execution_step.FanoutHash( + _num_outputs=num_partitions, + partition_by=expr_projection, + ) + return physical_plan.pipeline_instruction( + input, + fanout_instruction, + ResourceRequest(), # TODO(Clark): Propagate resource request. + ) + + +def reduce_merge( + input: physical_plan.InProgressPhysicalPlan[PartitionT], +) -> physical_plan.InProgressPhysicalPlan[PartitionT]: + reduce_instruction = execution_step.ReduceMerge() + return physical_plan.reduce(input, reduce_instruction) diff --git a/daft/logical/rust_logical_plan.py b/daft/logical/rust_logical_plan.py index cc49705eea..195baa1194 100644 --- a/daft/logical/rust_logical_plan.py +++ b/daft/logical/rust_logical_plan.py @@ -5,10 +5,12 @@ import fsspec +from daft import DataType from daft.context import get_context from daft.daft import FileFormat, FileFormatConfig from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder from daft.daft import PartitionScheme, PartitionSpec +from daft.errors import ExpressionTypeError from daft.expressions.expressions import Expression, ExpressionsProjection from daft.logical.builder import JoinType, LogicalPlanBuilder from daft.logical.schema import Schema @@ -110,15 +112,29 @@ def count(self) -> RustLogicalPlanBuilder: raise NotImplementedError("not implemented") def distinct(self) -> RustLogicalPlanBuilder: - raise NotImplementedError("not implemented") + builder = self._builder.distinct() + return RustLogicalPlanBuilder(builder) def sort(self, sort_by: ExpressionsProjection, descending: list[bool] | bool = False) -> RustLogicalPlanBuilder: - raise NotImplementedError("not implemented") + # Disallow sorting by null, binary, and boolean columns. + # TODO(Clark): This is a port of an existing constraint, we should look at relaxing this. + resolved_sort_by_schema = sort_by.resolve_schema(self.schema()) + for f, sort_by_expr in zip(resolved_sort_by_schema, sort_by): + if f.dtype == DataType.null() or f.dtype == DataType.binary() or f.dtype == DataType.bool(): + raise ExpressionTypeError(f"Cannot sort on expression {sort_by_expr} with type: {f.dtype}") + + sort_by_exprs = [expr._expr for expr in sort_by] + if not isinstance(descending, list): + descending = [descending] * len(sort_by_exprs) + builder = self._builder.sort(sort_by_exprs, descending) + return RustLogicalPlanBuilder(builder) def repartition( self, num_partitions: int, partition_by: ExpressionsProjection, scheme: PartitionScheme ) -> RustLogicalPlanBuilder: - raise NotImplementedError("not implemented") + partition_by_exprs = [expr._expr for expr in partition_by] + builder = self._builder.repartition(num_partitions, partition_by_exprs, scheme) + return RustLogicalPlanBuilder(builder) def coalesce(self, num_partitions: int) -> RustLogicalPlanBuilder: raise NotImplementedError("not implemented") @@ -147,8 +163,6 @@ def agg( builder = self._builder.aggregate([expr._expr for expr in exprs]) return RustLogicalPlanBuilder(builder) - raise NotImplementedError("not implemented") - def concat(self, other: LogicalPlanBuilder) -> RustLogicalPlanBuilder: raise NotImplementedError("not implemented") diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 18db21b4e3..db80c3c3be 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -89,6 +89,45 @@ impl LogicalPlanBuilder { Ok(logical_plan_builder) } + pub fn sort( + &self, + sort_by: Vec, + descending: Vec, + ) -> PyResult { + let sort_by_exprs: Vec = sort_by.iter().map(|expr| expr.clone().into()).collect(); + let logical_plan: LogicalPlan = + ops::Sort::new(sort_by_exprs, descending, self.plan.clone()).into(); + let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into()); + Ok(logical_plan_builder) + } + + pub fn repartition( + &self, + num_partitions: usize, + partition_by: Vec, + scheme: PartitionScheme, + ) -> PyResult { + let partition_by_exprs: Vec = partition_by + .iter() + .map(|expr| expr.clone().into()) + .collect(); + let logical_plan: LogicalPlan = ops::Repartition::new( + num_partitions, + partition_by_exprs, + scheme, + self.plan.clone(), + ) + .into(); + let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into()); + Ok(logical_plan_builder) + } + + pub fn distinct(&self) -> PyResult { + let logical_plan: LogicalPlan = ops::Distinct::new(self.plan.clone()).into(); + let logical_plan_builder = LogicalPlanBuilder::new(logical_plan.into()); + Ok(logical_plan_builder) + } + pub fn aggregate(&self, agg_exprs: Vec) -> PyResult { use crate::ops::Aggregate; let agg_exprs = agg_exprs diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 586e9bbd91..cb29f6093e 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -2,13 +2,16 @@ use std::sync::Arc; use daft_core::schema::SchemaRef; -use crate::{ops::*, PartitionSpec}; +use crate::{ops::*, PartitionScheme, PartitionSpec}; #[derive(Clone, Debug)] pub enum LogicalPlan { Source(Source), Filter(Filter), Limit(Limit), + Sort(Sort), + Repartition(Repartition), + Distinct(Distinct), Aggregate(Aggregate), } @@ -18,6 +21,9 @@ impl LogicalPlan { Self::Source(Source { schema, .. }) => schema.clone(), Self::Filter(Filter { input, .. }) => input.schema(), Self::Limit(Limit { input, .. }) => input.schema(), + Self::Sort(Sort { input, .. }) => input.schema(), + Self::Repartition(Repartition { input, .. }) => input.schema(), + Self::Distinct(Distinct { input, .. }) => input.schema(), Self::Aggregate(Aggregate { schema, .. }) => schema.clone(), } } @@ -27,6 +33,24 @@ impl LogicalPlan { Self::Source(Source { partition_spec, .. }) => partition_spec.clone(), Self::Filter(Filter { input, .. }) => input.partition_spec(), Self::Limit(Limit { input, .. }) => input.partition_spec(), + Self::Sort(Sort { input, sort_by, .. }) => PartitionSpec::new_internal( + PartitionScheme::Range, + input.partition_spec().num_partitions, + Some(sort_by.clone()), + ) + .into(), + Self::Repartition(Repartition { + num_partitions, + partition_by, + scheme, + .. + }) => PartitionSpec::new_internal( + scheme.clone(), + *num_partitions, + Some(partition_by.clone()), + ) + .into(), + Self::Distinct(Distinct { input, .. }) => input.partition_spec(), Self::Aggregate(Aggregate { input, .. }) => input.partition_spec(), // TODO } } @@ -36,6 +60,9 @@ impl LogicalPlan { Self::Source(..) => vec![], Self::Filter(Filter { input, .. }) => vec![input], Self::Limit(Limit { input, .. }) => vec![input], + Self::Sort(Sort { input, .. }) => vec![input], + Self::Repartition(Repartition { input, .. }) => vec![input], + Self::Distinct(Distinct { input, .. }) => vec![input], Self::Aggregate(Aggregate { input, .. }) => vec![input], } } @@ -45,6 +72,9 @@ impl LogicalPlan { Self::Source(source) => source.multiline_display(), Self::Filter(Filter { predicate, .. }) => vec![format!("Filter: {predicate}")], Self::Limit(Limit { limit, .. }) => vec![format!("Limit: {limit}")], + Self::Sort(sort) => sort.multiline_display(), + Self::Repartition(repartition) => repartition.multiline_display(), + Self::Distinct(_) => vec!["Distinct".to_string()], Self::Aggregate(aggregate) => aggregate.multiline_display(), } } @@ -68,5 +98,8 @@ macro_rules! impl_from_data_struct_for_logical_plan { impl_from_data_struct_for_logical_plan!(Source); impl_from_data_struct_for_logical_plan!(Filter); -impl_from_data_struct_for_logical_plan!(Aggregate); impl_from_data_struct_for_logical_plan!(Limit); +impl_from_data_struct_for_logical_plan!(Sort); +impl_from_data_struct_for_logical_plan!(Repartition); +impl_from_data_struct_for_logical_plan!(Distinct); +impl_from_data_struct_for_logical_plan!(Aggregate); diff --git a/src/daft-plan/src/ops/distinct.rs b/src/daft-plan/src/ops/distinct.rs new file mode 100644 index 0000000000..133e5a9082 --- /dev/null +++ b/src/daft-plan/src/ops/distinct.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use crate::LogicalPlan; + +#[derive(Clone, Debug)] +pub struct Distinct { + // Upstream node. + pub input: Arc, +} + +impl Distinct { + pub(crate) fn new(input: Arc) -> Self { + Self { input } + } +} diff --git a/src/daft-plan/src/ops/mod.rs b/src/daft-plan/src/ops/mod.rs index 7bb7bcc896..4874c8a4bf 100644 --- a/src/daft-plan/src/ops/mod.rs +++ b/src/daft-plan/src/ops/mod.rs @@ -1,9 +1,15 @@ mod agg; +mod distinct; mod filter; mod limit; +mod repartition; +mod sort; mod source; pub use agg::Aggregate; +pub use distinct::Distinct; pub use filter::Filter; pub use limit::Limit; +pub use repartition::Repartition; +pub use sort::Sort; pub use source::Source; diff --git a/src/daft-plan/src/ops/repartition.rs b/src/daft-plan/src/ops/repartition.rs new file mode 100644 index 0000000000..ae49688171 --- /dev/null +++ b/src/daft-plan/src/ops/repartition.rs @@ -0,0 +1,42 @@ +use std::sync::Arc; + +use daft_dsl::Expr; + +use crate::{LogicalPlan, PartitionScheme}; + +#[derive(Clone, Debug)] +pub struct Repartition { + pub num_partitions: usize, + pub partition_by: Vec, + pub scheme: PartitionScheme, + // Upstream node. + pub input: Arc, +} + +impl Repartition { + pub(crate) fn new( + num_partitions: usize, + partition_by: Vec, + scheme: PartitionScheme, + input: Arc, + ) -> Self { + Self { + num_partitions, + partition_by, + scheme, + input, + } + } + + pub fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push(format!( + "Repartition ({:?}): n={}", + self.scheme, self.num_partitions + )); + if !self.partition_by.is_empty() { + res.push(format!(" Partition by: {:?}", self.partition_by)); + } + res + } +} diff --git a/src/daft-plan/src/ops/sort.rs b/src/daft-plan/src/ops/sort.rs new file mode 100644 index 0000000000..72b5f0e1d9 --- /dev/null +++ b/src/daft-plan/src/ops/sort.rs @@ -0,0 +1,44 @@ +use std::sync::Arc; + +use daft_dsl::Expr; + +use crate::LogicalPlan; + +#[derive(Clone, Debug)] +pub struct Sort { + pub sort_by: Vec, + pub descending: Vec, + // Upstream node. + pub input: Arc, +} + +impl Sort { + pub(crate) fn new(sort_by: Vec, descending: Vec, input: Arc) -> Self { + Self { + sort_by, + descending, + input, + } + } + + pub fn multiline_display(&self) -> Vec { + let mut res = vec![]; + res.push("Sort:".to_string()); + if !self.sort_by.is_empty() { + let pairs: Vec = self + .sort_by + .iter() + .zip(self.descending.iter()) + .map(|(sb, d)| { + format!( + "({:?}, {})", + sb, + if *d { "descending" } else { "ascending" }, + ) + }) + .collect(); + res.push(format!(" Sort by: {:?}", pairs)); + } + res + } +} diff --git a/src/daft-plan/src/partitioning.rs b/src/daft-plan/src/partitioning.rs index 14cb14188c..d52a449653 100644 --- a/src/daft-plan/src/partitioning.rs +++ b/src/daft-plan/src/partitioning.rs @@ -49,17 +49,31 @@ pub struct PartitionSpec { pub by: Option>, } +impl PartitionSpec { + pub fn new_internal( + scheme: PartitionScheme, + num_partitions: usize, + by: Option>, + ) -> Self { + Self { + scheme, + num_partitions, + by, + } + } +} + #[cfg(feature = "python")] #[pymethods] impl PartitionSpec { #[new] #[pyo3(signature = (scheme=PartitionScheme::Unknown, num_partitions=0usize, by=None))] pub fn new(scheme: PartitionScheme, num_partitions: usize, by: Option>) -> Self { - Self { + Self::new_internal( scheme, num_partitions, - by: by.map(|v| v.iter().map(|e| e.clone().into()).collect()), - } + by.map(|v| v.iter().map(|e| e.clone().into()).collect()), + ) } #[getter] diff --git a/src/daft-plan/src/physical_ops/fanout.rs b/src/daft-plan/src/physical_ops/fanout.rs new file mode 100644 index 0000000000..5f0e43f587 --- /dev/null +++ b/src/daft-plan/src/physical_ops/fanout.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use daft_dsl::Expr; + +use crate::physical_plan::PhysicalPlan; + +#[derive(Clone, Debug)] +pub struct FanoutRandom { + pub num_partitions: usize, + // Upstream node. + pub input: Arc, +} + +impl FanoutRandom { + pub(crate) fn new(num_partitions: usize, input: Arc) -> Self { + Self { + num_partitions, + input, + } + } +} + +#[derive(Clone, Debug)] +pub struct FanoutByHash { + pub num_partitions: usize, + pub partition_by: Vec, + // Upstream node. + pub input: Arc, +} + +impl FanoutByHash { + pub(crate) fn new( + num_partitions: usize, + partition_by: Vec, + input: Arc, + ) -> Self { + Self { + num_partitions, + partition_by, + input, + } + } +} + +#[derive(Clone, Debug)] +pub struct FanoutByRange { + pub num_partitions: usize, + pub sort_by: Vec, + // Upstream node. + pub input: Arc, +} + +impl FanoutByRange { + #[allow(dead_code)] + pub(crate) fn new(num_partitions: usize, sort_by: Vec, input: Arc) -> Self { + Self { + num_partitions, + sort_by, + input, + } + } +} diff --git a/src/daft-plan/src/physical_ops/mod.rs b/src/daft-plan/src/physical_ops/mod.rs index 872f08f9c3..ae6b2e29d4 100644 --- a/src/daft-plan/src/physical_ops/mod.rs +++ b/src/daft-plan/src/physical_ops/mod.rs @@ -1,17 +1,25 @@ mod agg; mod csv; +mod fanout; mod filter; #[cfg(feature = "python")] mod in_memory; mod json; mod limit; mod parquet; +mod reduce; +mod sort; +mod split; pub use agg::Aggregate; pub use csv::TabularScanCsv; +pub use fanout::{FanoutByHash, FanoutByRange, FanoutRandom}; pub use filter::Filter; #[cfg(feature = "python")] pub use in_memory::InMemoryScan; pub use json::TabularScanJson; pub use limit::Limit; pub use parquet::TabularScanParquet; +pub use reduce::ReduceMerge; +pub use sort::Sort; +pub use split::Split; diff --git a/src/daft-plan/src/physical_ops/reduce.rs b/src/daft-plan/src/physical_ops/reduce.rs new file mode 100644 index 0000000000..61746a023b --- /dev/null +++ b/src/daft-plan/src/physical_ops/reduce.rs @@ -0,0 +1,15 @@ +use std::sync::Arc; + +use crate::physical_plan::PhysicalPlan; + +#[derive(Clone, Debug)] +pub struct ReduceMerge { + // Upstream node. + pub input: Arc, +} + +impl ReduceMerge { + pub(crate) fn new(input: Arc) -> Self { + Self { input } + } +} diff --git a/src/daft-plan/src/physical_ops/sort.rs b/src/daft-plan/src/physical_ops/sort.rs new file mode 100644 index 0000000000..bc6a02fc41 --- /dev/null +++ b/src/daft-plan/src/physical_ops/sort.rs @@ -0,0 +1,30 @@ +use std::sync::Arc; + +use daft_dsl::Expr; + +use crate::physical_plan::PhysicalPlan; + +#[derive(Clone, Debug)] +pub struct Sort { + pub sort_by: Vec, + pub descending: Vec, + pub num_partitions: usize, + // Upstream node. + pub input: Arc, +} + +impl Sort { + pub(crate) fn new( + sort_by: Vec, + descending: Vec, + num_partitions: usize, + input: Arc, + ) -> Self { + Self { + sort_by, + descending, + num_partitions, + input, + } + } +} diff --git a/src/daft-plan/src/physical_ops/split.rs b/src/daft-plan/src/physical_ops/split.rs new file mode 100644 index 0000000000..f8dc3ab6bf --- /dev/null +++ b/src/daft-plan/src/physical_ops/split.rs @@ -0,0 +1,25 @@ +use std::sync::Arc; + +use crate::physical_plan::PhysicalPlan; + +#[derive(Clone, Debug)] +pub struct Split { + pub input_num_partitions: usize, + pub output_num_partitions: usize, + // Upstream node. + pub input: Arc, +} + +impl Split { + pub(crate) fn new( + input_num_partitions: usize, + output_num_partitions: usize, + input: Arc, + ) -> Self { + Self { + input_num_partitions, + output_num_partitions, + input, + } + } +} diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 73dded2286..b27024216a 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -22,6 +22,13 @@ pub enum PhysicalPlan { TabularScanJson(TabularScanJson), Filter(Filter), Limit(Limit), + Sort(Sort), + Split(Split), + FanoutRandom(FanoutRandom), + FanoutByHash(FanoutByHash), + #[allow(dead_code)] + FanoutByRange(FanoutByRange), + ReduceMerge(ReduceMerge), Aggregate(Aggregate), } @@ -162,6 +169,78 @@ impl PhysicalPlan { .call1((local_limit_iter, *limit, *num_partitions as i64))?; Ok(global_limit_iter.into()) } + PhysicalPlan::Sort(Sort { + input, + sort_by, + descending, + num_partitions, + }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let sort_by_pyexprs: Vec = sort_by + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "sort"))? + .call1(( + upstream_iter, + sort_by_pyexprs, + descending.clone(), + *num_partitions, + ))?; + Ok(py_iter.into()) + } + PhysicalPlan::Split(Split { + input, + input_num_partitions, + output_num_partitions, + }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "split"))? + .call1((upstream_iter, *input_num_partitions, *output_num_partitions))?; + Ok(py_iter.into()) + } + PhysicalPlan::FanoutRandom(FanoutRandom { + input, + num_partitions, + }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.physical_plan"))? + .getattr(pyo3::intern!(py, "fanout_random"))? + .call1((upstream_iter, *num_partitions))?; + Ok(py_iter.into()) + } + PhysicalPlan::FanoutByHash(FanoutByHash { + input, + num_partitions, + partition_by, + }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let partition_by_pyexprs: Vec = partition_by + .iter() + .map(|expr| PyExpr::from(expr.clone())) + .collect(); + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "split_by_hash"))? + .call1((upstream_iter, *num_partitions, partition_by_pyexprs))?; + Ok(py_iter.into()) + } + PhysicalPlan::FanoutByRange(_) => unimplemented!( + "FanoutByRange not implemented, since only use case (sorting) doesn't need it yet." + ), + PhysicalPlan::ReduceMerge(ReduceMerge { input }) => { + let upstream_iter = input.to_partition_tasks(py, psets)?; + let py_iter = py + .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? + .getattr(pyo3::intern!(py, "reduce_merge"))? + .call1((upstream_iter,))?; + Ok(py_iter.into()) + } PhysicalPlan::Aggregate(Aggregate { aggregations, group_by, diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index 7744cf6e8f..d310ad41a7 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -1,16 +1,20 @@ use std::sync::Arc; use common_error::DaftResult; +use daft_dsl::Expr; use crate::logical_plan::LogicalPlan; use crate::ops::{ - Aggregate as LogicalAggregate, Filter as LogicalFilter, Limit as LogicalLimit, Source, + Aggregate as LogicalAggregate, Distinct as LogicalDistinct, Filter as LogicalFilter, + Limit as LogicalLimit, Repartition as LogicalRepartition, Sort as LogicalSort, Source, }; use crate::physical_ops::{ - Aggregate, Filter, Limit, TabularScanCsv, TabularScanJson, TabularScanParquet, + Aggregate, FanoutByHash, FanoutRandom, Filter, Limit, ReduceMerge, Sort, Split, TabularScanCsv, + TabularScanJson, TabularScanParquet, }; use crate::physical_plan::PhysicalPlan; use crate::source_info::{ExternalInfo, FileFormatConfig, SourceInfo}; +use crate::PartitionScheme; #[cfg(feature = "python")] use crate::physical_ops::InMemoryScan; @@ -75,6 +79,83 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { Arc::new(input_physical), ))) } + LogicalPlan::Sort(LogicalSort { + input, + sort_by, + descending, + }) => { + let input_physical = plan(input)?; + let num_partitions = logical_plan.partition_spec().num_partitions; + Ok(PhysicalPlan::Sort(Sort::new( + sort_by.clone(), + descending.clone(), + num_partitions, + input_physical.into(), + ))) + } + LogicalPlan::Repartition(LogicalRepartition { + input, + num_partitions, + partition_by, + scheme, + }) => { + let input_physical = Arc::new(plan(input)?); + match scheme { + PartitionScheme::Unknown => Ok(PhysicalPlan::Split(Split::new( + input.partition_spec().num_partitions, + *num_partitions, + input_physical, + ))), + PartitionScheme::Random => { + let split_op = PhysicalPlan::FanoutRandom(FanoutRandom::new( + *num_partitions, + input_physical, + )); + Ok(PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))) + } + PartitionScheme::Hash => { + let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( + *num_partitions, + partition_by.clone(), + input_physical, + )); + Ok(PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into()))) + } + PartitionScheme::Range => unreachable!("Repartitioning by range is not supported"), + } + } + LogicalPlan::Distinct(LogicalDistinct { input }) => { + let input_physical = plan(input)?; + let col_exprs = input + .schema() + .names() + .iter() + .map(|name| Expr::Column(name.clone().into())) + .collect::>(); + let agg_op = PhysicalPlan::Aggregate(Aggregate::new( + input_physical.into(), + vec![], + col_exprs.clone(), + input.schema(), + )); + let num_partitions = logical_plan.partition_spec().num_partitions; + if num_partitions > 1 { + let split_op = PhysicalPlan::FanoutByHash(FanoutByHash::new( + num_partitions, + col_exprs.clone(), + agg_op.into(), + )); + let reduce_op = PhysicalPlan::ReduceMerge(ReduceMerge::new(split_op.into())); + Ok(PhysicalPlan::Aggregate(Aggregate::new( + reduce_op.into(), + vec![], + col_exprs, + input.schema(), + ))) + } else { + Ok(agg_op) + } + } LogicalPlan::Aggregate(LogicalAggregate { schema, aggregations, diff --git a/tests/cookbook/test_sorting.py b/tests/cookbook/test_sorting.py index 71a098e8d0..7960364ff2 100644 --- a/tests/cookbook/test_sorting.py +++ b/tests/cookbook/test_sorting.py @@ -7,7 +7,7 @@ from tests.conftest import assert_df_equals -def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner): """Sort by a column that undergoes an expression""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort(((col("Unique Key") % 2) == 0).if_else(col("Unique Key"), col("Unique Key") * -1)) @@ -36,7 +36,7 @@ def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts) pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): +def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, use_new_planner): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort([col(k) for k in sort_keys], desc=True) @@ -55,7 +55,7 @@ def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sor pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): +def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, use_new_planner): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort([col(k) for k in sort_keys], desc=True).limit(100) @@ -74,7 +74,9 @@ def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_npart pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted_top_n_flipped_desc(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): +def test_get_sorted_top_n_flipped_desc( + daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, use_new_planner +): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) desc_list = [True] @@ -103,7 +105,9 @@ def test_get_sorted_top_n_flipped_desc(daft_df, service_requests_csv_pd_df, repa ), ], ) -def test_get_sorted_top_n_projected(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_get_sorted_top_n_projected( + daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, use_new_planner +): """Sort by a column and retrieve specific columns from the top N results""" daft_df = daft_df.repartition(repartition_nparts) expected = service_requests_csv_pd_df.sort_values(by="Unique Key", ascending=False)[ diff --git a/tests/dataframe/test_distinct.py b/tests/dataframe/test_distinct.py index d3d2b6adc5..b154b22961 100644 --- a/tests/dataframe/test_distinct.py +++ b/tests/dataframe/test_distinct.py @@ -9,7 +9,7 @@ @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_distinct_with_nulls(repartition_nparts): +def test_distinct_with_nulls(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id": [1, None, None, None], @@ -47,7 +47,7 @@ def test_distinct_with_all_nulls(repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2]) -def test_distinct_with_empty(repartition_nparts): +def test_distinct_with_empty(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id": [1], diff --git a/tests/dataframe/test_sort.py b/tests/dataframe/test_sort.py index b21d3f45f1..e686da7171 100644 --- a/tests/dataframe/test_sort.py +++ b/tests/dataframe/test_sort.py @@ -42,7 +42,7 @@ def test_disallowed_sort_bytes(): @pytest.mark.parametrize("desc", [True, False]) @pytest.mark.parametrize("n_partitions", [1, 3]) -def test_single_float_col_sort(desc: bool, n_partitions: int): +def test_single_float_col_sort(desc: bool, n_partitions: int, use_new_planner): df = daft.from_pydict({"A": [1.0, None, 3.0, float("nan"), 2.0]}) df = df.repartition(n_partitions) df = df.sort("A", desc=desc) @@ -60,7 +60,7 @@ def _replace_nan_with_string(l): @pytest.mark.skip(reason="Issue: https://github.com/Eventual-Inc/Daft/issues/546") @pytest.mark.parametrize("n_partitions", [1, 3]) -def test_multi_float_col_sort(n_partitions: int): +def test_multi_float_col_sort(n_partitions: int, use_new_planner): df = daft.from_pydict( { "A": [1.0, 1.0, None, None, float("nan"), float("nan"), float("nan")], @@ -107,7 +107,7 @@ def _replace_nan_with_string(l): @pytest.mark.parametrize("desc", [True, False]) @pytest.mark.parametrize("n_partitions", [1, 3]) -def test_single_string_col_sort(desc: bool, n_partitions: int): +def test_single_string_col_sort(desc: bool, n_partitions: int, use_new_planner): df = daft.from_pydict({"A": ["0", None, "1", "", "01"]}) df = df.repartition(n_partitions) df = df.sort("A", desc=desc) @@ -126,7 +126,7 @@ def test_single_string_col_sort(desc: bool, n_partitions: int): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_int_sort_with_nulls(repartition_nparts): +def test_int_sort_with_nulls(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id": [2, None, 1], @@ -147,7 +147,7 @@ def test_int_sort_with_nulls(repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_str_sort_with_nulls(repartition_nparts): +def test_str_sort_with_nulls(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id": [1, None, 2], @@ -167,7 +167,7 @@ def test_str_sort_with_nulls(repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 4, 6]) -def test_sort_with_nulls_multikey(repartition_nparts): +def test_sort_with_nulls_multikey(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id1": [2, None, 2, None, 1], @@ -205,7 +205,7 @@ def test_sort_with_all_nulls(repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2]) -def test_sort_with_empty(repartition_nparts): +def test_sort_with_empty(repartition_nparts, use_new_planner): daft_df = daft.from_pydict( { "id": [1], @@ -220,7 +220,7 @@ def test_sort_with_empty(repartition_nparts): assert len(resultset["values"]) == 0 -def test_sort_with_all_null_type_column(): +def test_sort_with_all_null_type_column(use_new_planner): daft_df = daft.from_pydict( { "id": [None, None, None],