From 1c0087a9964e1cbd13444ae4d866bae6664e71d4 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Fri, 8 Sep 2023 10:20:11 -0700 Subject: [PATCH] [CHORE] [New Query Planner] Add simple `df.explain()` option; change to fixed-point policy for rule batch (#1354) This PR: - adds a `simple: bool` option to `df.explain()`, which prints out a node-name-only logical plan tree for quick visualizations; - changes the optimizer to use a fixed-point policy for its single rule batch, which adds back some dropped optimizations in certain cases (e.g. `PushDownProjection` producing a `Filter` node which never has a chance to push through other upstream nodes). --- daft/dataframe/dataframe.py | 10 ++++---- daft/logical/builder.py | 2 +- daft/logical/logical_plan.py | 13 ++++++----- daft/logical/rust_logical_plan.py | 9 ++++--- src/daft-plan/src/builder.rs | 8 +++---- src/daft-plan/src/display.rs | 26 +++++++++++++++------ src/daft-plan/src/logical_plan.rs | 22 +++++++++++++++-- src/daft-plan/src/optimization/optimizer.rs | 24 +++++++++++++++++-- 8 files changed, 85 insertions(+), 29 deletions(-) diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 02f3ae5e9a..26e2e253d8 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -105,24 +105,26 @@ def _result(self) -> Optional[PartitionSet]: return self._result_cache.value @DataframePublicAPI - def explain(self, show_optimized: bool = False) -> None: - """Prints the LogicalPlan that will be executed to produce this DataFrame. + def explain(self, show_optimized: bool = False, simple=False) -> None: + """Prints the logical plan that will be executed to produce this DataFrame. Defaults to showing the unoptimized plan. Use `show_optimized` to show the optimized one. Args: show_optimized (bool): shows the optimized QueryPlan instead of the unoptimized one. + simple (bool): Whether to only show the type of logical op for each node in the logical plan, + rather than showing details of how each logical op is configured. """ if self._result_cache is not None: print("Result is cached and will skip computation\n") - print(self._builder.pretty_print()) + print(self._builder.pretty_print(simple)) print("However here is the logical plan used to produce this result:\n") builder = self.__builder if show_optimized: builder = builder.optimize() - print(builder.pretty_print()) + print(builder.pretty_print(simple)) def num_partitions(self) -> int: return self.__builder.num_partitions() diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 11b2740936..7182587af4 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -56,7 +56,7 @@ def num_partitions(self) -> int: return self.partition_spec().num_partitions @abstractmethod - def pretty_print(self) -> str: + def pretty_print(self, simple: bool = False) -> str: """ Pretty prints the current underlying logical plan. """ diff --git a/daft/logical/logical_plan.py b/daft/logical/logical_plan.py index 1e5c94c19e..2aa3bb19ae 100644 --- a/daft/logical/logical_plan.py +++ b/daft/logical/logical_plan.py @@ -60,8 +60,8 @@ def schema(self) -> Schema: def partition_spec(self) -> PartitionSpec: return self._plan.partition_spec() - def pretty_print(self) -> str: - return self._plan.pretty_print() + def pretty_print(self, simple: bool = False) -> str: + return self._plan.pretty_print(simple) def optimize(self) -> PyLogicalPlanBuilder: from daft.internal.rule_runner import ( @@ -338,14 +338,15 @@ def rebuild(self) -> LogicalPlan: def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan: raise NotImplementedError() - def pretty_print( - self, - ) -> str: + def pretty_print(self, simple: bool = False) -> str: builder: list[str] = [] def helper(node: LogicalPlan, depth: int = 0, index: int = 0, prefix: str = "", header: str = ""): children: list[LogicalPlan] = node._children() - obj_repr_lines = repr(node).splitlines() + if simple: + obj_repr_lines = [node.__class__.__name__] + else: + obj_repr_lines = repr(node).splitlines() builder.append(f"{header}{obj_repr_lines[0]}\n") if len(children) > 0: diff --git a/daft/logical/rust_logical_plan.py b/daft/logical/rust_logical_plan.py index 1193dc2f9f..2a4cdc5462 100644 --- a/daft/logical/rust_logical_plan.py +++ b/daft/logical/rust_logical_plan.py @@ -37,11 +37,14 @@ def partition_spec(self) -> PartitionSpec: # TODO(Clark): Push PartitionSpec into planner. return self._builder.partition_spec() - def pretty_print(self) -> str: - return repr(self) + def pretty_print(self, simple: bool = False) -> str: + if simple: + return self._builder.repr_ascii(simple=True) + else: + return repr(self) def __repr__(self) -> str: - return self._builder.repr_ascii() + return self._builder.repr_ascii(simple=False) def optimize(self) -> RustLogicalPlanBuilder: builder = self._builder.optimize() diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 591abb2a19..3d39490ef2 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -221,8 +221,8 @@ impl LogicalPlanBuilder { self.plan.partition_spec().as_ref().clone() } - pub fn repr_ascii(&self) -> String { - self.plan.repr_ascii() + pub fn repr_ascii(&self, simple: bool) -> String { + self.plan.repr_ascii(simple) } } @@ -410,8 +410,8 @@ impl PyLogicalPlanBuilder { Ok(Arc::new(physical_plan).into()) } - pub fn repr_ascii(&self) -> PyResult { - Ok(self.builder.repr_ascii()) + pub fn repr_ascii(&self, simple: bool) -> PyResult { + Ok(self.builder.repr_ascii(simple)) } } diff --git a/src/daft-plan/src/display.rs b/src/daft-plan/src/display.rs index d57b9abbfb..065a574bb3 100644 --- a/src/daft-plan/src/display.rs +++ b/src/daft-plan/src/display.rs @@ -7,12 +7,15 @@ pub(crate) trait TreeDisplay { // Required method: Get a list of lines representing this node. No trailing newlines. fn get_multiline_representation(&self) -> Vec; + // Required method: Get the human-readable name of this node. + fn get_name(&self) -> String; + // Required method: Get the children of the self node. fn get_children(&self) -> Vec<&Arc>; // Print the whole tree represented by this node. - fn fmt_tree(&self, s: &mut String) -> fmt::Result { - self.fmt_tree_gitstyle(0, s) + fn fmt_tree(&self, s: &mut String, simple: bool) -> fmt::Result { + self.fmt_tree_gitstyle(0, s, simple) } // Print the tree recursively, and illustrate the tree structure with a single line per node + indentation. @@ -46,11 +49,16 @@ pub(crate) trait TreeDisplay { // Print the tree recursively, and illustrate the tree structure in the same style as `git log --graph`. // `depth` is the number of forks in this node's ancestors. - fn fmt_tree_gitstyle(&self, depth: usize, s: &mut String) -> fmt::Result { + fn fmt_tree_gitstyle(&self, depth: usize, s: &mut String, simple: bool) -> fmt::Result { // Print the current node. // e.g. | | * // | | | - for (i, val) in self.get_multiline_representation().iter().enumerate() { + let lines = if simple { + vec![self.get_name()] + } else { + self.get_multiline_representation() + }; + for (i, val) in lines.iter().enumerate() { self.fmt_depth(depth, s)?; match i { 0 => write!(s, "* ")?, @@ -71,7 +79,7 @@ pub(crate) trait TreeDisplay { writeln!(s, "|")?; // Child tree. - child.fmt_tree_gitstyle(depth, s) + child.fmt_tree_gitstyle(depth, s, simple) } // Two children - print legs, print right child indented, print left child. [left, right] => { @@ -80,14 +88,14 @@ pub(crate) trait TreeDisplay { writeln!(s, "|\\")?; // Right child tree, indented. - right.fmt_tree_gitstyle(depth + 1, s)?; + right.fmt_tree_gitstyle(depth + 1, s, simple)?; // Legs, e.g. | | | self.fmt_depth(depth, s)?; writeln!(s, "|")?; // Left child tree. - left.fmt_tree_gitstyle(depth, s) + left.fmt_tree_gitstyle(depth, s, simple) } _ => unreachable!("Max two child nodes expected, got {}", children.len()), } @@ -108,6 +116,10 @@ impl TreeDisplay for crate::LogicalPlan { self.multiline_display() } + fn get_name(&self) -> String { + self.name() + } + fn get_children(&self) -> Vec<&Arc> { self.children() } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index 5c10f8ec8f..44d153d64e 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -261,6 +261,24 @@ impl LogicalPlan { ), } } + pub fn name(&self) -> String { + let name = match self { + Self::Source(..) => "Source", + Self::Project(..) => "Project", + Self::Filter(..) => "Filter", + Self::Limit(..) => "Limit", + Self::Explode(..) => "Explode", + Self::Sort(..) => "Sort", + Self::Repartition(..) => "Repartition", + Self::Coalesce(..) => "Coalesce", + Self::Distinct(..) => "Distinct", + Self::Aggregate(..) => "Aggregate", + Self::Concat(..) => "Concat", + Self::Join(..) => "Join", + Self::Sink(..) => "Sink", + }; + name.to_string() + } pub fn multiline_display(&self) -> Vec { match self { @@ -291,9 +309,9 @@ impl LogicalPlan { } } - pub fn repr_ascii(&self) -> String { + pub fn repr_ascii(&self, simple: bool) -> String { let mut s = String::new(); - self.fmt_tree(&mut s).unwrap(); + self.fmt_tree(&mut s, simple).unwrap(); s } diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index 8b6687b789..e10ba8a644 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -71,6 +71,22 @@ impl RuleBatch { } } + #[allow(dead_code)] + pub fn with_order( + rules: Vec>, + strategy: RuleExecutionStrategy, + order: Option, + ) -> Self { + debug_assert!(order.clone().map_or(true, |order| rules + .iter() + .all(|rule| rule.apply_order() == order))); + Self { + rules, + strategy, + order, + } + } + /// Get the maximum number of passes the optimizer should make over this rule batch. fn max_passes(&self, config: &OptimizerConfig) -> usize { use RuleExecutionStrategy::*; @@ -85,13 +101,13 @@ impl RuleBatch { /// The execution strategy for a batch of rules. pub enum RuleExecutionStrategy { // Apply the batch of rules only once. + #[allow(dead_code)] Once, // Apply the batch of rules multiple times, to a fixed-point or until the max // passes is hit. // If parametrized by Some(n), the batch of rules will be run a maximum // of n passes; if None, the number of passes is capped by the default max // passes given in the OptimizerConfig. - #[allow(dead_code)] FixedPoint(Option), } @@ -113,7 +129,11 @@ impl Optimizer { Box::new(PushDownProjection::new()), Box::new(PushDownLimit::new()), ], - RuleExecutionStrategy::Once, + // Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node + // at the current node, which would require another batch application in order to have a chance to push + // that Filter node through upstream nodes. + // TODO(Clark): Refine this fixed-point policy. + RuleExecutionStrategy::FixedPoint(Some(3)), )]; Self::with_rule_batches(rule_batches, config) }