From 358ccc4656115ac09c5c66f7b30718aa7b0841a2 Mon Sep 17 00:00:00 2001 From: Alexis Schlomer Date: Mon, 29 Apr 2024 15:30:21 -0400 Subject: [PATCH] Make 25% faster --- .../src/cost/base_cost/stats.rs | 92 ++++----- optd-gungnir/src/stats/hyperloglog.rs | 32 +-- optd-gungnir/src/stats/misragries.rs | 21 +- optd-perftest/src/datafusion_dbms.rs | 190 +++++++++--------- optd-perftest/src/postgres_dbms.rs | 2 +- optd-perftest/src/tpch.rs | 26 ++- 6 files changed, 186 insertions(+), 177 deletions(-) diff --git a/optd-datafusion-repr/src/cost/base_cost/stats.rs b/optd-datafusion-repr/src/cost/base_cost/stats.rs index b63e6de8..dd677df4 100644 --- a/optd-datafusion-repr/src/cost/base_cost/stats.rs +++ b/optd-datafusion-repr/src/cost/base_cost/stats.rs @@ -1,9 +1,13 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + sync::{mpsc::Receiver, Arc}, + thread::JoinHandle, +}; -use arrow_schema::{ArrowError, DataType, SchemaRef}; +use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; use datafusion::arrow::array::{ Array, BooleanArray, Date32Array, Float32Array, Int16Array, Int32Array, Int8Array, RecordBatch, - RecordBatchIterator, RecordBatchReader, StringArray, UInt16Array, UInt32Array, UInt8Array, + StringArray, UInt16Array, UInt32Array, UInt8Array, }; use itertools::Itertools; use optd_core::rel_node::{SerializableOrderedF64, Value}; @@ -330,16 +334,17 @@ impl TableStats, TDigest> { .zip(hlls) .zip(null_counts) .for_each(|(((column_comb, mg), hll), count)| { - let filtered_nulls: Vec = column_comb - .iter() - .filter(|row| row.iter().any(|val| val.is_some())) - .cloned() - .collect(); - let nb_rows = column_comb.len() as i32; + let filtered_nulls = column_comb + .into_iter() + .filter(|row| row.iter().any(|val| val.is_some())); + + *count += column_comb.len() as i32; - *count += nb_rows - filtered_nulls.len() as i32; - mg.aggregate(&filtered_nulls); - hll.aggregate(&filtered_nulls); + filtered_nulls.for_each(|e| { + mg.insert_element(e, 1); + hll.process(e); + *count -= 1; + }); }); } @@ -373,46 +378,38 @@ impl TableStats, TDigest> { }); } - pub fn from_record_batches>>( - batch_iter_builder: impl Fn() -> anyhow::Result>, + pub fn from_record_batches( + first_batch_channel: impl FnOnce() + -> (JoinHandle<()>, Receiver>), + second_batch_channel: impl FnOnce() + -> (JoinHandle<()>, Receiver>), combinations: Vec, + schema: Arc, ) -> anyhow::Result { - let batch_iter = batch_iter_builder()?; - let comb_stat_types = Self::get_stats_types(&combinations, &batch_iter.schema()); + let comb_stat_types = Self::get_stats_types(&combinations, &schema); let nb_stats = comb_stat_types.len(); - // 0. Just count row numbers if no combinations can give stats. - if nb_stats == 0 { - let mut row_cnt = 0; - for batch in batch_iter { - row_cnt += batch?.num_rows(); - } - - return Ok(Self { - row_cnt, - column_comb_stats: HashMap::new(), - }); - } - - // TODO(Alexis): This materialization is OK as JOB only takes 1GB, but should be made in parallel... - // Unfortunately, par_bridge doesn't work as the BatchIterator doesn't implement Send. - let materialized: Vec<_> = batch_iter.collect(); - - // 1. FIRST PASS: hlls + mgs + null_cnts. + // 1. FIRST PASS: hlls + mgs + null_cnts. let now = std::time::Instant::now(); - let (hlls, mgs, null_cnts) = materialized - .par_iter() + let (handle, receiver) = first_batch_channel(); + + let (hlls, mgs, null_cnts) = receiver + .into_iter() + .par_bridge() .fold(Self::first_pass_stats_id(nb_stats), |local_stats, batch| { let mut local_stats = local_stats?; match batch { Ok(batch) => { let (hlls, mgs, null_cnts) = &mut local_stats; - let comb = Self::get_column_combs(batch, &comb_stat_types); + let comb = Self::get_column_combs(&batch, &comb_stat_types); Self::generate_partial_stats(&comb, mgs, hlls, null_cnts); Ok(local_stats) } - Err(_) => todo!(), // TODO(Alexis): Could not satisfy the type checker otherwise, but never happens! + Err(e) => { + println!("Err: {:?},, {:?}", e, comb_stat_types.len()); + Err(e.into()) + } } }) .reduce( @@ -433,12 +430,17 @@ impl TableStats, TDigest> { Ok(final_stats) }, )?; + + let _ = handle.join(); let first = now.elapsed(); // 2. SECOND PASS: mcv + tdigest + row_cnts. let now = std::time::Instant::now(); - let (distrs, cnts, row_cnts) = materialized - .par_iter() + let (handle, receiver) = second_batch_channel(); + + let (distrs, cnts, row_cnts) = receiver + .into_iter() + .par_bridge() .fold( Self::second_pass_stats_id(&comb_stat_types, &mgs, nb_stats), |local_stats, batch| { @@ -447,11 +449,11 @@ impl TableStats, TDigest> { match batch { Ok(batch) => { let (distrs, cnts, row_cnts) = &mut local_stats; - let comb = Self::get_column_combs(batch, &comb_stat_types); + let comb = Self::get_column_combs(&batch, &comb_stat_types); Self::generate_full_stats(&comb, cnts, distrs, row_cnts); Ok(local_stats) } - Err(_) => todo!(), // TODO(Alexis): Could not satisfy the type checker otherwise, but never happens! + Err(e) => Err(e.into()), } }, ) @@ -479,10 +481,11 @@ impl TableStats, TDigest> { Ok(final_stats) }, )?; - let second = now.elapsed(); + + let _ = handle.join(); + println!("First: {:?}, Second: {:?}", first, now.elapsed()); // 3. ASSEMBLE STATS. - let now = std::time::Instant::now(); let row_cnt = row_cnts[0]; let mut column_comb_stats = HashMap::new(); @@ -506,7 +509,6 @@ impl TableStats, TDigest> { ); column_comb_stats.insert(comb, column_stats); } - println!("First: {:?}, Second: {:?}, Third: {:?}", first, second, now.elapsed()); Ok(Self { row_cnt: row_cnt as usize, diff --git a/optd-gungnir/src/stats/hyperloglog.rs b/optd-gungnir/src/stats/hyperloglog.rs index d49f6f38..396052c4 100644 --- a/optd-gungnir/src/stats/hyperloglog.rs +++ b/optd-gungnir/src/stats/hyperloglog.rs @@ -88,9 +88,9 @@ impl_byte_serializable_for_numeric!(usize, isize); impl_byte_serializable_for_numeric!(f64, f32); // Self-contained implementation of the HyperLogLog data structure. -impl HyperLogLog +impl<'a, T> HyperLogLog where - T: ByteSerializable, + T: ByteSerializable + 'a, { /// Creates and initializes a new empty HyperLogLog. pub fn new(precision: u8) -> Self { @@ -109,17 +109,23 @@ where } } + pub fn process(&mut self, element: &T) + where + T: ByteSerializable, + { + let hash = murmur_hash(&element.to_bytes(), 0); // TODO: We ignore DoS attacks (seed). + let mask = (1 << (self.precision)) - 1; + let idx = (hash & mask) as usize; // LSB is bucket discriminator; MSB is zero streak. + self.registers[idx] = max(self.registers[idx], self.zeros(hash) + 1); + } + /// Digests an array of ByteSerializable data into the HLL. - pub fn aggregate(&mut self, data: &[T]) + pub fn aggregate(&mut self, data: I) where + I: Iterator, T: ByteSerializable, { - for d in data { - let hash = murmur_hash(&d.to_bytes(), 0); // TODO: We ignore DoS attacks (seed). - let mask = (1 << (self.precision)) - 1; - let idx = (hash & mask) as usize; // LSB is bucket discriminator; MSB is zero streak. - self.registers[idx] = max(self.registers[idx], self.zeros(hash) + 1); - } + data.for_each(|e| self.process(e)); } /// Merges two HLLs together and returns a new one. @@ -192,7 +198,7 @@ mod tests { let mut hll = HyperLogLog::new(12); let data = vec!["a".to_string(), "b".to_string()]; - hll.aggregate(&data); + hll.aggregate(data.iter()); assert_eq!(hll.n_distinct(), data.len() as u64); } @@ -201,7 +207,7 @@ mod tests { let mut hll = HyperLogLog::new(12); let data = vec![1, 2]; - hll.aggregate(&data); + hll.aggregate(data.iter()); assert_eq!(hll.n_distinct(), data.len() as u64); } @@ -239,7 +245,7 @@ mod tests { let relative_error = 0.05; // We allow a 5% relatative error rate. let strings = generate_random_strings(n_distinct, 100, 0); - hll.aggregate(&strings); + hll.aggregate(strings.iter()); assert!(is_close( hll.n_distinct() as f64, @@ -264,7 +270,7 @@ mod tests { let curr_job_id = job_id.fetch_add(1, Ordering::SeqCst); let strings = generate_random_strings(n_distinct, 100, curr_job_id); - local_hll.aggregate(&strings); + local_hll.aggregate(strings.iter()); assert!(is_close( local_hll.n_distinct() as f64, diff --git a/optd-gungnir/src/stats/misragries.rs b/optd-gungnir/src/stats/misragries.rs index 692a8b64..05aa7201 100644 --- a/optd-gungnir/src/stats/misragries.rs +++ b/optd-gungnir/src/stats/misragries.rs @@ -23,9 +23,9 @@ pub struct MisraGries { } // Self-contained implementation of the Misra-Gries data structure. -impl MisraGries +impl<'a, T> MisraGries where - T: PartialEq + Eq + Hash + Clone, + T: PartialEq + Eq + Hash + Clone + 'a, { /// Creates and initializes a new empty Misra-Gries. pub fn new(k: u16) -> Self { @@ -48,7 +48,7 @@ where } // Inserts an element occ times into the `self` Misra-Gries structure. - fn insert_element(&mut self, elem: &T, occ: i32) { + pub fn insert_element(&mut self, elem: &T, occ: i32) { match self.frequencies.get_mut(elem) { Some(freq) => { *freq += occ; // Hit. @@ -93,8 +93,11 @@ where } /// Digests an array of data into the Misra-Gries structure. - pub fn aggregate(&mut self, data: &[T]) { - data.iter().for_each(|key| self.insert_element(key, 1)); + pub fn aggregate(&mut self, data: I) + where + I: Iterator, + { + data.for_each(|key| self.insert_element(&key, 1)); } /// Merges another MisraGries into the current one. @@ -131,7 +134,7 @@ mod tests { let data = vec![0, 1, 2, 3]; let mut misra_gries = MisraGries::::new(data.len() as u16); - misra_gries.aggregate(&data); + misra_gries.aggregate(data.iter()); for key in misra_gries.most_frequent_keys() { assert!(data.contains(key)); @@ -145,7 +148,7 @@ mod tests { let mut misra_gries = MisraGries::::new(data.len() as u16); - misra_gries.aggregate(&data_dup); + misra_gries.aggregate(data_dup.iter()); for key in misra_gries.most_frequent_keys() { assert!(data.contains(key)); @@ -189,7 +192,7 @@ mod tests { let data = create_zipfian(n_distinct, 0); let mut misra_gries = MisraGries::::new(k as u16); - misra_gries.aggregate(&data); + misra_gries.aggregate(data.iter()); check_zipfian(&misra_gries, n_distinct); } @@ -209,7 +212,7 @@ mod tests { let curr_job_id = job_id.fetch_add(1, Ordering::SeqCst); let data = create_zipfian(n_distinct, curr_job_id as u64); - local_misra_gries.aggregate(&data); + local_misra_gries.aggregate(data.iter()); check_zipfian(&local_misra_gries, n_distinct); diff --git a/optd-perftest/src/datafusion_dbms.rs b/optd-perftest/src/datafusion_dbms.rs index 29a620bd..3e819cd0 100644 --- a/optd-perftest/src/datafusion_dbms.rs +++ b/optd-perftest/src/datafusion_dbms.rs @@ -1,7 +1,11 @@ use std::{ fs::{self, File}, path::{Path, PathBuf}, - sync::{Arc, Mutex}, + sync::{ + mpsc::{self, Receiver}, + Arc, Mutex, + }, + thread::{self, JoinHandle}, time::Instant, }; @@ -14,8 +18,10 @@ use crate::{ use async_trait::async_trait; use datafusion::{ arrow::{ - array::RecordBatchIterator, + array::{RecordBatch, RecordBatchIterator}, csv::ReaderBuilder, + datatypes::SchemaRef, + error::ArrowError, util::display::{ArrayFormatter, FormatOptions}, }, execution::{ @@ -309,7 +315,7 @@ impl DatafusionDBMS { self.create_tpch_tables(&tpch_kit).await?; // Load the data by creating an external table first and copying the data to real tables. - let tbl_fpath_iter = tpch_kit.get_tbl_fpath_iter(tpch_kit_config).unwrap(); + let tbl_fpath_iter = tpch_kit.get_tbl_fpath_vec(tpch_kit_config).unwrap(); for tbl_fpath in tbl_fpath_iter { let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap(); Self::execute( @@ -350,6 +356,84 @@ impl DatafusionDBMS { Ok(()) } + fn gen_base_stats( + tbl_paths: Vec, + ctx: SessionContext, + delim: u8, + ) -> anyhow::Result { + let base_table_stats = Mutex::new(DataFusionBaseTableStats::default()); + let now = Instant::now(); + + tbl_paths.par_iter().for_each(|tbl_fpath| { + let tbl_name = TpchKit::get_tbl_name_from_tbl_fpath(tbl_fpath); + let start = Instant::now(); + + let schema = block_on(async { + ctx.catalog("datafusion") + .unwrap() + .schema("public") + .unwrap() + .table(&tbl_name) + .await + .unwrap() + .schema() + }); + + let nb_cols = schema.fields().len(); + let single_cols = (0..nb_cols).map(|v| vec![v]).collect::>(); + + let stats_result = DataFusionPerTableStats::from_record_batches( + Self::create_batch_channel(tbl_fpath.clone(), schema.clone(), delim), + Self::create_batch_channel(tbl_fpath.clone(), schema.clone(), delim), + single_cols, + schema, + ); + + if let Ok(per_table_stats) = stats_result { + let mut stats = base_table_stats.lock().unwrap(); + stats.insert(tbl_name.to_string(), per_table_stats); + } + + println!( + "Table {:?} took in total {:?}...", + tbl_name, + start.elapsed() + ); + }); + + println!("Total execution time {:?}...", now.elapsed()); + + Ok(base_table_stats.into_inner()?) + } + + fn create_batch_channel( + tbl_fpath: PathBuf, + schema: SchemaRef, + delim: u8, + ) -> impl FnOnce() -> (JoinHandle<()>, Receiver>) { + move || { + let (sender, receiver) = mpsc::channel(); + + let handle = thread::spawn(move || { + let tbl_file = File::open(tbl_fpath).expect("Failed to open file"); + let csv_reader = ReaderBuilder::new(schema.clone()) + .has_header(false) + .with_delimiter(delim) + .with_escape(b'\\') + .with_batch_size(1024) + .build(tbl_file) + .expect("Failed to build CSV reader"); + + let batch_iter = RecordBatchIterator::new(csv_reader, schema); + for batch in batch_iter { + sender.send(batch).expect("Failed to send batch"); + } + }); + + (handle, receiver) + } + } + async fn get_tpch_stats( &mut self, tpch_kit_config: &TpchKitConfig, @@ -370,44 +454,9 @@ impl DatafusionDBMS { Self::execute(&ctx, ddl).await?; } - // Build the DataFusionBaseTableStats object. - let mut base_table_stats = DataFusionBaseTableStats::default(); - for tbl_fpath in tpch_kit.get_tbl_fpath_iter(tpch_kit_config).unwrap() { - let tbl_name = TpchKit::get_tbl_name_from_tbl_fpath(&tbl_fpath); - let schema = ctx - .catalog("datafusion") - .unwrap() - .schema("public") - .unwrap() - .table(&tbl_name) - .await - .unwrap() - .schema(); - - let nb_cols = schema.fields().len(); - let single_cols = (0..nb_cols).map(|v| vec![v]); - /*let pairwise_cols = iproduct!(0..nb_cols, 0..nb_cols) - .filter(|(i, j)| i != j) - .map(|(i, j)| vec![i, j]);*/ - - base_table_stats.insert( - tbl_name.to_string(), - DataFusionPerTableStats::from_record_batches( - || { - let tbl_file = fs::File::open(&tbl_fpath)?; - let csv_reader1 = ReaderBuilder::new(schema.clone()) - .has_header(false) - .with_delimiter(b'|') - .build(tbl_file) - .unwrap(); - Ok(RecordBatchIterator::new(csv_reader1, schema.clone())) - }, - single_cols.collect(), - )?, - ); - } - - Ok(base_table_stats) + // Compute base statistics. + let tbl_paths = tpch_kit.get_tbl_fpath_vec(tpch_kit_config)?; + Self::gen_base_stats(tbl_paths, ctx, b'|') } async fn get_job_stats( @@ -430,64 +479,9 @@ impl DatafusionDBMS { Self::execute(&ctx, ddl).await?; } - let now = Instant::now(); - - // Build the DataFusionBaseTableStats object. - let base_table_stats = Mutex::new(DataFusionBaseTableStats::default()); + // Compute base statistics. let tbl_paths = job_kit.get_tbl_fpath_vec().unwrap(); - - tbl_paths.par_iter().for_each(|tbl_fpath| { - let tbl_name = JobKit::get_tbl_name_from_tbl_fpath(tbl_fpath); - let start = Instant::now(); - - let schema = block_on(async { - ctx.catalog("datafusion") - .unwrap() - .schema("public") - .unwrap() - .table(&tbl_name) - .await - .unwrap() - .schema() - }); - println!( - "Table {:?} starting after {:?}...", - tbl_name, - start.elapsed() - ); - - let nb_cols = schema.fields().len(); - let single_cols = (0..nb_cols).map(|v| vec![v]).collect::>(); - - let stats_result = DataFusionPerTableStats::from_record_batches( - || { - let tbl_file = fs::File::open(tbl_fpath)?; - let csv_reader1 = ReaderBuilder::new(schema.clone()) - .has_header(false) - .with_delimiter(b',') - .with_escape(b'\\') - .with_batch_size(1024) - .build(tbl_file) - .unwrap(); - Ok(RecordBatchIterator::new(csv_reader1, schema.clone())) - }, - single_cols, - ); - - if let Ok(per_table_stats) = stats_result { - let mut stats = base_table_stats.lock().unwrap(); - stats.insert(tbl_name.to_string(), per_table_stats); - } - - println!( - "Table {:?} took in total {:?}...", - tbl_name, - start.elapsed() - ); - }); - - println!("Total execution time {:?}...", now.elapsed()); - Ok(base_table_stats.into_inner().unwrap()) + Self::gen_base_stats(tbl_paths, ctx, b',') } } diff --git a/optd-perftest/src/postgres_dbms.rs b/optd-perftest/src/postgres_dbms.rs index 13a82805..f870ae9d 100644 --- a/optd-perftest/src/postgres_dbms.rs +++ b/optd-perftest/src/postgres_dbms.rs @@ -158,7 +158,7 @@ impl PostgresDBMS { // load the tables tpch_kit.gen_tables(tpch_kit_config)?; - for tbl_fpath in tpch_kit.get_tbl_fpath_iter(tpch_kit_config)? { + for tbl_fpath in tpch_kit.get_tbl_fpath_vec(tpch_kit_config)? { Self::copy_from_stdin(client, tbl_fpath, "|", "\\").await?; } diff --git a/optd-perftest/src/tpch.rs b/optd-perftest/src/tpch.rs index 720fab3f..00b98942 100644 --- a/optd-perftest/src/tpch.rs +++ b/optd-perftest/src/tpch.rs @@ -208,19 +208,23 @@ impl TpchKit { .to_string() } - /// Get an iterator through all generated .tbl files of a given config - pub fn get_tbl_fpath_iter( - &self, - tpch_kit_config: &TpchKitConfig, - ) -> io::Result> { + /// Get a vector of all generated .tbl files of a given config + pub fn get_tbl_fpath_vec(&self, tpch_kit_config: &TpchKitConfig) -> io::Result> { let this_genned_tables_dpath = self.get_this_genned_tables_dpath(tpch_kit_config); let dirent_iter = fs::read_dir(this_genned_tables_dpath)?; - // all results/options are fine to be unwrapped except for path.extension() because that could - // return None in various cases - let path_iter = dirent_iter.map(|dirent| dirent.unwrap().path()); - let tbl_fpath_iter = path_iter - .filter(|path| path.extension().map(|ext| ext.to_str().unwrap()) == Some("tbl")); - Ok(tbl_fpath_iter) + + let tbl_fpath_vec: Vec = dirent_iter + .filter_map(|dirent| dirent.ok()) + .map(|dirent| dirent.path()) + .filter(|path| { + path.extension() + .and_then(|ext| ext.to_str()) + .map(|ext| ext == "tbl") + .unwrap_or(false) + }) + .collect(); + + Ok(tbl_fpath_vec) } /// Get an iterator through all generated .sql files _in order_ of a given config