diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 0881d961f..64b2afc7e 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -50,7 +50,6 @@ arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } -async-stream = { workspace = true } async-trait = { workspace = true } bimap = { workspace = true } bitvec = { workspace = true } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 58440bfdf..6ed84d342 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef}; use arrow_string::like::starts_with; -use async_stream::try_stream; use bytes::Bytes; use fnv::FnvHashSet; +use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; -use futures::stream::StreamExt; -use futures::{try_join, TryFutureExt}; +use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter}; use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; @@ -44,7 +43,8 @@ use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream}; +use crate::runtime::spawn; +use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, Schema}; use crate::{Error, ErrorKind}; @@ -52,6 +52,22 @@ use crate::{Error, ErrorKind}; pub struct ArrowReaderBuilder { batch_size: Option, file_io: FileIO, + config: ArrowReaderConfig, +} + +#[derive(Clone)] +struct ArrowReaderConfig { + /// the maximum number of data files that can be fetched at the same time + concurrency_limit_data_files: usize, +} + +impl Default for ArrowReaderConfig { + fn default() -> Self { + let num_cores = num_cpus::get(); + Self { + concurrency_limit_data_files: num_cores, + } + } } impl ArrowReaderBuilder { @@ -60,9 +76,19 @@ impl ArrowReaderBuilder { ArrowReaderBuilder { batch_size: None, file_io, + config: ArrowReaderConfig::default(), } } + /// Sets the max number of in flight data files that are being fetched + pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self { + self.config = ArrowReaderConfig { + concurrency_limit_data_files: val, + }; + + self + } + /// Sets the desired size of batches in the response /// to something other than the default pub fn with_batch_size(mut self, batch_size: usize) -> Self { @@ -75,6 +101,7 @@ impl ArrowReaderBuilder { ArrowReader { batch_size: self.batch_size, file_io: self.file_io, + config: self.config, } } } @@ -84,73 +111,106 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, + config: ArrowReaderConfig, } impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files - pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result { + pub fn read(self, tasks: FileScanTaskStream) -> Result { let file_io = self.file_io.clone(); - - Ok(try_stream! { - while let Some(task_result) = tasks.next().await { - match task_result { - Ok(task) => { - // Collect Parquet column indices from field ids - let mut collector = CollectFieldIdVisitor { - field_ids: HashSet::default(), - }; - if let Some(predicates) = task.predicate() { - visit(&mut collector, predicates)?; - } - - let parquet_file = file_io - .new_input(task.data_file_path())?; - - let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; - let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - - let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader) - .await?; - - let parquet_schema = batch_stream_builder.parquet_schema(); - let arrow_schema = batch_stream_builder.schema(); - let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?; - batch_stream_builder = batch_stream_builder.with_projection(projection_mask); - - let parquet_schema = batch_stream_builder.parquet_schema(); - let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?; - - if let Some(row_filter) = row_filter { - batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); - } - - if let Some(batch_size) = self.batch_size { - batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); - } - - let mut batch_stream = batch_stream_builder.build()?; - - while let Some(batch) = batch_stream.next().await { - yield batch?; - } - } - Err(e) => { - Err(e)? - } - } + let batch_size = self.batch_size; + let max_concurrent_fetching_datafiles = self.config.concurrency_limit_data_files; + + let (tx, rx) = channel(0); + + spawn(async move { + tasks + .map(|task| Ok((task, file_io.clone(), tx.clone()))) + .try_for_each_concurrent( + max_concurrent_fetching_datafiles, + |(file_scan_task, file_io, tx)| async move { + spawn(async move { + Self::process_file_scan_task(file_scan_task, batch_size, file_io, tx) + .await + }) + .await + }, + ) + .await + }); + + return Ok(rx.boxed()); + } + + async fn process_file_scan_task( + task_res: Result, + batch_size: Option, + file_io: FileIO, + mut tx: Sender>, + ) -> Result<()> { + let task = match task_res { + Ok(task) => task, + Err(err) => { + tx.send(Err(err)).await?; + return Ok(()); } + }; + + // Collect Parquet column indices from field ids + let mut collector = CollectFieldIdVisitor { + field_ids: HashSet::default(), + }; + + if let Some(predicates) = task.predicate() { + visit(&mut collector, predicates)?; } - .boxed()) + + let parquet_file = file_io.new_input(task.data_file_path())?; + + let (parquet_metadata, parquet_reader) = + try_join!(parquet_file.metadata(), parquet_file.reader())?; + let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); + + let mut batch_stream_builder = + ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?; + + let parquet_schema = batch_stream_builder.parquet_schema(); + let arrow_schema = batch_stream_builder.schema(); + let projection_mask = Self::get_arrow_projection_mask( + task.project_field_ids(), + task.schema(), + parquet_schema, + arrow_schema, + )?; + batch_stream_builder = batch_stream_builder.with_projection(projection_mask); + + let parquet_schema = batch_stream_builder.parquet_schema(); + let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?; + + if let Some(row_filter) = row_filter { + batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); + } + + if let Some(batch_size) = batch_size { + batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); + } + + let mut batch_stream = batch_stream_builder.build()?; + + while let Some(batch) = batch_stream.try_next().await? { + tx.send(Ok(batch)).await? + } + + Ok(()) } fn get_arrow_projection_mask( - &self, field_ids: &[i32], iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, - ) -> crate::Result { + ) -> Result { if field_ids.is_empty() { Ok(ProjectionMask::all()) } else { @@ -216,7 +276,6 @@ impl ArrowReader { } fn get_row_filter( - &self, predicates: Option<&BoundPredicate>, parquet_schema: &SchemaDescriptor, collector: &CollectFieldIdVisitor, diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 9b83c9fdb..10abe92d5 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -56,8 +56,9 @@ pub struct TableScanBuilder<'a> { batch_size: Option, case_sensitive: bool, filter: Option, - concurrency_limit_manifest_files: usize, + concurrency_limit_data_files: usize, concurrency_limit_manifest_entries: usize, + concurrency_limit_manifest_files: usize, } impl<'a> TableScanBuilder<'a> { @@ -73,8 +74,9 @@ impl<'a> TableScanBuilder<'a> { batch_size: None, case_sensitive: true, filter: None, - concurrency_limit_manifest_files: num_cpus, + concurrency_limit_data_files: num_cpus, concurrency_limit_manifest_entries: num_cpus, + concurrency_limit_manifest_files: num_cpus, } } @@ -125,12 +127,13 @@ impl<'a> TableScanBuilder<'a> { pub fn with_concurrency_limit(mut self, limit: usize) -> Self { self.concurrency_limit_manifest_files = limit; self.concurrency_limit_manifest_entries = limit; + self.concurrency_limit_data_files = limit; self } - /// Sets the manifest file concurrency limit for this scan - pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self { - self.concurrency_limit_manifest_files = limit; + /// Sets the data file concurrency limit for this scan + pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit_data_files = limit; self } @@ -140,6 +143,12 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the manifest file concurrency limit for this scan + pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self { + self.concurrency_limit_manifest_files = limit; + self + } + /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { @@ -245,10 +254,11 @@ impl<'a> TableScanBuilder<'a> { Ok(TableScan { batch_size: self.batch_size, column_names: self.column_names, - concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, file_io: self.table.file_io().clone(), plan_context, + concurrency_limit_data_files: self.concurrency_limit_data_files, concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries, + concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, }) } } @@ -267,6 +277,10 @@ pub struct TableScan { /// The maximum number of [`ManifestEntry`]s that will /// be processed in parallel concurrency_limit_manifest_entries: usize, + + /// The maximum number of [`ManifestEntry`]s that will + /// be processed in parallel + concurrency_limit_data_files: usize, } /// PlanContext wraps a [`SnapshotRef`] alongside all the other @@ -339,7 +353,8 @@ impl TableScan { /// Returns an [`ArrowRecordBatchStream`]. pub async fn to_arrow(&self) -> Result { - let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()); + let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) + .with_data_file_concurrency_limit(self.concurrency_limit_data_files); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);