From b56ab2aac1f875e3386985526f1ee0ea5946fe6c Mon Sep 17 00:00:00 2001 From: fmassot Date: Fri, 10 Nov 2023 14:54:29 +0100 Subject: [PATCH] Remove existence checks in strema_splits method, fix tests. --- .../metastore/file_backed_metastore/mod.rs | 22 ++++++++++--- .../src/metastore/postgresql_metastore.rs | 12 +------ quickwit/quickwit-metastore/src/tests/mod.rs | 31 +++++++------------ 3 files changed, 29 insertions(+), 36 deletions(-) diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs index f34a9deb6f0..2f32ff315dc 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs @@ -319,15 +319,25 @@ impl FileBackedMetastore { Ok(index_mutex) } + /// Returns the list of splits for the given request. + /// No error is returned if one of the requested `index_uid` does not exist. async fn inner_list_splits(&self, request: ListSplitsRequest) -> MetastoreResult> { let list_splits_query = request.deserialize_list_splits_query()?; let mut all_splits = Vec::new(); for index_uid in &list_splits_query.index_uids { - let splits = self + let splits = match self .read(index_uid.clone(), |index| { index.list_splits(&list_splits_query) }) - .await?; + .await + { + Ok(splits) => splits, + Err(MetastoreError::NotFound(_)) => { + // If the index does not exist, we just skip it. + continue; + } + Err(error) => return Err(error), + }; all_splits.extend(splits); } Ok(all_splits) @@ -625,16 +635,18 @@ impl MetastoreService for FileBackedMetastore { /// ------------------------------------------------------------------------------- /// Read-only accessors + /// Streams of splits for the given request. + /// No error is returned if one of the requested `index_uid` does not exist. async fn stream_splits( &mut self, request: ListSplitsRequest, ) -> MetastoreResult> { let splits = self.inner_list_splits(request).await?; - let chunks = splits + let splits_responses: Vec> = splits .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|chunk| ListSplitsResponse::try_from_splits(chunk.to_vec())) - .collect_vec(); - Ok(ServiceStream::from(chunks)) + .collect(); + Ok(ServiceStream::from(splits_responses)) } async fn list_stale_splits( diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 5d22515c116..13a6738a035 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -827,7 +827,7 @@ impl MetastoreService for PostgresqlMetastore { split_stream .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|pg_splits_res| { - let mut splits = Vec::new(); + let mut splits = Vec::with_capacity(pg_splits_res.len()); for pg_split_res in pg_splits_res { let pg_split = match pg_split_res { Ok(pg_split) => pg_split, @@ -1274,16 +1274,6 @@ impl MetastoreService for PostgresqlMetastore { .fetch_all(&self.connection_pool) .await?; - // If no splits were returned, maybe the index does not exist in the first place? - if pg_stale_splits.is_empty() - && index_opt_for_uid(&self.connection_pool, index_uid.clone()) - .await? - .is_none() - { - return Err(MetastoreError::NotFound(EntityKind::Index { - index_id: index_uid.index_id().to_string(), - })); - } let splits = pg_stale_splits .into_iter() .map(|pg_split| pg_split.try_into()) diff --git a/quickwit/quickwit-metastore/src/tests/mod.rs b/quickwit/quickwit-metastore/src/tests/mod.rs index b9b665b5b6a..fdc74b5c8a5 100644 --- a/quickwit/quickwit-metastore/src/tests/mod.rs +++ b/quickwit/quickwit-metastore/src/tests/mod.rs @@ -2314,7 +2314,7 @@ pub async fn test_metastore_list_all_splits< ..Default::default() }; - let error = metastore + let no_splits = metastore .list_splits( ListSplitsRequest::try_from_index_uid(IndexUid::new_with_random_ulid( "index-not-found", @@ -2322,12 +2322,8 @@ pub async fn test_metastore_list_all_splits< .unwrap(), ) .await - .unwrap_err(); - assert!(matches!( - error, - // TODO: This discrepancy is tracked in #3760. - MetastoreError::NotFound(EntityKind::Index { .. } | EntityKind::Indexes { .. }) - )); + .unwrap(); + assert!(no_splits.is_empty()); let create_index_request = CreateIndexRequest::try_from_index_config(index_config).unwrap(); let index_uid: IndexUid = metastore @@ -2466,15 +2462,11 @@ pub async fn test_metastore_list_splits