diff --git a/Cargo.lock b/Cargo.lock index 3f92018a..125aee21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,6 +362,9 @@ name = "arrow-schema" version = "47.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d1d179c117b158853e0101bfbed5615e86fe97ee356b4af901f1c5001e1ce4b" +dependencies = [ + "serde", +] [[package]] name = "arrow-select" @@ -393,6 +396,11 @@ dependencies = [ "regex-syntax 0.7.5", ] +[[package]] +name = "arrow-tools" +version = "0.18.0" +source = "git+https://github.com/wangpatrick57/arrow-tools.git?branch=main#c04460346d808268e7811b212212c3442428330c" + [[package]] name = "assert_approx_eq" version = "1.1.0" @@ -1012,12 +1020,12 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.2" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", - "clap_derive 4.5.0", + "clap_derive 4.5.4", ] [[package]] @@ -1038,7 +1046,7 @@ version = "3.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro-error", "proc-macro2 1.0.78", "quote 1.0.35", @@ -1047,11 +1055,11 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.0" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2 1.0.78", "quote 1.0.35", "syn 2.0.48", @@ -1284,6 +1292,20 @@ dependencies = [ "memchr", ] +[[package]] +name = "csv2parquet" +version = "0.18.0" +source = "git+https://github.com/wangpatrick57/arrow-tools.git?branch=main#c04460346d808268e7811b212212c3442428330c" +dependencies = [ + "arrow", + "arrow-schema", + "arrow-tools", + "clap 4.5.4", + "parquet", + "regex", + "serde_json", +] + [[package]] name = "ctor" version = "0.2.6" @@ -2046,6 +2068,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -2507,9 +2535,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.4" +version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" [[package]] name = "mimalloc" @@ -2883,9 +2911,11 @@ name = "optd-perftest" version = "0.1.0" dependencies = [ "anyhow", + "arrow-schema", "assert_cmd", "async-trait", - "clap 4.5.2", + "clap 4.5.4", + "csv2parquet", "datafusion", "datafusion-optd-cli", "env_logger 0.11.2", @@ -3857,9 +3887,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" dependencies = [ "itoa", "ryu", @@ -3990,7 +4020,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2 1.0.78", "quote 1.0.35", "syn 1.0.109", @@ -4122,7 +4152,7 @@ version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2 1.0.78", "quote 1.0.35", "rustversion", diff --git a/dev_scripts/which_queries_work.sh b/dev_scripts/which_queries_work.sh index 5a6796d9..98ff623b 100755 --- a/dev_scripts/which_queries_work.sh +++ b/dev_scripts/which_queries_work.sh @@ -24,8 +24,7 @@ fi successful_ids=() IFS=',' for id in $all_ids; do - # make sure to execute with --adaptive so that we actually run the query in datafusion - cargo run --bin optd-perftest cardtest $benchmark_name --query-ids $id --adaptive &>/dev/null + cargo run --release --bin optd-perftest cardtest $benchmark_name --query-ids $id &>/dev/null if [ $? -eq 0 ]; then echo >&2 $id succeeded diff --git a/optd-perftest/Cargo.toml b/optd-perftest/Cargo.toml index fe08820e..58137a34 100644 --- a/optd-perftest/Cargo.toml +++ b/optd-perftest/Cargo.toml @@ -31,7 +31,7 @@ tokio = { version = "1.24", features = [ shlex = "1.3" tokio-postgres = "0.7" regex = "1.10" -clap = { version = "4.5", features = [ +clap = { version = "4.5.4", features = [ "derive", ] } log = "0.4" @@ -47,6 +47,9 @@ itertools = "0.12.1" test-case = "3.3" rayon = "1.10" parquet = "47.0.0" +csv2parquet = { git = "https://github.com/wangpatrick57/arrow-tools.git", branch = "main" } +arrow-schema = { version = "47.0.0", features = ["serde"] } + [dev_dependencies] assert_cmd = "2.0" diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index ee06e462..37c4ba61 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -322,7 +322,7 @@ impl DatafusionDBMS { match benchmark { Benchmark::Tpch(_) => { let tpch_kit = TpchKit::build(&self.workspace_dpath)?; - self.create_tpch_tables(&tpch_kit).await?; + Self::create_tpch_tables(self.get_ctx(), &tpch_kit).await?; } Benchmark::Job(_) | Benchmark::Joblight(_) => { let job_kit = JobKit::build(&self.workspace_dpath)?; @@ -332,7 +332,7 @@ impl DatafusionDBMS { Ok(()) } - async fn create_tpch_tables(&mut self, tpch_kit: &TpchKit) -> anyhow::Result<()> { + async fn create_tpch_tables(ctx: &SessionContext, tpch_kit: &TpchKit) -> anyhow::Result<()> { let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?; let ddls = ddls .split(';') @@ -340,7 +340,7 @@ impl DatafusionDBMS { .filter(|s| !s.is_empty()) .collect::>(); for ddl in ddls { - Self::execute(self.get_ctx(), ddl).await?; + Self::execute(ctx, ddl).await?; } Ok(()) } @@ -362,12 +362,12 @@ impl DatafusionDBMS { &mut self, tpch_kit_config: &TpchKitConfig, ) -> anyhow::Result<()> { - // Generate the tables. + // Generate the tables and convert them to Parquet. let tpch_kit = TpchKit::build(&self.workspace_dpath)?; tpch_kit.gen_tables(tpch_kit_config)?; // Create the tables. - self.create_tpch_tables(&tpch_kit).await?; + Self::create_tpch_tables(self.get_ctx(), &tpch_kit).await?; // Load the data by creating an external table first and copying the data to real tables. let tbl_fpath_iter = tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "tbl").unwrap(); @@ -394,6 +394,10 @@ impl DatafusionDBMS { .await .unwrap() .schema(); + + // DEBUG(phw2) + println!("schema={}", serde_json::to_string_pretty(&schema).unwrap()); + let projection_list = (1..=schema.fields().len()) .map(|i| format!("column_{}", i)) .collect::>() @@ -477,7 +481,35 @@ impl DatafusionDBMS { println!("Total execution time {:?}...", now.elapsed()); - Ok(base_table_stats.into_inner()?) + let stats = base_table_stats.into_inner(); + let l = stats.unwrap(); + // Useful for debugging stats so I kept it + // l.iter().for_each(|(table_name, stats)| { + // println!("Table: {} (num_rows: {})", table_name, stats.row_cnt); + // stats + // .column_comb_stats + // .iter() + // .sorted_by_key(|x| x.0[0]) + // .for_each(|x| { + // let sum_freq: f64 = x.1.mcvs.frequencies().values().copied().sum(); + // println!( + // "Col: {} (n_distinct: {}) (n_frac: {}) (mcvs: {} {}) (tdigests: {:?} {:?} {:?} {:?} {:?})", + // x.0[0], + // x.1.ndistinct, + // x.1.null_frac, + // x.1.mcvs.frequencies().len(), + // sum_freq, + // x.1.distr.as_ref().map(|d| d.quantile(0.01)), + // x.1.distr.as_ref().map(|d| d.quantile(0.25)), + // x.1.distr.as_ref().map(|d| d.quantile(0.50)), + // x.1.distr.as_ref().map(|d| d.quantile(0.75)), + // x.1.distr.as_ref().map(|d| d.quantile(0.99)), + // ); + // }); + // }); + // println!("{:#?}", stats); + + Ok(l) } // Load job data from a .csv file. @@ -487,14 +519,14 @@ impl DatafusionDBMS { ) -> anyhow::Result<()> { let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?; - // Download the tables. + // Download the tables and convert them to Parquet. let job_kit = JobKit::build(&self.workspace_dpath)?; job_kit.download_tables(job_kit_config)?; // Create the tables. Self::create_job_tables(&ctx, &job_kit).await?; - // Load each table using register_csv() + // Load each table using register_csv(). let tbl_fpath_iter = job_kit.get_tbl_fpath_vec("csv").unwrap(); for tbl_fpath in tbl_fpath_iter { let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); @@ -525,24 +557,20 @@ impl DatafusionDBMS { &mut self, tpch_kit_config: &TpchKitConfig, ) -> anyhow::Result { - // Generate the tables - let tpch_kit = TpchKit::build(&self.workspace_dpath)?; - tpch_kit.gen_tables(tpch_kit_config)?; - - // To get the schema of each table. + // Create tables in a temporary context to get the schema provider. let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_TPCH).await?; - let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?; - let ddls = ddls - .split(';') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect::>(); - for ddl in ddls { - Self::execute(&ctx, ddl).await?; - } + let tpch_kit = TpchKit::build(&self.workspace_dpath)?; + Self::create_tpch_tables(&ctx, &tpch_kit).await?; + let schema_provider = ctx.catalog("datafusion").unwrap().schema("public").unwrap(); + // Generate the tables + tpch_kit.gen_tables(tpch_kit_config)?; + tpch_kit + .make_parquet_files(tpch_kit_config, schema_provider) + .await?; // Compute base statistics on Parquet. let tbl_paths = tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "parquet")?; + assert!(tbl_paths.len() == tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "tbl")?.len()); Self::gen_base_stats(tbl_paths) } @@ -550,9 +578,18 @@ impl DatafusionDBMS { &mut self, job_kit_config: &JobKitConfig, ) -> anyhow::Result { + // Create tables in a temporary context to get the schema provider. + let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?; + let job_kit = JobKit::build(&self.workspace_dpath)?; + Self::create_job_tables(&ctx, &job_kit).await?; + let schema_provider = ctx.catalog("datafusion").unwrap().schema("public").unwrap(); + // Generate the tables. let job_kit = JobKit::build(&self.workspace_dpath)?; job_kit.download_tables(job_kit_config)?; + job_kit + .make_parquet_files(job_kit_config, schema_provider) + .await?; // To get the schema of each table. let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?; @@ -568,6 +605,7 @@ impl DatafusionDBMS { // Compute base statistics on Parquet. let tbl_paths = job_kit.get_tbl_fpath_vec("parquet").unwrap(); + assert!(tbl_paths.len() == job_kit.get_tbl_fpath_vec("csv")?.len()); Self::gen_base_stats(tbl_paths) } } diff --git a/optd-perftest/src/job.rs b/optd-perftest/src/job.rs index b6eb4f14..da898cfe 100644 --- a/optd-perftest/src/job.rs +++ b/optd-perftest/src/job.rs @@ -1,3 +1,5 @@ +use csv2parquet::Opts; +use datafusion::catalog::schema::SchemaProvider; /// A wrapper around job-kit use serde::{Deserialize, Serialize}; @@ -7,18 +9,18 @@ use std::fs; use std::fs::File; use std::io; use std::path::{Path, PathBuf}; +use std::sync::Arc; const JOB_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/job-kit.git"; const JOB_TABLES_URL: &str = "https://homepages.cwi.nl/~boncz/job/imdb.tgz"; pub const WORKING_JOB_QUERY_IDS: &[&str] = &[ - "1a", "1b", "1c", "1d", "2a", "2b", "2c", "2d", "3a", "3b", "3c", "4a", "4b", "4c", "5a", "5b", - "5c", "6a", "6b", "6c", "6d", "6e", "6f", "7b", "8a", "8b", "8c", "8d", "9b", "9c", "9d", - "10a", "10b", "10c", "12a", "12b", "12c", "13a", "13b", "13c", "13d", "14a", "14b", "14c", - "15a", "15b", "15c", "15d", "16a", "16b", "16c", "16d", "17a", "17b", "17c", "17d", "17e", - "17f", "18a", "18c", "19b", "19c", "19d", "20a", "20b", "20c", "22a", "22b", "22c", "22d", - "23a", "23b", "23c", "24a", "24b", "25a", "25b", "25c", "26a", "26b", "26c", "28a", "28b", - "28c", "29a", "29b", "29c", "30a", "30b", "30c", "31a", "31b", "31c", "32a", "32b", "33a", - "33b", "33c", + "1a", "1b", "1c", "1d", "2a", "2b", "2d", "3a", "3b", "3c", "4a", "4b", "4c", "5c", "6a", "6b", + "6c", "6d", "6e", "6f", "7b", "8a", "8b", "8c", "8d", "9b", "9c", "9d", "10a", "10c", "12a", + "12b", "12c", "13a", "13b", "13c", "13d", "14a", "14b", "14c", "15a", "15b", "15c", "15d", + "16a", "16b", "16c", "16d", "17a", "17b", "17c", "17d", "17e", "17f", "18a", "18c", "19b", + "19c", "19d", "20a", "20b", "20c", "22a", "22b", "22c", "22d", "23a", "23b", "23c", "24a", + "24b", "25a", "25b", "25c", "26a", "26b", "26c", "28a", "28b", "28c", "29a", "29b", "29c", + "30a", "30b", "30c", "31a", "31b", "31c", "32b", "33a", "33b", "33c", ]; pub const WORKING_JOBLIGHT_QUERY_IDS: &[&str] = &[ "1a", "1b", "1c", "1d", "2a", "3a", "3b", "4a", "4b", "4c", "5a", "5b", "5c", "6a", "6b", "6c", @@ -126,6 +128,32 @@ impl JobKit { Ok(()) } + pub async fn make_parquet_files( + &self, + job_kit_config: &JobKitConfig, + schema_provider: Arc, + ) -> io::Result<()> { + let done_fpath = self.downloaded_tables_dpath.join("make_parquet_done"); + if !done_fpath.exists() { + log::debug!("[start] making parquet for {}", job_kit_config); + for csv_tbl_fpath in self.get_tbl_fpath_vec("csv").unwrap() { + let tbl_name = Self::get_tbl_name_from_tbl_fpath(&csv_tbl_fpath); + let schema = schema_provider.table(&tbl_name).await.unwrap().schema(); + let mut parquet_tbl_fpath = csv_tbl_fpath.clone(); + parquet_tbl_fpath.set_extension("parquet"); + let mut opts = Opts::new(csv_tbl_fpath, parquet_tbl_fpath.clone()); + opts.delimiter = ','; + opts.schema = Some(schema.as_ref().clone()); + csv2parquet::convert(opts).unwrap(); + } + File::create(done_fpath)?; + log::debug!("[end] making parquet for {}", job_kit_config); + } else { + log::debug!("[skip] making parquet for {}", job_kit_config); + } + Ok(()) + } + /// Convert a tbl_fpath into the table name pub fn get_tbl_name_from_tbl_fpath>(tbl_fpath: P) -> String { tbl_fpath diff --git a/optd-perftest/src/tpch.rs b/optd-perftest/src/tpch.rs index 6c8e53d4..58569200 100644 --- a/optd-perftest/src/tpch.rs +++ b/optd-perftest/src/tpch.rs @@ -1,4 +1,6 @@ /// A wrapper around tpch-kit +use csv2parquet::Opts; +use datafusion::catalog::schema::SchemaProvider; use serde::{Deserialize, Serialize}; use crate::shell; @@ -9,12 +11,14 @@ use std::fs; use std::fs::File; use std::io; use std::path::{Path, PathBuf}; +use std::sync::Arc; const TPCH_KIT_REPO_URL: &str = "https://github.com/wangpatrick57/tpch-kit.git"; pub const TPCH_KIT_POSTGRES: &str = "POSTGRESQL"; const NUM_TPCH_QUERIES: usize = 22; -pub const WORKING_QUERY_IDS: &[&str] = - &["2", "3", "5", "7", "8", "9", "10", "12", "13", "14", "17"]; +pub const WORKING_QUERY_IDS: &[&str] = &[ + "2", "3", "5", "6", "7", "8", "9", "10", "12", "13", "14", "17", "19", +]; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct TpchKitConfig { @@ -146,6 +150,35 @@ impl TpchKit { Ok(()) } + pub async fn make_parquet_files( + &self, + tpch_kit_config: &TpchKitConfig, + schema_provider: Arc, + ) -> io::Result<()> { + let this_genned_tables_dpath = self.get_this_genned_tables_dpath(tpch_kit_config); + let done_fpath = this_genned_tables_dpath.join("make_parquet_done"); + + if !done_fpath.exists() { + log::debug!("[start] making parquet for {}", tpch_kit_config); + for csv_tbl_fpath in self.get_tbl_fpath_vec(tpch_kit_config, "tbl").unwrap() { + let tbl_name = Self::get_tbl_name_from_tbl_fpath(&csv_tbl_fpath); + let schema = schema_provider.table(&tbl_name).await.unwrap().schema(); + let mut parquet_tbl_fpath = csv_tbl_fpath.clone(); + parquet_tbl_fpath.set_extension("parquet"); + let mut opts = Opts::new(csv_tbl_fpath, parquet_tbl_fpath.clone()); + opts.delimiter = '|'; + opts.schema = Some(schema.as_ref().clone()); + csv2parquet::convert(opts).unwrap(); + } + File::create(done_fpath)?; + log::debug!("[end] making parquet for {}", tpch_kit_config); + } else { + log::debug!("[skip] making parquet for {}", tpch_kit_config); + } + + Ok(()) + } + /// Generates the .sql files for all queries of TPC-H, with one .sql file per query pub fn gen_queries(&self, tpch_kit_config: &TpchKitConfig) -> io::Result<()> { let this_genned_queries_dpath = self.get_this_genned_queries_dpath(tpch_kit_config);