Skip to content

Commit

Permalink
[CHORE] More Parquet benchmarking (#1160)
Browse files Browse the repository at this point in the history
1. Adds a naive "whole file downloader" read method
2. Adds stub for a native Daft read_parquet Table method
3. Adds more configurations of benchmarks (one column vs multi-column vs
multi-sparse-columns vs all columns)

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia committed Jul 18, 2023
1 parent 18f21a6 commit 512c81d
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 23 deletions.
16 changes: 16 additions & 0 deletions benchmarking/parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ Goals:
1. Find Parquet features that Daft underperforms on
2. Compare Daft against other frameworks

## Setup

Create a new virtual environment and install the dependencies

```bash
python -m venv venv
source venv/bin/activate
pip install -r benchmark-requirements.txt
```

Now, install the version of Daft you wish to use for benchmarking (either a released wheel, or if you want, a local build)

```bash
pip install getdaft
```

## Running the benchmarks:

### Goal 1: Find Parquet features that Daft underperforms on
Expand Down
5 changes: 5 additions & 0 deletions benchmarking/parquet/benchmark-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pytest==7.4.0
pytest-benchmark==4.0.0
pytest-memray==1.4.1
pyarrow==12.0.0
boto3==1.28.3
47 changes: 40 additions & 7 deletions benchmarking/parquet/conftest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
from __future__ import annotations

import io

import boto3
import pyarrow as pa
import pyarrow.fs as pafs
import pyarrow.parquet as papq
import pytest

import daft


def daft_read(path: str, columns: list[str] | None = None) -> pa.Table:
df = daft.read_parquet(path)
if columns is not None:
df = df.select(*columns)
return df.to_arrow()
# def daft_legacy_read(path: str, columns: list[str] | None = None) -> pa.Table:
# df = daft.read_parquet(path)
# if columns is not None:
# df = df.select(*columns)
# return df.to_arrow()


def pyarrow_read(path: str, columns: list[str] | None = None) -> pa.Table:
Expand All @@ -23,7 +25,38 @@ def pyarrow_read(path: str, columns: list[str] | None = None) -> pa.Table:
return papq.read_table(path, columns=columns, filesystem=fs)


@pytest.fixture(params=[daft_read, pyarrow_read], ids=["daft", "pyarrow"])
def boto3_get_object_read(path: str, columns: list[str] | None = None) -> pa.Table:
if path.startswith("s3://"):
client = boto3.client("s3", region_name="us-west-2")
split_path = path.replace("s3://", "").split("/")
bucket, key = split_path[0], path.replace(f"s3://{split_path[0]}/", "")
resp = client.get_object(Bucket=bucket, Key=key)
data = io.BytesIO(resp["Body"].read())
tbl = papq.read_table(data, columns=columns)
return tbl

with open(path, "rb") as f:
data = io.BytesIO(f.read())
return papq.read_table(data, columns=columns)


def daft_native_read(path: str, columns: list[str] | None = None) -> pa.Table:
tbl = daft.table.Table.read_parquet(path, columns=columns)
return tbl.to_arrow()


@pytest.fixture(
params=[
daft_native_read,
pyarrow_read,
boto3_get_object_read,
],
ids=[
"daft_native_read",
"pyarrow",
"boto3_get_object",
],
)
def read_fn(request):
"""Fixture which returns the function to read a PyArrow table from a path"""
return request.param
70 changes: 54 additions & 16 deletions benchmarking/parquet/test_num_rowgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,66 @@

import pytest

PATHS = [
"s3://eventual-dev-benchmarking-fixtures/parquet-benchmarking/tpch/1RG/daft_tpch_100g_32part_1RG.parquet",
"s3://eventual-dev-benchmarking-fixtures/parquet-benchmarking/tpch/8RG/daft_tpch_100g_32part_8RG.parquet",
"s3://eventual-dev-benchmarking-fixtures/parquet-benchmarking/tpch/64RG/daft_tpch_100g_32part_64RG.parquet",
]

@pytest.mark.benchmark(group="num_rowgroups")
IDS = ["1RG", "8RG", "64RG"]


@pytest.mark.benchmark(group="num_rowgroups_single_column")
@pytest.mark.parametrize(
"path",
[
"s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part_1RG.parquet",
"s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part.parquet",
# Disabled: too slow!
# "s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part_18kRG.parquet"
# "s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part_180kRG.parquet",
],
ids=[
"1",
"2k",
# Disabled: too slow!
# "18k",
# "180k",
],
PATHS,
ids=IDS,
)
def test_read_parquet_num_rowgroups(path, read_fn, benchmark):
def test_read_parquet_num_rowgroups_single_column(path, read_fn, benchmark):
data = benchmark(read_fn, path, columns=["L_ORDERKEY"])

# Make sure the data is correct
assert data.column_names == ["L_ORDERKEY"]
assert len(data) == 18751674


@pytest.mark.benchmark(group="num_rowgroups_multi_contiguous_columns")
@pytest.mark.parametrize(
"path",
PATHS,
ids=IDS,
)
def test_read_parquet_num_rowgroups_multi_contiguous_columns(path, read_fn, benchmark):
data = benchmark(read_fn, path, columns=["L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY"])

# Make sure the data is correct
assert data.column_names == ["L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY"]
assert len(data) == 18751674


@pytest.mark.benchmark(group="num_rowgroups_multi_sparse_columns")
@pytest.mark.parametrize(
"path",
PATHS,
ids=IDS,
)
def test_read_parquet_num_rowgroups_multi_sparse_columns(path, read_fn, benchmark):
data = benchmark(read_fn, path, columns=["L_ORDERKEY", "L_TAX"])

# Make sure the data is correct
assert data.column_names == ["L_ORDERKEY", "L_TAX"]
assert len(data) == 18751674


@pytest.mark.benchmark(group="num_rowgroups_all_columns")
@pytest.mark.parametrize(
"path",
PATHS,
ids=IDS,
)
def test_read_parquet_num_rowgroups_all_columns(path, read_fn, benchmark):
data = benchmark(read_fn, path)

# Make sure the data is correct
assert len(data.column_names) == 16
assert len(data) == 18751674

0 comments on commit 512c81d

Please sign in to comment.