Skip to content

Commit

Permalink
Add stream splits gRPC. (#4109)
Browse files Browse the repository at this point in the history
* Add stream splits gRPC.

* Remove existence checks in strema_splits method, fix tests.

* Fix tests.

* Add test on metastore split stream method.

* Add metastore suite test for MetastoreServiceGrpcClientAdapter.

* Reorganize metastore test suite.

* Take review comments into account.

* Apply suggestions from code review

Co-authored-by: Adrien Guillo <[email protected]>

* Clean metastore proto.

* Fix metatore split tests.

---------

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
fmassot and guilload authored Dec 7, 2023
1 parent 012ff87 commit 6eb0767
Show file tree
Hide file tree
Showing 37 changed files with 4,746 additions and 4,126 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ openssl = { version = "0.10.60", default-features = false }
openssl-probe = "0.1.5"
opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = "0.12.0"
ouroboros = "0.18.0"
pin-project = "1.1.0"
pnet = { version = "0.33.0", features = ["std"] }
postcard = { version = "1.0.4", features = [
Expand Down
65 changes: 35 additions & 30 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID};
use quickwit_metastore::{
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreResolver, MetastoreServiceExt,
SplitState, StageSplitsRequestExt,
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt,
SplitMetadata, SplitState, StageSplitsRequestExt,
};
use quickwit_proto::metastore::{
DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListSplitsRequest,
Expand Down Expand Up @@ -249,17 +249,18 @@ async fn test_ingest_docs_cli() {

local_ingest_docs_cli(args).await.unwrap();

let splits: Vec<_> = test_env
let splits_metadata: Vec<SplitMetadata> = test_env
.metastore()
.await
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();

assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_metadata.num_docs, 5);
assert_eq!(splits_metadata.len(), 1);
assert_eq!(splits_metadata[0].num_docs, 5);

// Ensure cache directory is empty.
let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path);
Expand Down Expand Up @@ -665,13 +666,14 @@ async fn test_garbage_collect_cli_no_grace() {
dry_run,
};

let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

let args = create_gc_args(false);

Expand All @@ -681,7 +683,7 @@ async fn test_garbage_collect_cli_no_grace() {
let index_path = test_env.indexes_dir_path.join(&test_env.index_id);
assert_eq!(index_path.try_exists().unwrap(), true);

let split_ids = vec![splits[0].split_id().to_string()];
let split_ids = vec![splits_metadata[0].split_id().to_string()];
let mut metastore = refresh_metastore(metastore).await.unwrap();
let mark_for_deletion_request =
MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids.clone());
Expand Down Expand Up @@ -718,7 +720,8 @@ async fn test_garbage_collect_cli_no_grace() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap()
.len(),
0
Expand Down Expand Up @@ -776,16 +779,17 @@ async fn test_garbage_collect_index_cli() {
.await
.unwrap();

let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

let index_path = test_env.indexes_dir_path.join(&test_env.index_id);
let split_filename = quickwit_common::split_file(splits[0].split_metadata.split_id.as_str());
let split_filename = quickwit_common::split_file(splits_metadata[0].split_id.as_str());
let split_path = index_path.join(&split_filename);
assert_eq!(split_path.try_exists().unwrap(), true);

Expand All @@ -795,41 +799,39 @@ async fn test_garbage_collect_index_cli() {

// Split should still exists within grace period.
let mut metastore = refresh_metastore(metastore).await.unwrap();
let splits = metastore
let splits_metadata = metastore
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits_metadata()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits_metadata.len(), 1);

// The following steps help turn an existing published split into a staged one
// without deleting the files.
let split = splits[0].clone();
let split_metadata = splits_metadata[0].clone();
metastore
.mark_splits_for_deletion(MarkSplitsForDeletionRequest::new(
index_uid.clone(),
vec![split.split_metadata.split_id.to_string()],
vec![split_metadata.split_id.to_string()],
))
.await
.unwrap();
metastore
.delete_splits(DeleteSplitsRequest {
index_uid: index_uid.to_string(),
split_ids: splits
split_ids: splits_metadata
.into_iter()
.map(|split| split.split_metadata.split_id)
.map(|split_metadata| split_metadata.split_id)
.collect(),
})
.await
.unwrap();
metastore
.stage_splits(
StageSplitsRequest::try_from_split_metadata(
index_uid.clone(),
split.split_metadata.clone(),
)
.unwrap(),
StageSplitsRequest::try_from_split_metadata(index_uid.clone(), split_metadata.clone())
.unwrap(),
)
.await
.unwrap();
Expand All @@ -840,7 +842,8 @@ async fn test_garbage_collect_index_cli() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits()
.await
.unwrap();
assert_eq!(splits[0].split_state, SplitState::Staged);

Expand All @@ -855,7 +858,8 @@ async fn test_garbage_collect_index_cli() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(splits[0].split_state, SplitState::Staged);
Expand All @@ -873,7 +877,8 @@ async fn test_garbage_collect_index_cli() {
.list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap())
.await
.unwrap()
.deserialize_splits()
.collect_splits()
.await
.unwrap();
// Splits should be deleted from both metastore and file system.
assert_eq!(splits.len(), 0);
Expand Down
25 changes: 25 additions & 0 deletions quickwit/quickwit-common/src/stream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ pub struct ServiceStream<T> {
inner: BoxStream<T>,
}

impl<T> ServiceStream<T>
where T: Send + 'static
{
pub fn new(inner: BoxStream<T>) -> Self {
Self { inner }
}

pub fn empty() -> Self {
Self {
inner: Box::pin(stream::empty()),
}
}
}

impl<T> fmt::Debug for ServiceStream<T>
where T: 'static
{
Expand Down Expand Up @@ -159,3 +173,14 @@ where T: Send + 'static
}
}
}

#[cfg(any(test, feature = "testsuite"))]
impl<T> From<Vec<T>> for ServiceStream<T>
where T: Send + 'static
{
fn from(values: Vec<T>) -> Self {
Self {
inner: Box::pin(stream::iter(values)),
}
}
}
Loading

0 comments on commit 6eb0767

Please sign in to comment.