Skip to content

Commit

Permalink
[CHORE] LogicalPlan: Add display improvements, and Filter (#1221)
Browse files Browse the repository at this point in the history
- Add `LogicalPlan::Filter` and integrate it all the way to the Python
DataFrame API.
- Implement Display (single-node display) and TreeDisplay (entire plan
display) for LogicalPlan.

```
>>> import daft
>>> df = daft.read_parquet("out.pq")
2023-08-02 17:47:32.456 | INFO     | daft.context:runner:86 - Using PyRunner
>>> df = df.where(daft.col("a") > 0)
>>> df._plan
* Filter: col(a) > lit(0)
|
* Source: Parquet
|   /Users/charles/daft/out.pq/da78add6-a17a-47e6-bd39-2de437a64969-0.parquet
|   File schema: a (Int64)
|   Output schema: a (Int64)

```

---------

Co-authored-by: Xiayue Charles Lin <[email protected]>
  • Loading branch information
xcharleslin and Xiayue Charles Lin authored Aug 3, 2023
1 parent bc65aaf commit 839d18c
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 30 deletions.
16 changes: 15 additions & 1 deletion daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Tuple,
TypeVar,
Union,
cast,
)

from daft.api_annotations import DataframePublicAPI
Expand Down Expand Up @@ -530,7 +531,20 @@ def where(self, predicate: Expression) -> "DataFrame":
Returns:
DataFrame: Filtered DataFrame.
"""
plan = logical_plan.Filter(self._plan, ExpressionsProjection([predicate]))
use_rust_planner = get_context().use_rust_planner

if use_rust_planner:
new_builder = cast(
rust_logical_plan.RustLogicalPlanBuilder,
self._plan,
).builder.filter(predicate._expr)

plan = cast(
logical_plan.LogicalPlan,
rust_logical_plan.RustLogicalPlanBuilder(new_builder),
)
else:
plan = logical_plan.Filter(self._plan, ExpressionsProjection([predicate]))
return DataFrame(plan)

@DataframePublicAPI
Expand Down
3 changes: 3 additions & 0 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@ def __init__(self, builder: LogicalPlanBuilder) -> None:
def schema(self) -> Schema:
pyschema = self.builder.schema()
return Schema._from_pyschema(pyschema)

def __repr__(self) -> str:
return self.builder.repr_ascii()
10 changes: 9 additions & 1 deletion src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,18 @@ impl Schema {

res
}

pub fn short_string(&self) -> String {
self.fields
.iter()
.map(|(name, field)| format!("{} ({:?})", name, field.dtype))
.collect::<Vec<String>>()
.join(", ")
}
}

impl Display for Schema {
// `f` is a buffer, and this method must write the formatted string into it
// Produces an ASCII table.
fn fmt(&self, f: &mut Formatter) -> Result {
let mut table = prettytable::Table::new();

Expand Down
22 changes: 19 additions & 3 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ use crate::ops;
use crate::source_info::{FileFormat, FilesInfo, SourceInfo};

#[cfg(feature = "python")]
use daft_core::python::schema::PySchema;
#[cfg(feature = "python")]
use pyo3::prelude::*;
use {daft_core::python::schema::PySchema, daft_dsl::python::PyExpr, pyo3::prelude::*};

#[cfg_attr(feature = "python", pyclass)]
#[derive(Debug)]
pub struct LogicalPlanBuilder {
plan: Arc<LogicalPlan>,
}
Expand All @@ -21,6 +20,11 @@ impl LogicalPlanBuilder {
plan: LogicalPlan::Source(source).into(),
}
}
pub fn from_filter(filter: ops::Filter) -> Self {
Self {
plan: LogicalPlan::Filter(filter).into(),
}
}
}

#[cfg(feature = "python")]
Expand All @@ -40,7 +44,19 @@ impl LogicalPlanBuilder {
Ok(logical_plan_builder)
}

pub fn filter(&self, predicate: &PyExpr) -> PyResult<LogicalPlanBuilder> {
let logical_plan_builder = LogicalPlanBuilder::from_filter(ops::Filter::new(
predicate.expr.clone().into(),
self.plan.clone(),
));
Ok(logical_plan_builder)
}

pub fn schema(&self) -> PyResult<PySchema> {
Ok(self.plan.schema().into())
}

pub fn repr_ascii(&self) -> PyResult<String> {
Ok(self.plan.repr_ascii())
}
}
71 changes: 48 additions & 23 deletions src/daft-plan/src/display.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,31 @@
use std::fmt;
use std::fmt::{self, Display, Write};

trait TreeDisplay {
// Required method: Print just the self node in a single line. No trailing newline.
fn fmt_oneline(&self, f: &mut fmt::Formatter) -> fmt::Result;
pub(crate) trait TreeDisplay {
// Required method: Get a list of lines representing this node. No trailing newlines.
fn get_multiline_representation(&self) -> Vec<String>;

// Required method: Get the children of the self node.
fn get_children(&self) -> Vec<&Self>;

// Print the whole tree represented by this node.
fn fmt_tree(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.fmt_tree_gitstyle(0, f)
fn fmt_tree(&self, s: &mut String) -> fmt::Result {
self.fmt_tree_gitstyle(0, s)
}

// Print the tree recursively, and illustrate the tree structure in the same style as `git log --graph`.
// `depth` is the number of forks in this node's ancestors.
fn fmt_tree_gitstyle(&self, depth: usize, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt_tree_gitstyle(&self, depth: usize, s: &mut String) -> fmt::Result {
// Print the current node.
// e.g. | | * <node contents>
self.fmt_depth(depth, f)?;
write!(f, "* ")?;
self.fmt_oneline(f)?;
writeln!(f)?;
// e.g. | | * <node contents line 1>
// | | | <node contents line 2>
for (i, val) in self.get_multiline_representation().iter().enumerate() {
self.fmt_depth(depth, s)?;
match i {
0 => write!(s, "* ")?,
_ => write!(s, "| ")?,
}
writeln!(s, "{val}")?;
}

// Recursively handle children.
let children = self.get_children();
Expand All @@ -30,37 +35,57 @@ trait TreeDisplay {
// One child - print leg, then print the child tree.
[child] => {
// Leg: e.g. | | |
self.fmt_depth(depth, f)?;
writeln!(f, "|")?;
self.fmt_depth(depth, s)?;
writeln!(s, "|")?;

// Child tree.
child.fmt_tree_gitstyle(depth, f)
child.fmt_tree_gitstyle(depth, s)
}
// Two children - print legs, print right child indented, print left child.
[left, right] => {
// Legs: e.g. | | |\
self.fmt_depth(depth, f)?;
writeln!(f, "|\\")?;
self.fmt_depth(depth, s)?;
writeln!(s, "|\\")?;

// Right child tree, indented.
right.fmt_tree_gitstyle(depth + 1, f)?;
right.fmt_tree_gitstyle(depth + 1, s)?;

// Legs, e.g. | | |
self.fmt_depth(depth, f)?;
writeln!(f, "|")?;
self.fmt_depth(depth, s)?;
writeln!(s, "|")?;

// Left child tree.
left.fmt_tree_gitstyle(depth, f)
left.fmt_tree_gitstyle(depth, s)
}
_ => unreachable!("Max two child nodes expected, got {}", children.len()),
}
}

fn fmt_depth(&self, depth: usize, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt_depth(&self, depth: usize, s: &mut String) -> fmt::Result {
// Print leading pipes for forks in ancestors that will be printed later.
// e.g. "| | "
for _ in 0..depth {
write!(f, "| ")?;
write!(s, "| ")?;
}
Ok(())
}
}

impl TreeDisplay for crate::LogicalPlan {
fn get_multiline_representation(&self) -> Vec<String> {
self.multiline_display()
}

fn get_children(&self) -> Vec<&Self> {
self.children()
}
}

// Single node display.
impl Display for crate::LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for line in self.multiline_display() {
writeln!(f, "{line}")?;
}
Ok(())
}
Expand Down
24 changes: 23 additions & 1 deletion src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,37 @@ use daft_core::schema::SchemaRef;

use crate::ops::*;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum LogicalPlan {
Source(Source),
Filter(Filter),
}

impl LogicalPlan {
pub fn schema(&self) -> SchemaRef {
match self {
Self::Source(Source { schema, .. }) => schema.clone(),
Self::Filter(Filter { input, .. }) => input.schema(),
}
}

pub fn children(&self) -> Vec<&Self> {
match self {
Self::Source(..) => vec![],
Self::Filter(filter) => vec![&filter.input],
}
}

pub fn multiline_display(&self) -> Vec<String> {
match self {
Self::Source(source) => source.multiline_display(),
Self::Filter(Filter { predicate, .. }) => vec![format!("Filter: {predicate}")],
}
}

pub fn repr_ascii(&self) -> String {
let mut s = String::new();
crate::display::TreeDisplay::fmt_tree(self, &mut s).unwrap();
s
}
}
19 changes: 19 additions & 0 deletions src/daft-plan/src/ops/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::sync::Arc;

use daft_dsl::ExprRef;

use crate::LogicalPlan;

#[derive(Clone, Debug)]
pub struct Filter {
// The Boolean expression to filter on.
pub predicate: ExprRef,
// Upstream node.
pub input: Arc<LogicalPlan>,
}

impl Filter {
pub(crate) fn new(predicate: ExprRef, input: Arc<LogicalPlan>) -> Self {
Self { predicate, input }
}
}
2 changes: 2 additions & 0 deletions src/daft-plan/src/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod filter;
mod source;

pub use filter::Filter;
pub use source::Source;
28 changes: 27 additions & 1 deletion src/daft-plan/src/ops/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use daft_dsl::ExprRef;

use crate::source_info::SourceInfo;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Source {
/// The schema of the output of this node (the source data schema).
/// May be a subset of the source data schema; executors should push down this projection if possible.
Expand All @@ -29,4 +29,30 @@ impl Source {
limit: None, // Will be populated by plan optimizer.
}
}

pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];

use SourceInfo::*;
match &*self.source_info {
FilesInfo(files_info) => {
res.push(format!("Source: {:?}", files_info.file_format));
for fp in files_info.filepaths.iter() {
res.push(format!(" {}", fp));
}
res.push(format!(
" File schema: {}",
files_info.schema.short_string()
));
}
}
res.push(format!(" Output schema: {}", self.schema.short_string()));
if self.filters.is_empty() {
res.push(format!(" Filters: {:?}", self.filters));
}
if let Some(limit) = self.limit {
res.push(format!(" Limit: {}", limit));
}
res
}
}
3 changes: 3 additions & 0 deletions src/daft-plan/src/source_info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use daft_core::schema::SchemaRef;

#[derive(Debug)]
pub enum SourceInfo {
FilesInfo(FilesInfo),
}
Expand All @@ -13,12 +14,14 @@ impl SourceInfo {
}
}

#[derive(Debug)]
pub enum FileFormat {
Parquet,
Csv,
Json,
}

#[derive(Debug)]
pub struct FilesInfo {
pub file_format: FileFormat,
pub filepaths: Vec<String>, // TODO: pull in some sort of URL crate for this
Expand Down

0 comments on commit 839d18c

Please sign in to comment.