diff --git a/rust/blockstore/src/arrow/block/delta/delta.rs b/rust/blockstore/src/arrow/block/delta/delta.rs index e21da3f5732..39385b01796 100644 --- a/rust/blockstore/src/arrow/block/delta/delta.rs +++ b/rust/blockstore/src/arrow/block/delta/delta.rs @@ -177,7 +177,7 @@ mod test { values_before_flush.push(read.to_vec()); } block_manager.flush(&block).await.unwrap(); - let block = block_manager.get(&block.clone().id).await.unwrap(); + let block = block_manager.get(&block.clone().id).await.unwrap().unwrap(); for i in 0..n { let key = format!("key{}", i); let read = block.get::<&str, &[u32]>("prefix", &key).unwrap(); @@ -214,7 +214,7 @@ mod test { } block_manager.flush(&block).await.unwrap(); - let block = block_manager.get(&delta_id).await.unwrap(); + let block = block_manager.get(&delta_id).await.unwrap().unwrap(); assert_eq!(size, block.get_size()); for i in 0..n { @@ -232,11 +232,11 @@ mod test { } // test fork - let forked_block = block_manager.fork::<&str, String>(&delta_id).await; + let forked_block = block_manager.fork::<&str, String>(&delta_id).await.unwrap(); let new_id = forked_block.id.clone(); let block = block_manager.commit::<&str, String>(forked_block); block_manager.flush(&block).await.unwrap(); - let forked_block = block_manager.get(&new_id).await.unwrap(); + let forked_block = block_manager.get(&new_id).await.unwrap().unwrap(); for i in 0..n { let key = format!("key{}", i); let read = forked_block.get::<&str, &str>("prefix", &key); @@ -271,7 +271,7 @@ mod test { values_before_flush.push(read); } block_manager.flush(&block).await.unwrap(); - let block = block_manager.get(&delta_id).await.unwrap(); + let block = block_manager.get(&delta_id).await.unwrap().unwrap(); assert_eq!(size, block.get_size()); for i in 0..n { let key = i as f32; @@ -303,7 +303,7 @@ mod test { let delta_id = delta.id.clone(); let block = block_manager.commit::<&str, RoaringBitmap>(delta); block_manager.flush(&block).await.unwrap(); - let block = block_manager.get(&delta_id).await.unwrap(); + let block = block_manager.get(&delta_id).await.unwrap().unwrap(); assert_eq!(size, block.get_size()); @@ -368,7 +368,7 @@ mod test { let delta_id = delta.id.clone(); let block = block_manager.commit::<&str, &DataRecord>(delta); block_manager.flush(&block).await.unwrap(); - let block = block_manager.get(&delta_id).await.unwrap(); + let block = block_manager.get(&delta_id).await.unwrap().unwrap(); for i in 0..3 { let read = block.get::<&str, DataRecord>("", ids[i]).unwrap(); assert_eq!(read.id, ids[i]); @@ -403,7 +403,7 @@ mod test { let delta_id = delta.id.clone(); let block = block_manager.commit::(delta); block_manager.flush(&block).await.unwrap(); - let block = block_manager.get(&delta_id).await.unwrap(); + let block = block_manager.get(&delta_id).await.unwrap().unwrap(); assert_eq!(size, block.get_size()); // test save/load @@ -437,7 +437,7 @@ mod test { } block_manager.flush(&block).await.unwrap(); - let block = block_manager.get(&delta_id).await.unwrap(); + let block = block_manager.get(&delta_id).await.unwrap().unwrap(); assert_eq!(size, block.get_size()); for i in 0..n { @@ -455,11 +455,11 @@ mod test { } // test fork - let forked_block = block_manager.fork::(&delta_id).await; + let forked_block = block_manager.fork::(&delta_id).await.unwrap(); let new_id = forked_block.id.clone(); let block = block_manager.commit::(forked_block); block_manager.flush(&block).await.unwrap(); - let forked_block = block_manager.get(&new_id).await.unwrap(); + let forked_block = block_manager.get(&new_id).await.unwrap().unwrap(); for i in 0..n { let key = i as u32; let read = forked_block.get::("prefix", key); diff --git a/rust/blockstore/src/arrow/block/types.rs b/rust/blockstore/src/arrow/block/types.rs index 65e94e5c3cb..6c38f258d57 100644 --- a/rust/blockstore/src/arrow/block/types.rs +++ b/rust/blockstore/src/arrow/block/types.rs @@ -564,6 +564,17 @@ pub enum BlockLoadError { NoRecordBatches, } +impl ChromaError for BlockLoadError { + fn code(&self) -> ErrorCodes { + match self { + BlockLoadError::IOError(_) => ErrorCodes::Internal, + BlockLoadError::ArrowError(_) => ErrorCodes::Internal, + BlockLoadError::ArrowLayoutVerificationError(_) => ErrorCodes::Internal, + BlockLoadError::NoRecordBatches => ErrorCodes::Internal, + } + } +} + /* ===== Layout Verification ===== */ diff --git a/rust/blockstore/src/arrow/blockfile.rs b/rust/blockstore/src/arrow/blockfile.rs index ed7c7f6baad..f2f62d14bd9 100644 --- a/rust/blockstore/src/arrow/blockfile.rs +++ b/rust/blockstore/src/arrow/blockfile.rs @@ -1,3 +1,4 @@ +use super::provider::GetError; use super::{block::delta::BlockDelta, provider::BlockManager}; use super::{ block::Block, @@ -105,7 +106,6 @@ impl ArrowBlockfileWriter { removed = self.sparse_index.remove_block(&delta.id); } if !removed { - // TODO: might these error? let block = self.block_manager.commit::(delta); blocks.push(block); } @@ -119,7 +119,6 @@ impl ArrowBlockfileWriter { self.id, ); - // TODO: we need to update the sparse index with the new min keys? Ok(flusher) } @@ -142,7 +141,6 @@ impl ArrowBlockfileWriter { // See if a delta for the target block already exists, if not create a new one and add it to the transaction state // Creating a delta loads the block entirely into memory - // TODO: replace with R/W lock let delta = { let deltas = self.block_deltas.lock(); let delta = match deltas.get(&target_block_id) { @@ -154,8 +152,21 @@ impl ArrowBlockfileWriter { let delta = match delta { None => { - let block = self.block_manager.get(&target_block_id).await.unwrap(); - let new_delta = self.block_manager.fork::(&block.id).await; + let block = match self.block_manager.get(&target_block_id).await { + Ok(Some(block)) => block, + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let new_delta = match self.block_manager.fork::(&block.id).await { + Ok(delta) => delta, + Err(e) => { + return Err(Box::new(e)); + } + }; let new_id = new_delta.id; // Blocks can be empty. self.sparse_index @@ -206,8 +217,21 @@ impl ArrowBlockfileWriter { let delta = match delta { None => { - let block = self.block_manager.get(&target_block_id).await.unwrap(); - let new_delta = self.block_manager.fork::(&block.id).await; + let block = match self.block_manager.get(&target_block_id).await { + Ok(Some(block)) => block, + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let new_delta = match self.block_manager.fork::(&block.id).await { + Ok(delta) => delta, + Err(e) => { + return Err(Box::new(e)); + } + }; let new_id = new_delta.id; self.sparse_index .replace_block(target_block_id, new_delta.id); @@ -254,9 +278,17 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me } } - pub(super) async fn get_block(&self, block_id: Uuid) -> Option<&Block> { + pub async fn get_block(&self, block_id: Uuid) -> Result, GetError> { if !self.loaded_blocks.lock().contains_key(&block_id) { - let block = self.block_manager.get(&block_id).await?; + let block = match self.block_manager.get(&block_id).await { + Ok(Some(block)) => block, + Ok(None) => { + return Ok(None); + } + Err(e) => { + return Err(e); + } + }; self.loaded_blocks.lock().insert(block_id, Box::new(block)); } @@ -270,10 +302,10 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me // We never drop the Box while the reference is still alive // We never drop the HashMap while the reference is still alive // We never drop the HashMap while the Box is still alive - return Some(unsafe { transmute(&**block) }); + return Ok(Some(unsafe { transmute(&**block) })); } - None + Ok(None) } /// Load all required blocks into the underlying block manager so that @@ -319,11 +351,14 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me let target_block_id = self.sparse_index.get_target_block_id(&search_key); let block = self.get_block(target_block_id).await; let res = match block { - Some(block) => block.get(prefix, key.clone()), - None => { + Ok(Some(block)) => block.get(prefix, key.clone()), + Ok(None) => { tracing::error!("Block with id {:?} not found", target_block_id); return Err(Box::new(ArrowBlockfileError::BlockNotFound)); } + Err(e) => { + return Err(Box::new(e)); + } }; match res { Some(value) => Ok(value), @@ -351,7 +386,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me let sparse_index_forward = self.sparse_index.forward.lock(); *sparse_index_forward.iter().nth(i).unwrap().1 }; - block = self.get_block(uuid).await; + block = match self.get_block(uuid).await { + Ok(Some(block)) => Some(block), + Ok(None) => { + tracing::error!("Block with id {:?} not found", uuid); + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; match block { Some(b) => { if block_offset + b.len() > index { @@ -392,7 +436,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me let mut result: Vec<(&str, K, V)> = vec![]; // Read all the blocks individually to get keys > key. for block_id in block_ids { - let block_opt = self.get_block(block_id).await; + let block_opt = match self.get_block(block_id).await { + Ok(Some(block)) => Some(block), + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let block = match block_opt { Some(b) => b, None => { @@ -422,7 +475,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me let mut result: Vec<(&str, K, V)> = vec![]; // Read all the blocks individually to get keys < key. for block_id in block_ids { - let block_opt = self.get_block(block_id).await; + let block_opt = match self.get_block(block_id).await { + Ok(Some(block)) => Some(block), + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let block = match block_opt { Some(b) => b, None => { @@ -452,7 +514,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me let mut result: Vec<(&str, K, V)> = vec![]; // Read all the blocks individually to get keys >= key. for block_id in block_ids { - let block_opt = self.get_block(block_id).await; + let block_opt = match self.get_block(block_id).await { + Ok(Some(block)) => Some(block), + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let block = match block_opt { Some(b) => b, None => { @@ -482,7 +553,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me let mut result: Vec<(&str, K, V)> = vec![]; // Read all the blocks individually to get keys <= key. for block_id in block_ids { - let block_opt = self.get_block(block_id).await; + let block_opt = match self.get_block(block_id).await { + Ok(Some(block)) => Some(block), + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let block = match block_opt { Some(b) => b, None => { @@ -509,7 +589,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me let block_ids = self.sparse_index.get_block_ids_prefix(prefix); let mut result: Vec<(&str, K, V)> = vec![]; for block_id in block_ids { - let block_opt = self.get_block(block_id).await; + let block_opt = match self.get_block(block_id).await { + Ok(Some(block)) => Some(block), + Ok(None) => { + return Err(Box::new(ArrowBlockfileError::BlockNotFound)); + } + Err(e) => { + return Err(Box::new(e)); + } + }; + let block = match block_opt { Some(b) => b, None => { @@ -528,19 +617,25 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me Ok(result) } - pub(crate) async fn contains(&'me self, prefix: &str, key: K) -> bool { + pub(crate) async fn contains( + &'me self, + prefix: &str, + key: K, + ) -> Result> { let search_key = CompositeKey::new(prefix.to_string(), key.clone()); let target_block_id = self.sparse_index.get_target_block_id(&search_key); - let block = self.get_block(target_block_id).await; - let res: Option = match block { - Some(block) => block.get(prefix, key), - None => { - return false; + let block = match self.get_block(target_block_id).await { + Ok(Some(block)) => block, + Ok(None) => { + return Ok(false); + } + Err(e) => { + return Err(Box::new(e)); } }; - match res { - Some(_) => true, - None => false, + match block.get::(prefix, key) { + Some(_) => Ok(true), + None => Ok(false), } } @@ -558,13 +653,16 @@ impl<'me, K: ArrowReadableKey<'me> + Into, V: ArrowReadableValue<'me self.load_blocks(&block_ids).await; let mut result: usize = 0; for block_id in block_ids { - let block = self.get_block(block_id).await; - match block { - Some(b) => result = result + b.len(), - None => { + let block = match self.get_block(block_id).await { + Ok(Some(block)) => block, + Ok(None) => { return Err(Box::new(ArrowBlockfileError::BlockNotFound)); } - } + Err(e) => { + return Err(Box::new(e)); + } + }; + result = result + block.len(); } Ok(result) @@ -1394,7 +1492,7 @@ mod tests { for i in 0..delete_end { let key = format!("{:04}", i); - assert_eq!(reader.contains("key", &key).await, false); + assert_eq!(reader.contains("key", &key).await.unwrap(), false); } for i in delete_end..n * 2 { diff --git a/rust/blockstore/src/arrow/provider.rs b/rust/blockstore/src/arrow/provider.rs index aca9e341557..a23866ec66a 100644 --- a/rust/blockstore/src/arrow/provider.rs +++ b/rust/blockstore/src/arrow/provider.rs @@ -1,5 +1,5 @@ use super::{ - block::{delta::BlockDelta, Block}, + block::{delta::BlockDelta, Block, BlockLoadError}, blockfile::{ArrowBlockfileReader, ArrowBlockfileWriter}, config::ArrowBlockfileProviderConfig, sparse_index::SparseIndex, @@ -54,12 +54,15 @@ impl ArrowBlockfileProvider { ) -> Result, Box> { let sparse_index = self.sparse_index_manager.get::(id).await; match sparse_index { - Some(sparse_index) => Ok(BlockfileReader::ArrowBlockfileReader( + Ok(Some(sparse_index)) => Ok(BlockfileReader::ArrowBlockfileReader( ArrowBlockfileReader::new(*id, self.block_manager.clone(), sparse_index), )), - None => { + Ok(None) => { return Err(Box::new(OpenError::NotFound)); } + Err(e) => { + return Err(Box::new(OpenError::Other(Box::new(e)))); + } } } @@ -91,7 +94,14 @@ impl ArrowBlockfileProvider { ) -> Result> { tracing::info!("Forking blockfile from {:?}", id); let new_id = Uuid::new_v4(); - let new_sparse_index = self.sparse_index_manager.fork::(id, new_id).await; + let new_sparse_index = self + .sparse_index_manager + .fork::(id, new_id) + .await + .map_err(|e| { + tracing::error!("Error forking sparse index: {:?}", e); + Box::new(CreateError::Other(Box::new(e))) + })?; let file = ArrowBlockfileWriter::from_sparse_index( new_id, self.block_manager.clone(), @@ -139,6 +149,40 @@ impl Configurable<(ArrowBlockfileProviderConfig, Storage)> for ArrowBlockfilePro } } +#[derive(Error, Debug)] +pub(super) enum GetError { + #[error(transparent)] + BlockLoadError(#[from] BlockLoadError), + #[error(transparent)] + StorageGetError(#[from] chroma_storage::GetError), +} + +impl ChromaError for GetError { + fn code(&self) -> ErrorCodes { + match self { + GetError::BlockLoadError(e) => e.code(), + GetError::StorageGetError(e) => e.code(), + } + } +} + +#[derive(Error, Debug)] +pub(super) enum ForkError { + #[error("Block not found")] + BlockNotFound, + #[error(transparent)] + GetError(#[from] GetError), +} + +impl ChromaError for ForkError { + fn code(&self) -> ErrorCodes { + match self { + ForkError::BlockNotFound => ErrorCodes::NotFound, + ForkError::GetError(e) => e.code(), + } + } +} + /// A simple local cache of Arrow-backed blocks, the blockfile provider passes this /// to the ArrowBlockfile when it creates a new blockfile. So that the blockfile can manage and access blocks /// # Note @@ -176,19 +220,21 @@ impl BlockManager { pub(super) async fn fork( &self, block_id: &Uuid, - ) -> BlockDelta { + ) -> Result { let block = self.get(block_id).await; let block = match block { - Some(block) => block, - None => { - // TODO: Err - tried to fork a block not owned by this manager - panic!("Tried to fork a block not owned by this manager") + Ok(Some(block)) => block, + Ok(None) => { + return Err(ForkError::BlockNotFound); + } + Err(e) => { + return Err(ForkError::GetError(e)); } }; let new_block_id = Uuid::new_v4(); let delta = BlockDelta::new::(new_block_id); let populated_delta = self.fork_lifetime_scope::(&block, delta); - populated_delta + Ok(populated_delta) } fn fork_lifetime_scope<'new, KeyWrite, ValueWrite>( @@ -217,10 +263,10 @@ impl BlockManager { self.block_cache.get(id).is_some() } - pub(super) async fn get(&self, id: &Uuid) -> Option { + pub(super) async fn get(&self, id: &Uuid) -> Result, GetError> { let block = self.block_cache.get(id); match block { - Some(block) => Some(block.clone()), + Some(block) => Ok(Some(block.clone())), None => async { let key = format!("block/{}", id); let bytes_res = self @@ -240,29 +286,27 @@ impl BlockManager { let _guard = self.write_mutex.lock().await; match self.block_cache.get(id) { Some(b) => { - return Some(b); + return Ok(Some(b)); } None => { self.block_cache.insert(*id, block.clone()); - Some(block) + Ok(Some(block)) } } } Err(e) => { - // TODO: Return an error to callsite instead of None. tracing::error!( "Error converting bytes to Block {:?}/{:?}", key, e ); - None + return Err(GetError::BlockLoadError(e)); } } } Err(e) => { tracing::error!("Error converting bytes to Block {:?}", e); - // TODO: Return error instead of None. - return None; + return Err(GetError::StorageGetError(e)); } } }.instrument(tracing::trace_span!(parent: Span::current(), "BlockManager get cold")).await @@ -310,6 +354,29 @@ impl ChromaError for BlockFlushError { } } +#[derive(Error, Debug)] +pub(super) enum SparseIndexManagerError { + #[error("Not found")] + NotFound, + #[error(transparent)] + BlockLoadError(#[from] BlockLoadError), + #[error(transparent)] + UUIDParseError(#[from] uuid::Error), + #[error(transparent)] + StorageGetError(#[from] chroma_storage::GetError), +} + +impl ChromaError for SparseIndexManagerError { + fn code(&self) -> ErrorCodes { + match self { + SparseIndexManagerError::NotFound => ErrorCodes::NotFound, + SparseIndexManagerError::BlockLoadError(e) => e.code(), + SparseIndexManagerError::StorageGetError(e) => e.code(), + SparseIndexManagerError::UUIDParseError(_) => ErrorCodes::DataLoss, + } + } +} + #[derive(Clone)] pub(super) struct SparseIndexManager { cache: Cache, @@ -324,12 +391,11 @@ impl SparseIndexManager { pub async fn get<'new, K: ArrowReadableKey<'new> + 'new>( &self, id: &Uuid, - ) -> Option { + ) -> Result, SparseIndexManagerError> { let index = self.cache.get(id); match index { - Some(index) => Some(index), + Some(index) => Ok(Some(index)), None => { - // TODO: move this to a separate function tracing::info!("Cache miss - fetching sparse index from storage"); let key = format!("sparse_index/{}", id); tracing::debug!("Reading sparse index from storage with key: {}", key); @@ -348,7 +414,7 @@ impl SparseIndexManager { "Error reading sparse index from storage: {}", e ); - return None; + return Err(SparseIndexManagerError::StorageGetError(e)); } } } @@ -365,29 +431,26 @@ impl SparseIndexManager { match index { Ok(index) => { self.cache.insert(*id, index.clone()); - return Some(index); + return Ok(Some(index)); } Err(e) => { - // TODO: return error tracing::error!( "Error turning block into sparse index: {}", e ); - return None; + return Err(SparseIndexManagerError::UUIDParseError(e)); } } } Err(e) => { - // TODO: return error tracing::error!("Error turning bytes into block: {}", e); - return None; + return Err(SparseIndexManagerError::BlockLoadError(e)); } } } Err(e) => { - // TODO: return error tracing::error!("Error reading sparse index from storage: {}", e); - return None; + return Err(SparseIndexManagerError::StorageGetError(e)); } } } @@ -432,12 +495,16 @@ impl SparseIndexManager { &self, old_id: &Uuid, new_id: Uuid, - ) -> SparseIndex { - // TODO: error handling + ) -> Result { tracing::info!("Forking sparse index from {:?}", old_id); - let original = self.get::>(old_id).await.unwrap(); - let forked = original.fork(new_id); - forked + let original = self.get::>(old_id).await?; + match original { + Some(original) => { + let forked = original.fork(new_id); + Ok(forked) + } + None => Err(SparseIndexManagerError::NotFound), + } } } diff --git a/rust/blockstore/src/arrow/sparse_index.rs b/rust/blockstore/src/arrow/sparse_index.rs index 41f6954fb99..a39522756a6 100644 --- a/rust/blockstore/src/arrow/sparse_index.rs +++ b/rust/blockstore/src/arrow/sparse_index.rs @@ -475,8 +475,7 @@ impl SparseIndex { pub(super) fn to_block(&self) -> Result> { let forward = self.forward.lock(); if forward.is_empty() { - // TODO: error here - panic!("No blocks in the sparse index"); + panic!("Invariant violation. No blocks in the sparse index"); } // TODO: we could save the uuid not as a string to be more space efficient @@ -512,7 +511,7 @@ impl SparseIndex { pub(super) fn from_block<'block, K: ArrowReadableKey<'block> + 'block>( block: &'block Block, - ) -> Result> { + ) -> Result { let mut forward = BTreeMap::new(); let mut reverse = HashMap::new(); let id = block.id; @@ -523,7 +522,7 @@ impl SparseIndex { let block_id = Uuid::parse_str(value); match block_id { Ok(block_id) => (SparseIndexDelimiter::Start, block_id), - Err(e) => panic!("Failed to parse block id: {}", e), // TODO: error here + Err(e) => return Err(e), } } _ => { @@ -533,7 +532,7 @@ impl SparseIndex { SparseIndexDelimiter::Key(CompositeKey::new(prefix.to_string(), key)), block_id, ), - Err(e) => panic!("Failed to parse block id: {}", e), // TODO: error here + Err(e) => return Err(e), } } }; diff --git a/rust/blockstore/src/key.rs b/rust/blockstore/src/key.rs index 6c0cdab5b85..56151c9f9dc 100644 --- a/rust/blockstore/src/key.rs +++ b/rust/blockstore/src/key.rs @@ -99,7 +99,9 @@ impl CompositeKey { impl Hash for CompositeKey { fn hash(&self, state: &mut H) { - // TODO: Implement a better hash function + // TODO: Implement a better hash function. This is only used by the + // memory blockfile, so its not a performance issue, since that + // is only used for testing. self.prefix.hash(state) } } diff --git a/rust/blockstore/src/memory/reader_writer.rs b/rust/blockstore/src/memory/reader_writer.rs index db7b3061d38..19f07512e3a 100644 --- a/rust/blockstore/src/memory/reader_writer.rs +++ b/rust/blockstore/src/memory/reader_writer.rs @@ -78,7 +78,6 @@ impl< > MemoryBlockfileReader { pub(crate) fn open(id: uuid::Uuid, storage_manager: StorageManager) -> Self { - // TODO: don't unwrap let storage = storage_manager.get(id).unwrap(); Self { storage_manager, diff --git a/rust/blockstore/src/memory/storage.rs b/rust/blockstore/src/memory/storage.rs index 6a432ac1011..89ead13f314 100644 --- a/rust/blockstore/src/memory/storage.rs +++ b/rust/blockstore/src/memory/storage.rs @@ -890,7 +890,6 @@ impl<'referred_data> Readable<'referred_data> for DataRecord<'referred_data> { prefix: prefix.to_string(), key, }); - // TODO: don't unwrap Some(DataRecord { id: &id.unwrap(), embedding: &embedding.unwrap(), diff --git a/rust/blockstore/src/provider.rs b/rust/blockstore/src/provider.rs index 348c9e83558..07c88fb5683 100644 --- a/rust/blockstore/src/provider.rs +++ b/rust/blockstore/src/provider.rs @@ -133,11 +133,16 @@ impl Configurable<(BlockfileProviderConfig, Storage)> for BlockfileProvider { pub enum OpenError { #[error("Blockfile not found")] NotFound, + #[error(transparent)] + Other(#[from] Box), } impl ChromaError for OpenError { fn code(&self) -> ErrorCodes { - ErrorCodes::NotFound + match self { + OpenError::NotFound => ErrorCodes::NotFound, + OpenError::Other(e) => e.code(), + } } } @@ -145,10 +150,15 @@ impl ChromaError for OpenError { pub enum CreateError { #[error("Blockfile already exists")] AlreadyExists, + #[error(transparent)] + Other(#[from] Box), } impl ChromaError for CreateError { fn code(&self) -> ErrorCodes { - ErrorCodes::AlreadyExists + match self { + CreateError::AlreadyExists => ErrorCodes::AlreadyExists, + CreateError::Other(e) => e.code(), + } } } diff --git a/rust/blockstore/src/types.rs b/rust/blockstore/src/types.rs index 2cf924b0a66..8da0bfb72bb 100644 --- a/rust/blockstore/src/types.rs +++ b/rust/blockstore/src/types.rs @@ -252,10 +252,14 @@ impl< } } - pub async fn contains(&'referred_data self, prefix: &str, key: K) -> bool { + pub async fn contains( + &'referred_data self, + prefix: &str, + key: K, + ) -> Result> { match self { BlockfileReader::ArrowBlockfileReader(reader) => reader.contains(prefix, key).await, - BlockfileReader::MemoryBlockfileReader(reader) => reader.contains(prefix, key), + BlockfileReader::MemoryBlockfileReader(reader) => Ok(reader.contains(prefix, key)), } } @@ -276,7 +280,6 @@ impl< } } - // TODO: make prefix &str pub async fn get_by_prefix( &'referred_data self, prefix: &str, diff --git a/rust/index/src/hnsw_provider.rs b/rust/index/src/hnsw_provider.rs index 23c79d62d71..f005a5a250a 100644 --- a/rust/index/src/hnsw_provider.rs +++ b/rust/index/src/hnsw_provider.rs @@ -307,8 +307,16 @@ impl HnswIndexProvider { } }; - // TODO: don't unwrap path conv here - match HnswIndex::load(index_storage_path.to_str().unwrap(), &index_config, *id) { + let index_storage_path_str = match index_storage_path.to_str() { + Some(index_storage_path_str) => index_storage_path_str, + None => { + return Err(Box::new(HnswIndexProviderOpenError::PathToStringError( + index_storage_path, + ))); + } + }; + + match HnswIndex::load(index_storage_path_str, &index_config, *id) { Ok(index) => { let _guard = self.write_mutex.lock().await; match self.get(id, &segment.collection) { @@ -458,6 +466,8 @@ pub enum HnswIndexProviderOpenError { HnswConfigError(#[from] HnswIndexFromSegmentError), #[error("Index load error")] IndexLoadError(#[from] Box), + #[error("Path: {0} could not be converted to string")] + PathToStringError(PathBuf), } impl ChromaError for HnswIndexProviderOpenError { @@ -467,6 +477,7 @@ impl ChromaError for HnswIndexProviderOpenError { HnswIndexProviderOpenError::FileError(_) => ErrorCodes::Internal, HnswIndexProviderOpenError::HnswConfigError(e) => e.code(), HnswIndexProviderOpenError::IndexLoadError(e) => e.code(), + HnswIndexProviderOpenError::PathToStringError(_) => ErrorCodes::InvalidArgument, } } } diff --git a/rust/index/src/metadata/mod.rs b/rust/index/src/metadata/mod.rs index 1df1e0e55a1..cd408564ea0 100644 --- a/rust/index/src/metadata/mod.rs +++ b/rust/index/src/metadata/mod.rs @@ -1,3 +1 @@ pub mod types; - -// TODO reexport the types module diff --git a/rust/index/src/metadata/types.rs b/rust/index/src/metadata/types.rs index 11e950c8c99..4ab0e2ed904 100644 --- a/rust/index/src/metadata/types.rs +++ b/rust/index/src/metadata/types.rs @@ -806,7 +806,7 @@ impl<'me> MetadataIndexReader<'me> { MetadataIndexReader::StringMetadataIndexReader(blockfile_reader) => { match metadata_value { KeyWrapper::String(k) => { - if !blockfile_reader.contains(metadata_key, k).await { + if !blockfile_reader.contains(metadata_key, k).await? { return Ok(RoaringBitmap::new()); } let rbm = blockfile_reader.get(metadata_key, k).await; @@ -820,7 +820,7 @@ impl<'me> MetadataIndexReader<'me> { } MetadataIndexReader::U32MetadataIndexReader(blockfile_reader) => match metadata_value { KeyWrapper::Uint32(k) => { - if !blockfile_reader.contains(metadata_key, *k).await { + if !blockfile_reader.contains(metadata_key, *k).await? { return Ok(RoaringBitmap::new()); } let rbm = blockfile_reader.get(metadata_key, *k).await; @@ -833,7 +833,7 @@ impl<'me> MetadataIndexReader<'me> { }, MetadataIndexReader::F32MetadataIndexReader(blockfile_reader) => match metadata_value { KeyWrapper::Float32(k) => { - if !blockfile_reader.contains(metadata_key, *k).await { + if !blockfile_reader.contains(metadata_key, *k).await? { return Ok(RoaringBitmap::new()); } let rbm = blockfile_reader.get(metadata_key, *k).await; @@ -847,7 +847,7 @@ impl<'me> MetadataIndexReader<'me> { MetadataIndexReader::BoolMetadataIndexReader(blockfile_reader) => { match metadata_value { KeyWrapper::Bool(k) => { - if !blockfile_reader.contains(metadata_key, *k).await { + if !blockfile_reader.contains(metadata_key, *k).await? { return Ok(RoaringBitmap::new()); } let rbm = blockfile_reader.get(metadata_key, *k).await; diff --git a/rust/worker/src/compactor/scheduler.rs b/rust/worker/src/compactor/scheduler.rs index 258ad52a584..a2e5506c8f3 100644 --- a/rust/worker/src/compactor/scheduler.rs +++ b/rust/worker/src/compactor/scheduler.rs @@ -48,12 +48,10 @@ impl Scheduler { .log .get_collections_with_new_data(self.min_compaction_size as u64) .await; - // TODO: filter collecitons based on memberlist let collections = match collections { Ok(collections) => collections, Err(e) => { - // TODO: Log error - println!("Error: {:?}", e); + tracing::error!("Error: {:?}", e); return Vec::new(); } }; @@ -68,8 +66,7 @@ impl Scheduler { for collection_info in collections { let collection_id = Uuid::parse_str(collection_info.collection_id.as_str()); if collection_id.is_err() { - // TODO: Log error - println!("Error: {:?}", collection_id.err()); + tracing::error!("Error: {:?}", collection_id.err()); continue; } let collection_id = Some(collection_id.unwrap()); @@ -82,8 +79,10 @@ impl Scheduler { match result { Ok(collection) => { if collection.is_empty() { - // TODO: Log error - println!("Collection not found: {:?}", collection_info.collection_id); + tracing::error!( + "Collection not found: {:?}", + collection_info.collection_id + ); continue; } @@ -95,10 +94,12 @@ impl Scheduler { let last_compaction_time = match tenant { Ok(tenant) => tenant[0].last_compaction_time, Err(e) => { - // TODO: Log error - println!("Error: {:?}", e); + tracing::error!("Error: {:?}", e); // Ignore this collection id for this compaction iteration - println!("Ignoring collection: {:?}", collection_info.collection_id); + tracing::info!( + "Ignoring collection: {:?}", + collection_info.collection_id + ); continue; } }; @@ -127,8 +128,7 @@ impl Scheduler { }); } Err(e) => { - // TODO: Log error - println!("Error: {:?}", e); + tracing::error!("Error: {:?}", e); } } } @@ -150,8 +150,7 @@ impl Scheduler { } } Err(e) => { - // TODO: Log error - println!("Error: {:?}", e); + tracing::error!("Error: {:?}", e); continue; } } @@ -171,8 +170,7 @@ impl Scheduler { // For now, we clear the job queue every time, assuming we will not have any pending jobs running self.job_queue.clear(); if self.memberlist.is_none() || self.memberlist.as_ref().unwrap().is_empty() { - // TODO: Log error - println!("Memberlist is not set or empty. Cannot schedule compaction jobs."); + tracing::error!("Memberlist is not set or empty. Cannot schedule compaction jobs."); return; } let collections = self.get_collections_with_new_data().await; diff --git a/rust/worker/src/execution/orchestration/compact.rs b/rust/worker/src/execution/orchestration/compact.rs index 697aa40ca88..fe13243462a 100644 --- a/rust/worker/src/execution/orchestration/compact.rs +++ b/rust/worker/src/execution/orchestration/compact.rs @@ -196,9 +196,6 @@ impl CompactOrchestrator { } } - // TODO: It is possible that the offset_id from the compaction job is wrong since the log service - // can have an outdated view of the offset. We should filter out entries from the log based on the start offset - // of the segment, and not fully respect the offset_id from the compaction job async fn pull_logs( &mut self, self_address: Box>>, diff --git a/rust/worker/src/execution/orchestration/hnsw.rs b/rust/worker/src/execution/orchestration/hnsw.rs index 338968d1756..b6a4d23061e 100644 --- a/rust/worker/src/execution/orchestration/hnsw.rs +++ b/rust/worker/src/execution/orchestration/hnsw.rs @@ -242,6 +242,7 @@ impl HnswQueryOrchestrator { Ok(_) => (), Err(e) => { // TODO: log an error and reply to caller + tracing::error!("Error sending PullLogs task: {:?}", e); } } } @@ -677,7 +678,8 @@ impl Handler self.brute_force_results.insert(query_index, output); } Err(e) => { - // TODO: handle this error, technically never happens + terminate_with_error(self.result_channel.take(), e.boxed(), ctx); + return; } } @@ -713,6 +715,7 @@ impl Handler>> for HnswQu } Err(e) => { terminate_with_error(self.result_channel.take(), e.boxed(), ctx); + return; } } diff --git a/rust/worker/src/log/log.rs b/rust/worker/src/log/log.rs index 6d34ea2f8da..afe69500016 100644 --- a/rust/worker/src/log/log.rs +++ b/rust/worker/src/log/log.rs @@ -136,8 +136,7 @@ impl Configurable for GrpcLog { LogConfig::Grpc(my_config) => { let host = &my_config.host; let port = &my_config.port; - // TODO: switch to logging when logging is implemented - println!("Connecting to log service at {}:{}", host, port); + tracing::info!("Connecting to log service at {}:{}", host, port); let connection_string = format!("http://{}:{}", host, port); let endpoint_res = match Endpoint::from_shared(connection_string) { Ok(endpoint) => endpoint, @@ -207,8 +206,7 @@ impl GrpcLog { Ok(result) } Err(e) => { - // TODO: switch to logging when logging is implemented - println!("Failed to pull logs: {}", e); + tracing::error!("Failed to pull logs: {}", e); Err(PullLogsError::FailedToPullLogs(e)) } } @@ -242,8 +240,7 @@ impl GrpcLog { Ok(result) } Err(e) => { - // TODO: switch to logging when logging is implemented - println!("Failed to get collections: {}", e); + tracing::error!("Failed to get collections: {}", e); Err(GetCollectionsWithNewDataError::FailedGetCollectionsWithNewData(e)) } } diff --git a/rust/worker/src/memberlist/memberlist_provider.rs b/rust/worker/src/memberlist/memberlist_provider.rs index 4526971d2b3..fccb9995cfd 100644 --- a/rust/worker/src/memberlist/memberlist_provider.rs +++ b/rust/worker/src/memberlist/memberlist_provider.rs @@ -1,5 +1,3 @@ -use std::{fmt::Debug, sync::RwLock}; - use super::config::MemberlistProviderConfig; use crate::system::ReceiverForMessage; use crate::system::{Component, ComponentContext, Handler, StreamHandler}; @@ -13,8 +11,10 @@ use kube::{ runtime::{watcher, WatchStreamExt}, Client, CustomResource, }; +use parking_lot::RwLock; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use std::fmt::Debug; use thiserror::Error; /* =========== Basic Types ============== */ @@ -164,13 +164,7 @@ impl CustomResourceMemberlistProvider { } async fn notify_subscribers(&self) -> () { - let curr_memberlist = match self.current_memberlist.read() { - Ok(curr_memberlist) => curr_memberlist.clone(), - Err(_err) => { - // TODO: Log error and attempt recovery - return; - } - }; + let curr_memberlist = self.current_memberlist.read().clone(); for subscriber in self.subscribers.iter() { let _ = subscriber.send(curr_memberlist.clone(), None).await; @@ -208,7 +202,7 @@ impl Handler> for CustomResourceMemberlistProvide let name = match &memberlist.metadata.name { Some(name) => name, None => { - // TODO: Log an error + tracing::error!("Memberlist event without memberlist name"); return; } }; @@ -221,15 +215,8 @@ impl Handler> for CustomResourceMemberlistProvide .map(|member| member.member_id.clone()) .collect::>(); { - let curr_memberlist_handle = self.current_memberlist.write(); - match curr_memberlist_handle { - Ok(mut curr_memberlist) => { - *curr_memberlist = memberlist; - } - Err(_err) => { - // TODO: Log an error - } - } + let mut curr_memberlist_handle = self.current_memberlist.write(); + *curr_memberlist_handle = memberlist; } // Inform subscribers self.notify_subscribers().await; @@ -252,9 +239,8 @@ impl MemberlistProvider for CustomResourceMemberlistProvider { #[cfg(test)] mod tests { - use crate::system::System; - use super::*; + use crate::system::System; #[tokio::test] // Naming this "test_k8s_integration_" means that the Tilt stack is required. See rust/worker/README.md. diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index 779964419af..146a2172d8e 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -1,7 +1,6 @@ use super::record_segment::ApplyMaterializedLogError; use super::types::{MaterializedLogRecord, SegmentWriter}; use super::SegmentFlusher; -use arrow::array::Int32Array; use async_trait::async_trait; use chroma_blockstore::provider::{BlockfileProvider, CreateError, OpenError}; use chroma_error::{ChromaError, ErrorCodes}; @@ -73,8 +72,6 @@ pub enum MetadataSegmentError { UuidParseError(String), #[error("No writer found")] NoWriter, - #[error("Could not write to fulltext index blockfiles {0}")] - FullTextIndexWriteError(Box), #[error("Path vector exists but is empty?")] EmptyPathVector, #[error("Failed to write to blockfile")] @@ -83,14 +80,25 @@ pub enum MetadataSegmentError { LimitOffsetNotSupported, #[error("Could not query metadata index {0}")] MetadataIndexQueryError(#[from] MetadataIndexError), - #[error("Attempted to delete a document that does not exist")] - DocumentDoesNotExist, } impl ChromaError for MetadataSegmentError { fn code(&self) -> ErrorCodes { - // TODO - ErrorCodes::Internal + match self { + MetadataSegmentError::InvalidSegmentType => ErrorCodes::Internal, + MetadataSegmentError::FullTextIndexWriterError(e) => e.code(), + MetadataSegmentError::BlockfileError(e) => e.code(), + MetadataSegmentError::BlockfileOpenError(e) => e.code(), + MetadataSegmentError::FullTextIndexFilesIntegrityError => ErrorCodes::Internal, + MetadataSegmentError::IncorrectNumberOfFiles => ErrorCodes::Internal, + MetadataSegmentError::MissingFile(_) => ErrorCodes::Internal, + MetadataSegmentError::UuidParseError(_) => ErrorCodes::Internal, + MetadataSegmentError::NoWriter => ErrorCodes::Internal, + MetadataSegmentError::EmptyPathVector => ErrorCodes::Internal, + MetadataSegmentError::BlockfileWriteError => ErrorCodes::Internal, + MetadataSegmentError::LimitOffsetNotSupported => ErrorCodes::Internal, + MetadataSegmentError::MetadataIndexQueryError(_) => ErrorCodes::Internal, + } } } diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index 84f6cac6214..a8ec0f4898d 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -27,9 +27,6 @@ pub(crate) struct RecordSegmentWriter { // we should store it in metadata of one of the blockfiles max_offset_id: Option, pub(crate) id: Uuid, - // If there is an old version of the data, we need to keep it around to be able to - // materialize the log records - // old_id_to_data: Option>>, } impl Debug for RecordSegmentWriter { @@ -773,7 +770,7 @@ impl RecordSegmentReader<'_> { &self, user_id: &str, ) -> Result> { - if !self.user_id_to_id.contains("", user_id).await { + if !self.user_id_to_id.contains("", user_id).await? { return Ok(false); } let offset_id = match self.user_id_to_id.get("", user_id).await { @@ -782,7 +779,7 @@ impl RecordSegmentReader<'_> { return Err(e); } }; - Ok(self.id_to_data.contains("", offset_id).await) + self.id_to_data.contains("", offset_id).await } /// Returns all data in the record segment, sorted by diff --git a/rust/worker/src/sysdb/test_sysdb.rs b/rust/worker/src/sysdb/test_sysdb.rs index 2aca5e7eac3..9bae76a2f2f 100644 --- a/rust/worker/src/sysdb/test_sysdb.rs +++ b/rust/worker/src/sysdb/test_sysdb.rs @@ -155,7 +155,6 @@ impl TestSysDb { let last_compaction_time = match inner.tenant_last_compaction_time.get(&tenant_id) { Some(last_compaction_time) => *last_compaction_time, None => { - // TODO: Log an error return Err(GetLastCompactionTimeError::TenantNotFound); } }; diff --git a/rust/worker/src/system/executor.rs b/rust/worker/src/system/executor.rs index 29bd15f7015..49c5bae7d50 100644 --- a/rust/worker/src/system/executor.rs +++ b/rust/worker/src/system/executor.rs @@ -88,7 +88,7 @@ where task_future.instrument(child_span).await; } None => { - // TODO: Log error + tracing::error!("Channel closed"); } } } diff --git a/rust/worker/src/system/scheduler.rs b/rust/worker/src/system/scheduler.rs index 2e688642eb3..d8a44800ce7 100644 --- a/rust/worker/src/system/scheduler.rs +++ b/rust/worker/src/system/scheduler.rs @@ -54,8 +54,7 @@ impl Scheduler { return; }, Err(e) => { - // TODO: log error - println!("Error: {:?}", e); + tracing::error!("Error: {:?}", e); return; } } @@ -101,8 +100,7 @@ impl Scheduler { Ok(_) => { }, Err(e) => { - // TODO: log error - println!("Error: {:?}", e); + tracing::error!("Error: {:?}", e); } } } diff --git a/rust/worker/src/system/system.rs b/rust/worker/src/system/system.rs index a31735fea7a..c0bb819a935 100644 --- a/rust/worker/src/system/system.rs +++ b/rust/worker/src/system/system.rs @@ -115,8 +115,7 @@ where match res { Ok(_) => {} Err(e) => { - println!("Failed to send message: {:?}", e); - // TODO: switch to logging + tracing::error!("Failed to send stream message: {:?}", e); // Terminate the stream break; }