From a70b101ad30cb02c28ac6d46cdd0afae66e3af01 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 4 Oct 2024 17:49:44 +0900 Subject: [PATCH] Introduce an explicit message to terminate merge pipelines, and introducing finalization merge policy. --- .../quickwit-config/src/index_config/mod.rs | 3 + .../src/merge_policy_config.rs | 4 ++ .../src/actors/indexing_pipeline.rs | 3 +- .../src/actors/indexing_service.rs | 11 +++- .../src/actors/merge_pipeline.rs | 56 +++++++++++++++- .../src/actors/merge_planner.rs | 66 ++++++++++++++++--- .../quickwit-indexing/src/actors/publisher.rs | 20 ++++++ .../quickwit-indexing/src/merge_policy/mod.rs | 14 +++- .../src/actors/delete_task_pipeline.rs | 5 +- 9 files changed, 165 insertions(+), 17 deletions(-) diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 60ea2a9429e..8ef52c80656 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -121,6 +121,8 @@ pub struct IndexingSettings { pub merge_policy: MergePolicyConfig, #[serde(default)] pub resources: IndexingResources, + #[serde(default = "MergePolicyConfig::noop")] + pub finalize_merge_policy: MergePolicyConfig, } impl IndexingSettings { @@ -161,6 +163,7 @@ impl Default for IndexingSettings { docstore_compression_level: Self::default_docstore_compression_level(), split_num_docs_target: Self::default_split_num_docs_target(), merge_policy: MergePolicyConfig::default(), + finalize_merge_policy: MergePolicyConfig::noop(), resources: IndexingResources::default(), } } diff --git a/quickwit/quickwit-config/src/merge_policy_config.rs b/quickwit/quickwit-config/src/merge_policy_config.rs index 2f96ed49d78..757af7571b5 100644 --- a/quickwit/quickwit-config/src/merge_policy_config.rs +++ b/quickwit/quickwit-config/src/merge_policy_config.rs @@ -146,6 +146,10 @@ impl Default for MergePolicyConfig { } impl MergePolicyConfig { + pub fn noop() -> Self { + MergePolicyConfig::Nop + } + pub fn validate(&self) -> anyhow::Result<()> { let (merge_factor, max_merge_factor) = match self { MergePolicyConfig::Nop => { diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 7ab58bb873f..643ed0498cb 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -619,7 +619,7 @@ mod tests { use super::{IndexingPipeline, *}; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; - use crate::merge_policy::default_merge_policy; + use crate::merge_policy::{default_merge_policy, NopMergePolicy}; #[test] fn test_wait_duration() { @@ -908,6 +908,7 @@ mod tests { metastore: metastore.clone(), split_store: split_store.clone(), merge_policy: default_merge_policy(), + finalize_merge_policy: Arc::new(NopMergePolicy), max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, merge_scheduler_service: universe.get_or_spawn_one(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 26756c92086..7654c9e55e2 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -284,8 +284,14 @@ impl IndexingService { let message = format!("failed to spawn indexing pipeline: {error}"); IndexingError::Internal(message) })?; - let merge_policy = - crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings); + let merge_policy = crate::merge_policy::merge_policy_from_settings( + index_config.indexing_settings.merge_policy.clone(), + &index_config.indexing_settings, + ); + let finalize_merge_policy = crate::merge_policy::merge_policy_from_settings( + index_config.indexing_settings.finalize_merge_policy.clone(), + &index_config.indexing_settings, + ); let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) @@ -300,6 +306,7 @@ impl IndexingService { split_store: split_store.clone(), merge_scheduler_service: self.merge_scheduler_service.clone(), merge_policy: merge_policy.clone(), + finalize_merge_policy, merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index a539329a833..61f8fa194c7 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -42,6 +42,8 @@ use time::OffsetDateTime; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; +use super::merge_planner::RunFinalizeMergePolicy; +use super::publisher::DisconnectMergePlanner; use super::MergeSchedulerService; use crate::actors::indexing_pipeline::wait_duration_before_retry; use crate::actors::merge_split_downloader::MergeSplitDownloader; @@ -56,6 +58,22 @@ use crate::split_store::IndexingSplitStore; /// concurrently. static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10); +/// Instructs the merge pipeline that it should stops itself. +/// Merge that have already been scheduled are not aborted. +/// +/// In addition, the finalizer merge policy will be executed to schedule a few +/// additional merges. +/// +/// After reception the `FinalizeAndClosePipeline`, the merge pipeline loop will +/// be disconnected. In other words, the connection from the merge publisher to +/// the merge planner will be cut, so that the merge pipeline will terminate naturally. +/// +/// Supervisation will still exist. However it will not restart the pipeline +/// in case of failure, it will just kill all of the merge pipeline actors. (for +/// instance, if one of the actor is stuck). +#[derive(Debug, Clone, Copy)] +pub struct FinishPendingMergesAndShutdownPipeline; + struct MergePipelineHandles { merge_planner: ActorHandle, merge_split_downloader: ActorHandle, @@ -96,6 +114,8 @@ pub struct MergePipeline { kill_switch: KillSwitch, /// Immature splits passed to the merge planner the first time the pipeline is spawned. initial_immature_splits_opt: Option>, + // After it is set to true, we don't respawn. + shutdown_initiated: bool, } #[async_trait] @@ -141,6 +161,7 @@ impl MergePipeline { merge_planner_inbox, merge_planner_mailbox, initial_immature_splits_opt, + shutdown_initiated: false, } } @@ -331,6 +352,7 @@ impl MergePipeline { &self.params.pipeline_id, immature_splits, self.params.merge_policy.clone(), + self.params.finalize_merge_policy.clone(), merge_split_downloader_mailbox, self.params.merge_scheduler_service.clone(), ); @@ -467,6 +489,33 @@ impl Handler for MergePipeline { } } +#[async_trait] +impl Handler for MergePipeline { + type Reply = (); + async fn handle( + &mut self, + _: FinishPendingMergesAndShutdownPipeline, + _ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + // From now on, we will not respawn the pipeline if it fails. + self.shutdown_initiated = true; + if let Some(handles) = &self.handles_opt { + let _ = handles + .merge_publisher + .mailbox() + .send_message(DisconnectMergePlanner); + let _ = handles + .merge_planner + .mailbox() + .send_message(RunFinalizeMergePolicy); + } else { + // we won't respawn the pipeline in the future, so there is nothing + // to do here. + } + Ok(()) + } +} + #[async_trait] impl Handler for MergePipeline { type Reply = (); @@ -476,6 +525,9 @@ impl Handler for MergePipeline { spawn: Spawn, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + if self.shutdown_initiated { + return Ok(()); + } if self.handles_opt.is_some() { return Ok(()); } @@ -509,6 +561,7 @@ pub struct MergePipelineParams { pub merge_scheduler_service: Mailbox, pub split_store: IndexingSplitStore, pub merge_policy: Arc, + pub finalize_merge_policy: Arc, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. pub merge_io_throughput_limiter_opt: Option, pub event_broker: EventBroker, @@ -530,7 +583,7 @@ mod tests { use quickwit_storage::RamStorage; use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; - use crate::merge_policy::default_merge_policy; + use crate::merge_policy::{default_merge_policy, nop_merge_policy}; use crate::IndexingSplitStore; #[tokio::test] @@ -571,6 +624,7 @@ mod tests { merge_scheduler_service: universe.get_or_spawn_one(), split_store, merge_policy: default_merge_policy(), + finalize_merge_policy: nop_merge_policy(), max_concurrent_split_uploads: 2, merge_io_throughput_limiter_opt: None, event_broker: Default::default(), diff --git a/quickwit/quickwit-indexing/src/actors/merge_planner.rs b/quickwit/quickwit-indexing/src/actors/merge_planner.rs index 7b04513056d..7cb692affc6 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_planner.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_planner.rs @@ -38,6 +38,9 @@ use crate::merge_policy::MergeOperation; use crate::models::NewSplits; use crate::MergePolicy; +#[derive(Debug)] +pub(crate) struct RunFinalizeMergePolicy; + #[derive(Debug, Clone, PartialEq, Eq, Hash)] struct MergePartition { partition_id: u64, @@ -78,6 +81,8 @@ pub struct MergePlanner { known_split_ids_recompute_attempt_id: usize, merge_policy: Arc, + finalize_merge_policy: Arc, + merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, @@ -126,6 +131,22 @@ impl Actor for MergePlanner { } } +#[async_trait] +impl Handler for MergePlanner { + type Reply = (); + + async fn handle( + &mut self, + _plan_merge: RunFinalizeMergePolicy, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + // Note we ignore messages that could be coming from a different incarnation. + // (See comment on `Self::incarnation_start_at`.) + self.send_merge_ops(true, ctx).await?; + Err(ActorExitStatus::Success) + } +} + #[async_trait] impl Handler for MergePlanner { type Reply = (); @@ -138,7 +159,7 @@ impl Handler for MergePlanner { if plan_merge.incarnation_started_at == self.incarnation_started_at { // Note we ignore messages that could be coming from a different incarnation. // (See comment on `Self::incarnation_start_at`.) - self.send_merge_ops(ctx).await?; + self.send_merge_ops(false, ctx).await?; } self.recompute_known_splits_if_necessary(); Ok(()) @@ -155,7 +176,7 @@ impl Handler for MergePlanner { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.record_splits_if_necessary(new_splits.new_splits); - self.send_merge_ops(ctx).await?; + self.send_merge_ops(false, ctx).await?; self.recompute_known_splits_if_necessary(); Ok(()) } @@ -172,6 +193,7 @@ impl MergePlanner { pipeline_id: &MergePipelineId, immature_splits: Vec, merge_policy: Arc, + finalize_merge_policy: Arc, merge_split_downloader_mailbox: Mailbox, merge_scheduler_service: Mailbox, ) -> MergePlanner { @@ -184,6 +206,7 @@ impl MergePlanner { known_split_ids_recompute_attempt_id: 0, partitioned_young_splits: Default::default(), merge_policy, + finalize_merge_policy, merge_split_downloader_mailbox, merge_scheduler_service, ongoing_merge_operations_inventory: Inventory::default(), @@ -273,12 +296,18 @@ impl MergePlanner { } async fn compute_merge_ops( &mut self, + is_finalize: bool, ctx: &ActorContext, ) -> Result, ActorExitStatus> { let mut merge_operations = Vec::new(); for young_splits in self.partitioned_young_splits.values_mut() { if !young_splits.is_empty() { - merge_operations.extend(self.merge_policy.operations(young_splits)); + let merge_policy = if is_finalize { + &self.finalize_merge_policy + } else { + &self.merge_policy + }; + merge_operations.extend(merge_policy.operations(young_splits)); } ctx.record_progress(); ctx.yield_now().await; @@ -289,13 +318,17 @@ impl MergePlanner { Ok(merge_operations) } - async fn send_merge_ops(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + async fn send_merge_ops( + &mut self, + is_finalize: bool, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { // We identify all of the merge operations we want to run and leave it // to the merge scheduler to decide in which order these should be scheduled. // // The merge scheduler has the merit of knowing about merge operations from other // index as well. - let merge_ops = self.compute_merge_ops(ctx).await?; + let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?; for merge_operation in merge_ops { info!(merge_operation=?merge_operation, "schedule merge operation"); let tracked_merge_operation = self @@ -347,7 +380,7 @@ mod tests { use crate::actors::MergePlanner; use crate::merge_policy::{ - merge_policy_from_settings, MergePolicy, MergeTask, StableLogMergePolicy, + merge_policy_from_settings, MergePolicy, MergeTask, NopMergePolicy, StableLogMergePolicy, }; use crate::models::NewSplits; @@ -408,6 +441,7 @@ mod tests { &pipeline_id, Vec::new(), merge_policy, + Arc::new(NopMergePolicy), merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -528,11 +562,15 @@ mod tests { 2, ), ]; - let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); + let merge_policy: Arc = + merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); + let finalize_merge_policy: Arc = + merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, + finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -620,11 +658,17 @@ mod tests { 2, ), ]; - let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); + let merge_policy: Arc = + merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); + let finalize_merge_policy: Arc = merge_policy_from_settings( + indexing_settings.finalize_merge_policy.clone(), + &indexing_settings, + ); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, + finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); @@ -685,11 +729,15 @@ mod tests { 2, ), ]; - let merge_policy: Arc = merge_policy_from_settings(&indexing_settings); + let merge_policy: Arc = + merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); + let finalize_merge_policy: Arc = + merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings); let merge_planner = MergePlanner::new( &pipeline_id, immature_splits.clone(), merge_policy, + finalize_merge_policy, merge_split_downloader_mailbox, universe.get_or_spawn_one(), ); diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 4e999e7f7a2..481c4971b71 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -51,6 +51,11 @@ impl PublisherType { } } +/// Disconnect the merge planner loop back. +/// This message is used to cut the merge pipeline loop, and let it terminate. +#[derive(Debug)] +pub(crate) struct DisconnectMergePlanner; + #[derive(Clone)] pub struct Publisher { publisher_type: PublisherType, @@ -97,6 +102,21 @@ impl Actor for Publisher { } } +#[async_trait] +impl Handler for Publisher { + type Reply = (); + + async fn handle( + &mut self, + _: DisconnectMergePlanner, + _ctx: &ActorContext, + ) -> Result<(), quickwit_actors::ActorExitStatus> { + info!("disconnecting merge planner mailbox."); + self.merge_planner_mailbox_opt.take(); + Ok(()) + } +} + #[async_trait] impl Handler for Publisher { type Reply = (); diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 69fccbbede0..95622a11460 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -166,8 +166,10 @@ pub trait MergePolicy: Send + Sync + fmt::Debug { fn check_is_valid(&self, _merge_op: &MergeOperation, _remaining_splits: &[SplitMetadata]) {} } -pub fn merge_policy_from_settings(settings: &IndexingSettings) -> Arc { - let merge_policy_config = settings.merge_policy.clone(); +pub fn merge_policy_from_settings( + merge_policy_config: MergePolicyConfig, + settings: &IndexingSettings, +) -> Arc { match merge_policy_config { MergePolicyConfig::Nop => Arc::new(NopMergePolicy), MergePolicyConfig::ConstWriteAmplification(config) => { @@ -183,7 +185,12 @@ pub fn merge_policy_from_settings(settings: &IndexingSettings) -> Arc Arc { - merge_policy_from_settings(&IndexingSettings::default()) + let indexing_settings = IndexingSettings::default(); + merge_policy_from_settings(indexing_settings.merge_policy.clone(), &indexing_settings) +} + +pub fn nop_merge_policy() -> Arc { + Arc::new(NopMergePolicy) } struct SplitShortDebug<'a>(&'a SplitMetadata); @@ -414,6 +421,7 @@ pub mod tests { &pipeline_id.merge_pipeline_id(), Vec::new(), merge_policy.clone(), + Arc::new(NopMergePolicy), merge_task_mailbox, universe.get_or_spawn_one::(), ); diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 5c25c7ae1c7..072f7338af3 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -176,7 +176,10 @@ impl DeleteTaskPipeline { ctx.spawn_actor().supervise(publisher); let split_store = IndexingSplitStore::create_without_local_store_for_test(self.index_storage.clone()); - let merge_policy = merge_policy_from_settings(&index_config.indexing_settings); + let merge_policy = merge_policy_from_settings( + index_config.indexing_settings.merge_policy.clone(), + &index_config.indexing_settings, + ); let uploader = Uploader::new( UploaderType::DeleteUploader, self.metastore.clone(),