diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 4cf0130132..0820af6987 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import threading from concurrent import futures from dataclasses import dataclass from typing import Iterator @@ -121,8 +122,7 @@ def __init__(self, use_thread_pool: bool | None) -> None: self._thread_pool = futures.ThreadPoolExecutor() # Global accounting of tasks and resources - self._inflight_tasks_resources: dict[str, ResourceRequest] = dict() - self._inflight_tasks: dict[str, PartitionTask] = dict() + self._inflight_futures: dict[str, futures.Future] = {} system_info = SystemInfo() num_cpus = system_info.cpu_count() @@ -134,7 +134,13 @@ def __init__(self, use_thread_pool: bool | None) -> None: self.num_cpus = num_cpus self.num_gpus = cuda_device_count() - self.bytes_memory = system_info.total_memory() + self.total_bytes_memory = system_info.total_memory() + + # Resource accounting: + self._resource_accounting_lock = threading.Lock() + self._available_bytes_memory = self.total_bytes_memory + self._available_cpus = float(self.num_cpus) + self._available_gpus = float(self.num_gpus) def runner_io(self) -> PyRunnerIO: return PyRunnerIO() @@ -213,8 +219,7 @@ def run_iter_tables( def _physical_plan_to_partitions( self, plan: physical_plan.MaterializedPhysicalPlan[MicroPartition] ) -> Iterator[PyMaterializedResult]: - future_to_task: dict[futures.Future, str] = dict() - + local_futures_to_task: dict[futures.Future, PartitionTask] = {} pbar = ProgressBar(use_ray_tqdm=False) try: @@ -238,15 +243,16 @@ def _physical_plan_to_partitions( next_step = next(plan) continue - elif not self._can_admit_task( - next_step.resource_request, - ): - # Insufficient resources; await some tasks. - logger.debug("Skipping to wait on dispatched tasks: insufficient resources") - break - else: # next_task is a task to run. + task_admitted = self._attempt_admit_task( + next_step.resource_request, + ) + + if not task_admitted: + # Insufficient resources; await some tasks. + logger.debug("Skipping to wait on dispatched tasks: insufficient resources") + break # Run the task in the main thread, instead of the thread pool, in certain conditions: # - Threading is disabled in runner config. @@ -269,6 +275,7 @@ def _physical_plan_to_partitions( next_step.instructions, next_step.inputs, next_step.partial_metadatas, + next_step.resource_request, ) next_step.set_result(materialized_results) @@ -284,36 +291,38 @@ def _physical_plan_to_partitions( next_step.instructions, next_step.inputs, next_step.partial_metadatas, + next_step.resource_request, ) - # Register the inflight task and resources used. - future_to_task[future] = next_step.id() + # Register the inflight task assert ( - next_step.id() not in self._inflight_tasks_resources + next_step.id() not in local_futures_to_task ), "Step IDs should be unique - this indicates an internal error, please file an issue!" - self._inflight_tasks[next_step.id()] = next_step - self._inflight_tasks_resources[next_step.id()] = next_step.resource_request + self._inflight_futures[next_step.id()] = future + local_futures_to_task[future] = next_step next_step = next(plan) - # Await at least one task and process the results. - if not len(future_to_task) > 0: + if next_step is None and not len(local_futures_to_task) > 0: raise RuntimeError( f"Scheduler deadlocked! This should never happen. Please file an issue. Current step: {type(next_step)}" ) - done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED) + # Await at least one task in the global futures to finish before proceeding + _ = futures.wait(list(self._inflight_futures.values()), return_when=futures.FIRST_COMPLETED) + + # Now await at a task in the local futures to finish, so as to progress the local execution + done_set, _ = futures.wait(list(local_futures_to_task), return_when=futures.FIRST_COMPLETED) for done_future in done_set: - done_id = future_to_task.pop(done_future) - del self._inflight_tasks_resources[done_id] - done_task = self._inflight_tasks.pop(done_id) + done_task = local_futures_to_task.pop(done_future) materialized_results = done_future.result() pbar.mark_task_done(done_task) + del self._inflight_futures[done_task.id()] logger.debug( "Task completed: %s -> <%s partitions>", - done_id, + done_task.id(), len(materialized_results), ) @@ -333,39 +342,53 @@ def _check_resource_requests(self, resource_request: ResourceRequest) -> None: raise RuntimeError(f"Requested {resource_request.num_cpus} CPUs but found only {self.num_cpus} available") if resource_request.num_gpus is not None and resource_request.num_gpus > self.num_gpus: raise RuntimeError(f"Requested {resource_request.num_gpus} GPUs but found only {self.num_gpus} available") - if resource_request.memory_bytes is not None and resource_request.memory_bytes > self.bytes_memory: + if resource_request.memory_bytes is not None and resource_request.memory_bytes > self.total_bytes_memory: raise RuntimeError( - f"Requested {resource_request.memory_bytes} bytes of memory but found only {self.bytes_memory} available" + f"Requested {resource_request.memory_bytes} bytes of memory but found only {self.total_bytes_memory} available" ) - def _can_admit_task( + def _attempt_admit_task( self, resource_request: ResourceRequest, ) -> bool: self._check_resource_requests(resource_request) - inflight_resources = self._inflight_tasks_resources.values() - total_inflight_resources: ResourceRequest = sum(inflight_resources, ResourceRequest()) - cpus_okay = (total_inflight_resources.num_cpus or 0) + (resource_request.num_cpus or 0) <= self.num_cpus - gpus_okay = (total_inflight_resources.num_gpus or 0) + (resource_request.num_gpus or 0) <= self.num_gpus - memory_okay = (total_inflight_resources.memory_bytes or 0) + ( - resource_request.memory_bytes or 0 - ) <= self.bytes_memory + with self._resource_accounting_lock: + memory_okay = (resource_request.memory_bytes or 0) <= self._available_bytes_memory + cpus_okay = (resource_request.num_cpus or 0) <= self._available_cpus + gpus_okay = (resource_request.num_gpus or 0) <= self._available_gpus + all_okay = all((cpus_okay, gpus_okay, memory_okay)) + + # Update resource accounting if we have the resources (this is considered as the task being "admitted") + if all_okay: + self._available_bytes_memory -= resource_request.memory_bytes or 0 + self._available_cpus -= resource_request.num_cpus or 0.0 + self._available_gpus -= resource_request.num_gpus or 0.0 - return all((cpus_okay, gpus_okay, memory_okay)) + return all_okay - @staticmethod def build_partitions( + self, instruction_stack: list[Instruction], partitions: list[MicroPartition], final_metadata: list[PartialPartitionMetadata], + resource_request: ResourceRequest, ) -> list[MaterializedResult[MicroPartition]]: - for instruction in instruction_stack: - partitions = instruction.run(partitions) - return [ - PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial)) - for part, partial in zip(partitions, final_metadata) - ] + try: + for instruction in instruction_stack: + partitions = instruction.run(partitions) + + results: list[MaterializedResult[MicroPartition]] = [ + PyMaterializedResult(part, PartitionMetadata.from_table(part).merge_with_partial(partial)) + for part, partial in zip(partitions, final_metadata) + ] + return results + finally: + # Release CPU, GPU and memory resources + with self._resource_accounting_lock: + self._available_bytes_memory += resource_request.memory_bytes or 0 + self._available_cpus += resource_request.num_cpus or 0.0 + self._available_gpus += resource_request.num_gpus or 0.0 @dataclass