Skip to content

Commit

Permalink
[CHORE] [New Query Planner] Add simple df.explain() option; change …
Browse files Browse the repository at this point in the history
…to fixed-point policy for rule batch (#1354)

This PR:
- adds a `simple: bool` option to `df.explain()`, which prints out a
node-name-only logical plan tree for quick visualizations;
- changes the optimizer to use a fixed-point policy for its single rule
batch, which adds back some dropped optimizations in certain cases (e.g.
`PushDownProjection` producing a `Filter` node which never has a chance
to push through other upstream nodes).
  • Loading branch information
clarkzinzow committed Sep 8, 2023
1 parent 6f6ca88 commit 1c0087a
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 29 deletions.
10 changes: 6 additions & 4 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,26 @@ def _result(self) -> Optional[PartitionSet]:
return self._result_cache.value

@DataframePublicAPI
def explain(self, show_optimized: bool = False) -> None:
"""Prints the LogicalPlan that will be executed to produce this DataFrame.
def explain(self, show_optimized: bool = False, simple=False) -> None:
"""Prints the logical plan that will be executed to produce this DataFrame.
Defaults to showing the unoptimized plan. Use `show_optimized` to show the optimized one.
Args:
show_optimized (bool): shows the optimized QueryPlan instead of the unoptimized one.
simple (bool): Whether to only show the type of logical op for each node in the logical plan,
rather than showing details of how each logical op is configured.
"""

if self._result_cache is not None:
print("Result is cached and will skip computation\n")
print(self._builder.pretty_print())
print(self._builder.pretty_print(simple))

print("However here is the logical plan used to produce this result:\n")

builder = self.__builder
if show_optimized:
builder = builder.optimize()
print(builder.pretty_print())
print(builder.pretty_print(simple))

def num_partitions(self) -> int:
return self.__builder.num_partitions()
Expand Down
2 changes: 1 addition & 1 deletion daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def num_partitions(self) -> int:
return self.partition_spec().num_partitions

@abstractmethod
def pretty_print(self) -> str:
def pretty_print(self, simple: bool = False) -> str:
"""
Pretty prints the current underlying logical plan.
"""
Expand Down
13 changes: 7 additions & 6 deletions daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def schema(self) -> Schema:
def partition_spec(self) -> PartitionSpec:
return self._plan.partition_spec()

def pretty_print(self) -> str:
return self._plan.pretty_print()
def pretty_print(self, simple: bool = False) -> str:
return self._plan.pretty_print(simple)

def optimize(self) -> PyLogicalPlanBuilder:
from daft.internal.rule_runner import (
Expand Down Expand Up @@ -338,14 +338,15 @@ def rebuild(self) -> LogicalPlan:
def copy_with_new_children(self, new_children: list[LogicalPlan]) -> LogicalPlan:
raise NotImplementedError()

def pretty_print(
self,
) -> str:
def pretty_print(self, simple: bool = False) -> str:
builder: list[str] = []

def helper(node: LogicalPlan, depth: int = 0, index: int = 0, prefix: str = "", header: str = ""):
children: list[LogicalPlan] = node._children()
obj_repr_lines = repr(node).splitlines()
if simple:
obj_repr_lines = [node.__class__.__name__]
else:
obj_repr_lines = repr(node).splitlines()
builder.append(f"{header}{obj_repr_lines[0]}\n")

if len(children) > 0:
Expand Down
9 changes: 6 additions & 3 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ def partition_spec(self) -> PartitionSpec:
# TODO(Clark): Push PartitionSpec into planner.
return self._builder.partition_spec()

def pretty_print(self) -> str:
return repr(self)
def pretty_print(self, simple: bool = False) -> str:
if simple:
return self._builder.repr_ascii(simple=True)
else:
return repr(self)

def __repr__(self) -> str:
return self._builder.repr_ascii()
return self._builder.repr_ascii(simple=False)

def optimize(self) -> RustLogicalPlanBuilder:
builder = self._builder.optimize()
Expand Down
8 changes: 4 additions & 4 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ impl LogicalPlanBuilder {
self.plan.partition_spec().as_ref().clone()
}

pub fn repr_ascii(&self) -> String {
self.plan.repr_ascii()
pub fn repr_ascii(&self, simple: bool) -> String {
self.plan.repr_ascii(simple)
}
}

Expand Down Expand Up @@ -410,8 +410,8 @@ impl PyLogicalPlanBuilder {
Ok(Arc::new(physical_plan).into())
}

pub fn repr_ascii(&self) -> PyResult<String> {
Ok(self.builder.repr_ascii())
pub fn repr_ascii(&self, simple: bool) -> PyResult<String> {
Ok(self.builder.repr_ascii(simple))
}
}

Expand Down
26 changes: 19 additions & 7 deletions src/daft-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ pub(crate) trait TreeDisplay {
// Required method: Get a list of lines representing this node. No trailing newlines.
fn get_multiline_representation(&self) -> Vec<String>;

// Required method: Get the human-readable name of this node.
fn get_name(&self) -> String;

// Required method: Get the children of the self node.
fn get_children(&self) -> Vec<&Arc<Self>>;

// Print the whole tree represented by this node.
fn fmt_tree(&self, s: &mut String) -> fmt::Result {
self.fmt_tree_gitstyle(0, s)
fn fmt_tree(&self, s: &mut String, simple: bool) -> fmt::Result {
self.fmt_tree_gitstyle(0, s, simple)
}

// Print the tree recursively, and illustrate the tree structure with a single line per node + indentation.
Expand Down Expand Up @@ -46,11 +49,16 @@ pub(crate) trait TreeDisplay {

// Print the tree recursively, and illustrate the tree structure in the same style as `git log --graph`.
// `depth` is the number of forks in this node's ancestors.
fn fmt_tree_gitstyle(&self, depth: usize, s: &mut String) -> fmt::Result {
fn fmt_tree_gitstyle(&self, depth: usize, s: &mut String, simple: bool) -> fmt::Result {
// Print the current node.
// e.g. | | * <node contents line 1>
// | | | <node contents line 2>
for (i, val) in self.get_multiline_representation().iter().enumerate() {
let lines = if simple {
vec![self.get_name()]
} else {
self.get_multiline_representation()
};
for (i, val) in lines.iter().enumerate() {
self.fmt_depth(depth, s)?;
match i {
0 => write!(s, "* ")?,
Expand All @@ -71,7 +79,7 @@ pub(crate) trait TreeDisplay {
writeln!(s, "|")?;

// Child tree.
child.fmt_tree_gitstyle(depth, s)
child.fmt_tree_gitstyle(depth, s, simple)
}
// Two children - print legs, print right child indented, print left child.
[left, right] => {
Expand All @@ -80,14 +88,14 @@ pub(crate) trait TreeDisplay {
writeln!(s, "|\\")?;

// Right child tree, indented.
right.fmt_tree_gitstyle(depth + 1, s)?;
right.fmt_tree_gitstyle(depth + 1, s, simple)?;

// Legs, e.g. | | |
self.fmt_depth(depth, s)?;
writeln!(s, "|")?;

// Left child tree.
left.fmt_tree_gitstyle(depth, s)
left.fmt_tree_gitstyle(depth, s, simple)
}
_ => unreachable!("Max two child nodes expected, got {}", children.len()),
}
Expand All @@ -108,6 +116,10 @@ impl TreeDisplay for crate::LogicalPlan {
self.multiline_display()
}

fn get_name(&self) -> String {
self.name()
}

fn get_children(&self) -> Vec<&Arc<Self>> {
self.children()
}
Expand Down
22 changes: 20 additions & 2 deletions src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,24 @@ impl LogicalPlan {
),
}
}
pub fn name(&self) -> String {
let name = match self {
Self::Source(..) => "Source",
Self::Project(..) => "Project",
Self::Filter(..) => "Filter",
Self::Limit(..) => "Limit",
Self::Explode(..) => "Explode",
Self::Sort(..) => "Sort",
Self::Repartition(..) => "Repartition",
Self::Coalesce(..) => "Coalesce",
Self::Distinct(..) => "Distinct",
Self::Aggregate(..) => "Aggregate",
Self::Concat(..) => "Concat",
Self::Join(..) => "Join",
Self::Sink(..) => "Sink",
};
name.to_string()
}

pub fn multiline_display(&self) -> Vec<String> {
match self {
Expand Down Expand Up @@ -291,9 +309,9 @@ impl LogicalPlan {
}
}

pub fn repr_ascii(&self) -> String {
pub fn repr_ascii(&self, simple: bool) -> String {
let mut s = String::new();
self.fmt_tree(&mut s).unwrap();
self.fmt_tree(&mut s, simple).unwrap();
s
}

Expand Down
24 changes: 22 additions & 2 deletions src/daft-plan/src/optimization/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ impl RuleBatch {
}
}

#[allow(dead_code)]
pub fn with_order(
rules: Vec<Box<dyn OptimizerRule>>,
strategy: RuleExecutionStrategy,
order: Option<ApplyOrder>,
) -> Self {
debug_assert!(order.clone().map_or(true, |order| rules
.iter()
.all(|rule| rule.apply_order() == order)));
Self {
rules,
strategy,
order,
}
}

/// Get the maximum number of passes the optimizer should make over this rule batch.
fn max_passes(&self, config: &OptimizerConfig) -> usize {
use RuleExecutionStrategy::*;
Expand All @@ -85,13 +101,13 @@ impl RuleBatch {
/// The execution strategy for a batch of rules.
pub enum RuleExecutionStrategy {
// Apply the batch of rules only once.
#[allow(dead_code)]
Once,
// Apply the batch of rules multiple times, to a fixed-point or until the max
// passes is hit.
// If parametrized by Some(n), the batch of rules will be run a maximum
// of n passes; if None, the number of passes is capped by the default max
// passes given in the OptimizerConfig.
#[allow(dead_code)]
FixedPoint(Option<usize>),
}

Expand All @@ -113,7 +129,11 @@ impl Optimizer {
Box::new(PushDownProjection::new()),
Box::new(PushDownLimit::new()),
],
RuleExecutionStrategy::Once,
// Use a fixed-point policy for the pushdown rules: PushDownProjection can produce a Filter node
// at the current node, which would require another batch application in order to have a chance to push
// that Filter node through upstream nodes.
// TODO(Clark): Refine this fixed-point policy.
RuleExecutionStrategy::FixedPoint(Some(3)),
)];
Self::with_rule_batches(rule_batches, config)
}
Expand Down

0 comments on commit 1c0087a

Please sign in to comment.