From e0a1ac896d2a51fea88062ab0bc918f1b5f56c3e Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Mon, 23 Sep 2024 08:11:50 +0100 Subject: [PATCH] refactor: rename to RecordBatchTransformer. Improve passthrough handling --- crates/iceberg/src/arrow/mod.rs | 2 +- crates/iceberg/src/arrow/reader.rs | 13 +- ...ocessor.rs => record_batch_transformer.rs} | 191 +++++++++--------- 3 files changed, 98 insertions(+), 108 deletions(-) rename crates/iceberg/src/arrow/{record_batch_evolution_processor.rs => record_batch_transformer.rs} (79%) diff --git a/crates/iceberg/src/arrow/mod.rs b/crates/iceberg/src/arrow/mod.rs index b0e0805fe..31a892fa8 100644 --- a/crates/iceberg/src/arrow/mod.rs +++ b/crates/iceberg/src/arrow/mod.rs @@ -20,6 +20,6 @@ mod schema; pub use schema::*; mod reader; -pub(crate) mod record_batch_evolution_processor; +pub(crate) mod record_batch_transformer; pub use reader::*; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 35d51c935..c04653181 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -38,7 +38,7 @@ use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FI use parquet::file::metadata::ParquetMetaData; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; -use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor; +use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::error::Result; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; @@ -195,10 +195,11 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); - // create a RecordBatchEvolutionProcessor if our task schema contains columns - // not present in the parquet file or whose types have been promoted - let mut record_batch_evolution_processor = - RecordBatchEvolutionProcessor::build(task.schema_ref(), task.project_field_ids()); + // RecordBatchTransformer performs any required transformations on the RecordBatches + // that come back from the file, such as type promotion, default column insertion + // and column re-ordering + let mut record_batch_transformer = + RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -241,7 +242,7 @@ impl ArrowReader { let mut record_batch_stream = record_batch_stream_builder.build()?; while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(record_batch_evolution_processor.process_record_batch(batch)) + tx.send(record_batch_transformer.process_record_batch(batch)) .await? } diff --git a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs similarity index 79% rename from crates/iceberg/src/arrow/record_batch_evolution_processor.rs rename to crates/iceberg/src/arrow/record_batch_transformer.rs index 6a58cc68f..dc31a9c10 100644 --- a/crates/iceberg/src/arrow/record_batch_evolution_processor.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -32,33 +32,32 @@ use crate::arrow::schema_to_arrow_schema; use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; use crate::{Error, ErrorKind, Result}; -/// Represents an operation that needs to be performed -/// to transform a RecordBatch coming from a Parquet file record -/// batch stream so that it conforms to an Iceberg schema that has -/// evolved from the one that was used when the file was written. +/// Indicates how a particular column in a processed RecordBatch should +/// be sourced. #[derive(Debug)] -pub(crate) enum EvolutionAction { +pub(crate) enum ColumnSource { // signifies that a column should be passed through unmodified + // from the file's RecordBatch PassThrough { source_index: usize, }, - // signifies particular column has undergone type promotion, and so - // the source column with the given index needs to be promoted to the - // specified type + // signifies that a column from the file's RecordBatch has undergone + // type promotion so the source column with the given index needs + // to be promoted to the specified type Promote { target_type: DataType, source_index: usize, }, - // Signifies that a new column has been inserted before the row + // Signifies that a new column has been inserted before the column // with index `index`. (we choose "before" rather than "after" so // that we can use usize; if we insert after, then we need to - // be able to store -1 here when we want to indicate that the new - // column is to be added at the front of the list). + // be able to store -1 here to signify that a new + // column is to be added at the front of the column list). // If multiple columns need to be inserted at a given // location, they should all be given the same index, as the index - // here refers to the original record batch, not the interim state after + // here refers to the original RecordBatch, not the interim state after // a preceding operation. Add { target_type: DataType, @@ -67,7 +66,7 @@ pub(crate) enum EvolutionAction { // The iceberg spec refers to other permissible schema evolution actions // (see https://iceberg.apache.org/spec/#schema-evolution): // renaming fields, deleting fields and reordering fields. - // Renames only affect the RecordBatch schema rather than the + // Renames only affect the schema of the RecordBatch rather than the // columns themselves, so a single updated cached schema can // be re-used and no per-column actions are required. // Deletion and Reorder can be achieved without needing this @@ -75,35 +74,45 @@ pub(crate) enum EvolutionAction { } #[derive(Debug)] -struct SchemaAndOps { - // Every transformed RecordBatch will have the same schema. We create the - // target just once and cache it here. Helpfully, Arc is needed in - // the constructor for RecordBatch, so we don't need an expensive copy - // each time. - pub target_schema: Arc, - - // Indicates how each column in the target schema is derived. - pub operations: Vec, +enum BatchTransform { + // Indicates that no changes need to be performed to the RecordBatches + // coming in from the stream and that they can be passed through + // unmodified + PassThrough, + + Modify { + // Every transformed RecordBatch will have the same schema. We create the + // target just once and cache it here. Helpfully, Arc is needed in + // the constructor for RecordBatch, so we don't need an expensive copy + // each time we build a new RecordBatch + target_schema: Arc, + + // Indicates how each column in the target schema is derived. + operations: Vec, + }, } #[derive(Debug)] -pub(crate) struct RecordBatchEvolutionProcessor { +pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - schema_and_ops: Option, + + // BatchTransform gets lazily constructed based on the schema of + // the first RecordBatch we receive from the file + batch_transform: Option, } -impl RecordBatchEvolutionProcessor { - /// Fallibly try to build a RecordBatchEvolutionProcessor for a given parquet file schema - /// and Iceberg snapshot schema. Returns Ok(None) if the processor would not be required - /// due to the file schema already matching the snapshot schema +impl RecordBatchTransformer { + /// Build a RecordBatchTransformer for a given + /// Iceberg snapshot schema and list of projected field ids. pub(crate) fn build( - // source_schema: &ArrowSchemaRef, snapshot_schema: Arc, projected_iceberg_field_ids: &[i32], ) -> Self { let projected_iceberg_field_ids = if projected_iceberg_field_ids.is_empty() { - // project all fields in table schema order + // If the list of field ids is empty, this indicates that we + // need to select all fields. + // Project all fields in table schema order snapshot_schema .as_struct() .fields() @@ -117,61 +126,47 @@ impl RecordBatchEvolutionProcessor { Self { snapshot_schema, projected_iceberg_field_ids, - schema_and_ops: None, + batch_transform: None, } - - // let (operations, target_schema) = Self::generate_operations_and_schema( - // source_schema, - // snapshot_schema, - // projected_iceberg_field_ids, - // )?; - // - // Ok(if target_schema.as_ref() == source_schema.as_ref() { - // None - // } else { - // Some(Self { - // operations, - // target_schema, - // }) - // }) } pub(crate) fn process_record_batch( &mut self, record_batch: RecordBatch, ) -> Result { - if self.schema_and_ops.is_none() { - self.schema_and_ops = Some(Self::generate_operations_and_schema( - record_batch.schema_ref(), - self.snapshot_schema.as_ref(), - &self.projected_iceberg_field_ids, - )?); - } - - let Some(SchemaAndOps { - ref target_schema, .. - }) = self.schema_and_ops - else { - return Err(Error::new( - ErrorKind::Unexpected, - "schema_and_ops always created at this point", - )); - }; - - Ok(RecordBatch::try_new( - target_schema.clone(), - self.transform_columns(record_batch.columns())?, - )?) + Ok(match self.batch_transform { + Some(BatchTransform::PassThrough) => record_batch, + Some(BatchTransform::Modify { + ref target_schema, + ref operations, + }) => RecordBatch::try_new( + target_schema.clone(), + self.transform_columns(record_batch.columns(), operations)?, + )?, + None => { + self.batch_transform = Some(Self::generate_batch_transform( + record_batch.schema_ref(), + self.snapshot_schema.as_ref(), + &self.projected_iceberg_field_ids, + )?); + + self.process_record_batch(record_batch)? + } + }) } - // create the (possibly empty) list of `EvolutionOp`s that we need - // to apply to the arrays in a record batch with `source_schema` so - // that it matches the `snapshot_schema` - fn generate_operations_and_schema( + // Compare the schema of the incoming RecordBatches to the schema of + // the Iceberg snapshot to determine what, if any, transformation + // needs to be applied. If the schemas match, we return BatchTransform::PassThrough + // to indicate that no changes need to be made. Otherwise, we return a + // BatchTransform::Modify containing the target RecordBatch schema and + // the list of `ColumnSource`s that indicate how to source each column in + // the resulting RecordBatches. + fn generate_batch_transform( source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - ) -> Result { + ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -190,7 +185,11 @@ impl RecordBatchEvolutionProcessor { .clone()) }) .collect(); + let target_schema = ArrowSchema::new(fields?); + if target_schema == *source_schema.as_ref() { + return Ok(BatchTransform::PassThrough); + } let operations: Result> = projected_iceberg_field_ids.iter().map(|field_id|{ let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( @@ -203,12 +202,12 @@ impl RecordBatchEvolutionProcessor { if source_field.data_type().equals_datatype(target_type) { // no promotion required - EvolutionAction::PassThrough { + ColumnSource::PassThrough { source_index: *source_index } } else { // promotion required - EvolutionAction::Promote { + ColumnSource::Promote { target_type: target_type.clone(), source_index: *source_index, } @@ -222,25 +221,25 @@ impl RecordBatchEvolutionProcessor { let default_value = if let Some(ref iceberg_default_value) = &iceberg_field.initial_default { - let Literal::Primitive(prim_value) = iceberg_default_value else { + let Literal::Primitive(primitive_literal) = iceberg_default_value else { return Err(Error::new( ErrorKind::Unexpected, format!("Default value for column must be primitive type, but encountered {:?}", iceberg_default_value) )); }; - Some(prim_value.clone()) + Some(primitive_literal.clone()) } else { None }; - EvolutionAction::Add { + ColumnSource::Add { value: default_value, target_type: target_type.clone(), } }) }).collect(); - Ok(SchemaAndOps { + Ok(BatchTransform::Modify { operations: operations?, target_schema: Arc::new(target_schema), }) @@ -278,37 +277,30 @@ impl RecordBatchEvolutionProcessor { fn transform_columns( &self, columns: &[Arc], + operations: &[ColumnSource], ) -> Result>> { if columns.is_empty() { return Ok(columns.to_vec()); } let num_rows = columns[0].len(); - let Some(ref schema_and_ops) = self.schema_and_ops else { - return Err(Error::new( - ErrorKind::Unexpected, - "schema_and_ops was None, but should be present", - )); - }; - - let result: Result> = schema_and_ops - .operations + operations .iter() .map(|op| { Ok(match op { - EvolutionAction::PassThrough { source_index } => columns[*source_index].clone(), - EvolutionAction::Promote { + ColumnSource::PassThrough { source_index } => columns[*source_index].clone(), + + ColumnSource::Promote { target_type, source_index, } => cast(&*columns[*source_index], target_type)?, - EvolutionAction::Add { target_type, value } => { + + ColumnSource::Add { target_type, value } => { Self::create_column(target_type, value, num_rows)? } }) }) - .collect(); - - result + .collect() } fn create_column( @@ -388,7 +380,7 @@ mod test { use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; - use crate::arrow::record_batch_evolution_processor::RecordBatchEvolutionProcessor; + use crate::arrow::record_batch_transformer::RecordBatchTransformer; use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type}; #[test] @@ -396,8 +388,7 @@ mod test { let arrow_schema = arrow_schema_already_same_as_target(); let result = - RecordBatchEvolutionProcessor::build_field_id_to_arrow_schema_map(&arrow_schema) - .unwrap(); + RecordBatchTransformer::build_field_id_to_arrow_schema_map(&arrow_schema).unwrap(); let expected = HashMap::from_iter([ (10, (arrow_schema.fields()[0].clone(), 0)), @@ -415,8 +406,7 @@ mod test { let snapshot_schema = Arc::new(iceberg_table_schema()); let projected_iceberg_field_ids = [13, 14]; - let mut inst = - RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids); + let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); let result = inst .process_record_batch(source_record_batch_no_migration_required()) @@ -432,8 +422,7 @@ mod test { let snapshot_schema = Arc::new(iceberg_table_schema()); let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f - let mut inst = - RecordBatchEvolutionProcessor::build(snapshot_schema, &projected_iceberg_field_ids); + let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); let result = inst.process_record_batch(source_record_batch()).unwrap();