Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Begin integrating Rust Logical Plan with Dataframe API #1207

Merged
merged 6 commits into from
Aug 2, 2023

Conversation

xcharleslin
Copy link
Contributor

@xcharleslin xcharleslin commented Aug 1, 2023

daft.read_parquet now creates a Rust logical plan (when DAFT_DEVELOPER_RUST_QUERY_PLANNER=1 is set).

This involves:

  • Separate method for constructing old logical plan scan nodes vs new logical plan scan nodes
  • Temporary wrapper class in Python for LogicalPlanBuilder, so DataFrame API can use it as if it was an old LogicalPlan

@github-actions github-actions bot added the chore label Aug 1, 2023
@xcharleslin xcharleslin marked this pull request as ready for review August 1, 2023 21:29
@codecov
Copy link

codecov bot commented Aug 1, 2023

Codecov Report

Merging #1207 (790fa24) into main (e681536) will decrease coverage by 0.32%.
Report is 2 commits behind head on main.
The diff coverage is 57.57%.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1207      +/-   ##
==========================================
- Coverage   88.41%   88.10%   -0.32%     
==========================================
  Files          55       56       +1     
  Lines        5620     5656      +36     
==========================================
+ Hits         4969     4983      +14     
- Misses        651      673      +22     
Files Changed Coverage Δ
daft/io/common.py 64.51% <23.07%> (-30.23%) ⬇️
daft/logical/rust_logical_plan.py 66.66% <66.66%> (ø)
daft/io/parquet.py 90.00% <87.50%> (-3.34%) ⬇️
daft/context.py 80.55% <100.00%> (ø)
daft/dataframe/dataframe.py 88.84% <100.00%> (ø)

... and 3 files with indirect coverage changes

@xcharleslin
Copy link
Contributor Author

@clarkzinzow I ended up moving the file type (parquet) directly into the SourceInfo enum. This avoids creating a new enum and should be cleaner, I hope

daft/context.py Show resolved Hide resolved
@@ -65,7 +65,7 @@ def __init__(self, plan: logical_plan.LogicalPlan) -> None:
Args:
plan: LogicalPlan describing the steps required to arrive at this DataFrame
"""
if not isinstance(plan, logical_plan.LogicalPlan):
if not isinstance(plan, (logical_plan.LogicalPlan, rust_logical_plan.RustLogicalPlanBuilder)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the type annotation of the plan arg and the _plan property accessor to be a union of LogicalPlan and RustLogicalPlanBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline; LogicalPlan has too many methods so Union will not work out of the box

context = get_context()

if context.use_rust_planner:
plan = cast(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this type casting, we could make the DataFrame constructor take a Union[LogicalPlan, RustLogicalPlanBuilder].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Discussed offline; same as above)

)
context = get_context()

if context.use_rust_planner:
Copy link
Contributor

@clarkzinzow clarkzinzow Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some quick questions about the current "switching between all-Python logical plan vs. Rust-based logical plan builder" setup.

We previously talked about exposing a LogicalPlanBuilder interface that would have two implementations, the all-Python query planner (PyLogicalPlanBuilder) and the new Rust query planner (RustLogicalPlanBuilder), where the DataFrame API layer would be rewritten to use that LogicalPlanBuilder interface and we could limit the number of places we'd need to switch on context.use_rust_planner. E.g. the DataFrame constructor would take a LogicalPlanBuilder instance instead of a union of the all-Python LogicalPlan and the RustLogicalPlanBuilder:

    def __init__(self, builder: LogicalPlanBuilder) -> None:
        if not isinstance(builder, LogicalPlanBuilder):
            if isinstance(builder, dict):
                raise ValueError(
                    f"DataFrames should be constructed with a dictionary of columns using `daft.from_pydict`"
                )
            if isinstance(builder, list):
                raise ValueError(
                    f"DataFrames should be constructed with a list of dictionaries using `daft.from_pylist`"
                )
            raise ValueError(f"Expected DataFrame to be constructed with a LogicalPlanBuilder, received: {builder}")

        self._builder = builder
        self._result_cache: Optional[PartitionCacheEntry] = None
        self._preview = DataFramePreview(preview_partition=None, dataframe_num_rows=None)

And read_parquet() would look something like this:

def read_parquet(
    path: Union[str, List[str]],
    schema_hints: Optional[Dict[str, DataType]] = None,
    fs: Optional[fsspec.AbstractFileSystem] = None,
    io_config: Optional["IOConfig"] = None,
    use_native_downloader: bool = False,
) -> DataFrame:
    if isinstance(path, list) and len(path) == 0:
        raise ValueError(f"Cannot read DataFrame from from empty list of Parquet filepaths")

    context = get_context()
    # This could eventually be reduced to context.new_logical_plan_builder().
    builder = RustLogicalPlanBuilder() if context.use_rust_planner else PyLogicalPlanBuilder()
    new_builder = builder.scan(
        path,
        schema_hints,
        ParquetSourceInfo(
            io_config=io_config,
            use_native_downloader=use_native_downloader,
        ),
        fs,
    )
    return DataFrame(new_builder)

This should end up being a good bit cleaner than imperatively switching between implementations within each DataFrame API function/method, and we should be able to line up the builder interfaces (such as builder.scan() or builder.filter()) since they're adding the same logical op to each underlying logical plan implementation. This does, however, require more upfront changes to the DataFrame implementation, since each method would need to be ported to the PyLogicalPlanBuilder interface (although this should be straightforward).

Do you agree that this is a better long-term approach, and if so, are you thinking that deferring this refactor is the best choice for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline --

Do you agree that this is a better long-term approach, and if so, are you thinking that deferring this refactor is the best choice for now?

yes and yes, to get rust plan creation from dataframes in our hands ASAP

filepaths,
schema.schema.clone(),
));
pub fn read_parquet(filepaths: Vec<String>, schema: &PySchema) -> PyResult<LogicalPlanBuilder> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Instead of having a LogicalPlanBuilder method per read API (e.g. for reading Parquets, CSVs, JSONs, etc.), we could still have a source method that also takes a FileFormat enum variant, which would then be incorporated into the SourceInfo.

On second thought, making the LogicalPlanBuilder methods 1:1 with the DataFrame APIs makes more sense to me than 1:1 with the logical operators, since the former leaks less representational details to the Python side and makes the builder abstraction more useful so keeping format-specific read_* methods seems like the best choice to me! E.g. it would also allow us to hide a FileFormat enum from the Python side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the former leaks less representational details to the Python side and makes the builder abstraction more useful

yeah, this was what I was hoping for as well!

@@ -1,24 +1,24 @@
use daft_core::schema::SchemaRef;

pub enum SourceInfo {
FilesInfo(FilesInfo),
ParquetFilesInfo(ParquetFilesInfo),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until there's deviation in what data we're holding in the SourceInfo structs across Parquet, CSV, JSON, etc. file types, I think that we should keep it as a FilesInfo enum variant containing a FilesInfo struct that contains an additional FileFormat enum:

pub enum SourceInfo {
    FilesInfo(FilesInfo),
}

impl SourceInfo {
    pub fn schema(&self) -> SchemaRef {
        use SourceInfo::*;
        match self {
            FilesInfo(files_info) => files_info.schema.clone(),
        }
    }
}

pub enum FileFormat {
    Parquet,
    Csv,
    Json,
}

pub struct FilesInfo {
    pub filepaths: Vec<String>, // TODO: pull in some sort of URL crate for this
    pub file_format: FileFormat,
    pub schema: SchemaRef,
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@xcharleslin xcharleslin enabled auto-merge (squash) August 2, 2023 22:33
@xcharleslin xcharleslin merged commit 701fb88 into main Aug 2, 2023
17 of 18 checks passed
@xcharleslin xcharleslin deleted the charles/readplan branch August 2, 2023 22:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants