Skip to content

Commit

Permalink
refactor: rename to RecordBatchTransformer. Improve passthrough handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Sep 23, 2024
1 parent 415f57e commit e0a1ac8
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 108 deletions.
2 changes: 1 addition & 1 deletion crates/iceberg/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
mod schema;
pub use schema::*;
mod reader;
pub(crate) mod record_batch_evolution_processor;
pub(crate) mod record_batch_transformer;

pub use reader::*;
13 changes: 7 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI
use parquet::file::metadata::ParquetMetaData;
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};

use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor;
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
Expand Down Expand Up @@ -195,10 +195,11 @@ impl ArrowReader {
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// create a RecordBatchEvolutionProcessor if our task schema contains columns
// not present in the parquet file or whose types have been promoted
let mut record_batch_evolution_processor =
RecordBatchEvolutionProcessor::build(task.schema_ref(), task.project_field_ids());
// RecordBatchTransformer performs any required transformations on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -241,7 +242,7 @@ impl ArrowReader {
let mut record_batch_stream = record_batch_stream_builder.build()?;

while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(record_batch_evolution_processor.process_record_batch(batch))
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,32 @@ use crate::arrow::schema_to_arrow_schema;
use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema};
use crate::{Error, ErrorKind, Result};

/// Represents an operation that needs to be performed
/// to transform a RecordBatch coming from a Parquet file record
/// batch stream so that it conforms to an Iceberg schema that has
/// evolved from the one that was used when the file was written.
/// Indicates how a particular column in a processed RecordBatch should
/// be sourced.
#[derive(Debug)]
pub(crate) enum EvolutionAction {
pub(crate) enum ColumnSource {
// signifies that a column should be passed through unmodified
// from the file's RecordBatch
PassThrough {
source_index: usize,
},

// signifies particular column has undergone type promotion, and so
// the source column with the given index needs to be promoted to the
// specified type
// signifies that a column from the file's RecordBatch has undergone
// type promotion so the source column with the given index needs
// to be promoted to the specified type
Promote {
target_type: DataType,
source_index: usize,
},

// Signifies that a new column has been inserted before the row
// Signifies that a new column has been inserted before the column
// with index `index`. (we choose "before" rather than "after" so
// that we can use usize; if we insert after, then we need to
// be able to store -1 here when we want to indicate that the new
// column is to be added at the front of the list).
// be able to store -1 here to signify that a new
// column is to be added at the front of the column list).
// If multiple columns need to be inserted at a given
// location, they should all be given the same index, as the index
// here refers to the original record batch, not the interim state after
// here refers to the original RecordBatch, not the interim state after
// a preceding operation.
Add {
target_type: DataType,
Expand All @@ -67,43 +66,53 @@ pub(crate) enum EvolutionAction {
// The iceberg spec refers to other permissible schema evolution actions
// (see https://iceberg.apache.org/spec/#schema-evolution):
// renaming fields, deleting fields and reordering fields.
// Renames only affect the RecordBatch schema rather than the
// Renames only affect the schema of the RecordBatch rather than the
// columns themselves, so a single updated cached schema can
// be re-used and no per-column actions are required.
// Deletion and Reorder can be achieved without needing this
// post-processing step by using the projection mask.
}

#[derive(Debug)]
struct SchemaAndOps {
// Every transformed RecordBatch will have the same schema. We create the
// target just once and cache it here. Helpfully, Arc<Schema> is needed in
// the constructor for RecordBatch, so we don't need an expensive copy
// each time.
pub target_schema: Arc<ArrowSchema>,

// Indicates how each column in the target schema is derived.
pub operations: Vec<EvolutionAction>,
enum BatchTransform {
// Indicates that no changes need to be performed to the RecordBatches
// coming in from the stream and that they can be passed through
// unmodified
PassThrough,

Modify {
// Every transformed RecordBatch will have the same schema. We create the
// target just once and cache it here. Helpfully, Arc<Schema> is needed in
// the constructor for RecordBatch, so we don't need an expensive copy
// each time we build a new RecordBatch
target_schema: Arc<ArrowSchema>,

// Indicates how each column in the target schema is derived.
operations: Vec<ColumnSource>,
},
}

#[derive(Debug)]
pub(crate) struct RecordBatchEvolutionProcessor {
pub(crate) struct RecordBatchTransformer {
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: Vec<i32>,
schema_and_ops: Option<SchemaAndOps>,

// BatchTransform gets lazily constructed based on the schema of
// the first RecordBatch we receive from the file
batch_transform: Option<BatchTransform>,
}

impl RecordBatchEvolutionProcessor {
/// Fallibly try to build a RecordBatchEvolutionProcessor for a given parquet file schema
/// and Iceberg snapshot schema. Returns Ok(None) if the processor would not be required
/// due to the file schema already matching the snapshot schema
impl RecordBatchTransformer {
/// Build a RecordBatchTransformer for a given
/// Iceberg snapshot schema and list of projected field ids.
pub(crate) fn build(
// source_schema: &ArrowSchemaRef,
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: &[i32],
) -> Self {
let projected_iceberg_field_ids = if projected_iceberg_field_ids.is_empty() {
// project all fields in table schema order
// If the list of field ids is empty, this indicates that we
// need to select all fields.
// Project all fields in table schema order
snapshot_schema
.as_struct()
.fields()
Expand All @@ -117,61 +126,47 @@ impl RecordBatchEvolutionProcessor {
Self {
snapshot_schema,
projected_iceberg_field_ids,
schema_and_ops: None,
batch_transform: None,
}

// let (operations, target_schema) = Self::generate_operations_and_schema(
// source_schema,
// snapshot_schema,
// projected_iceberg_field_ids,
// )?;
//
// Ok(if target_schema.as_ref() == source_schema.as_ref() {
// None
// } else {
// Some(Self {
// operations,
// target_schema,
// })
// })
}

pub(crate) fn process_record_batch(
&mut self,
record_batch: RecordBatch,
) -> Result<RecordBatch> {
if self.schema_and_ops.is_none() {
self.schema_and_ops = Some(Self::generate_operations_and_schema(
record_batch.schema_ref(),
self.snapshot_schema.as_ref(),
&self.projected_iceberg_field_ids,
)?);
}

let Some(SchemaAndOps {
ref target_schema, ..
}) = self.schema_and_ops
else {
return Err(Error::new(
ErrorKind::Unexpected,
"schema_and_ops always created at this point",
));
};

Ok(RecordBatch::try_new(
target_schema.clone(),
self.transform_columns(record_batch.columns())?,
)?)
Ok(match self.batch_transform {
Some(BatchTransform::PassThrough) => record_batch,
Some(BatchTransform::Modify {
ref target_schema,
ref operations,
}) => RecordBatch::try_new(
target_schema.clone(),
self.transform_columns(record_batch.columns(), operations)?,
)?,
None => {
self.batch_transform = Some(Self::generate_batch_transform(
record_batch.schema_ref(),
self.snapshot_schema.as_ref(),
&self.projected_iceberg_field_ids,
)?);

self.process_record_batch(record_batch)?
}
})
}

// create the (possibly empty) list of `EvolutionOp`s that we need
// to apply to the arrays in a record batch with `source_schema` so
// that it matches the `snapshot_schema`
fn generate_operations_and_schema(
// Compare the schema of the incoming RecordBatches to the schema of
// the Iceberg snapshot to determine what, if any, transformation
// needs to be applied. If the schemas match, we return BatchTransform::PassThrough
// to indicate that no changes need to be made. Otherwise, we return a
// BatchTransform::Modify containing the target RecordBatch schema and
// the list of `ColumnSource`s that indicate how to source each column in
// the resulting RecordBatches.
fn generate_batch_transform(
source_schema: &ArrowSchemaRef,
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
) -> Result<SchemaAndOps> {
) -> Result<BatchTransform> {
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;
Expand All @@ -190,7 +185,11 @@ impl RecordBatchEvolutionProcessor {
.clone())
})
.collect();

let target_schema = ArrowSchema::new(fields?);
if target_schema == *source_schema.as_ref() {
return Ok(BatchTransform::PassThrough);
}

let operations: Result<Vec<_>> = projected_iceberg_field_ids.iter().map(|field_id|{
let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or(
Expand All @@ -203,12 +202,12 @@ impl RecordBatchEvolutionProcessor {

if source_field.data_type().equals_datatype(target_type) {
// no promotion required
EvolutionAction::PassThrough {
ColumnSource::PassThrough {
source_index: *source_index
}
} else {
// promotion required
EvolutionAction::Promote {
ColumnSource::Promote {
target_type: target_type.clone(),
source_index: *source_index,
}
Expand All @@ -222,25 +221,25 @@ impl RecordBatchEvolutionProcessor {
let default_value = if let Some(ref iceberg_default_value) =
&iceberg_field.initial_default
{
let Literal::Primitive(prim_value) = iceberg_default_value else {
let Literal::Primitive(primitive_literal) = iceberg_default_value else {
return Err(Error::new(
ErrorKind::Unexpected,
format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value)
));
};
Some(prim_value.clone())
Some(primitive_literal.clone())
} else {
None
};

EvolutionAction::Add {
ColumnSource::Add {
value: default_value,
target_type: target_type.clone(),
}
})
}).collect();

Ok(SchemaAndOps {
Ok(BatchTransform::Modify {
operations: operations?,
target_schema: Arc::new(target_schema),
})
Expand Down Expand Up @@ -278,37 +277,30 @@ impl RecordBatchEvolutionProcessor {
fn transform_columns(
&self,
columns: &[Arc<dyn ArrowArray>],
operations: &[ColumnSource],
) -> Result<Vec<Arc<dyn ArrowArray>>> {
if columns.is_empty() {
return Ok(columns.to_vec());
}
let num_rows = columns[0].len();

let Some(ref schema_and_ops) = self.schema_and_ops else {
return Err(Error::new(
ErrorKind::Unexpected,
"schema_and_ops was None, but should be present",
));
};

let result: Result<Vec<_>> = schema_and_ops
.operations
operations
.iter()
.map(|op| {
Ok(match op {
EvolutionAction::PassThrough { source_index } => columns[*source_index].clone(),
EvolutionAction::Promote {
ColumnSource::PassThrough { source_index } => columns[*source_index].clone(),

ColumnSource::Promote {
target_type,
source_index,
} => cast(&*columns[*source_index], target_type)?,
EvolutionAction::Add { target_type, value } => {

ColumnSource::Add { target_type, value } => {
Self::create_column(target_type, value, num_rows)?
}
})
})
.collect();

result
.collect()
}

fn create_column(
Expand Down Expand Up @@ -388,16 +380,15 @@ mod test {
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor;
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type};

#[test]
fn build_field_id_to_source_schema_map_works() {
let arrow_schema = arrow_schema_already_same_as_target();

let result =
RecordBatchEvolutionProcessor::build_field_id_to_arrow_schema_map(&arrow_schema)
.unwrap();
RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap();

let expected = HashMap::from_iter([
(10, (arrow_schema.fields()[0].clone(), 0)),
Expand All @@ -415,8 +406,7 @@ mod test {
let snapshot_schema = Arc::new(iceberg_table_schema());
let projected_iceberg_field_ids = [13, 14];

let mut inst =
RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids);
let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids);

let result = inst
.process_record_batch(source_record_batch_no_migration_required())
Expand All @@ -432,8 +422,7 @@ mod test {
let snapshot_schema = Arc::new(iceberg_table_schema());
let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f

let mut inst =
RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids);
let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids);

let result = inst.process_record_batch(source_record_batch()).unwrap();

Expand Down

0 comments on commit e0a1ac8

Please sign in to comment.