Skip to content

Commit

Permalink
feat: automatically converting csv to parquet (#181)
Browse files Browse the repository at this point in the history
**Summary**: Automatically converting CSV to Parquet before generating
stats on the Parquet files.

**Demo**:
![Screenshot 2024-05-01 at 18 58
27](https://github.com/cmu-db/optd/assets/20631215/e87df42d-ab79-420f-bfef-9318ab51c03b)

**Details**:
* For robustness, we don't use schema inference. We build a temporary
DataFusion context, create the tables with the DDL statements, and then
get the schema from DataFusion.
* I forked csv2parquet
[here](https://github.com/wangpatrick57/arrow-tools). One notable change
is that it's now a library instead of a binary. Also, we turn empty
strings for nullable Utf8 columns into nulls in-memory, because arrow's
CSV reader doesn't seem to do this for Utf8 types. This has a huge
effect on q-error on JOB.
  • Loading branch information
wangpatrick57 committed May 28, 2024
1 parent 182b052 commit 5255d8d
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 49 deletions.
58 changes: 44 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions dev_scripts/which_queries_work.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ fi
successful_ids=()
IFS=','
for id in $all_ids; do
# make sure to execute with --adaptive so that we actually run the query in datafusion
cargo run --bin optd-perftest cardtest $benchmark_name --query-ids $id --adaptive &>/dev/null
cargo run --release --bin optd-perftest cardtest $benchmark_name --query-ids $id &>/dev/null

if [ $? -eq 0 ]; then
echo >&2 $id succeeded
Expand Down
5 changes: 4 additions & 1 deletion optd-perftest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ tokio = { version = "1.24", features = [
shlex = "1.3"
tokio-postgres = "0.7"
regex = "1.10"
clap = { version = "4.5", features = [
clap = { version = "4.5.4", features = [
"derive",
] }
log = "0.4"
Expand All @@ -47,6 +47,9 @@ itertools = "0.12.1"
test-case = "3.3"
rayon = "1.10"
parquet = "47.0.0"
csv2parquet = { git = "https://github.com/wangpatrick57/arrow-tools.git", branch = "main" }
arrow-schema = { version = "47.0.0", features = ["serde"] }


[dev_dependencies]
assert_cmd = "2.0"
82 changes: 60 additions & 22 deletions optd-perftest/src/datafusion_dbms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl DatafusionDBMS {
match benchmark {
Benchmark::Tpch(_) => {
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
self.create_tpch_tables(&tpch_kit).await?;
Self::create_tpch_tables(self.get_ctx(), &tpch_kit).await?;
}
Benchmark::Job(_) | Benchmark::Joblight(_) => {
let job_kit = JobKit::build(&self.workspace_dpath)?;
Expand All @@ -332,15 +332,15 @@ impl DatafusionDBMS {
Ok(())
}

async fn create_tpch_tables(&mut self, tpch_kit: &TpchKit) -> anyhow::Result<()> {
async fn create_tpch_tables(ctx: &SessionContext, tpch_kit: &TpchKit) -> anyhow::Result<()> {
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
let ddls = ddls
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
for ddl in ddls {
Self::execute(self.get_ctx(), ddl).await?;
Self::execute(ctx, ddl).await?;
}
Ok(())
}
Expand All @@ -362,12 +362,12 @@ impl DatafusionDBMS {
&mut self,
tpch_kit_config: &TpchKitConfig,
) -> anyhow::Result<()> {
// Generate the tables.
// Generate the tables and convert them to Parquet.
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
tpch_kit.gen_tables(tpch_kit_config)?;

// Create the tables.
self.create_tpch_tables(&tpch_kit).await?;
Self::create_tpch_tables(self.get_ctx(), &tpch_kit).await?;

// Load the data by creating an external table first and copying the data to real tables.
let tbl_fpath_iter = tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "tbl").unwrap();
Expand All @@ -394,6 +394,10 @@ impl DatafusionDBMS {
.await
.unwrap()
.schema();

// DEBUG(phw2)
println!("schema={}", serde_json::to_string_pretty(&schema).unwrap());

let projection_list = (1..=schema.fields().len())
.map(|i| format!("column_{}", i))
.collect::<Vec<_>>()
Expand Down Expand Up @@ -477,7 +481,35 @@ impl DatafusionDBMS {

println!("Total execution time {:?}...", now.elapsed());

Ok(base_table_stats.into_inner()?)
let stats = base_table_stats.into_inner();
let l = stats.unwrap();
// Useful for debugging stats so I kept it
// l.iter().for_each(|(table_name, stats)| {
// println!("Table: {} (num_rows: {})", table_name, stats.row_cnt);
// stats
// .column_comb_stats
// .iter()
// .sorted_by_key(|x| x.0[0])
// .for_each(|x| {
// let sum_freq: f64 = x.1.mcvs.frequencies().values().copied().sum();
// println!(
// "Col: {} (n_distinct: {}) (n_frac: {}) (mcvs: {} {}) (tdigests: {:?} {:?} {:?} {:?} {:?})",
// x.0[0],
// x.1.ndistinct,
// x.1.null_frac,
// x.1.mcvs.frequencies().len(),
// sum_freq,
// x.1.distr.as_ref().map(|d| d.quantile(0.01)),
// x.1.distr.as_ref().map(|d| d.quantile(0.25)),
// x.1.distr.as_ref().map(|d| d.quantile(0.50)),
// x.1.distr.as_ref().map(|d| d.quantile(0.75)),
// x.1.distr.as_ref().map(|d| d.quantile(0.99)),
// );
// });
// });
// println!("{:#?}", stats);

Ok(l)
}

// Load job data from a .csv file.
Expand All @@ -487,14 +519,14 @@ impl DatafusionDBMS {
) -> anyhow::Result<()> {
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;

// Download the tables.
// Download the tables and convert them to Parquet.
let job_kit = JobKit::build(&self.workspace_dpath)?;
job_kit.download_tables(job_kit_config)?;

// Create the tables.
Self::create_job_tables(&ctx, &job_kit).await?;

// Load each table using register_csv()
// Load each table using register_csv().
let tbl_fpath_iter = job_kit.get_tbl_fpath_vec("csv").unwrap();
for tbl_fpath in tbl_fpath_iter {
let tbl_name = tbl_fpath.file_stem().unwrap().to_str().unwrap();
Expand Down Expand Up @@ -525,34 +557,39 @@ impl DatafusionDBMS {
&mut self,
tpch_kit_config: &TpchKitConfig,
) -> anyhow::Result<DataFusionBaseTableStats> {
// Generate the tables
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
tpch_kit.gen_tables(tpch_kit_config)?;

// To get the schema of each table.
// Create tables in a temporary context to get the schema provider.
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_TPCH).await?;
let ddls = fs::read_to_string(&tpch_kit.schema_fpath)?;
let ddls = ddls
.split(';')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
for ddl in ddls {
Self::execute(&ctx, ddl).await?;
}
let tpch_kit = TpchKit::build(&self.workspace_dpath)?;
Self::create_tpch_tables(&ctx, &tpch_kit).await?;
let schema_provider = ctx.catalog("datafusion").unwrap().schema("public").unwrap();

// Generate the tables
tpch_kit.gen_tables(tpch_kit_config)?;
tpch_kit
.make_parquet_files(tpch_kit_config, schema_provider)
.await?;
// Compute base statistics on Parquet.
let tbl_paths = tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "parquet")?;
assert!(tbl_paths.len() == tpch_kit.get_tbl_fpath_vec(tpch_kit_config, "tbl")?.len());
Self::gen_base_stats(tbl_paths)
}

async fn get_job_stats(
&mut self,
job_kit_config: &JobKitConfig,
) -> anyhow::Result<DataFusionBaseTableStats> {
// Create tables in a temporary context to get the schema provider.
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;
let job_kit = JobKit::build(&self.workspace_dpath)?;
Self::create_job_tables(&ctx, &job_kit).await?;
let schema_provider = ctx.catalog("datafusion").unwrap().schema("public").unwrap();

// Generate the tables.
let job_kit = JobKit::build(&self.workspace_dpath)?;
job_kit.download_tables(job_kit_config)?;
job_kit
.make_parquet_files(job_kit_config, schema_provider)
.await?;

// To get the schema of each table.
let ctx = Self::new_session_ctx(None, self.adaptive, WITH_LOGICAL_FOR_JOB).await?;
Expand All @@ -568,6 +605,7 @@ impl DatafusionDBMS {

// Compute base statistics on Parquet.
let tbl_paths = job_kit.get_tbl_fpath_vec("parquet").unwrap();
assert!(tbl_paths.len() == job_kit.get_tbl_fpath_vec("csv")?.len());
Self::gen_base_stats(tbl_paths)
}
}
Expand Down
Loading

0 comments on commit 5255d8d

Please sign in to comment.