Skip to content

Commit

Permalink
reorder record batch
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Sep 11, 2024
1 parent dfccd0e commit 3333aa6
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl ArrowReader {

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = Self::get_arrow_projection_mask(
let (projection_mask, reorder) = Self::get_arrow_projection_mask(
&task.project_field_ids,
&task.schema,
record_batch_stream_builder.parquet_schema(),
Expand Down Expand Up @@ -234,6 +234,11 @@ impl ArrowReader {
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;
while let Some(batch) = record_batch_stream.try_next().await? {
let batch = if let Some(reorder) = reorder.as_ref() {
batch.project(&reorder).expect("must be able to reorder")
} else {
batch
};
tx.send(Ok(batch)).await?
}

Expand Down Expand Up @@ -261,9 +266,9 @@ impl ArrowReader {
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> Result<ProjectionMask> {
) -> Result<(ProjectionMask, Option<Vec<usize>>)> {
if field_ids.is_empty() {
Ok(ProjectionMask::all())
Ok((ProjectionMask::all(), None))
} else {
// Build the map between field id and column index in Parquet schema.
let mut column_map = HashMap::new();
Expand Down Expand Up @@ -322,7 +327,12 @@ impl ArrowReader {
));
}
}
Ok(ProjectionMask::leaves(parquet_schema, indices))

let mut indexed_pairs: Vec<(usize, usize)> = indices.iter().cloned().enumerate().collect();
indexed_pairs.sort_by_key(|&(_, index)| index);
let reorder_vec: Vec<usize> = indexed_pairs.iter().map(|&(pos, _)| pos).collect();

Ok((ProjectionMask::leaves(parquet_schema, indices), Some(reorder_vec)))
}
}

Expand Down

0 comments on commit 3333aa6

Please sign in to comment.