Skip to content

Commit

Permalink
feat: RecordBatchEvolutionProcessor handles skipped fields in projection
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Sep 9, 2024
1 parent 6d8ad87 commit bf6d2ef
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 198 deletions.
19 changes: 5 additions & 14 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,8 @@ impl ArrowReader {

// create a RecordBatchEvolutionProcessor if our task schema contains columns
// not present in the parquet file or whose types have been promoted
let record_batch_evolution_processor = RecordBatchEvolutionProcessor::build(
record_batch_stream_builder.schema(),
task.schema(),
task.project_field_ids(),
)?;
let mut record_batch_evolution_processor =
RecordBatchEvolutionProcessor::build(task.schema_ref(), task.project_field_ids());

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -243,15 +240,9 @@ impl ArrowReader {
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;

if let Some(record_batch_evolution_processor) = record_batch_evolution_processor {
while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(record_batch_evolution_processor.process_record_batch(batch))
.await?
}
} else {
while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
}
while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(record_batch_evolution_processor.process_record_batch(batch))
.await?
}

Ok(())
Expand Down
Loading

0 comments on commit bf6d2ef

Please sign in to comment.