From 80ee07c5d3a3c4b81532061f5505dfbd0d58de05 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Mon, 26 Aug 2024 12:53:55 +0200 Subject: [PATCH] Deserialize splits outside of tokio runtime --- .../src/actors/indexing_service.rs | 2 +- .../src/actors/delete_task_planner.rs | 3 +- .../src/metastore/file_backed/mod.rs | 4 +- .../quickwit-metastore/src/metastore/mod.rs | 66 +++++++++++-------- .../src/metastore/postgres/metastore.rs | 2 +- .../src/tests/list_splits.rs | 6 ++ .../quickwit-metastore/src/tests/split.rs | 3 + 7 files changed, 55 insertions(+), 31 deletions(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index aa8d616bd35..87d827dd2e7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -435,7 +435,7 @@ impl IndexingService { let mut num_immature_splits = 0usize; while let Some(list_splits_response) = immature_splits_stream.try_next().await? { - for split_metadata in list_splits_response.deserialize_splits_metadata()? { + for split_metadata in list_splits_response.deserialize_splits_metadata().await? { num_immature_splits += 1; let merge_pipeline_id = MergePipelineId { diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index a8adce51b0b..4166634c072 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -357,7 +357,8 @@ impl DeleteTaskPlanner { let stale_splits = ctx .protect_future(self.metastore.list_stale_splits(list_stale_splits_request)) .await? - .deserialize_splits()?; + .deserialize_splits() + .await?; debug!( index_id = index_uid.index_id, last_delete_opstamp = last_delete_opstamp, diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs index 1fbfd57e08d..9780cbc386e 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs @@ -582,10 +582,10 @@ impl MetastoreService for FileBackedMetastore { /// Mutations over a single index async fn stage_splits(&self, request: StageSplitsRequest) -> MetastoreResult { + let index_uid = request.index_uid().clone(); let splits_metadata = request.deserialize_splits_metadata()?; - let index_uid = request.index_uid(); - self.mutate(index_uid, |index| { + self.mutate(&index_uid, |index| { let mut failed_split_ids = Vec::new(); for split_metadata in splits_metadata { diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 89822b549cc..148d8de342e 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -87,7 +87,7 @@ impl MetastoreServiceStreamSplitsExt for MetastoreServiceStream MetastoreResult> { let mut all_splits = Vec::new(); while let Some(list_splits_response) = self.try_next().await? { - let splits = list_splits_response.deserialize_splits()?; + let splits = list_splits_response.deserialize_splits().await?; all_splits.extend(splits); } Ok(all_splits) @@ -96,7 +96,7 @@ impl MetastoreServiceStreamSplitsExt for MetastoreServiceStream MetastoreResult> { let mut all_splits_metadata = Vec::new(); while let Some(list_splits_response) = self.try_next().await? { - let splits_metadata = list_splits_response.deserialize_splits_metadata()?; + let splits_metadata = list_splits_response.deserialize_splits_metadata().await?; all_splits_metadata.extend(splits_metadata); } Ok(all_splits_metadata) @@ -105,7 +105,7 @@ impl MetastoreServiceStreamSplitsExt for MetastoreServiceStream MetastoreResult> { let mut all_splits = Vec::new(); while let Some(list_splits_response) = self.try_next().await? { - let splits = list_splits_response.deserialize_split_ids()?; + let splits = list_splits_response.deserialize_split_ids().await?; all_splits.extend(splits); } Ok(all_splits) @@ -414,7 +414,7 @@ impl AddSourceRequestExt for AddSourceRequest { ) -> MetastoreResult { let source_config_json = serde_utils::to_json_str(&source_config)?; let request = Self { - index_uid: index_uid.into().into(), + index_uid: Some(index_uid.into()), source_config_json, }; Ok(request) @@ -451,7 +451,7 @@ impl StageSplitsRequestExt for StageSplitsRequest { ) -> MetastoreResult { let split_metadata_list_serialized_json = serde_utils::to_json_str(&[split_metadata])?; let request = Self { - index_uid: index_uid.into().into(), + index_uid: Some(index_uid.into()), split_metadata_list_serialized_json, }; Ok(request) @@ -464,7 +464,7 @@ impl StageSplitsRequestExt for StageSplitsRequest { let splits_metadata: Vec = splits_metadata.into_iter().collect(); let split_metadata_list_serialized_json = serde_utils::to_json_str(&splits_metadata)?; let request = Self { - index_uid: index_uid.into().into(), + index_uid: Some(index_uid.into()), split_metadata_list_serialized_json, }; Ok(request) @@ -510,6 +510,7 @@ impl ListSplitsRequestExt for ListSplitsRequest { } /// Helper trait to build a [`ListSplitsResponse`] and deserialize its payload. +#[async_trait] pub trait ListSplitsResponseExt { /// Creates a new [`ListSplitsResponse`] from a list of [`Split`]. fn try_from_splits( @@ -518,27 +519,15 @@ pub trait ListSplitsResponseExt { /// Deserializes the `splits_serialized_json` field of a [`ListSplitsResponse`] into a list of /// [`Split`]. - fn deserialize_splits(&self) -> MetastoreResult>; + async fn deserialize_splits(self) -> MetastoreResult>; /// Deserializes the `splits_serialized_json` field of a [`ListSplitsResponse`] into a list of /// [`SplitMetadata`]. - fn deserialize_splits_metadata(&self) -> MetastoreResult> { - let splits = self.deserialize_splits()?; - Ok(splits - .into_iter() - .map(|split| split.split_metadata) - .collect()) - } + async fn deserialize_splits_metadata(self) -> MetastoreResult>; /// Deserializes the `splits_serialized_json` field of a [`ListSplitsResponse`] into a list of /// [`SplitId`]. - fn deserialize_split_ids(&self) -> MetastoreResult> { - let splits = self.deserialize_splits()?; - Ok(splits - .into_iter() - .map(|split| split.split_metadata.split_id) - .collect()) - } + async fn deserialize_split_ids(self) -> MetastoreResult>; /// Creates an empty [`ListSplitsResponse`]. fn empty() -> Self; @@ -560,6 +549,7 @@ impl PublishSplitsRequestExt for PublishSplitsRequest { } } +#[async_trait] impl ListSplitsResponseExt for ListSplitsResponse { fn empty() -> Self { Self { @@ -575,8 +565,31 @@ impl ListSplitsResponseExt for ListSplitsResponse { Ok(request) } - fn deserialize_splits(&self) -> MetastoreResult> { - serde_utils::from_json_str(&self.splits_serialized_json) + async fn deserialize_splits(self) -> MetastoreResult> { + run_cpu_intensive(move || serde_utils::from_json_str(&self.splits_serialized_json)) + .await + .map_err(|join_error| MetastoreError::Internal { + message: "failed to deserialize splits".to_string(), + cause: join_error.to_string(), + })? + } + + async fn deserialize_splits_metadata(self) -> MetastoreResult> { + let splits = self.deserialize_splits().await?; + let splits_metadata = splits + .into_iter() + .map(|split| split.split_metadata) + .collect(); + Ok(splits_metadata) + } + + async fn deserialize_split_ids(self) -> MetastoreResult> { + let splits = self.deserialize_splits().await?; + let split_ids = splits + .into_iter() + .map(|split| split.split_metadata.split_id) + .collect(); + Ok(split_ids) } } @@ -1014,10 +1027,11 @@ mod tests { assert!(!filter.overlaps_with(75..=124)); } - #[test] - fn test_list_splits_response_empty() { + #[tokio::test] + async fn test_list_splits_response_empty() { let response = ListSplitsResponse::empty(); - assert_eq!(response.deserialize_splits().unwrap(), Vec::new()); + let splits = response.deserialize_splits().await.unwrap(); + assert!(splits.is_empty()); } #[tokio::test] diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 7d916fcd29f..831551c6a97 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -569,12 +569,12 @@ impl MetastoreService for PostgresqlMetastore { #[instrument(skip_all, fields(split_ids))] async fn stage_splits(&self, request: StageSplitsRequest) -> MetastoreResult { + let index_uid: IndexUid = request.index_uid().clone(); let splits_metadata = request.deserialize_splits_metadata()?; if splits_metadata.is_empty() { return Ok(Default::default()); } - let index_uid: IndexUid = request.index_uid().clone(); let mut split_ids = Vec::with_capacity(splits_metadata.len()); let mut time_range_start_list = Vec::with_capacity(splits_metadata.len()); let mut time_range_end_list = Vec::with_capacity(splits_metadata.len()); diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index e2955db2fc0..cd1cc1712f3 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -225,6 +225,7 @@ pub async fn test_metastore_stream_splits