Skip to content

Commit

Permalink
[feature] Enable support for Serverless Compute (#165)
Browse files Browse the repository at this point in the history
* remove Cluster dependency for Notebook / default Brickflow task
black

* add warnings for notebook tasks

* environments

* temporary disable tests in pre-commit

* drop "baseEnvironment" key

* bump to latest CLI version

* update model and bundles to use 0.228.0 version of Databricks CLI

* handle absence of git_conf in the local deployments

* fix model generator

* fix existing tests
TODO: new bundle tests for serverless
TODO: deploy classic and serverless workflows
TODO: validate backwards compatibility

* update params

* example workflow
tests

* too-many-lines

* rename project

* update tests

* update tests

* finally fixed tests

* --extra-index-url and tests

* exclude nones during bundle generation

* serverless faq

---------

Co-authored-by: Maxim Mityutko <[email protected]>
  • Loading branch information
maxim-mityutko and Maxim Mityutko authored Nov 1, 2024
1 parent 039b74c commit d5715f1
Show file tree
Hide file tree
Showing 25 changed files with 1,640 additions and 615 deletions.
1,499 changes: 912 additions & 587 deletions brickflow/bundles/model.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion brickflow/cli/bundles.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def bundle_synth(**kwargs: Any) -> None:


def get_bundle_cli_version() -> str:
return config(BrickflowEnvVars.BRICKFLOW_BUNDLE_CLI_VERSION.value, "0.210.2")
return config(BrickflowEnvVars.BRICKFLOW_BUNDLE_CLI_VERSION.value, "0.228.0")


def bundle_cli_setup() -> None:
Expand Down
53 changes: 40 additions & 13 deletions brickflow/codegen/databricks_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,17 +540,26 @@ def _build_native_notebook_task(
f"Make sure {task_name} returns a NotebookTask object."
) from e

return JobsTasks(
jt = JobsTasks(
**task_settings.to_tf_dict(),
notebook_task=notebook_task,
libraries=task_libraries,
depends_on=depends_on,
task_key=task_name,
# unpack dictionary provided by cluster object, will either be key or
# existing cluster id
**task.cluster.job_task_field_dict,
# existing cluster id, if cluster object is empty, Databricks will use serverless compute
**(task.cluster.job_task_field_dict if task.cluster else {}),
)

# Do not configure Notebook dependencies for Serverless clusters
if task.cluster:
jt.libraries = task_libraries
else:
_ilog.warning(
"Library definitions are not compatible with Serverless executions. "
"Use '%pip install' directly in the notebook instead."
)
return jt

def _build_native_spark_jar_task(
self,
task_name: str,
Expand Down Expand Up @@ -614,17 +623,25 @@ def _build_native_spark_python_task(
spark_python_task.parameters.append(k)
spark_python_task.parameters.append(v)

return JobsTasks(
jt = JobsTasks(
**task_settings.to_tf_dict(),
spark_python_task=spark_python_task,
libraries=task_libraries,
depends_on=depends_on,
task_key=task_name,
# unpack dictionary provided by cluster object, will either be key or
# existing cluster id
**task.cluster.job_task_field_dict,
# existing cluster id, if cluster object is empty, Databricks will use serverless compute
**(task.cluster.job_task_field_dict if task.cluster else {}),
)

if task.cluster:
jt.libraries = task_libraries
else:
jt.environment_key = (
"Default" # TODO: make configurable from task definition
)

return jt

def _build_native_run_job_task(
self,
task_name: str,
Expand Down Expand Up @@ -744,13 +761,21 @@ def _build_brickflow_entrypoint_task(
task.databricks_task_type_str: self.task_to_task_obj(task),
**task_settings.to_tf_dict(),
}, # type: ignore
libraries=task_libraries,
depends_on=depends_on,
task_key=task_name,
# unpack dictionary provided by cluster object, will either be key or
# existing cluster id
**task.cluster.job_task_field_dict,
# existing cluster id, if cluster object is empty, Databricks will use serverless compute
**(task.cluster.job_task_field_dict if task.cluster else {}),
)

# Do not configure Notebook dependencies for Serverless clusters
if task.cluster:
task_obj.libraries = task_libraries
else:
_ilog.warning(
"Library definitions are not compatible with Serverless executions. "
"Use '%pip install' directly in the 'entrypoint.py' instead."
)
return task_obj

def workflow_obj_to_tasks(
Expand Down Expand Up @@ -889,7 +914,6 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
job = Jobs(
name=workflow_name,
tasks=tasks,
git_source=git_conf,
tags=workflow.tags,
health=workflow.health,
job_clusters=[JobsJobClusters(**c) for c in workflow_clusters],
Expand All @@ -906,7 +930,10 @@ def proj_to_bundle(self) -> DatabricksAssetBundles:
trigger=workflow.trigger,
continuous=workflow.schedule_continuous,
parameters=workflow.parameters,
environments=workflow.environments,
git_source=git_conf,
)

jobs[workflow_name] = job

pipelines.update(self.workflow_obj_to_pipelines(workflow))
Expand Down Expand Up @@ -981,4 +1008,4 @@ def quoted_presenter(dumper, data): # type: ignore[no-untyped-def]
yaml.add_representer(str, quoted_presenter)

with open("bundle.yml", "w", encoding="utf-8") as f:
f.write(yaml.dump(bundle.dict(exclude_unset=True)))
f.write(yaml.dump(bundle.dict(exclude_unset=True, exclude_none=True)))
2 changes: 1 addition & 1 deletion brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ class Task:
task_id: str
task_func: Callable
workflow: Workflow # noqa
cluster: Cluster
cluster: Optional[Cluster] = None
description: Optional[str] = None
libraries: List[TaskLibrary] = field(default_factory=lambda: [])
depends_on: List[Union[Callable, str]] = field(default_factory=lambda: [])
Expand Down
58 changes: 52 additions & 6 deletions brickflow/engine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
JobsParameters,
JobsTrigger,
JobsWebhookNotifications,
JobsEnvironments,
)
from brickflow.context import BrickflowInternalVariables
from brickflow.engine import ROOT_NODE
Expand All @@ -29,6 +30,8 @@
TaskNotFoundError,
TaskSettings,
TaskType,
PypiTaskLibrary,
WheelTaskLibrary,
)
from brickflow.engine.utils import wraps_keyerror

Expand Down Expand Up @@ -144,14 +147,18 @@ class Workflow:
max_tasks_in_workflow: int = 100
enable_plugins: Optional[bool] = None
parameters: Optional[List[JobsParameters]] = None
# environments should be defined for serverless workloads
environments: Optional[List[JobsEnvironments]] = None

def __post_init__(self) -> None:
self.graph.add_node(ROOT_NODE)
if self.default_cluster is None and self.clusters == []:
raise NoWorkflowComputeError(
f"Please configure default_cluster or "
f"clusters field for workflow: {self.name}"
logging.info(
"Default cluster details are not provided, switching to serverless compute."
)
self.environments = self.convert_libraries_to_environments
logging.debug(self.environments)

if self.prefix is None:
self.prefix = env_chain(
BrickflowEnvVars.BRICKFLOW_WORKFLOW_PREFIX.value,
Expand All @@ -164,7 +171,7 @@ def __post_init__(self) -> None:
BrickflowInternalVariables.workflow_suffix.value,
"",
)
if self.default_cluster is None:
if self.default_cluster is None and self.clusters:
# the default cluster is set to the first cluster if it is not configured
self.default_cluster = self.clusters[0]

Expand Down Expand Up @@ -254,6 +261,45 @@ def validate_schedule_configs(self) -> None:
"Please configure either PAUSED or UNPAUSED for schedule_continuous.pause_status"
)

@property
def convert_libraries_to_environments(self) -> List[Dict[Any, Any]]:
logging.info(
"Serverless workload detected, library dependencies will be converted to 'environments'!"
)
environments, dependencies = [], []
for lib in self.libraries:
if isinstance(lib, PypiTaskLibrary):
# pylint: disable=no-else-raise
if lib.repo:
# TODO: update to new Databricks CLI and remove WorkflowConfigError (see below)
dependencies.append(
f"--extra-index-url {lib.repo.strip()} {lib.package}"
)
raise WorkflowConfigError(
"Custom repositories are not supported for serverless workloads, due to Databricks CLI "
"limitations. Refer to https://github.com/databricks/cli/pull/1842"
"This will be fixed in the future releases, use wheel instead."
)
else:
dependencies.append(lib.package)
elif isinstance(lib, WheelTaskLibrary):
dependencies.append(lib.whl)
else:
logging.info(
"Serverless workload type only compatible with PyPi and Whl dependencies, skipping %s",
lib,
)
environments.append(
{
"environment_key": "Default",
"spec": {
"client": "1",
"dependencies": dependencies,
},
}
)
return environments

@property
def bfs_layers(self) -> List[str]:
return list(nx.bfs_layers(self.graph, ROOT_NODE))[1:]
Expand Down Expand Up @@ -339,8 +385,8 @@ def _add_task(
)

if self.default_cluster is None:
raise RuntimeError(
"Some how default cluster wasnt set please raise a github issue."
logging.info(
"Default cluster details are not provided, switching to serverless compute."
)

if self.log_timeout_warning(task_settings): # type: ignore
Expand Down
42 changes: 41 additions & 1 deletion docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,44 @@ def copy_from_volumnes_to_box(*args):
operation="upload",
)
volumnes_to_box_copy.execute()
```
```

## How do I use serverless compute for my tasks?
Serverless compute is supported for: Brickflow entrypoint task, Notebook task and Python task.
1. Remove `default_cluster` configuration from the workflow and tasks.
2. Configure dependencies:
- For Brickflow entrypoint task use MAGIC commands by adding the below to the top of the notebook:

```python
# Databricks notebook source
# `brickflows` dependency is mandatory!
# It should always point to the `brickflows` version with serverless support or the wheel file with the same
# MAGIC %pip install brickflows==x.x.x
# MAGIC %pip install my-dependency==x.x.x
# MAGIC %restart_python

# COMMAND ----------
```

- For Notebook task use the MAGIC commands, but `brickflows` dependency is not required:
```python
# Databricks notebook source
# MAGIC %pip install my-dependency==x.x.x
# MAGIC %restart_python

# COMMAND ----------
```

- For Python set the dependencies as usual on workflow level::
```python
wf = Workflow(
"brickflow-serverless-demo",
schedule_quartz_expression="0 0/20 0 ? * * *",
libraries=[
PypiTaskLibrary(package="my-package==x.x.x"),
WheelTaskLibrary(whl="/path/to/wheel.whl")
],
)
```

Refer to the full workflow example in `/examples/brickflow_serverless_examples` folder.
10 changes: 10 additions & 0 deletions examples/brickflow_serverless_examples/.brickflow-project-root.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# DO NOT MODIFY THIS FILE - IT IS AUTO GENERATED BY BRICKFLOW AND RESERVED FOR FUTURE USAGE
projects:
brickflow-serverless-demo:
brickflow_version: auto
deployment_mode: bundle
enable_plugins: true
name: brickflow-serverless-demo
path_from_repo_root_to_project_root: .
path_project_root_to_workflows_dir: workflows
version: v1
Loading

0 comments on commit d5715f1

Please sign in to comment.