Skip to content

Commit

Permalink
Remove existence checks in strema_splits method, fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Nov 10, 2023
1 parent 3603577 commit b56ab2a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,15 +319,25 @@ impl FileBackedMetastore {
Ok(index_mutex)
}

/// Returns the list of splits for the given request.
/// No error is returned if one of the requested `index_uid` does not exist.
async fn inner_list_splits(&self, request: ListSplitsRequest) -> MetastoreResult<Vec<Split>> {
let list_splits_query = request.deserialize_list_splits_query()?;
let mut all_splits = Vec::new();
for index_uid in &list_splits_query.index_uids {
let splits = self
let splits = match self
.read(index_uid.clone(), |index| {
index.list_splits(&list_splits_query)
})
.await?;
.await
{
Ok(splits) => splits,
Err(MetastoreError::NotFound(_)) => {
// If the index does not exist, we just skip it.
continue;
}
Err(error) => return Err(error),
};
all_splits.extend(splits);
}
Ok(all_splits)
Expand Down Expand Up @@ -625,16 +635,18 @@ impl MetastoreService for FileBackedMetastore {
/// -------------------------------------------------------------------------------
/// Read-only accessors

/// Streams of splits for the given request.
/// No error is returned if one of the requested `index_uid` does not exist.
async fn stream_splits(
&mut self,
request: ListSplitsRequest,
) -> MetastoreResult<MetastoreServiceStream<ListSplitsResponse>> {
let splits = self.inner_list_splits(request).await?;
let chunks = splits
let splits_responses: Vec<MetastoreResult<ListSplitsResponse>> = splits
.chunks(STREAM_SPLITS_CHUNK_SIZE)
.map(|chunk| ListSplitsResponse::try_from_splits(chunk.to_vec()))
.collect_vec();
Ok(ServiceStream::from(chunks))
.collect();
Ok(ServiceStream::from(splits_responses))
}

async fn list_stale_splits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ impl MetastoreService for PostgresqlMetastore {
split_stream
.chunks(STREAM_SPLITS_CHUNK_SIZE)
.map(|pg_splits_res| {
let mut splits = Vec::new();
let mut splits = Vec::with_capacity(pg_splits_res.len());
for pg_split_res in pg_splits_res {
let pg_split = match pg_split_res {
Ok(pg_split) => pg_split,
Expand Down Expand Up @@ -1274,16 +1274,6 @@ impl MetastoreService for PostgresqlMetastore {
.fetch_all(&self.connection_pool)
.await?;

// If no splits were returned, maybe the index does not exist in the first place?
if pg_stale_splits.is_empty()
&& index_opt_for_uid(&self.connection_pool, index_uid.clone())
.await?
.is_none()
{
return Err(MetastoreError::NotFound(EntityKind::Index {
index_id: index_uid.index_id().to_string(),
}));
}
let splits = pg_stale_splits
.into_iter()
.map(|pg_split| pg_split.try_into())
Expand Down
31 changes: 11 additions & 20 deletions quickwit/quickwit-metastore/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2314,20 +2314,16 @@ pub async fn test_metastore_list_all_splits<
..Default::default()
};

let error = metastore
let no_splits = metastore
.list_splits(
ListSplitsRequest::try_from_index_uid(IndexUid::new_with_random_ulid(
"index-not-found",
))
.unwrap(),
)
.await
.unwrap_err();
assert!(matches!(
error,
// TODO: This discrepancy is tracked in #3760.
MetastoreError::NotFound(EntityKind::Index { .. } | EntityKind::Indexes { .. })
));
.unwrap();
assert!(no_splits.is_empty());

let create_index_request = CreateIndexRequest::try_from_index_config(index_config).unwrap();
let index_uid: IndexUid = metastore
Expand Down Expand Up @@ -2466,15 +2462,11 @@ pub async fn test_metastore_list_splits<MetastoreToTest: MetastoreServiceExt + D
{
let query =
ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged);
let error = metastore
let splits = metastore
.list_splits(ListSplitsRequest::try_from_list_splits_query(query).unwrap())
.await
.unwrap_err();
assert!(matches!(
error,
// TODO: This discrepancy is tracked in #3760.
MetastoreError::NotFound(EntityKind::Index { .. } | EntityKind::Indexes { .. })
));
.unwrap();
assert!(splits.is_empty());
}
{
let create_index_request =
Expand Down Expand Up @@ -3372,14 +3364,13 @@ pub async fn test_metastore_list_stale_splits<
delete_opstamp: 0,
num_splits: 100,
};
let error = metastore
let no_splits = metastore
.list_stale_splits(list_stale_splits_request)
.await
.unwrap_err();
assert!(matches!(
error,
MetastoreError::NotFound(EntityKind::Index { .. })
));
.unwrap()
.deserialize_splits()
.unwrap();
assert!(no_splits.is_empty());

{
info!("List stale splits on an index");
Expand Down

0 comments on commit b56ab2a

Please sign in to comment.