Skip to content

Commit

Permalink
Compress ListIndexesMetadataResponse payload and list shards in bat…
Browse files Browse the repository at this point in the history
…ches (#4768)

* List shards from metastore in batches

* Compress `ListIndexesMetadataResponse` payload
  • Loading branch information
guilload authored Mar 21, 2024
1 parent 2c2f254 commit 6d629ae
Show file tree
Hide file tree
Showing 23 changed files with 288 additions and 249 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 20 additions & 44 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,9 +948,7 @@ mod tests {
});
mock_metastore
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
.returning(|_| Ok(ListIndexesMetadataResponse::for_test(Vec::new())));
let cluster_config = ClusterConfig::for_test();
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn(
Expand Down Expand Up @@ -992,9 +990,7 @@ mod tests {
.returning(|_| Ok(EmptyResponse {}));
mock_metastore
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
.returning(|_| Ok(ListIndexesMetadataResponse::for_test(Vec::new())));

let cluster_config = ClusterConfig::for_test();
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
Expand Down Expand Up @@ -1044,10 +1040,9 @@ mod tests {
mock_metastore
.expect_list_indexes_metadata()
.returning(move |_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone()
])
.unwrap())
]))
});

let cluster_config = ClusterConfig::for_test();
Expand Down Expand Up @@ -1090,12 +1085,7 @@ mod tests {
index_metadata.add_source(test_source_config).unwrap();
mock_metastore
.expect_list_indexes_metadata()
.return_once(|_| {
Ok(
ListIndexesMetadataResponse::try_from_indexes_metadata(vec![index_metadata])
.unwrap(),
)
});
.return_once(|_| Ok(ListIndexesMetadataResponse::for_test(vec![index_metadata])));

let index_uid_clone = index_uid.clone();
mock_metastore
Expand Down Expand Up @@ -1171,9 +1161,7 @@ mod tests {
.returning(|_| Ok(EmptyResponse {}));
mock_metastore
.expect_list_indexes_metadata()
.returning(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
.returning(|_| Ok(ListIndexesMetadataResponse::for_test(Vec::new())));

let cluster_config = ClusterConfig::for_test();
let cluster_change_stream_factory = ClusterChangeStreamFactoryForTest::default();
Expand Down Expand Up @@ -1217,10 +1205,7 @@ mod tests {
let mut source_config = SourceConfig::ingest_v2();
source_config.enabled = true;
index_metadata.add_source(source_config).unwrap();
Ok(
ListIndexesMetadataResponse::try_from_indexes_metadata(vec![index_metadata])
.unwrap(),
)
Ok(ListIndexesMetadataResponse::for_test(vec![index_metadata]))
});
let index_uid_clone = index_uid.clone();
mock_metastore
Expand Down Expand Up @@ -1300,7 +1285,7 @@ mod tests {
.times(2) // 1 for the first initialization, 1 after the respawn of the control plane.
.returning(|list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::empty())
Ok(ListIndexesMetadataResponse::for_test(Vec::new()))
});
mock_metastore.expect_list_shards().return_once(
|_list_shards_request: ListShardsRequest| {
Expand Down Expand Up @@ -1439,10 +1424,9 @@ mod tests {
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
Ok(ListIndexesMetadataResponse::for_test(vec![
index_0_clone.clone()
])
.unwrap())
]))
},
);
let index_uid_clone = index_0.index_uid.clone();
Expand Down Expand Up @@ -1588,10 +1572,9 @@ mod tests {
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
Ok(ListIndexesMetadataResponse::for_test(vec![
index_0_clone.clone()
])
.unwrap())
]))
},
);

Expand Down Expand Up @@ -1673,10 +1656,9 @@ mod tests {
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
Ok(ListIndexesMetadataResponse::for_test(vec![
index_0_clone.clone()
])
.unwrap())
]))
},
);
let index_uid_clone = index_0.index_uid.clone();
Expand Down Expand Up @@ -1758,10 +1740,9 @@ mod tests {
.in_sequence(&mut seq)
.returning(move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
Ok(ListIndexesMetadataResponse::for_test(vec![
index_0_clone.clone()
])
.unwrap())
]))
});
mock_metastore
.expect_list_shards()
Expand Down Expand Up @@ -1886,10 +1867,9 @@ mod tests {
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
Ok(ListIndexesMetadataResponse::for_test(vec![
index_0_clone.clone()
])
.unwrap())
]))
},
);

Expand Down Expand Up @@ -1955,9 +1935,7 @@ mod tests {

mock_metastore
.expect_list_indexes_metadata()
.return_once(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
.return_once(|_| Ok(ListIndexesMetadataResponse::for_test(Vec::new())));

mock_metastore
.expect_find_index_template_matches()
Expand Down Expand Up @@ -2086,9 +2064,7 @@ mod tests {
let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore
.expect_list_indexes_metadata()
.return_once(|_| {
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(Vec::new()).unwrap())
});
.return_once(|_| Ok(ListIndexesMetadataResponse::for_test(Vec::new())));
let metastore = MetastoreServiceClient::from(mock_metastore);
let (_control_plane_mailbox, control_plane_handle) = ControlPlane::spawn(
&universe,
Expand Down
77 changes: 40 additions & 37 deletions quickwit/quickwit-control-plane/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod shard_table;

use std::borrow::Cow;
use std::collections::BTreeSet;
use std::mem;
use std::ops::Deref;
use std::time::Instant;

Expand Down Expand Up @@ -82,59 +83,61 @@ impl ControlPlaneModel {
metastore: &mut MetastoreServiceClient,
progress: &Progress,
) -> ControlPlaneResult<()> {
const BATCH_SIZE: usize = 500;

let now = Instant::now();
self.clear();

let index_metadatas = progress
let indexes_metadata = progress
.protect_future(metastore.list_indexes_metadata(ListIndexesMetadataRequest::all()))
.await?
.deserialize_indexes_metadata()?;
.deserialize_indexes_metadata()
.await?;

let num_indexes = index_metadatas.len();
let num_indexes = indexes_metadata.len();
self.index_table.reserve(num_indexes);

for index_metadata in indexes_metadata {
self.add_index(index_metadata);
}
let mut num_sources = 0;
let mut num_shards = 0;

let mut subrequests = Vec::with_capacity(index_metadatas.len());

for index_metadata in index_metadatas {
self.add_index(index_metadata);
}
let mut next_list_shards_request = metastore::ListShardsRequest::default();

for index_metadata in self.index_table.values() {
for (idx, index_metadata) in self.index_table.values().enumerate() {
for source_config in index_metadata.sources.values() {
num_sources += 1;

if source_config.source_type() != SourceType::IngestV2 {
continue;
if source_config.source_type() == SourceType::IngestV2 {
let request = ListShardsSubrequest {
index_uid: index_metadata.index_uid.clone().into(),
source_id: source_config.source_id.clone(),
shard_state: None,
};
next_list_shards_request.subrequests.push(request);
}
let request = ListShardsSubrequest {
index_uid: index_metadata.index_uid.clone().into(),
source_id: source_config.source_id.clone(),
shard_state: None,
};
subrequests.push(request);
}
}
if !subrequests.is_empty() {
// TODO: Limit the number of subrequests and perform multiple requests if needed.
let list_shards_request = metastore::ListShardsRequest { subrequests };
let list_shard_response = progress
.protect_future(metastore.list_shards(list_shards_request))
.await?;

for list_shards_subresponse in list_shard_response.subresponses {
num_shards += list_shards_subresponse.shards.len();

let ListShardsSubresponse {
index_uid,
source_id,
shards,
} = list_shards_subresponse;
let index_uid = index_uid.expect("`index_uid` should be a required field");
self.shard_table
.insert_shards(&index_uid, &source_id, shards);
let num_subrequests = next_list_shards_request.subrequests.len();

if num_subrequests > 0 && (num_subrequests >= BATCH_SIZE || idx == num_indexes - 1) {
let list_shards_request = mem::take(&mut next_list_shards_request);
let list_shards_response = progress
.protect_future(metastore.list_shards(list_shards_request))
.await?;

for list_shards_subresponse in list_shards_response.subresponses {
num_shards += list_shards_subresponse.shards.len();

let ListShardsSubresponse {
index_uid,
source_id,
shards,
} = list_shards_subresponse;
let index_uid = index_uid.expect("`index_uid` should be a required field");
self.shard_table
.insert_shards(&index_uid, &source_id, shards);
}
}
}
info!(
Expand Down Expand Up @@ -402,7 +405,7 @@ mod tests {
index_2.add_source(SourceConfig::cli()).unwrap();

let indexes = vec![index_0, index_1, index_2];
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(indexes).unwrap())
Ok(ListIndexesMetadataResponse::for_test(indexes))
});
let index_uid_clone = index_uid.clone();
let index_uid2_clone = index_uid2.clone();
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async fn start_control_plane(
metastore.expect_list_indexes_metadata().returning(
move |_list_indexes_request: quickwit_proto::metastore::ListIndexesMetadataRequest| {
let indexes_metadata = vec![index_metadata_2.clone(), index_metadata_1.clone()];
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(indexes_metadata).unwrap())
Ok(ListIndexesMetadataResponse::for_test(indexes_metadata))
},
);
metastore.expect_list_shards().returning(|_| {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ impl IndexService {
let indexes_metadata = metastore
.list_indexes_metadata(list_indexes_metadatas_request)
.await?
.deserialize_indexes_metadata()?;
.deserialize_indexes_metadata()
.await?;

if !ignore_missing && indexes_metadata.len() != index_id_patterns.len() {
let found_index_ids: HashSet<&str> = indexes_metadata
Expand Down
19 changes: 8 additions & 11 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,13 +711,13 @@ impl IndexingService {
.collect();
debug!(queues=?queues, "list ingest API queues");

let indexes_metadatas = self
let indexes_metadata = self
.metastore
.clone()
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await?
.deserialize_indexes_metadata()?;
let index_ids: HashSet<String> = indexes_metadatas
.deserialize_indexes_metadata()
.await?;
let index_ids: HashSet<String> = indexes_metadata
.into_iter()
.map(|index_metadata| index_metadata.index_id().to_string())
.collect();
Expand Down Expand Up @@ -1468,13 +1468,10 @@ mod tests {
let index_metadata_clone = index_metadata.clone();
metastore
.expect_list_indexes_metadata()
.returning(move |_request| {
let list_indexes_metadatas_response =
ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
index_metadata_clone.clone(),
])
.unwrap();
Ok(list_indexes_metadatas_response)
.return_once(move |_request| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata_clone,
]))
});
metastore.expect_index_metadata().returning(move |_| {
Ok(IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap())
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-janitor/src/actors/delete_task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ impl DeleteTaskService {
.metastore
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await?
.deserialize_indexes_metadata()?
.deserialize_indexes_metadata()
.await?
.into_iter()
.map(|index_metadata| {
(
Expand Down
Loading

0 comments on commit 6d629ae

Please sign in to comment.