Skip to content

Commit

Permalink
[FEAT] Fix resource accounting in PyRunner (#2567)
Browse files Browse the repository at this point in the history
Together with #2566 , closes #2561 

This PR changes the way the PyRunner performs resource accounting.
Instead of updating the number of CPUs, GPUs and memory used only when
futures are retrieved, we do this just before each task completes. These
variables are protected with a lock to allow for concurrent access from
across worker threads.

Additionally, this PR now tracks the inflight `Futures` across all
executions globally in the PyRunner singleton. This is because there
will be instances where a single execution might not be able to make
forward progress (e.g. there are only 8 CPUs available, and there are 8
other currently-executing partitions). In this case, we need to wait for
**some** execution globally to complete before attempting to make
forward progress on the current execution.

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia committed Jul 30, 2024
1 parent 8544b76 commit b36b5ec
Showing 1 changed file with 65 additions and 42 deletions.
107 changes: 65 additions & 42 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import threading
from concurrent import futures
from dataclasses import dataclass
from typing import Iterator
Expand Down Expand Up @@ -121,8 +122,7 @@ def __init__(self, use_thread_pool: bool | None) -> None:
self._thread_pool = futures.ThreadPoolExecutor()

# Global accounting of tasks and resources
self._inflight_tasks_resources: dict[str, ResourceRequest] = dict()
self._inflight_tasks: dict[str, PartitionTask] = dict()
self._inflight_futures: dict[str, futures.Future] = {}

system_info = SystemInfo()
num_cpus = system_info.cpu_count()
Expand All @@ -134,7 +134,13 @@ def __init__(self, use_thread_pool: bool | None) -> None:
self.num_cpus = num_cpus

self.num_gpus = cuda_device_count()
self.bytes_memory = system_info.total_memory()
self.total_bytes_memory = system_info.total_memory()

# Resource accounting:
self._resource_accounting_lock = threading.Lock()
self._available_bytes_memory = self.total_bytes_memory
self._available_cpus = float(self.num_cpus)
self._available_gpus = float(self.num_gpus)

def runner_io(self) -> PyRunnerIO:
return PyRunnerIO()
Expand Down Expand Up @@ -213,8 +219,7 @@ def run_iter_tables(
def _physical_plan_to_partitions(
self, plan: physical_plan.MaterializedPhysicalPlan[MicroPartition]
) -> Iterator[PyMaterializedResult]:
future_to_task: dict[futures.Future, str] = dict()

local_futures_to_task: dict[futures.Future, PartitionTask] = {}
pbar = ProgressBar(use_ray_tqdm=False)

try:
Expand All @@ -238,15 +243,16 @@ def _physical_plan_to_partitions(
next_step = next(plan)
continue

elif not self._can_admit_task(
next_step.resource_request,
):
# Insufficient resources; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: insufficient resources")
break

else:
# next_task is a task to run.
task_admitted = self._attempt_admit_task(
next_step.resource_request,
)

if not task_admitted:
# Insufficient resources; await some tasks.
logger.debug("Skipping to wait on dispatched tasks: insufficient resources")
break

# Run the task in the main thread, instead of the thread pool, in certain conditions:
# - Threading is disabled in runner config.
Expand All @@ -269,6 +275,7 @@ def _physical_plan_to_partitions(
next_step.instructions,
next_step.inputs,
next_step.partial_metadatas,
next_step.resource_request,
)
next_step.set_result(materialized_results)

Expand All @@ -284,36 +291,38 @@ def _physical_plan_to_partitions(
next_step.instructions,
next_step.inputs,
next_step.partial_metadatas,
next_step.resource_request,
)
# Register the inflight task and resources used.
future_to_task[future] = next_step.id()

# Register the inflight task
assert (
next_step.id() not in self._inflight_tasks_resources
next_step.id() not in local_futures_to_task
), "Step IDs should be unique - this indicates an internal error, please file an issue!"
self._inflight_tasks[next_step.id()] = next_step
self._inflight_tasks_resources[next_step.id()] = next_step.resource_request
self._inflight_futures[next_step.id()] = future
local_futures_to_task[future] = next_step

next_step = next(plan)

# Await at least one task and process the results.
if not len(future_to_task) > 0:
if next_step is None and not len(local_futures_to_task) > 0:
raise RuntimeError(
f"Scheduler deadlocked! This should never happen. Please file an issue. Current step: {type(next_step)}"
)

done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED)
# Await at least one task in the global futures to finish before proceeding
_ = futures.wait(list(self._inflight_futures.values()), return_when=futures.FIRST_COMPLETED)

# Now await at a task in the local futures to finish, so as to progress the local execution
done_set, _ = futures.wait(list(local_futures_to_task), return_when=futures.FIRST_COMPLETED)
for done_future in done_set:
done_id = future_to_task.pop(done_future)
del self._inflight_tasks_resources[done_id]
done_task = self._inflight_tasks.pop(done_id)
done_task = local_futures_to_task.pop(done_future)
materialized_results = done_future.result()

pbar.mark_task_done(done_task)
del self._inflight_futures[done_task.id()]

logger.debug(
"Task completed: %s -> <%s partitions>",
done_id,
done_task.id(),
len(materialized_results),
)

Expand All @@ -333,39 +342,53 @@ def _check_resource_requests(self, resource_request: ResourceRequest) -> None:
raise RuntimeError(f"Requested {resource_request.num_cpus} CPUs but found only {self.num_cpus} available")
if resource_request.num_gpus is not None and resource_request.num_gpus > self.num_gpus:
raise RuntimeError(f"Requested {resource_request.num_gpus} GPUs but found only {self.num_gpus} available")
if resource_request.memory_bytes is not None and resource_request.memory_bytes > self.bytes_memory:
if resource_request.memory_bytes is not None and resource_request.memory_bytes > self.total_bytes_memory:
raise RuntimeError(
f"Requested {resource_request.memory_bytes} bytes of memory but found only {self.bytes_memory} available"
f"Requested {resource_request.memory_bytes} bytes of memory but found only {self.total_bytes_memory} available"
)

def _can_admit_task(
def _attempt_admit_task(
self,
resource_request: ResourceRequest,
) -> bool:
self._check_resource_requests(resource_request)

inflight_resources = self._inflight_tasks_resources.values()
total_inflight_resources: ResourceRequest = sum(inflight_resources, ResourceRequest())
cpus_okay = (total_inflight_resources.num_cpus or 0) + (resource_request.num_cpus or 0) <= self.num_cpus
gpus_okay = (total_inflight_resources.num_gpus or 0) + (resource_request.num_gpus or 0) <= self.num_gpus
memory_okay = (total_inflight_resources.memory_bytes or 0) + (
resource_request.memory_bytes or 0
) <= self.bytes_memory
with self._resource_accounting_lock:
memory_okay = (resource_request.memory_bytes or 0) <= self._available_bytes_memory
cpus_okay = (resource_request.num_cpus or 0) <= self._available_cpus
gpus_okay = (resource_request.num_gpus or 0) <= self._available_gpus
all_okay = all((cpus_okay, gpus_okay, memory_okay))

# Update resource accounting if we have the resources (this is considered as the task being "admitted")
if all_okay:
self._available_bytes_memory -= resource_request.memory_bytes or 0
self._available_cpus -= resource_request.num_cpus or 0.0
self._available_gpus -= resource_request.num_gpus or 0.0

return all((cpus_okay, gpus_okay, memory_okay))
return all_okay

@staticmethod
def build_partitions(
self,
instruction_stack: list[Instruction],
partitions: list[MicroPartition],
final_metadata: list[PartialPartitionMetadata],
resource_request: ResourceRequest,
) -> list[MaterializedResult[MicroPartition]]:
for instruction in instruction_stack:
partitions = instruction.run(partitions)
return [
PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial))
for part, partial in zip(partitions, final_metadata)
]
try:
for instruction in instruction_stack:
partitions = instruction.run(partitions)

results: list[MaterializedResult[MicroPartition]] = [
PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial))
for part, partial in zip(partitions, final_metadata)
]
return results
finally:
# Release CPU, GPU and memory resources
with self._resource_accounting_lock:
self._available_bytes_memory += resource_request.memory_bytes or 0
self._available_cpus += resource_request.num_cpus or 0.0
self._available_gpus += resource_request.num_gpus or 0.0


@dataclass
Expand Down

0 comments on commit b36b5ec

Please sign in to comment.