Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Begin integrating Rust Logical Plan with Dataframe API #1207

Merged
merged 6 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _get_runner_config_from_env() -> _RunnerConfig:

def _get_planner_from_env() -> bool:
"""Returns whether or not to use the new query planner."""
return bool(int(os.getenv("DAFT_DEVELOPER_RUST_QUERY_PLANNER", default="1")))
return bool(int(os.getenv("DAFT_DEVELOPER_RUST_QUERY_PLANNER", default="0")))
xcharleslin marked this conversation as resolved.
Show resolved Hide resolved


@dataclasses.dataclass(frozen=True)
Expand Down
4 changes: 2 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.expressions import Expression, ExpressionsProjection, col, lit
from daft.logical import logical_plan
from daft.logical import logical_plan, rust_logical_plan
from daft.logical.aggregation_plan_builder import AggregationPlanBuilder
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry, PartitionSet
Expand Down Expand Up @@ -65,7 +65,7 @@ def __init__(self, plan: logical_plan.LogicalPlan) -> None:
Args:
plan: LogicalPlan describing the steps required to arrive at this DataFrame
"""
if not isinstance(plan, logical_plan.LogicalPlan):
if not isinstance(plan, (logical_plan.LogicalPlan, rust_logical_plan.RustLogicalPlanBuilder)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the type annotation of the plan arg and the _plan property accessor to be a union of LogicalPlan and RustLogicalPlanBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline; LogicalPlan has too many methods so Union will not work out of the box

if isinstance(plan, dict):
raise ValueError(
f"DataFrames should be constructed with a dictionary of columns using `daft.from_pydict`"
Expand Down
35 changes: 34 additions & 1 deletion daft/io/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import fsspec

from daft.context import get_context
from daft.daft import LogicalPlanBuilder
from daft.datasources import SourceInfo
from daft.datatype import DataType
from daft.logical import logical_plan
from daft.logical import logical_plan, rust_logical_plan
from daft.logical.schema import Schema


Expand Down Expand Up @@ -57,3 +58,35 @@
# one partition per filepath. This will change in the future and our logic here should change accordingly.
num_partitions=len(listing_details_partition_set),
)


def _get_files_scan_rustplan(
path: str | list[str],
schema_hints: dict[str, DataType] | None,
source_info: SourceInfo,
fs: fsspec.AbstractFileSystem | None,
) -> rust_logical_plan.RustLogicalPlanBuilder:
"""Returns a LogicalPlanBuilder with the file scan."""
# Glob the path using the Runner
runner_io = get_context().runner().runner_io()

Check warning on line 71 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L71

Added line #L71 was not covered by tests

paths = path if isinstance(path, list) else [str(path)]
listing_details_partition_set = runner_io.glob_paths_details(paths, source_info, fs)

Check warning on line 74 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L73-L74

Added lines #L73 - L74 were not covered by tests

# Infer schema if no hints provided
inferred_or_provided_schema = (

Check warning on line 77 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L77

Added line #L77 was not covered by tests
_get_schema_from_hints(schema_hints)
if schema_hints is not None
else runner_io.get_schema_from_first_filepath(listing_details_partition_set, source_info, fs)
)

# Construct plan
paths_details = listing_details_partition_set.to_pydict()

Check warning on line 84 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L84

Added line #L84 was not covered by tests

filepaths = paths_details[runner_io.FS_LISTING_PATH_COLUMN_NAME]
rs_schema = inferred_or_provided_schema._schema

Check warning on line 87 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L86-L87

Added lines #L86 - L87 were not covered by tests

builder = LogicalPlanBuilder.read_parquet(filepaths, rs_schema)
pybuilder = rust_logical_plan.RustLogicalPlanBuilder(builder)

Check warning on line 90 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L89-L90

Added lines #L89 - L90 were not covered by tests

return pybuilder

Check warning on line 92 in daft/io/common.py

View check run for this annotation

Codecov / codecov/patch

daft/io/common.py#L92

Added line #L92 was not covered by tests
42 changes: 31 additions & 11 deletions daft/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# isort: dont-add-import: from __future__ import annotations

from typing import TYPE_CHECKING, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Dict, List, Optional, Union, cast

import fsspec

from daft.api_annotations import PublicAPI
from daft.context import get_context
from daft.dataframe import DataFrame
from daft.datasources import ParquetSourceInfo
from daft.datatype import DataType
from daft.io.common import _get_tabular_files_scan
from daft.io.common import _get_files_scan_rustplan, _get_tabular_files_scan
from daft.logical.logical_plan import LogicalPlan

if TYPE_CHECKING:
from daft.io import IOConfig
Expand Down Expand Up @@ -43,16 +45,34 @@
returns:
DataFrame: parsed DataFrame
"""

if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of Parquet filepaths")

plan = _get_tabular_files_scan(
path,
schema_hints,
ParquetSourceInfo(
io_config=io_config,
use_native_downloader=use_native_downloader,
),
fs,
)
context = get_context()

if context.use_rust_planner:
Copy link
Contributor

@clarkzinzow clarkzinzow Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some quick questions about the current "switching between all-Python logical plan vs. Rust-based logical plan builder" setup.

We previously talked about exposing a LogicalPlanBuilder interface that would have two implementations, the all-Python query planner (PyLogicalPlanBuilder) and the new Rust query planner (RustLogicalPlanBuilder), where the DataFrame API layer would be rewritten to use that LogicalPlanBuilder interface and we could limit the number of places we'd need to switch on context.use_rust_planner. E.g. the DataFrame constructor would take a LogicalPlanBuilder instance instead of a union of the all-Python LogicalPlan and the RustLogicalPlanBuilder:

    def __init__(self, builder: LogicalPlanBuilder) -> None:
        if not isinstance(builder, LogicalPlanBuilder):
            if isinstance(builder, dict):
                raise ValueError(
                    f"DataFrames should be constructed with a dictionary of columns using `daft.from_pydict`"
                )
            if isinstance(builder, list):
                raise ValueError(
                    f"DataFrames should be constructed with a list of dictionaries using `daft.from_pylist`"
                )
            raise ValueError(f"Expected DataFrame to be constructed with a LogicalPlanBuilder, received: {builder}")

        self._builder = builder
        self._result_cache: Optional[PartitionCacheEntry] = None
        self._preview = DataFramePreview(preview_partition=None, dataframe_num_rows=None)

And read_parquet() would look something like this:

def read_parquet(
    path: Union[str, List[str]],
    schema_hints: Optional[Dict[str, DataType]] = None,
    fs: Optional[fsspec.AbstractFileSystem] = None,
    io_config: Optional["IOConfig"] = None,
    use_native_downloader: bool = False,
) -> DataFrame:
    if isinstance(path, list) and len(path) == 0:
        raise ValueError(f"Cannot read DataFrame from from empty list of Parquet filepaths")

    context = get_context()
    # This could eventually be reduced to context.new_logical_plan_builder().
    builder = RustLogicalPlanBuilder() if context.use_rust_planner else PyLogicalPlanBuilder()
    new_builder = builder.scan(
        path,
        schema_hints,
        ParquetSourceInfo(
            io_config=io_config,
            use_native_downloader=use_native_downloader,
        ),
        fs,
    )
    return DataFrame(new_builder)

This should end up being a good bit cleaner than imperatively switching between implementations within each DataFrame API function/method, and we should be able to line up the builder interfaces (such as builder.scan() or builder.filter()) since they're adding the same logical op to each underlying logical plan implementation. This does, however, require more upfront changes to the DataFrame implementation, since each method would need to be ported to the PyLogicalPlanBuilder interface (although this should be straightforward).

Do you agree that this is a better long-term approach, and if so, are you thinking that deferring this refactor is the best choice for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline --

Do you agree that this is a better long-term approach, and if so, are you thinking that deferring this refactor is the best choice for now?

yes and yes, to get rust plan creation from dataframes in our hands ASAP

plan = cast(

Check warning on line 55 in daft/io/parquet.py

View check run for this annotation

Codecov / codecov/patch

daft/io/parquet.py#L55

Added line #L55 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this type casting, we could make the DataFrame constructor take a Union[LogicalPlan, RustLogicalPlanBuilder].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Discussed offline; same as above)

LogicalPlan,
_get_files_scan_rustplan(
path,
schema_hints,
ParquetSourceInfo(
io_config=io_config,
use_native_downloader=use_native_downloader,
),
fs,
),
) # Cast for temporary type checking.
else:
plan = _get_tabular_files_scan(
path,
schema_hints,
ParquetSourceInfo(
io_config=io_config,
use_native_downloader=use_native_downloader,
),
fs,
)

return DataFrame(plan)
15 changes: 15 additions & 0 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from daft.daft import LogicalPlanBuilder
from daft.logical.schema import Schema


class RustLogicalPlanBuilder:
"""Wrapper class for the new LogicalPlanBuilder in Rust."""

def __init__(self, builder: LogicalPlanBuilder) -> None:
self.builder = builder

Check warning on line 11 in daft/logical/rust_logical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/rust_logical_plan.py#L11

Added line #L11 was not covered by tests

def schema(self) -> Schema:
pyschema = self.builder.schema()
return Schema._from_pyschema(pyschema)

Check warning on line 15 in daft/logical/rust_logical_plan.py

View check run for this annotation

Codecov / codecov/patch

daft/logical/rust_logical_plan.py#L14-L15

Added lines #L14 - L15 were not covered by tests
16 changes: 11 additions & 5 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use crate::logical_plan::LogicalPlan;
use crate::{ops, source_info};
use crate::ops;
use crate::source_info::{FileFormat, FilesInfo, SourceInfo};

#[cfg(feature = "python")]
use daft_core::python::schema::PySchema;
Expand All @@ -10,14 +11,14 @@ use pyo3::prelude::*;

#[cfg_attr(feature = "python", pyclass)]
pub struct LogicalPlanBuilder {
_plan: Arc<LogicalPlan>,
plan: Arc<LogicalPlan>,
}

impl LogicalPlanBuilder {
// Create a new LogicalPlanBuilder for a Source node.
pub fn from_source(source: ops::Source) -> Self {
Self {
_plan: LogicalPlan::Source(source).into(),
plan: LogicalPlan::Source(source).into(),
}
}
}
Expand All @@ -26,8 +27,9 @@ impl LogicalPlanBuilder {
#[pymethods]
impl LogicalPlanBuilder {
#[staticmethod]
pub fn source(filepaths: Vec<String>, schema: &PySchema) -> PyResult<LogicalPlanBuilder> {
let source_info = source_info::SourceInfo::FilesInfo(source_info::FilesInfo::new(
pub fn read_parquet(filepaths: Vec<String>, schema: &PySchema) -> PyResult<LogicalPlanBuilder> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Instead of having a LogicalPlanBuilder method per read API (e.g. for reading Parquets, CSVs, JSONs, etc.), we could still have a source method that also takes a FileFormat enum variant, which would then be incorporated into the SourceInfo.

On second thought, making the LogicalPlanBuilder methods 1:1 with the DataFrame APIs makes more sense to me than 1:1 with the logical operators, since the former leaks less representational details to the Python side and makes the builder abstraction more useful so keeping format-specific read_* methods seems like the best choice to me! E.g. it would also allow us to hide a FileFormat enum from the Python side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the former leaks less representational details to the Python side and makes the builder abstraction more useful

yeah, this was what I was hoping for as well!

let source_info = SourceInfo::FilesInfo(FilesInfo::new(
FileFormat::Parquet,
filepaths,
schema.schema.clone(),
));
Expand All @@ -37,4 +39,8 @@ impl LogicalPlanBuilder {
));
Ok(logical_plan_builder)
}

pub fn schema(&self) -> PyResult<PySchema> {
Ok(self.plan.schema().into())
}
}
10 changes: 10 additions & 0 deletions src/daft-plan/src/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
use daft_core::schema::SchemaRef;

use crate::ops::*;

#[derive(Clone)]
pub enum LogicalPlan {
Source(Source),
}

impl LogicalPlan {
pub fn schema(&self) -> SchemaRef {
match self {
Self::Source(Source { schema, .. }) => schema.clone(),
}
}
}
15 changes: 13 additions & 2 deletions src/daft-plan/src/source_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,24 @@ impl SourceInfo {
}
}

pub enum FileFormat {
Parquet,
Csv,
Json,
}

pub struct FilesInfo {
pub file_format: FileFormat,
pub filepaths: Vec<String>, // TODO: pull in some sort of URL crate for this
pub schema: SchemaRef,
}

impl FilesInfo {
pub(crate) fn new(filepaths: Vec<String>, schema: SchemaRef) -> Self {
Self { filepaths, schema }
pub(crate) fn new(file_format: FileFormat, filepaths: Vec<String>, schema: SchemaRef) -> Self {
Self {
file_format,
filepaths,
schema,
}
}
}
Loading