Skip to content

Commit

Permalink
[CLN] Propagate errors for block get/fork (#2778)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
	 - These errors were not properly propagated before
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
Non functional changes. Existing tests. We should add tests for these
cases
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB committed Sep 13, 2024
1 parent 7fbc397 commit 586ae7e
Show file tree
Hide file tree
Showing 24 changed files with 346 additions and 167 deletions.
22 changes: 11 additions & 11 deletions rust/blockstore/src/arrow/block/delta/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ mod test {
values_before_flush.push(read.to_vec());
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&block.clone().id).await.unwrap();
let block = block_manager.get(&block.clone().id).await.unwrap().unwrap();
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, &[u32]>("prefix", &key).unwrap();
Expand Down Expand Up @@ -214,7 +214,7 @@ mod test {
}
block_manager.flush(&block).await.unwrap();

let block = block_manager.get(&delta_id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();

assert_eq!(size, block.get_size());
for i in 0..n {
Expand All @@ -232,11 +232,11 @@ mod test {
}

// test fork
let forked_block = block_manager.fork::<&str, String>(&delta_id).await;
let forked_block = block_manager.fork::<&str, String>(&delta_id).await.unwrap();
let new_id = forked_block.id.clone();
let block = block_manager.commit::<&str, String>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap().unwrap();
for i in 0..n {
let key = format!("key{}", i);
let read = forked_block.get::<&str, &str>("prefix", &key);
Expand Down Expand Up @@ -271,7 +271,7 @@ mod test {
values_before_flush.push(read);
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();
assert_eq!(size, block.get_size());
for i in 0..n {
let key = i as f32;
Expand Down Expand Up @@ -303,7 +303,7 @@ mod test {
let delta_id = delta.id.clone();
let block = block_manager.commit::<&str, RoaringBitmap>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();

assert_eq!(size, block.get_size());

Expand Down Expand Up @@ -368,7 +368,7 @@ mod test {
let delta_id = delta.id.clone();
let block = block_manager.commit::<&str, &DataRecord>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();
for i in 0..3 {
let read = block.get::<&str, DataRecord>("", ids[i]).unwrap();
assert_eq!(read.id, ids[i]);
Expand Down Expand Up @@ -403,7 +403,7 @@ mod test {
let delta_id = delta.id.clone();
let block = block_manager.commit::<u32, String>(delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();
assert_eq!(size, block.get_size());

// test save/load
Expand Down Expand Up @@ -437,7 +437,7 @@ mod test {
}
block_manager.flush(&block).await.unwrap();

let block = block_manager.get(&delta_id).await.unwrap();
let block = block_manager.get(&delta_id).await.unwrap().unwrap();

assert_eq!(size, block.get_size());
for i in 0..n {
Expand All @@ -455,11 +455,11 @@ mod test {
}

// test fork
let forked_block = block_manager.fork::<u32, u32>(&delta_id).await;
let forked_block = block_manager.fork::<u32, u32>(&delta_id).await.unwrap();
let new_id = forked_block.id.clone();
let block = block_manager.commit::<u32, u32>(forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap().unwrap();
for i in 0..n {
let key = i as u32;
let read = forked_block.get::<u32, u32>("prefix", key);
Expand Down
11 changes: 11 additions & 0 deletions rust/blockstore/src/arrow/block/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,17 @@ pub enum BlockLoadError {
NoRecordBatches,
}

impl ChromaError for BlockLoadError {
fn code(&self) -> ErrorCodes {
match self {
BlockLoadError::IOError(_) => ErrorCodes::Internal,
BlockLoadError::ArrowError(_) => ErrorCodes::Internal,
BlockLoadError::ArrowLayoutVerificationError(_) => ErrorCodes::Internal,
BlockLoadError::NoRecordBatches => ErrorCodes::Internal,
}
}
}

/*
===== Layout Verification =====
*/
Expand Down
Loading

0 comments on commit 586ae7e

Please sign in to comment.