Skip to content

Commit

Permalink
[BUG] Use estimated in-memory size for scan task merging and resource…
Browse files Browse the repository at this point in the history
… requests (#2448)

Unblocks #2383, users can now set the `parquet_inflation_factor`
execution config to prevent large scan tasks from merging and tasks from
scheduling when there isn't enough memory

In the future we might want to read a few files/metadata to obtain a
better estimate
  • Loading branch information
kevinzwang committed Jun 29, 2024
1 parent 8a0aefa commit 86ded60
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 31 deletions.
9 changes: 6 additions & 3 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import TYPE_CHECKING

from daft.context import get_context
from daft.daft import (
FileFormat,
IOConfig,
Expand Down Expand Up @@ -32,15 +33,17 @@ def scan_with_tasks(
"""
# TODO(Clark): Currently hardcoded to have 1 file per instruction
# We can instead right-size and bundle the ScanTask into single-instruction bulk reads.

cfg = get_context().daft_execution_config

for scan_task in scan_tasks:
scan_step = execution_step.PartitionTaskBuilder[PartitionT](
inputs=[],
partial_metadatas=None,
).add_instruction(
instruction=execution_step.ScanWithTask(scan_task),
# Set the filesize as the memory request.
# (Note: this is very conservative; file readers empirically use much more peak memory than 1x file size.)
resource_request=ResourceRequest(memory_bytes=scan_task.size_bytes()),
# Set the estimated in-memory size as the memory request.
resource_request=ResourceRequest(memory_bytes=scan_task.estimate_in_memory_size_bytes(cfg)),
)
yield scan_step

Expand Down
6 changes: 1 addition & 5 deletions src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,7 @@ pub(super) fn translate_single_logical_node(
);

// Apply transformations on the ScanTasks to optimize
let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes(
scan_tasks,
cfg.scan_tasks_min_size_bytes,
cfg.scan_tasks_max_size_bytes,
);
let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes(scan_tasks, cfg);
let scan_tasks = scan_tasks.collect::<DaftResult<Vec<_>>>()?;
if scan_tasks.is_empty() {
let clustering_spec =
Expand Down
44 changes: 24 additions & 20 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;
use daft_io::IOStatsContext;
use daft_parquet::read::read_parquet_metadata;
Expand All @@ -10,7 +11,7 @@ use crate::{
ChunkSpec, DataFileSource, ScanTask, ScanTaskRef,
};

type BoxScanTaskIter = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>>>;
type BoxScanTaskIter<'a> = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>> + 'a>;

/// Coalesces ScanTasks by their [`ScanTask::size_bytes()`]
///
Expand All @@ -24,33 +25,30 @@ type BoxScanTaskIter = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>>>;
/// * `scan_tasks`: A Boxed Iterator of ScanTaskRefs to perform merging on
/// * `min_size_bytes`: Minimum size in bytes of a ScanTask, after which no more merging will be performed
/// * `max_size_bytes`: Maximum size in bytes of a ScanTask, capping the maximum size of a merged ScanTask
pub fn merge_by_sizes(
scan_tasks: BoxScanTaskIter,
min_size_bytes: usize,
max_size_bytes: usize,
) -> BoxScanTaskIter {
pub fn merge_by_sizes<'a>(
scan_tasks: BoxScanTaskIter<'a>,
cfg: &'a DaftExecutionConfig,
) -> BoxScanTaskIter<'a> {
Box::new(MergeByFileSize {
iter: scan_tasks,
min_size_bytes,
max_size_bytes,
cfg,
accumulator: None,
})
}

struct MergeByFileSize {
iter: BoxScanTaskIter,
min_size_bytes: usize,
max_size_bytes: usize,
struct MergeByFileSize<'a> {
iter: BoxScanTaskIter<'a>,
cfg: &'a DaftExecutionConfig,

// Current element being accumulated on
accumulator: Option<ScanTaskRef>,
}

impl MergeByFileSize {
impl<'a> MergeByFileSize<'a> {
fn accumulator_ready(&self) -> bool {
if let Some(acc) = &self.accumulator
&& let Some(acc_bytes) = acc.size_bytes()
&& acc_bytes >= self.min_size_bytes
&& let Some(acc_bytes) = acc.estimate_in_memory_size_bytes(Some(self.cfg))
&& acc_bytes >= self.cfg.scan_tasks_min_size_bytes
{
true
} else {
Expand All @@ -69,10 +67,12 @@ impl MergeByFileSize {
&& other.storage_config == accumulator.storage_config
&& other.pushdowns == accumulator.pushdowns;

let sum_smaller_than_max_size_bytes = if let Some(child_bytes) = other.size_bytes()
&& let Some(accumulator_bytes) = accumulator.size_bytes()
let sum_smaller_than_max_size_bytes = if let Some(child_bytes) =
other.estimate_in_memory_size_bytes(Some(self.cfg))
&& let Some(accumulator_bytes) =
accumulator.estimate_in_memory_size_bytes(Some(self.cfg))
{
child_bytes + accumulator_bytes <= self.max_size_bytes
child_bytes + accumulator_bytes <= self.cfg.scan_tasks_max_size_bytes
} else {
false
};
Expand All @@ -81,7 +81,7 @@ impl MergeByFileSize {
}
}

impl Iterator for MergeByFileSize {
impl<'a> Iterator for MergeByFileSize<'a> {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -104,7 +104,11 @@ impl Iterator for MergeByFileSize {
None => return self.accumulator.take().map(Ok),
};

if next_item.size_bytes().is_none() || !self.can_merge(&next_item) {
if next_item
.estimate_in_memory_size_bytes(Some(self.cfg))
.is_none()
|| !self.can_merge(&next_item)
{
return self.accumulator.replace(next_item).map(Ok);
}

Expand Down
6 changes: 3 additions & 3 deletions tests/io/test_merge_scan_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,23 @@ def test_merge_scan_task_exceed_max(csv_files):


def test_merge_scan_task_below_max(csv_files):
with override_merge_scan_tasks_configs(21, 22):
with override_merge_scan_tasks_configs(11, 12):
df = daft.read_csv(str(csv_files))
assert (
df.num_partitions() == 2
), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the second merge is too large (>22 bytes)"


def test_merge_scan_task_above_min(csv_files):
with override_merge_scan_tasks_configs(19, 40):
with override_merge_scan_tasks_configs(9, 20):
df = daft.read_csv(str(csv_files))
assert (
df.num_partitions() == 2
), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the first merge is above the minimum (>19 bytes)"


def test_merge_scan_task_below_min(csv_files):
with override_merge_scan_tasks_configs(35, 40):
with override_merge_scan_tasks_configs(17, 20):
df = daft.read_csv(str(csv_files))
assert (
df.num_partitions() == 1
Expand Down

0 comments on commit 86ded60

Please sign in to comment.