Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecordBatchTransformer: Handle schema migration and column re-ordering in table scans #602

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ apache-avro = "0.17"
array-init = "2"
arrow-arith = { version = "53" }
arrow-array = { version = "53" }
arrow-cast = { version = "53" }
arrow-ord = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ apache-avro = { workspace = true }
array-init = { workspace = true }
arrow-arith = { workspace = true }
arrow-array = { workspace = true }
arrow-cast = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
mod schema;
pub use schema::*;
mod reader;
pub(crate) mod record_batch_transformer;

pub use reader::*;
11 changes: 10 additions & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
Expand Down Expand Up @@ -209,6 +210,12 @@ impl ArrowReader {
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any required transformations on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}
Expand Down Expand Up @@ -261,8 +268,10 @@ impl ArrowReader {
// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;

while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
}

Ok(())
Expand Down
Loading
Loading