Skip to content

Commit

Permalink
Make reading in parallel with parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
AlSchlo committed Apr 30, 2024
1 parent 358ccc4 commit 1a39bfb
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 29 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions optd-perftest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ serde_json = "1.0"
itertools = "0.12.1"
test-case = "3.3"
rayon = "1.10"
parquet = "47.0.0"

[dev_dependencies]
assert_cmd = "2.0"
46 changes: 23 additions & 23 deletions optd-perftest/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use crate::{
use async_trait::async_trait;
use datafusion::{
arrow::{
array::{RecordBatch, RecordBatchIterator},
csv::ReaderBuilder,
datatypes::SchemaRef,
array::RecordBatch,
error::ArrowError,
util::display::{ArrayFormatter, FormatOptions},
},
Expand All @@ -31,6 +29,7 @@ use datafusion::{
},
sql::{parser::DFParser, sqlparser::dialect::GenericDialect},
};

use datafusion_optd_cli::helper::unescape_input;
use futures::executor::block_on;
use lazy_static::lazy_static;
Expand All @@ -39,6 +38,7 @@ use optd_datafusion_repr::{
cost::{DataFusionBaseTableStats, DataFusionPerTableStats},
DatafusionOptimizer,
};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use rayon::prelude::*;
use regex::Regex;

Expand Down Expand Up @@ -359,7 +359,6 @@ impl DatafusionDBMS {
fn gen_base_stats(
tbl_paths: Vec<PathBuf>,
ctx: SessionContext,
delim: u8,
) -> anyhow::Result<DataFusionBaseTableStats> {
let base_table_stats = Mutex::new(DataFusionBaseTableStats::default());
let now = Instant::now();
Expand All @@ -383,8 +382,8 @@ impl DatafusionDBMS {
let single_cols = (0..nb_cols).map(|v| vec![v]).collect::<Vec<_>>();

let stats_result = DataFusionPerTableStats::from_record_batches(
Self::create_batch_channel(tbl_fpath.clone(), schema.clone(), delim),
Self::create_batch_channel(tbl_fpath.clone(), schema.clone(), delim),
Self::create_batch_channel(tbl_fpath.clone()),
Self::create_batch_channel(tbl_fpath.clone()),
single_cols,
schema,
);
Expand All @@ -408,26 +407,27 @@ impl DatafusionDBMS {

fn create_batch_channel(
tbl_fpath: PathBuf,
schema: SchemaRef,
delim: u8,
) -> impl FnOnce() -> (JoinHandle<()>, Receiver<Result<RecordBatch, ArrowError>>) {
move || {
let (sender, receiver) = mpsc::channel();

let handle = thread::spawn(move || {
// Get the number of row groups.
let tbl_file = File::open(tbl_fpath).expect("Failed to open file");
let csv_reader = ReaderBuilder::new(schema.clone())
.has_header(false)
.with_delimiter(delim)
.with_escape(b'\\')
.with_batch_size(1024)
.build(tbl_file)
.expect("Failed to build CSV reader");

let batch_iter = RecordBatchIterator::new(csv_reader, schema);
for batch in batch_iter {
sender.send(batch).expect("Failed to send batch");
}
let builder =
ParquetRecordBatchReaderBuilder::try_new(tbl_file.try_clone().unwrap())
.unwrap();
let num_row_groups = builder.metadata().num_row_groups();

// Read row groups in parallel.
(0..num_row_groups).into_par_iter().for_each(|i| {
ParquetRecordBatchReaderBuilder::try_new(tbl_file.try_clone().unwrap())
.unwrap()
.with_row_groups(vec![i])
.build()
.unwrap()
.for_each(|batch| sender.send(batch).expect("Failed to send batch"))
});
});

(handle, receiver)
Expand Down Expand Up @@ -456,7 +456,7 @@ impl DatafusionDBMS {

// Compute base statistics.
let tbl_paths = tpch_kit.get_tbl_fpath_vec(tpch_kit_config)?;
Self::gen_base_stats(tbl_paths, ctx, b'|')
Self::gen_base_stats(tbl_paths, ctx)
}

async fn get_job_stats(
Expand All @@ -480,8 +480,8 @@ impl DatafusionDBMS {
}

// Compute base statistics.
let tbl_paths = job_kit.get_tbl_fpath_vec().unwrap();
Self::gen_base_stats(tbl_paths, ctx, b',')
let tbl_paths = job_kit.get_tbl_fpath_vec("parquet").unwrap();
Self::gen_base_stats(tbl_paths, ctx)
}
}

Expand Down
4 changes: 2 additions & 2 deletions optd-perftest/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl JobKit {
}

/// Get a vector of all generated .csv files in a given directory path
pub fn get_tbl_fpath_vec(&self) -> io::Result<Vec<PathBuf>> {
pub fn get_tbl_fpath_vec(&self, target_ext: &str) -> io::Result<Vec<PathBuf>> {
let dirent_iter = fs::read_dir(&self.downloaded_tables_dpath)?;

let entries: Vec<_> = dirent_iter.collect::<Result<Vec<_>, io::Error>>()?;
Expand All @@ -150,7 +150,7 @@ impl JobKit {
if path
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext == "csv")
.map(|ext| ext == target_ext)
.unwrap_or(false)
{
Some(path)
Expand Down
2 changes: 1 addition & 1 deletion optd-perftest/src/postgres_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl PostgresDBMS {

// load the tables
job_kit.download_tables(job_kit_config)?;
for tbl_fpath in job_kit.get_tbl_fpath_vec()? {
for tbl_fpath in job_kit.get_tbl_fpath_vec("csv")? {
Self::copy_from_stdin(client, tbl_fpath, ",", "\\").await?;
}

Expand Down

0 comments on commit 1a39bfb

Please sign in to comment.