Skip to content

Commit

Permalink
[FEAT] DataFrame.__iter__() and .iter_partitions() (#1062)
Browse files Browse the repository at this point in the history
Implements iteration, via streaming execution, over DataFrames:

- DataFrame.__iter__() returns an iterator of rows. Each row is a pydict
of the form `{"colname": value }`.
- DataFrame.iter_partitions() returns an iterator of partitions. Each
partition is a `daft.Table` object.

Execution semantics:

- Results are returned as soon as they become available.
- Current behaviours (not technical restrictions, we can change these if
we want):
  - PyRunner: Execution pauses between calls to `iterator.next()`. 
  - RayRunner: Execution continues in the background.

Implementation details:

- Adds new interfaces to Runner: 
  - `run_iter() -> Iterator[PartitionT]` and 
  - `run_iter_tables() -> Iterator[Table]`
- in addition to the existing `Runner.run() -> PartitionCacheEntry`.
This isn't super clean - ideally we go through a single point of
abstraction (PartitionCache) for translating between PartitionT and
Table. But we may rewrite a lot of this soon anyway, and for now it is a
bit dangerous to shoehorn single-partition behaviour into a
PartitionSet.
- `run_iter()` is now the new narrow waist. All execution, even
`df.collect()`, now happens through streaming execution.

---------

Co-authored-by: Xiayue Charles Lin <[email protected]>
  • Loading branch information
xcharleslin and Xiayue Charles Lin authored Jun 22, 2023
1 parent 1c41ad8 commit eb7e2c8
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 22 deletions.
49 changes: 49 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Expand All @@ -38,6 +39,7 @@

if TYPE_CHECKING:
from ray.data.dataset import Dataset as RayDataset
from ray import ObjectRef as RayObjectRef
import pandas as pd
import pyarrow as pa
import dask
Expand Down Expand Up @@ -176,6 +178,53 @@ def show(self, n: int = 8) -> "DataFrameDisplay":

return DataFrameDisplay(preview, self.schema(), num_rows=n)

@DataframePublicAPI
def __iter__(self) -> Iterator[Dict[str, Any]]:
"""Return an iterator of rows for this dataframe.
Each row will be a pydict of the form { "key" : value }.
"""

if self._result is not None:
# If the dataframe has already finished executing,
# use the precomputed results.
pydict = self.to_pydict()
for i in range(len(self)):
row = {key: value[i] for (key, value) in pydict.items()}
yield row

else:
# Execute the dataframe in a streaming fashion.
context = get_context()
partitions_iter = context.runner().run_iter_tables(self._plan)

# Iterate through partitions.
for partition in partitions_iter:
pydict = partition.to_pydict()

# Yield invidiual rows from the partition.
for i in range(len(partition)):
row = {key: value[i] for (key, value) in pydict.items()}
yield row

@DataframePublicAPI
def iter_partitions(self) -> Iterator[Union[Table, "RayObjectRef"]]:
"""Begin executing this dataframe and return an iterator over the partitions.
Each partition will be returned as a daft.Table object (if using Python runner backend)
or a ray ObjectRef (if using Ray runner backend).
"""
if self._result is not None:
# If the dataframe has already finished executing,
# use the precomputed results.
yield from self._result.values()

else:
# Execute the dataframe in a streaming fashion.
context = get_context()
partitions_iter = context.runner().run_iter(self._plan)
yield from partitions_iter

@DataframePublicAPI
def __repr__(self) -> str:
display = DataFrameDisplay(self._preview, self.schema())
Expand Down
23 changes: 15 additions & 8 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class LocalLogicalPartitionOpRunner(LogicalPartitionOpRunner):
...


class PyRunner(Runner):
class PyRunner(Runner[Table]):
def __init__(self, use_thread_pool: bool | None) -> None:
super().__init__()
self._use_thread_pool: bool = use_thread_pool if use_thread_pool is not None else True
Expand Down Expand Up @@ -178,6 +178,16 @@ def runner_io(self) -> PyRunnerIO:
return PyRunnerIO()

def run(self, logplan: logical_plan.LogicalPlan) -> PartitionCacheEntry:
partitions = list(self.run_iter(logplan))

result_pset = LocalPartitionSet({})
for i, partition in enumerate(partitions):
result_pset.set_partition(i, partition)

pset_entry = self.put_partition_set_into_cache(result_pset)
return pset_entry

def run_iter(self, logplan: logical_plan.LogicalPlan) -> Iterator[Table]:
logplan = self.optimize(logplan)
psets = {
key: entry.value.values()
Expand All @@ -187,14 +197,11 @@ def run(self, logplan: logical_plan.LogicalPlan) -> PartitionCacheEntry:
plan = physical_plan_factory.get_materializing_physical_plan(logplan, psets)

with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"):
partitions = list(self._physical_plan_to_partitions(plan))

result_pset = LocalPartitionSet({})
for i, partition in enumerate(partitions):
result_pset.set_partition(i, partition)
partitions_gen = self._physical_plan_to_partitions(plan)
yield from partitions_gen

pset_entry = self.put_partition_set_into_cache(result_pset)
return pset_entry
def run_iter_tables(self, plan: logical_plan.LogicalPlan) -> Iterator[Table]:
return self.run_iter(plan)

def _physical_plan_to_partitions(self, plan: physical_plan.MaterializedPhysicalPlan) -> Iterator[Table]:
inflight_tasks: dict[str, PartitionTask] = dict()
Expand Down
84 changes: 71 additions & 13 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from __future__ import annotations

import threading
import uuid
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from queue import Queue
from typing import TYPE_CHECKING, Any, Iterable, Iterator

import fsspec
Expand Down Expand Up @@ -392,18 +396,51 @@ def __init__(self, max_task_backlog: int | None) -> None:

self.reserved_cores = 0

self.threads_by_df: dict[str, threading.Thread] = dict()
self.results_by_df: dict[str, Queue] = defaultdict(Queue)

def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration:

# Case: thread is terminated and no longer exists.
# Should only be hit for repeated calls to next() after StopIteration.
if result_uuid not in self.threads_by_df:
return StopIteration()

# Common case: get the next result from the thread.
result = self.results_by_df[result_uuid].get()

# If there are no more results, delete the thread.
if isinstance(result, StopIteration):
self.threads_by_df[result_uuid].join()
del self.threads_by_df[result_uuid]

return result

def run_plan(
self,
plan: logical_plan.LogicalPlan,
psets: dict[str, ray.ObjectRef],
) -> list[ray.ObjectRef]:
return list(self._run_plan(plan, psets))
result_uuid: str,
) -> None:

t = threading.Thread(
target=self._run_plan,
name=result_uuid,
kwargs={
"plan": plan,
"psets": psets,
"result_uuid": result_uuid,
},
)
t.start()
self.threads_by_df[result_uuid] = t

def _run_plan(
self,
plan: logical_plan.LogicalPlan,
psets: dict[str, ray.ObjectRef],
) -> Iterator[ray.ObjectRef]:
result_uuid: str,
) -> None:
from loguru import logger

phys_plan = physical_plan_factory.get_materializing_physical_plan(plan, psets)
Expand Down Expand Up @@ -444,7 +481,7 @@ def _run_plan(

elif isinstance(next_step, ray.ObjectRef):
# A final result.
yield next_step
self.results_by_df[result_uuid].put(next_step)
next_step = next(phys_plan)

# next_step is a task.
Expand Down Expand Up @@ -523,8 +560,8 @@ def _run_plan(
if next_step is None:
next_step = next(phys_plan)

except StopIteration:
return
except StopIteration as e:
self.results_by_df[result_uuid].put(e)


@ray.remote(num_cpus=1)
Expand Down Expand Up @@ -561,7 +598,7 @@ def _build_partitions(task: PartitionTask[ray.ObjectRef]) -> list[ray.ObjectRef]
return partitions


class RayRunner(Runner):
class RayRunner(Runner[ray.ObjectRef]):
def __init__(
self,
address: str | None,
Expand Down Expand Up @@ -602,30 +639,51 @@ def __init__(
max_task_backlog=max_task_backlog,
)

def run(self, plan: logical_plan.LogicalPlan) -> PartitionCacheEntry:
result_pset = RayPartitionSet({})

def run_iter(self, plan: logical_plan.LogicalPlan) -> Iterator[ray.ObjectRef]:
plan = self.optimize(plan)

psets = {
key: entry.value.values()
for key, entry in self._part_set_cache._uuid_to_partition_set.items()
if entry.value is not None
}
result_uuid = str(uuid.uuid4())
if isinstance(self.ray_context, ray.client_builder.ClientContext):
partitions = ray.get(
ray.get(
self.scheduler_actor.run_plan.remote(
plan=plan,
psets=psets,
result_uuid=result_uuid,
)
)

else:
partitions = self.scheduler.run_plan(
self.scheduler.run_plan(
plan=plan,
psets=psets,
result_uuid=result_uuid,
)

for i, partition in enumerate(partitions):
while True:
if isinstance(self.ray_context, ray.client_builder.ClientContext):
result = ray.get(self.scheduler_actor.next(result_uuid))
else:
result = self.scheduler.next(result_uuid)

if isinstance(result, StopIteration):
return
yield result

def run_iter_tables(self, plan: logical_plan.LogicalPlan) -> Iterator[Table]:
for ref in self.run_iter(plan):
yield ray.get(ref)

def run(self, plan: logical_plan.LogicalPlan) -> PartitionCacheEntry:
result_pset = RayPartitionSet({})

partitions_iter = self.run_iter(plan)

for i, partition in enumerate(partitions_iter):
result_pset.set_partition(i, partition)

pset_entry = self._part_set_cache.put_partition_set(result_pset)
Expand Down
16 changes: 15 additions & 1 deletion daft/runners/runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from abc import abstractmethod
from typing import Generic, Iterator, TypeVar

from daft.logical.logical_plan import LogicalPlan
from daft.runners.partitioning import (
Expand All @@ -9,9 +10,12 @@
PartitionSetCache,
)
from daft.runners.runner_io import RunnerIO
from daft.table import Table

PartitionT = TypeVar("PartitionT")

class Runner:

class Runner(Generic[PartitionT]):
def __init__(self) -> None:
self._part_set_cache = PartitionSetCache()

Expand All @@ -29,5 +33,15 @@ def runner_io(self) -> RunnerIO:
def run(self, plan: LogicalPlan) -> PartitionCacheEntry:
...

@abstractmethod
def run_iter(self, plan: LogicalPlan) -> Iterator[PartitionT]:
"""Similar to run(), but yield the individual partitions as they are completed."""
...

@abstractmethod
def run_iter_tables(self, plan: LogicalPlan) -> Iterator[Table]:
"""Similar to run_iter(), but always dereference and yield Table objects."""
...

def optimize(self, plan: LogicalPlan) -> LogicalPlan:
return plan
Loading

0 comments on commit eb7e2c8

Please sign in to comment.