diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index df71cc92ea4..26756c92086 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -27,8 +27,8 @@ use async_trait::async_trait; use futures::TryStreamExt; use itertools::Itertools; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox, - Observation, + Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Command, Handler, Healthz, + Mailbox, Observation, }; use quickwit_cluster::Cluster; use quickwit_common::fs::get_cache_directory_path; @@ -504,21 +504,27 @@ impl IndexingService { .merge_pipeline_handles .remove_entry(&merge_pipeline_to_shutdown) { - // We kill the merge pipeline to avoid waiting a merge operation to finish as it can - // be long. + // We gracefully shutdown the merge pipeline, so we can complete the in-flight + // merges. info!( index_uid=%merge_pipeline_to_shutdown.index_uid, source_id=%merge_pipeline_to_shutdown.source_id, - "no more indexing pipeline on this index and source, killing merge pipeline" + "shutting down orphan merge pipeline" ); - merge_pipeline_handle.handle.kill().await; + // The queue capacity of the merge pipeline is unbounded, so `.send_message(...)` + // should not block. + // We avoid using `.quit()` here because it waits for the actor to exit. + merge_pipeline_handle + .handle + .mailbox() + .send_message(Command::Quit) + .await + .expect("merge pipeline mailbox should not be full"); } } - // Finally remove the merge pipeline with an exit status. + // Finally, we remove the completed or failed merge pipelines. self.merge_pipeline_handles - .retain(|_, merge_pipeline_mailbox_handle| { - merge_pipeline_mailbox_handle.handle.state().is_running() - }); + .retain(|_, merge_pipeline_handle| merge_pipeline_handle.handle.state().is_running()); self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len(); self.update_chitchat_running_plan().await; @@ -543,23 +549,23 @@ impl IndexingService { immature_splits_opt: Option>, ctx: &ActorContext, ) -> Result, IndexingError> { - if let Some(merge_pipeline_mailbox_handle) = self + if let Some(merge_pipeline_handle) = self .merge_pipeline_handles .get(&merge_pipeline_params.pipeline_id) { - return Ok(merge_pipeline_mailbox_handle.mailbox.clone()); + return Ok(merge_pipeline_handle.mailbox.clone()); } let merge_pipeline_id = merge_pipeline_params.pipeline_id.clone(); let merge_pipeline = MergePipeline::new(merge_pipeline_params, immature_splits_opt, ctx.spawn_ctx()); let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline); - let merge_pipeline_mailbox_handle = MergePipelineHandle { + let merge_pipeline_handle = MergePipelineHandle { mailbox: merge_planner_mailbox.clone(), handle: pipeline_handle, }; self.merge_pipeline_handles - .insert(merge_pipeline_id, merge_pipeline_mailbox_handle); + .insert(merge_pipeline_id, merge_pipeline_handle); self.counters.num_running_merge_pipelines += 1; Ok(merge_planner_mailbox) } diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 2e3475f5759..a539329a833 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -251,7 +251,7 @@ impl MergePipeline { Some(self.merge_planner_mailbox.clone()), None, ); - let (merge_publisher_mailbox, merge_publisher_handler) = ctx + let (merge_publisher_mailbox, merge_publisher_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_backpressure_micros_counter( @@ -271,7 +271,7 @@ impl MergePipeline { self.params.max_concurrent_split_uploads, self.params.event_broker.clone(), ); - let (merge_uploader_mailbox, merge_uploader_handler) = ctx + let (merge_uploader_mailbox, merge_uploader_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .spawn(merge_uploader); @@ -279,7 +279,7 @@ impl MergePipeline { // Merge Packager let tag_fields = self.params.doc_mapper.tag_named_fields()?; let merge_packager = Packager::new("MergePackager", tag_fields, merge_uploader_mailbox); - let (merge_packager_mailbox, merge_packager_handler) = ctx + let (merge_packager_mailbox, merge_packager_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .spawn(merge_packager); @@ -300,7 +300,7 @@ impl MergePipeline { merge_executor_io_controls, merge_packager_mailbox, ); - let (merge_executor_mailbox, merge_executor_handler) = ctx + let (merge_executor_mailbox, merge_executor_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_backpressure_micros_counter( @@ -316,7 +316,7 @@ impl MergePipeline { executor_mailbox: merge_executor_mailbox, io_controls: split_downloader_io_controls, }; - let (merge_split_downloader_mailbox, merge_split_downloader_handler) = ctx + let (merge_split_downloader_mailbox, merge_split_downloader_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_backpressure_micros_counter( @@ -334,7 +334,7 @@ impl MergePipeline { merge_split_downloader_mailbox, self.params.merge_scheduler_service.clone(), ); - let (_, merge_planner_handler) = ctx + let (_, merge_planner_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) .set_mailboxes( @@ -346,12 +346,12 @@ impl MergePipeline { self.previous_generations_statistics = self.statistics.clone(); self.statistics.generation += 1; self.handles_opt = Some(MergePipelineHandles { - merge_planner: merge_planner_handler, - merge_split_downloader: merge_split_downloader_handler, - merge_executor: merge_executor_handler, - merge_packager: merge_packager_handler, - merge_uploader: merge_uploader_handler, - merge_publisher: merge_publisher_handler, + merge_planner: merge_planner_handle, + merge_split_downloader: merge_split_downloader_handle, + merge_executor: merge_executor_handle, + merge_packager: merge_packager_handle, + merge_uploader: merge_uploader_handle, + merge_publisher: merge_publisher_handle, next_check_for_progress: Instant::now() + *HEARTBEAT, }); Ok(()) @@ -359,14 +359,14 @@ impl MergePipeline { async fn terminate(&mut self) { self.kill_switch.kill(); - if let Some(handlers) = self.handles_opt.take() { + if let Some(handles) = self.handles_opt.take() { tokio::join!( - handlers.merge_planner.kill(), - handlers.merge_split_downloader.kill(), - handlers.merge_executor.kill(), - handlers.merge_packager.kill(), - handlers.merge_uploader.kill(), - handlers.merge_publisher.kill(), + handles.merge_planner.kill(), + handles.merge_split_downloader.kill(), + handles.merge_executor.kill(), + handles.merge_packager.kill(), + handles.merge_uploader.kill(), + handles.merge_publisher.kill(), ); } } @@ -576,8 +576,8 @@ mod tests { event_broker: Default::default(), }; let pipeline = MergePipeline::new(pipeline_params, None, universe.spawn_ctx()); - let (_pipeline_mailbox, pipeline_handler) = universe.spawn_builder().spawn(pipeline); - let (pipeline_exit_status, pipeline_statistics) = pipeline_handler.quit().await; + let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); + let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.quit().await; assert_eq!(pipeline_statistics.generation, 1); assert_eq!(pipeline_statistics.num_spawn_attempts, 1); assert_eq!(pipeline_statistics.num_published_splits, 0);