Skip to content

Commit

Permalink
feat: complete work to support positional deletes in table scans
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Oct 9, 2024
1 parent 655780f commit 8d73668
Show file tree
Hide file tree
Showing 3 changed files with 687 additions and 86 deletions.
180 changes: 113 additions & 67 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,53 +194,68 @@ impl ArrowReader {
};

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();

#[allow(clippy::redundant_closure)] // clippy's recommendation fails to compile
futures::stream::iter(delete_file_entries.iter().map(|df| crate::Result::Ok(df)))
.try_for_each_concurrent(concurrency_limit_data_files, |entry| {
let file_io = file_io.clone();
let mut tx = tx.clone();
async move {
let FileScanTaskDeleteFile {
ref file_path,
file_type,
} = entry;

let record_batch_stream =
Self::create_parquet_record_batch_stream_builder(
file_path,
file_io.clone(),
false,
)
.await?
.build()?
.map(|item| match item {
Ok(val) => Ok(val),
Err(err) => Err(Error::new(ErrorKind::DataInvalid, err.to_string())
.with_source(err)),
})
.boxed();

let result = match file_type {
DataContentType::PositionDeletes => {
parse_positional_delete_file(record_batch_stream).await
}
DataContentType::EqualityDeletes => {
parse_equality_delete_file(record_batch_stream).await
spawn(async move {
#[allow(clippy::redundant_closure)] // clippy's recommendation fails to compile
let result =
futures::stream::iter(delete_file_entries.iter().map(|df| crate::Result::Ok(df)))
.try_for_each_concurrent(concurrency_limit_data_files, |entry| {
let file_io = file_io.clone();
let mut tx = tx.clone();
async move {
let FileScanTaskDeleteFile {
ref file_path,
file_type,
} = entry;

let record_batch_stream =
Self::create_parquet_record_batch_stream_builder(
file_path,
file_io.clone(),
false,
)
.await?
.build()?
.map(|item| match item {
Ok(val) => Ok(val),
Err(err) => {
Err(Error::new(ErrorKind::DataInvalid, err.to_string())
.with_source(err))
}
})
.boxed();

let result = match file_type {
DataContentType::PositionDeletes => {
parse_positional_delete_file(record_batch_stream).await
}
DataContentType::EqualityDeletes => {
parse_equality_delete_file(record_batch_stream).await
}
_ => Err(Error::new(
ErrorKind::Unexpected,
"Expected equality or positional delete",
)),
}?;

tx.send(Ok(result)).await?;
Ok(())
}
_ => Err(Error::new(
ErrorKind::Unexpected,
"Expected equality or positional delete",
)),
}?;

tx.send(result).await?;
Ok(())
}
})
.await?;
})
.await;

if let Err(error) = result {
let _ = channel_for_error.send(Err(error)).await;
}
});

Ok(Some(rx.collect::<Vec<_>>().await))
let results = rx.try_collect::<Vec<_>>().await?;
if results.is_empty() {
Ok(None)
} else {
Ok(Some(results))
}
}

fn get_positional_delete_indexes(
Expand Down Expand Up @@ -299,7 +314,8 @@ impl ArrowReader {
async move { Self::get_deletes(delete_file_tasks, file_io, 5).await }
});

let should_load_page_index = row_selection_enabled && task.predicate.is_some();
let should_load_page_index =
(row_selection_enabled && task.predicate.is_some()) || task.deletes.is_some();
let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder(
&task.data_file_path,
file_io,
Expand Down Expand Up @@ -663,7 +679,7 @@ impl ArrowReader {
fn get_deleted_row_selection(
parquet_metadata: &Arc<ParquetMetaData>,
selected_row_groups: &Option<Vec<usize>>,
positional_delete_indexes: &[usize],
positional_deletes: &[usize],
) -> Result<RowSelection> {
let Some(offset_index) = parquet_metadata.offset_index() else {
return Err(Error::new(
Expand All @@ -672,47 +688,77 @@ impl ArrowReader {
));
};

let /*mut*/ selected_row_groups_idx = 0;
let mut selected_row_groups_idx = 0;
let mut curr_pos_del_idx = 0;
let pos_del_len = positional_deletes.len();

let page_index = offset_index
.iter()
.enumerate()
.zip(parquet_metadata.row_groups());

let results: Vec<RowSelector> = Vec::new();
let mut results: Vec<RowSelector> = Vec::new();
let mut current_page_base_idx: usize = 0;
for ((idx, _offset_index), row_group_metadata) in page_index {
let page_num_rows = row_group_metadata.num_rows() as usize;
let _next_page_base_idx = current_page_base_idx + page_num_rows;
let next_page_base_idx = current_page_base_idx + page_num_rows;

// skip any row groups that aren't in the row group selection
if let Some(selected_row_groups) = selected_row_groups {
// skip row groups that aren't present in selected_row_groups
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
if idx == selected_row_groups[selected_row_groups_idx] {
// selected_row_groups_idx += 1;
selected_row_groups_idx += 1;
} else {
current_page_base_idx += page_num_rows;
continue;
}
}

let Some(_next_delete_row_idx) = positional_delete_indexes.last() else {
break;
};
let mut next_deleted_row_idx = positional_deletes[curr_pos_del_idx];

// if the index of the next deleted row is beyond this page, skip
// to the next page
if next_deleted_row_idx >= next_page_base_idx {
continue;
}

let mut current_idx = current_page_base_idx;
while next_deleted_row_idx < next_page_base_idx {
// select all rows that precede the next delete index
if current_idx < next_deleted_row_idx {
let run_length = next_deleted_row_idx - current_idx;
results.push(RowSelector::select(run_length));
current_idx += run_length;
}

// skip all consecutive deleted rows
let mut run_length = 1;
while curr_pos_del_idx < pos_del_len - 1
&& positional_deletes[curr_pos_del_idx + 1] == next_deleted_row_idx + 1
{
run_length += 1;
curr_pos_del_idx += 1;
}
results.push(RowSelector::skip(run_length));
current_idx += run_length;

curr_pos_del_idx += 1;
if curr_pos_del_idx >= pos_del_len {
break;
}
next_deleted_row_idx = positional_deletes[curr_pos_del_idx];
}

if let Some(selected_row_groups) = selected_row_groups {
if selected_row_groups_idx == selected_row_groups.len() {
break;
}
}

// TODO: logic goes here to create `RowSelection`s for the
// current page, based on popping delete indices that are
// on this page
break;
// while *next_delete_row_idx < next_page_base_idx {
// }

// if let Some(selected_row_groups) = selected_row_groups {
// if selected_row_groups_idx == selected_row_groups.len() {
// break;
// }
// }
//
// current_page_base_idx += page_num_rows;
current_page_base_idx += page_num_rows;
}

Ok(results.into())
Expand Down
Loading

0 comments on commit 8d73668

Please sign in to comment.