From 512c81d4888e0f4902fb2e1e08bada26f88a15ed Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Tue, 18 Jul 2023 13:10:12 -0700 Subject: [PATCH] [CHORE] More Parquet benchmarking (#1160) 1. Adds a naive "whole file downloader" read method 2. Adds stub for a native Daft read_parquet Table method 3. Adds more configurations of benchmarks (one column vs multi-column vs multi-sparse-columns vs all columns) --------- Co-authored-by: Jay Chia --- benchmarking/parquet/README.md | 16 +++++ .../parquet/benchmark-requirements.txt | 5 ++ benchmarking/parquet/conftest.py | 47 +++++++++++-- benchmarking/parquet/test_num_rowgroups.py | 70 ++++++++++++++----- 4 files changed, 115 insertions(+), 23 deletions(-) create mode 100644 benchmarking/parquet/benchmark-requirements.txt diff --git a/benchmarking/parquet/README.md b/benchmarking/parquet/README.md index 4fc68eef6b..be6d89fcdd 100644 --- a/benchmarking/parquet/README.md +++ b/benchmarking/parquet/README.md @@ -5,6 +5,22 @@ Goals: 1. Find Parquet features that Daft underperforms on 2. Compare Daft against other frameworks +## Setup + +Create a new virtual environment and install the dependencies + +```bash +python -m venv venv +source venv/bin/activate +pip install -r benchmark-requirements.txt +``` + +Now, install the version of Daft you wish to use for benchmarking (either a released wheel, or if you want, a local build) + +```bash +pip install getdaft +``` + ## Running the benchmarks: ### Goal 1: Find Parquet features that Daft underperforms on diff --git a/benchmarking/parquet/benchmark-requirements.txt b/benchmarking/parquet/benchmark-requirements.txt new file mode 100644 index 0000000000..3e8f9ae1b9 --- /dev/null +++ b/benchmarking/parquet/benchmark-requirements.txt @@ -0,0 +1,5 @@ +pytest==7.4.0 +pytest-benchmark==4.0.0 +pytest-memray==1.4.1 +pyarrow==12.0.0 +boto3==1.28.3 diff --git a/benchmarking/parquet/conftest.py b/benchmarking/parquet/conftest.py index 3915f4fb33..f8062bf5d5 100644 --- a/benchmarking/parquet/conftest.py +++ b/benchmarking/parquet/conftest.py @@ -1,5 +1,8 @@ from __future__ import annotations +import io + +import boto3 import pyarrow as pa import pyarrow.fs as pafs import pyarrow.parquet as papq @@ -7,12 +10,11 @@ import daft - -def daft_read(path: str, columns: list[str] | None = None) -> pa.Table: - df = daft.read_parquet(path) - if columns is not None: - df = df.select(*columns) - return df.to_arrow() +# def daft_legacy_read(path: str, columns: list[str] | None = None) -> pa.Table: +# df = daft.read_parquet(path) +# if columns is not None: +# df = df.select(*columns) +# return df.to_arrow() def pyarrow_read(path: str, columns: list[str] | None = None) -> pa.Table: @@ -23,7 +25,38 @@ def pyarrow_read(path: str, columns: list[str] | None = None) -> pa.Table: return papq.read_table(path, columns=columns, filesystem=fs) -@pytest.fixture(params=[daft_read, pyarrow_read], ids=["daft", "pyarrow"]) +def boto3_get_object_read(path: str, columns: list[str] | None = None) -> pa.Table: + if path.startswith("s3://"): + client = boto3.client("s3", region_name="us-west-2") + split_path = path.replace("s3://", "").split("/") + bucket, key = split_path[0], path.replace(f"s3://{split_path[0]}/", "") + resp = client.get_object(Bucket=bucket, Key=key) + data = io.BytesIO(resp["Body"].read()) + tbl = papq.read_table(data, columns=columns) + return tbl + + with open(path, "rb") as f: + data = io.BytesIO(f.read()) + return papq.read_table(data, columns=columns) + + +def daft_native_read(path: str, columns: list[str] | None = None) -> pa.Table: + tbl = daft.table.Table.read_parquet(path, columns=columns) + return tbl.to_arrow() + + +@pytest.fixture( + params=[ + daft_native_read, + pyarrow_read, + boto3_get_object_read, + ], + ids=[ + "daft_native_read", + "pyarrow", + "boto3_get_object", + ], +) def read_fn(request): """Fixture which returns the function to read a PyArrow table from a path""" return request.param diff --git a/benchmarking/parquet/test_num_rowgroups.py b/benchmarking/parquet/test_num_rowgroups.py index f419781744..906d5d3e4f 100644 --- a/benchmarking/parquet/test_num_rowgroups.py +++ b/benchmarking/parquet/test_num_rowgroups.py @@ -2,28 +2,66 @@ import pytest +PATHS = [ + "s3://eventual-dev-benchmarking-fixtures/parquet-benchmarking/tpch/1RG/daft_tpch_100g_32part_1RG.parquet", + "s3://eventual-dev-benchmarking-fixtures/parquet-benchmarking/tpch/8RG/daft_tpch_100g_32part_8RG.parquet", + "s3://eventual-dev-benchmarking-fixtures/parquet-benchmarking/tpch/64RG/daft_tpch_100g_32part_64RG.parquet", +] -@pytest.mark.benchmark(group="num_rowgroups") +IDS = ["1RG", "8RG", "64RG"] + + +@pytest.mark.benchmark(group="num_rowgroups_single_column") @pytest.mark.parametrize( "path", - [ - "s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part_1RG.parquet", - "s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part.parquet", - # Disabled: too slow! - # "s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part_18kRG.parquet" - # "s3://daft-public-data/test_fixtures/parquet-dev/daft_tpch_100g_32part_180kRG.parquet", - ], - ids=[ - "1", - "2k", - # Disabled: too slow! - # "18k", - # "180k", - ], + PATHS, + ids=IDS, ) -def test_read_parquet_num_rowgroups(path, read_fn, benchmark): +def test_read_parquet_num_rowgroups_single_column(path, read_fn, benchmark): data = benchmark(read_fn, path, columns=["L_ORDERKEY"]) # Make sure the data is correct assert data.column_names == ["L_ORDERKEY"] assert len(data) == 18751674 + + +@pytest.mark.benchmark(group="num_rowgroups_multi_contiguous_columns") +@pytest.mark.parametrize( + "path", + PATHS, + ids=IDS, +) +def test_read_parquet_num_rowgroups_multi_contiguous_columns(path, read_fn, benchmark): + data = benchmark(read_fn, path, columns=["L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY"]) + + # Make sure the data is correct + assert data.column_names == ["L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY"] + assert len(data) == 18751674 + + +@pytest.mark.benchmark(group="num_rowgroups_multi_sparse_columns") +@pytest.mark.parametrize( + "path", + PATHS, + ids=IDS, +) +def test_read_parquet_num_rowgroups_multi_sparse_columns(path, read_fn, benchmark): + data = benchmark(read_fn, path, columns=["L_ORDERKEY", "L_TAX"]) + + # Make sure the data is correct + assert data.column_names == ["L_ORDERKEY", "L_TAX"] + assert len(data) == 18751674 + + +@pytest.mark.benchmark(group="num_rowgroups_all_columns") +@pytest.mark.parametrize( + "path", + PATHS, + ids=IDS, +) +def test_read_parquet_num_rowgroups_all_columns(path, read_fn, benchmark): + data = benchmark(read_fn, path) + + # Make sure the data is correct + assert len(data.column_names) == 16 + assert len(data) == 18751674