Skip to content

Commit

Permalink
feat: concurrent data file fetches, parallel RecordBatch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Aug 9, 2024
1 parent 8a2fdc2 commit c2e4e64
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 65 deletions.
1 change: 0 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ arrow-schema = { workspace = true }
arrow-select = { workspace = true }
arrow-string = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-stream = { workspace = true }
async-trait = { workspace = true }
bimap = { workspace = true }
bitvec = { workspace = true }
Expand Down
173 changes: 116 additions & 57 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
use arrow_string::like::starts_with;
use async_stream::try_stream;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::stream::StreamExt;
use futures::{try_join, TryFutureExt};
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
Expand All @@ -44,14 +43,31 @@ use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, Schema};
use crate::{Error, ErrorKind};

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
config: ArrowReaderConfig,
}

#[derive(Clone)]
struct ArrowReaderConfig {
/// the maximum number of data files that can be fetched at the same time
concurrency_limit_data_files: usize,
}

impl Default for ArrowReaderConfig {
fn default() -> Self {
let num_cores = num_cpus::get();
Self {
concurrency_limit_data_files: num_cores,
}
}
}

impl ArrowReaderBuilder {
Expand All @@ -60,9 +76,19 @@ impl ArrowReaderBuilder {
ArrowReaderBuilder {
batch_size: None,
file_io,
config: ArrowReaderConfig::default(),
}
}

/// Sets the max number of in flight data files that are being fetched
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
self.config = ArrowReaderConfig {
concurrency_limit_data_files: val,
};

self
}

/// Sets the desired size of batches in the response
/// to something other than the default
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
Expand All @@ -75,6 +101,7 @@ impl ArrowReaderBuilder {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
config: self.config,
}
}
}
Expand All @@ -84,73 +111,106 @@ impl ArrowReaderBuilder {
pub struct ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
config: ArrowReaderConfig,
}

impl ArrowReader {
/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, mut tasks: FileScanTaskStream) -> crate::Result<ArrowRecordBatchStream> {
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let file_io = self.file_io.clone();

Ok(try_stream! {
while let Some(task_result) = tasks.next().await {
match task_result {
Ok(task) => {
// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
if let Some(predicates) = task.predicate() {
visit(&mut collector, predicates)?;
}

let parquet_file = file_io
.new_input(task.data_file_path())?;

let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
let projection_mask = self.get_arrow_projection_mask(task.project_field_ids(),task.schema(),parquet_schema, arrow_schema)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = self.get_row_filter(task.predicate(),parquet_schema, &collector)?;

if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}

if let Some(batch_size) = self.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}

let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.next().await {
yield batch?;
}
}
Err(e) => {
Err(e)?
}
}
let batch_size = self.batch_size;
let max_concurrent_fetching_datafiles = self.config.concurrency_limit_data_files;

let (tx, rx) = channel(0);

spawn(async move {
tasks
.map(|task| Ok((task, file_io.clone(), tx.clone())))
.try_for_each_concurrent(
max_concurrent_fetching_datafiles,
|(file_scan_task, file_io, tx)| async move {
spawn(async move {
Self::process_file_scan_task(file_scan_task, batch_size, file_io, tx)
.await
})
.await
},
)
.await
});

return Ok(rx.boxed());
}

async fn process_file_scan_task(
task_res: Result<FileScanTask>,
batch_size: Option<usize>,
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
) -> Result<()> {
let task = match task_res {
Ok(task) => task,
Err(err) => {
tx.send(Err(err)).await?;
return Ok(());
}
};

// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};

if let Some(predicates) = task.predicate() {
visit(&mut collector, predicates)?;
}
.boxed())

let parquet_file = file_io.new_input(task.data_file_path())?;

let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
let projection_mask = Self::get_arrow_projection_mask(
task.project_field_ids(),
task.schema(),
parquet_schema,
arrow_schema,
)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;

if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}

if let Some(batch_size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}

let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
}

Ok(())
}

fn get_arrow_projection_mask(
&self,
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> crate::Result<ProjectionMask> {
) -> Result<ProjectionMask> {
if field_ids.is_empty() {
Ok(ProjectionMask::all())
} else {
Expand Down Expand Up @@ -216,7 +276,6 @@ impl ArrowReader {
}

fn get_row_filter(
&self,
predicates: Option<&BoundPredicate>,
parquet_schema: &SchemaDescriptor,
collector: &CollectFieldIdVisitor,
Expand Down
29 changes: 22 additions & 7 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ pub struct TableScanBuilder<'a> {
batch_size: Option<usize>,
case_sensitive: bool,
filter: Option<Predicate>,
concurrency_limit_manifest_files: usize,
concurrency_limit_data_files: usize,
concurrency_limit_manifest_entries: usize,
concurrency_limit_manifest_files: usize,
}

impl<'a> TableScanBuilder<'a> {
Expand All @@ -73,8 +74,9 @@ impl<'a> TableScanBuilder<'a> {
batch_size: None,
case_sensitive: true,
filter: None,
concurrency_limit_manifest_files: num_cpus,
concurrency_limit_data_files: num_cpus,
concurrency_limit_manifest_entries: num_cpus,
concurrency_limit_manifest_files: num_cpus,
}
}

Expand Down Expand Up @@ -125,12 +127,13 @@ impl<'a> TableScanBuilder<'a> {
pub fn with_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
self.concurrency_limit_manifest_entries = limit;
self.concurrency_limit_data_files = limit;
self
}

/// Sets the manifest file concurrency limit for this scan
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
/// Sets the data file concurrency limit for this scan
pub fn with_data_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_data_files = limit;
self
}

Expand All @@ -140,6 +143,12 @@ impl<'a> TableScanBuilder<'a> {
self
}

/// Sets the manifest file concurrency limit for this scan
pub fn with_manifest_file_concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit_manifest_files = limit;
self
}

/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Expand Down Expand Up @@ -245,10 +254,11 @@ impl<'a> TableScanBuilder<'a> {
Ok(TableScan {
batch_size: self.batch_size,
column_names: self.column_names,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
file_io: self.table.file_io().clone(),
plan_context,
concurrency_limit_data_files: self.concurrency_limit_data_files,
concurrency_limit_manifest_entries: self.concurrency_limit_manifest_entries,
concurrency_limit_manifest_files: self.concurrency_limit_manifest_files,
})
}
}
Expand All @@ -267,6 +277,10 @@ pub struct TableScan {
/// The maximum number of [`ManifestEntry`]s that will
/// be processed in parallel
concurrency_limit_manifest_entries: usize,

/// The maximum number of [`ManifestEntry`]s that will
/// be processed in parallel
concurrency_limit_data_files: usize,
}

/// PlanContext wraps a [`SnapshotRef`] alongside all the other
Expand Down Expand Up @@ -339,7 +353,8 @@ impl TableScan {

/// Returns an [`ArrowRecordBatchStream`].
pub async fn to_arrow(&self) -> Result<ArrowRecordBatchStream> {
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone());
let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone())
.with_data_file_concurrency_limit(self.concurrency_limit_data_files);

if let Some(batch_size) = self.batch_size {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
Expand Down

0 comments on commit c2e4e64

Please sign in to comment.