Skip to content

Commit

Permalink
[PERF] Native Parquet Bulk Reader (#1233)
Browse files Browse the repository at this point in the history
* Adds parquet bulk reader which can parallelize across files for IO and
compute
* Yielding up to a 5.3x speed up compared to the daft non bulk reader.
![Daft Bulk Reader Relative Speedup (Single
Column)](https://github.com/Eventual-Inc/Daft/assets/2550285/be310944-2348-4f85-9441-9695f5b676d9)
  • Loading branch information
samster25 committed Aug 5, 2023
1 parent 839d18c commit 71605cf
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 10 deletions.
37 changes: 37 additions & 0 deletions benchmarking/parquet/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,40 @@ def daft_native_read(path: str, columns: list[str] | None = None) -> pa.Table:
def read_fn(request):
"""Fixture which returns the function to read a PyArrow table from a path"""
return request.param


def bulk_read_adapter(func):
def fn(files: list[str]) -> list[pa.Table]:
return [func(f) for f in files]

return fn


def daft_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
tables = daft.table.Table.read_parquet_bulk(paths, columns=columns)
return [t.to_arrow() for t in tables]


def pyarrow_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
return [pyarrow_read(f, columns=columns) for f in paths]


def boto_bulk_read(paths: list[str], columns: list[str] | None = None) -> list[pa.Table]:
return [boto3_get_object_read(f, columns=columns) for f in paths]


@pytest.fixture(
params=[
daft_bulk_read,
pyarrow_bulk_read,
boto_bulk_read,
],
ids=[
"daft_bulk_read",
"pyarrow_bulk_read",
"boto3_bulk_read",
],
)
def bulk_read_fn(request):
"""Fixture which returns the function to read a PyArrow table from a path"""
return request.param
36 changes: 36 additions & 0 deletions benchmarking/parquet/test_bulk_reads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

import pytest

PATH = (
"s3://eventual-dev-benchmarking-fixtures/parquet-benchmarking/tpch/200MB-2RG/daft_200MB_lineitem_chunk.RG-2.parquet"
)


@pytest.mark.benchmark(group="num_files_single_column")
@pytest.mark.parametrize(
"num_files",
[1, 2, 4, 8],
)
def test_read_parquet_num_files_single_column(num_files, bulk_read_fn, benchmark):
data = benchmark(bulk_read_fn, [PATH] * num_files, columns=["L_ORDERKEY"])
assert len(data) == num_files
# Make sure the data is correct
for i in range(num_files):
assert data[i].column_names == ["L_ORDERKEY"]
assert len(data[i]) == 5515199


@pytest.mark.benchmark(group="num_rowgroups_all_columns")
@pytest.mark.parametrize(
"num_files",
[1, 2, 4],
)
def test_read_parquet_num_files_all_columns(num_files, bulk_read_fn, benchmark):
data = benchmark(bulk_read_fn, [PATH] * num_files)
assert len(data) == num_files

# Make sure the data is correct
for i in range(num_files):
assert len(data[i].column_names) == 16
assert len(data[i]) == 5515199
17 changes: 16 additions & 1 deletion daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from daft.arrow_utils import ensure_table
from daft.daft import PyTable as _PyTable
from daft.daft import _read_parquet
from daft.daft import read_parquet as _read_parquet
from daft.daft import read_parquet_bulk as _read_parquet_bulk
from daft.daft import read_parquet_statistics as _read_parquet_statistics
from daft.datatype import DataType
from daft.expressions import Expression, ExpressionsProjection
Expand Down Expand Up @@ -357,6 +358,20 @@ def read_parquet(
_read_parquet(uri=path, columns=columns, start_offset=start_offset, num_rows=num_rows, io_config=io_config)
)

@classmethod
def read_parquet_bulk(
cls,
paths: list[str],
columns: list[str] | None = None,
start_offset: int | None = None,
num_rows: int | None = None,
io_config: IOConfig | None = None,
) -> list[Table]:
pytables = _read_parquet_bulk(
uris=paths, columns=columns, start_offset=start_offset, num_rows=num_rows, io_config=io_config
)
return [Table._from_pytable(t) for t in pytables]

@classmethod
def read_parquet_statistics(
cls,
Expand Down
29 changes: 27 additions & 2 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub mod pylib {
use pyo3::{pyfunction, PyResult, Python};

#[pyfunction]
pub fn _read_parquet(
pub fn read_parquet(
py: Python,
uri: &str,
columns: Option<Vec<&str>>,
Expand All @@ -30,6 +30,30 @@ pub mod pylib {
})
}

#[pyfunction]
pub fn read_parquet_bulk(
py: Python,
uris: Vec<&str>,
columns: Option<Vec<&str>>,
start_offset: Option<usize>,
num_rows: Option<usize>,
io_config: Option<IOConfig>,
) -> PyResult<Vec<PyTable>> {
py.allow_threads(|| {
let io_client = get_io_client(io_config.unwrap_or_default().config.into())?;
Ok(crate::read::read_parquet_bulk(
uris.as_ref(),
columns.as_deref(),
start_offset,
num_rows,
io_client,
)?
.into_iter()
.map(|v| v.into())
.collect())
})
}

#[pyfunction]
pub fn read_parquet_schema(
py: Python,
Expand All @@ -55,7 +79,8 @@ pub mod pylib {
}
}
pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> {
parent.add_wrapped(wrap_pyfunction!(pylib::_read_parquet))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_bulk))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_schema))?;
parent.add_wrapped(wrap_pyfunction!(pylib::read_parquet_statistics))?;
Ok(())
Expand Down
56 changes: 49 additions & 7 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,19 @@ use daft_core::{
};
use daft_io::{get_runtime, IOClient};
use daft_table::Table;
use futures::future::join_all;
use futures::future::{join_all, try_join_all};
use snafu::ResultExt;

use crate::{file::ParquetReaderBuilder, JoinSnafu};

pub fn read_parquet(
async fn read_parquet_single(
uri: &str,
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
io_client: Arc<IOClient>,
) -> DaftResult<Table> {
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();
let builder = runtime_handle
.block_on(async { ParquetReaderBuilder::from_uri(uri, io_client.clone()).await })?;
let builder = ParquetReaderBuilder::from_uri(uri, io_client.clone()).await?;

let builder = if let Some(columns) = columns {
builder.prune_columns(columns)?
Expand All @@ -38,7 +35,7 @@ pub fn read_parquet(

let parquet_reader = builder.build()?;
let ranges = parquet_reader.prebuffer_ranges(io_client)?;
let table = runtime_handle.block_on(async { parquet_reader.read_from_ranges(ranges).await })?;
let table = parquet_reader.read_from_ranges(ranges).await?;

match (start_offset, num_rows) {
(None, None) if metadata_num_rows != table.len() => {
Expand Down Expand Up @@ -81,6 +78,51 @@ pub fn read_parquet(
Ok(table)
}

pub fn read_parquet(
uri: &str,
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
io_client: Arc<IOClient>,
) -> DaftResult<Table> {
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();
runtime_handle.block_on(async {
read_parquet_single(uri, columns, start_offset, num_rows, io_client).await
})
}

pub fn read_parquet_bulk(
uris: &[&str],
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
io_client: Arc<IOClient>,
) -> DaftResult<Vec<Table>> {
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();
let owned_columns = columns.map(|s| s.iter().map(|v| String::from(*v)).collect::<Vec<_>>());

let tables = runtime_handle
.block_on(async move {
try_join_all(uris.iter().map(|uri| {
let uri = uri.to_string();
let owned_columns = owned_columns.clone();
let io_client = io_client.clone();
tokio::task::spawn(async move {
let columns = owned_columns
.as_ref()
.map(|s| s.iter().map(AsRef::as_ref).collect::<Vec<_>>());
read_parquet_single(&uri, columns.as_deref(), start_offset, num_rows, io_client)
.await
})
}))
.await
})
.context(JoinSnafu { path: "UNKNOWN" })?;
tables.into_iter().collect::<DaftResult<Vec<_>>>()
}

pub fn read_parquet_schema(uri: &str, io_client: Arc<IOClient>) -> DaftResult<Schema> {
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();
Expand Down
11 changes: 11 additions & 0 deletions tests/integration/io/parquet/test_reads_public_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,17 @@ def test_parquet_read_table(parquet_file):
pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas())


@pytest.mark.integration()
def test_parquet_read_table_bulk(parquet_file):
_, url = parquet_file
daft_native_reads = Table.read_parquet_bulk([url] * 2, io_config=IOConfig(s3=S3Config(anonymous=True)))
pa_read = Table.from_arrow(read_parquet_with_pyarrow(url))

for daft_native_read in daft_native_reads:
assert daft_native_read.schema() == pa_read.schema()
pd.testing.assert_frame_equal(daft_native_read.to_pandas(), pa_read.to_pandas())


@pytest.mark.integration()
def test_parquet_read_df(parquet_file):
_, url = parquet_file
Expand Down

0 comments on commit 71605cf

Please sign in to comment.