Skip to content

Commit

Permalink
Deserialize splits outside of tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Aug 26, 2024
1 parent 85994e2 commit 80ee07c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 31 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ impl IndexingService {
let mut num_immature_splits = 0usize;

while let Some(list_splits_response) = immature_splits_stream.try_next().await? {
for split_metadata in list_splits_response.deserialize_splits_metadata()? {
for split_metadata in list_splits_response.deserialize_splits_metadata().await? {
num_immature_splits += 1;

let merge_pipeline_id = MergePipelineId {
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ impl DeleteTaskPlanner {
let stale_splits = ctx
.protect_future(self.metastore.list_stale_splits(list_stale_splits_request))
.await?
.deserialize_splits()?;
.deserialize_splits()
.await?;
debug!(
index_id = index_uid.index_id,
last_delete_opstamp = last_delete_opstamp,
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,10 @@ impl MetastoreService for FileBackedMetastore {
/// Mutations over a single index

async fn stage_splits(&self, request: StageSplitsRequest) -> MetastoreResult<EmptyResponse> {
let index_uid = request.index_uid().clone();
let splits_metadata = request.deserialize_splits_metadata()?;
let index_uid = request.index_uid();

self.mutate(index_uid, |index| {
self.mutate(&index_uid, |index| {
let mut failed_split_ids = Vec::new();

for split_metadata in splits_metadata {
Expand Down
66 changes: 40 additions & 26 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl MetastoreServiceStreamSplitsExt for MetastoreServiceStream<ListSplitsRespon
async fn collect_splits(mut self) -> MetastoreResult<Vec<Split>> {
let mut all_splits = Vec::new();
while let Some(list_splits_response) = self.try_next().await? {
let splits = list_splits_response.deserialize_splits()?;
let splits = list_splits_response.deserialize_splits().await?;
all_splits.extend(splits);
}
Ok(all_splits)
Expand All @@ -96,7 +96,7 @@ impl MetastoreServiceStreamSplitsExt for MetastoreServiceStream<ListSplitsRespon
async fn collect_splits_metadata(mut self) -> MetastoreResult<Vec<SplitMetadata>> {
let mut all_splits_metadata = Vec::new();
while let Some(list_splits_response) = self.try_next().await? {
let splits_metadata = list_splits_response.deserialize_splits_metadata()?;
let splits_metadata = list_splits_response.deserialize_splits_metadata().await?;
all_splits_metadata.extend(splits_metadata);
}
Ok(all_splits_metadata)
Expand All @@ -105,7 +105,7 @@ impl MetastoreServiceStreamSplitsExt for MetastoreServiceStream<ListSplitsRespon
async fn collect_split_ids(mut self) -> MetastoreResult<Vec<SplitId>> {
let mut all_splits = Vec::new();
while let Some(list_splits_response) = self.try_next().await? {
let splits = list_splits_response.deserialize_split_ids()?;
let splits = list_splits_response.deserialize_split_ids().await?;
all_splits.extend(splits);
}
Ok(all_splits)
Expand Down Expand Up @@ -414,7 +414,7 @@ impl AddSourceRequestExt for AddSourceRequest {
) -> MetastoreResult<AddSourceRequest> {
let source_config_json = serde_utils::to_json_str(&source_config)?;
let request = Self {
index_uid: index_uid.into().into(),
index_uid: Some(index_uid.into()),
source_config_json,
};
Ok(request)
Expand Down Expand Up @@ -451,7 +451,7 @@ impl StageSplitsRequestExt for StageSplitsRequest {
) -> MetastoreResult<StageSplitsRequest> {
let split_metadata_list_serialized_json = serde_utils::to_json_str(&[split_metadata])?;
let request = Self {
index_uid: index_uid.into().into(),
index_uid: Some(index_uid.into()),
split_metadata_list_serialized_json,
};
Ok(request)
Expand All @@ -464,7 +464,7 @@ impl StageSplitsRequestExt for StageSplitsRequest {
let splits_metadata: Vec<SplitMetadata> = splits_metadata.into_iter().collect();
let split_metadata_list_serialized_json = serde_utils::to_json_str(&splits_metadata)?;
let request = Self {
index_uid: index_uid.into().into(),
index_uid: Some(index_uid.into()),
split_metadata_list_serialized_json,
};
Ok(request)
Expand Down Expand Up @@ -510,6 +510,7 @@ impl ListSplitsRequestExt for ListSplitsRequest {
}

/// Helper trait to build a [`ListSplitsResponse`] and deserialize its payload.
#[async_trait]
pub trait ListSplitsResponseExt {
/// Creates a new [`ListSplitsResponse`] from a list of [`Split`].
fn try_from_splits(
Expand All @@ -518,27 +519,15 @@ pub trait ListSplitsResponseExt {

/// Deserializes the `splits_serialized_json` field of a [`ListSplitsResponse`] into a list of
/// [`Split`].
fn deserialize_splits(&self) -> MetastoreResult<Vec<Split>>;
async fn deserialize_splits(self) -> MetastoreResult<Vec<Split>>;

/// Deserializes the `splits_serialized_json` field of a [`ListSplitsResponse`] into a list of
/// [`SplitMetadata`].
fn deserialize_splits_metadata(&self) -> MetastoreResult<Vec<SplitMetadata>> {
let splits = self.deserialize_splits()?;
Ok(splits
.into_iter()
.map(|split| split.split_metadata)
.collect())
}
async fn deserialize_splits_metadata(self) -> MetastoreResult<Vec<SplitMetadata>>;

/// Deserializes the `splits_serialized_json` field of a [`ListSplitsResponse`] into a list of
/// [`SplitId`].
fn deserialize_split_ids(&self) -> MetastoreResult<Vec<SplitId>> {
let splits = self.deserialize_splits()?;
Ok(splits
.into_iter()
.map(|split| split.split_metadata.split_id)
.collect())
}
async fn deserialize_split_ids(self) -> MetastoreResult<Vec<SplitId>>;

/// Creates an empty [`ListSplitsResponse`].
fn empty() -> Self;
Expand All @@ -560,6 +549,7 @@ impl PublishSplitsRequestExt for PublishSplitsRequest {
}
}

#[async_trait]
impl ListSplitsResponseExt for ListSplitsResponse {
fn empty() -> Self {
Self {
Expand All @@ -575,8 +565,31 @@ impl ListSplitsResponseExt for ListSplitsResponse {
Ok(request)
}

fn deserialize_splits(&self) -> MetastoreResult<Vec<Split>> {
serde_utils::from_json_str(&self.splits_serialized_json)
async fn deserialize_splits(self) -> MetastoreResult<Vec<Split>> {
run_cpu_intensive(move || serde_utils::from_json_str(&self.splits_serialized_json))
.await
.map_err(|join_error| MetastoreError::Internal {
message: "failed to deserialize splits".to_string(),
cause: join_error.to_string(),
})?
}

async fn deserialize_splits_metadata(self) -> MetastoreResult<Vec<SplitMetadata>> {
let splits = self.deserialize_splits().await?;
let splits_metadata = splits
.into_iter()
.map(|split| split.split_metadata)
.collect();
Ok(splits_metadata)
}

async fn deserialize_split_ids(self) -> MetastoreResult<Vec<SplitId>> {
let splits = self.deserialize_splits().await?;
let split_ids = splits
.into_iter()
.map(|split| split.split_metadata.split_id)
.collect();
Ok(split_ids)
}
}

Expand Down Expand Up @@ -1014,10 +1027,11 @@ mod tests {
assert!(!filter.overlaps_with(75..=124));
}

#[test]
fn test_list_splits_response_empty() {
#[tokio::test]
async fn test_list_splits_response_empty() {
let response = ListSplitsResponse::empty();
assert_eq!(response.deserialize_splits().unwrap(), Vec::new());
let splits = response.deserialize_splits().await.unwrap();
assert!(splits.is_empty());
}

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,12 +569,12 @@ impl MetastoreService for PostgresqlMetastore {

#[instrument(skip_all, fields(split_ids))]
async fn stage_splits(&self, request: StageSplitsRequest) -> MetastoreResult<EmptyResponse> {
let index_uid: IndexUid = request.index_uid().clone();
let splits_metadata = request.deserialize_splits_metadata()?;

if splits_metadata.is_empty() {
return Ok(Default::default());
}
let index_uid: IndexUid = request.index_uid().clone();
let mut split_ids = Vec::with_capacity(splits_metadata.len());
let mut time_range_start_list = Vec::with_capacity(splits_metadata.len());
let mut time_range_end_list = Vec::with_capacity(splits_metadata.len());
Expand Down
6 changes: 6 additions & 0 deletions quickwit/quickwit-metastore/src/tests/list_splits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub async fn test_metastore_stream_splits<MetastoreToTest: MetastoreServiceExt +
.unwrap()
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert_eq!(splits.len(), 100);
all_splits.append(&mut splits);
Expand Down Expand Up @@ -1025,6 +1026,7 @@ pub async fn test_metastore_list_stale_splits<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert!(no_splits.is_empty());

Expand Down Expand Up @@ -1090,6 +1092,7 @@ pub async fn test_metastore_list_stale_splits<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(
Expand All @@ -1107,6 +1110,7 @@ pub async fn test_metastore_list_stale_splits<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert_eq!(splits.len(), 3);
assert_eq!(splits[0].split_id(), split_metadata_2.split_id());
Expand All @@ -1127,6 +1131,7 @@ pub async fn test_metastore_list_stale_splits<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert_eq!(splits.len(), 1);
assert_eq!(
Expand All @@ -1144,6 +1149,7 @@ pub async fn test_metastore_list_stale_splits<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert!(splits.is_empty());
cleanup_index(&mut metastore, index_uid).await;
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-metastore/src/tests/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,7 @@ pub async fn test_metastore_update_splits_delete_opstamp<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert_eq!(splits.len(), 2);

Expand All @@ -1787,6 +1788,7 @@ pub async fn test_metastore_update_splits_delete_opstamp<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert_eq!(splits.len(), 0);

Expand All @@ -1800,6 +1802,7 @@ pub async fn test_metastore_update_splits_delete_opstamp<
.await
.unwrap()
.deserialize_splits()
.await
.unwrap();
assert_eq!(splits.len(), 2);
assert_eq!(splits[0].split_metadata.delete_opstamp, 100);
Expand Down

0 comments on commit 80ee07c

Please sign in to comment.