Skip to content

Commit

Permalink
[FEAT] Deprecates usage of resource_request on df.with_column API (#2654
Browse files Browse the repository at this point in the history
)

Fully deprecates the use of `df.with_column(..., resource_request=...)`
in favor of using resources on UDFs instead.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Aug 15, 2024
1 parent b88f87e commit 9b8c2b2
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 241 deletions.
2 changes: 1 addition & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1619,7 +1619,7 @@ class LogicalPlanBuilder:
@staticmethod
def table_scan(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...
def select(self, to_select: list[PyExpr]) -> LogicalPlanBuilder: ...
def with_columns(self, columns: list[PyExpr], resource_request: ResourceRequest | None) -> LogicalPlanBuilder: ...
def with_columns(self, columns: list[PyExpr]) -> LogicalPlanBuilder: ...
def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: ...
def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ...
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
Expand Down
27 changes: 14 additions & 13 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
from daft.convert import InputListType
from daft.daft import FileFormat, IOConfig, JoinStrategy, JoinType, ResourceRequest, resolve_expr
from daft.daft import FileFormat, IOConfig, JoinStrategy, JoinType, resolve_expr
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
Expand All @@ -51,6 +51,7 @@
import ray
import torch

from daft.daft import ResourceRequest
from daft.io import DataCatalogTable

from daft.logical.schema import Schema
Expand Down Expand Up @@ -1218,7 +1219,7 @@ def with_column(
self,
column_name: str,
expr: Expression,
resource_request: Optional[ResourceRequest] = None,
resource_request: Optional["ResourceRequest"] = None,
) -> "DataFrame":
"""Adds a column to the current DataFrame with an Expression, equivalent to a ``select``
with all current columns and the new one
Expand All @@ -1245,26 +1246,26 @@ def with_column(
Args:
column_name (str): name of new column
expr (Expression): expression of the new column.
resource_request (ResourceRequest): a custom resource request for the execution of this operation (NOTE: this will be deprecated
in Daft version 0.3.0. Please use resource requests on your UDFs instead.)
Returns:
DataFrame: DataFrame with new column.
"""
if resource_request is not None:
warnings.warn(
"Specifying resource_request through `with_column` will be deprecated from Daft version >= 0.3.0! "
raise ValueError(
"Specifying resource_request through `with_column` is deprecated from Daft version >= 0.3.0! "
"Instead, please use the APIs on UDFs directly for controlling the resource requests of your UDFs. "
"You can define resource requests directly on the `@udf(num_gpus=N, num_cpus=M, ...)` decorator. "
"Alternatively, you can override resource requests on UDFs like so: `my_udf.override_options(num_gpus=N)`. "
"Check the Daft documentation for more details."
)

return self.with_columns({column_name: expr}, resource_request)
return self.with_columns({column_name: expr})

@DataframePublicAPI
def with_columns(
self,
columns: Dict[str, Expression],
resource_request: Optional[ResourceRequest] = None,
resource_request: Optional["ResourceRequest"] = None,
) -> "DataFrame":
"""Adds columns to the current DataFrame with Expressions, equivalent to a ``select``
with all current columns and the new ones
Expand All @@ -1290,22 +1291,22 @@ def with_columns(
Args:
columns (Dict[str, Expression]): Dictionary of new columns in the format { name: expression }
resource_request (ResourceRequest): a custom resource request for the execution of this operation (NOTE: this will be deprecated
in Daft version 0.3.0. Please use resource requests on your UDFs instead.)
Returns:
DataFrame: DataFrame with new columns.
"""
if resource_request is not None:
warnings.warn(
"Specifying resource_request through `with_columns` will be deprecated from Daft version >= 0.3.0! "
raise ValueError(
"Specifying resource_request through `with_columns` is deprecated from Daft version >= 0.3.0! "
"Instead, please use the APIs on UDFs directly for controlling the resource requests of your UDFs. "
"You can define resource requests directly on the `@udf(num_gpus=N, num_cpus=M, ...)` decorator. "
"Alternatively, you can override resource requests on UDFs like so: `my_udf.override_options(num_gpus=N)`. "
"Check the Daft documentation for more details."
)

new_columns = [col.alias(name) for name, col in columns.items()]

builder = self._builder.with_columns(new_columns, resource_request)
builder = self._builder.with_columns(new_columns)
return DataFrame(builder)

@DataframePublicAPI
Expand Down
7 changes: 2 additions & 5 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
JoinStrategy,
JoinType,
PyDaftExecutionConfig,
ResourceRequest,
ScanOperatorHandle,
)
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
Expand Down Expand Up @@ -123,11 +122,9 @@ def select(
builder = self._builder.select(to_select_pyexprs)
return LogicalPlanBuilder(builder)

def with_columns(
self, columns: list[Expression], custom_resource_request: ResourceRequest | None
) -> LogicalPlanBuilder:
def with_columns(self, columns: list[Expression]) -> LogicalPlanBuilder:
column_pyexprs = [expr._expr for expr in columns]
builder = self._builder.with_columns(column_pyexprs, custom_resource_request)
builder = self._builder.with_columns(column_pyexprs)
return LogicalPlanBuilder(builder)

def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder:
Expand Down
60 changes: 1 addition & 59 deletions src/daft-dsl/src/functions/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use common_error::DaftResult;
use common_resource_request::ResourceRequest;
use common_treenode::{Transformed, TreeNode, TreeNodeRecursion};
use common_treenode::{TreeNode, TreeNodeRecursion};
use daft_core::datatypes::DataType;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -141,64 +141,6 @@ pub fn stateful_udf(
})
}

/// Replaces resource_requests on UDF expressions in the provided expression tree
pub fn replace_udf_resource_request(
expr: ExprRef,
override_resource_request: &ResourceRequest,
) -> ExprRef {
expr.transform(|e| match e.as_ref() {
Expr::Function {
func:
FunctionExpr::Python(PythonUDF::Stateful(
original @ StatefulPythonUDF {
resource_request, ..
},
)),
inputs,
} => {
if let Some(existing_rr) = resource_request
&& existing_rr == override_resource_request
{
return Ok(Transformed::no(e));
}
let new_expr = Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateful(StatefulPythonUDF {
resource_request: Some(override_resource_request.clone()),
..original.clone()
})),
inputs: inputs.clone(),
};
Ok(Transformed::yes(new_expr.arced()))
}
Expr::Function {
func:
FunctionExpr::Python(PythonUDF::Stateless(
original @ StatelessPythonUDF {
resource_request, ..
},
)),
inputs,
} => {
if let Some(existing_rr) = resource_request
&& existing_rr == override_resource_request
{
return Ok(Transformed::no(e));
}
let new_expr = Expr::Function {
func: FunctionExpr::Python(PythonUDF::Stateless(StatelessPythonUDF {
resource_request: Some(override_resource_request.clone()),
..original.clone()
})),
inputs: inputs.clone(),
};
Ok(Transformed::yes(new_expr.arced()))
}
_ => Ok(Transformed::no(e)),
})
.unwrap()
.data
}

/// Generates a ResourceRequest by inspecting an iterator of expressions.
/// Looks for ResourceRequests on UDFs in each expression presented, and merges ResourceRequests across all expressions.
pub fn get_resource_request(exprs: &[ExprRef]) -> Option<ResourceRequest> {
Expand Down
33 changes: 4 additions & 29 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ use crate::{
use common_display::DisplayFormat;
use common_error::DaftResult;
use common_io_config::IOConfig;
use common_resource_request::ResourceRequest;
use daft_core::{
join::{JoinStrategy, JoinType},
schema::{Schema, SchemaRef},
};
use daft_dsl::{col, functions::python::replace_udf_resource_request, ExprRef};
use daft_dsl::{col, ExprRef};
use daft_scan::{file_format::FileFormat, PhysicalScanInfo, Pushdowns, ScanOperatorRef};

#[cfg(feature = "python")]
Expand Down Expand Up @@ -137,24 +136,7 @@ impl LogicalPlanBuilder {
Ok(logical_plan.into())
}

pub fn with_columns(
&self,
columns: Vec<ExprRef>,
resource_request: Option<ResourceRequest>,
) -> DaftResult<Self> {
// TODO: This should be deprecated in Daft >= v0.3
//
// Here we use resource_request to parametrize any UDFs in the new expression columns
// In the future, the ability to pass ResourceRequests into with_column(s) will be deprecated. Users will parametrize their UDFs directly instead.
let columns = if let Some(rr) = resource_request {
columns
.into_iter()
.map(|expr| replace_udf_resource_request(expr, &rr))
.collect()
} else {
columns
};

pub fn with_columns(&self, columns: Vec<ExprRef>) -> DaftResult<Self> {
let fields = &self.schema().fields;
let current_col_names = fields
.iter()
Expand Down Expand Up @@ -561,15 +543,8 @@ impl PyLogicalPlanBuilder {
Ok(self.builder.select(pyexprs_to_exprs(to_select))?.into())
}

pub fn with_columns(
&self,
columns: Vec<PyExpr>,
resource_request: Option<ResourceRequest>,
) -> PyResult<Self> {
Ok(self
.builder
.with_columns(pyexprs_to_exprs(columns), resource_request)?
.into())
pub fn with_columns(&self, columns: Vec<PyExpr>) -> PyResult<Self> {
Ok(self.builder.with_columns(pyexprs_to_exprs(columns))?.into())
}

pub fn exclude(&self, to_exclude: Vec<String>) -> PyResult<Self> {
Expand Down
Loading

0 comments on commit 9b8c2b2

Please sign in to comment.