Skip to content

Commit

Permalink
feat: performance testing harness and perf tests for scan file plan
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Aug 2, 2024
1 parent bd9eea1 commit db2c8eb
Show file tree
Hide file tree
Showing 12 changed files with 642 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@
.vscode
**/.DS_Store
dist/*
crates/iceberg/testdata/performance/raw_data/*

crates/iceberg/testdata/performance/warehouse
7 changes: 7 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,15 @@ url = { workspace = true }
uuid = { workspace = true }

[dev-dependencies]
criterion = { version = "0.3", features = ["async_tokio", "async_futures"] }
ctor = { workspace = true }
futures-util = "0.3"
iceberg-catalog-rest = { path = "../catalog/rest" }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }

[[bench]]
name = "table_scan_plan_files"
harness = false
152 changes: 152 additions & 0 deletions crates/iceberg/benches/table_scan_plan_files.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use criterion::*;
use futures_util::StreamExt;
use tokio::runtime::Runtime;

mod utils;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::table::Table;
use iceberg::Catalog;
use utils::build_catalog;

async fn setup_async() -> Table {
let catalog = build_catalog().await;
let namespaces = catalog.list_namespaces(None).await.unwrap();
let table_idents = catalog.list_tables(&namespaces[0]).await.unwrap();
catalog.load_table(&table_idents[0]).await.unwrap()
}

fn setup(runtime: &Runtime) -> Table {
runtime.block_on(setup_async())
}

async fn all_files_all_rows(table: &Table) {
let scan = table.scan().build().unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

async fn one_file_all_rows(table: &Table) {
let scan = table
.scan()
.with_filter(
Reference::new("tpep_pickup_datetime")
.greater_than_or_equal_to(
Datum::timestamptz_from_str("2024-02-01T00:00:00.000 UTC").unwrap(),
)
.and(Reference::new("tpep_pickup_datetime").less_than(
Datum::timestamptz_from_str("2024-02-02T00:00:00.000 UTC").unwrap(),
)),
)
.build()
.unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

async fn all_files_some_rows(table: &Table) {
let scan = table
.scan()
.with_filter(Reference::new("passenger_count").equal_to(Datum::double(1.0)))
.build()
.unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

async fn one_file_some_rows(table: &Table) {
let scan =
table
.scan()
.with_filter(
Reference::new("tpep_pickup_datetime")
.greater_than_or_equal_to(
Datum::timestamptz_from_str("2024-02-01T00:00:00.000 UTC").unwrap(),
)
.and(Reference::new("tpep_pickup_datetime").less_than(
Datum::timestamptz_from_str("2024-02-02T00:00:00.000 UTC").unwrap(),
))
.and(Reference::new("passenger_count").equal_to(Datum::double(1.0))),
)
.build()
.unwrap();
let mut stream = scan.plan_files().await.unwrap();

while let Some(item) = stream.next().await {
black_box(item.unwrap());
}
}

pub fn bench_all_files_all_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("all_files_all_rows", |b| {
b.to_async(&runtime).iter(|| all_files_all_rows(&table))
});
}

pub fn bench_one_file_all_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("one_file_all_rows", |b| {
b.to_async(&runtime).iter(|| one_file_all_rows(&table))
});
}

pub fn bench_all_files_some_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("all_files_some_rows", |b| {
b.to_async(&runtime).iter(|| all_files_some_rows(&table))
});
}

pub fn bench_one_file_some_rows(c: &mut Criterion) {
let runtime = Runtime::new().unwrap();
let table = setup(&runtime);
println!("setup complete");

c.bench_function("one_file_some_rows", |b| {
b.to_async(&runtime).iter(|| one_file_some_rows(&table))
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = bench_all_files_all_rows, bench_all_files_some_rows, bench_one_file_all_rows, bench_one_file_some_rows
}

criterion_main!(benches);
60 changes: 60 additions & 0 deletions crates/iceberg/benches/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
use iceberg_test_utils::docker::DockerCompose;

pub fn get_docker_compose() -> DockerCompose {
DockerCompose::new(
"iceberg-rust-performance",
format!(
"{}/../iceberg/testdata/performance",
env!("CARGO_MANIFEST_DIR")
),
)
.with_persistence(true)
}

pub async fn build_catalog() -> RestCatalog {
let docker_compose = get_docker_compose();

let rest_api_container_ip = docker_compose.get_container_ip("rest");
let haproxy_container_ip = docker_compose.get_container_ip("haproxy");

let catalog_uri = format!("http://{}:{}", rest_api_container_ip, 8181);
let haproxy_uri = format!("http://{}:{}", haproxy_container_ip, 9080);

let user_props = HashMap::from_iter(
vec![
(S3_ENDPOINT.to_string(), haproxy_uri),
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
(S3_REGION.to_string(), "us-east-1".to_string()),
]
.into_iter(),
);

RestCatalog::new(
RestCatalogConfig::builder()
.uri(catalog_uri)
.props(user_props)
.build(),
)
}
112 changes: 112 additions & 0 deletions crates/iceberg/testdata/performance/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

version: "3"

services:
spark-iceberg:
image: tabulario/spark-iceberg
build: spark/
networks:
iceberg_perf:
depends_on:
- rest
- minio
volumes:
- ./warehouse:/home/iceberg/warehouse
- ./notebooks:/home/iceberg/notebooks/notebooks
- ./raw_data:/home/iceberg/raw_data
- ./spark_scripts:/home/iceberg/spark_scripts
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 10000:10000
- 10001:10001

rest:
image: tabulario/iceberg-rest
networks:
iceberg_perf:
ports:
- 8181:8181
volumes:
- ./warehouse:/warehouse
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
- CATALOG_WAREHOUSE=s3://warehouse/
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
- CATALOG_S3_ENDPOINT=http://minio:9000
- CATALOG_URI=jdbc:sqlite:file:/warehouse/catalog.sqlite

minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
iceberg_perf:
aliases:
- warehouse.minio
volumes:
- ./warehouse:/warehouse
ports:
- 9001:9001
- 9000:9000
command: ["server", "/warehouse", "--console-address", ":9001"]

mc:
depends_on:
- minio
image: minio/mc
container_name: mc
networks:
iceberg_perf:
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
haproxy:
image: haproxy:lts-bookworm
networks:
iceberg_perf:
ports:
- 9080
volumes:
- type: bind
source: ./haproxy.cfg
target: /usr/local/etc/haproxy/haproxy.cfg
read_only: true
depends_on:
- minio

networks:
iceberg_perf:
39 changes: 39 additions & 0 deletions crates/iceberg/testdata/performance/haproxy.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

global
log stdout format raw local0 debug
fd-hard-limit 50000

defaults
mode http
timeout client 10s
timeout connect 5s
timeout server 10s
timeout http-request 10s
log global

frontend slowio
bind :9080
default_backend minio
tcp-request inspect-delay 100ms
tcp-request content accept if WAIT_END
filter bwlim-out mylimit default-limit 625000 default-period 1s

backend minio
# server s1 minio.iceberg-performance-tests.orb.local:9000 check maxconn 4
server s1 minio:9000 check maxconn 4
Loading

0 comments on commit db2c8eb

Please sign in to comment.