From f1cfeab86c77475732a781e9a08b609bcd5519e8 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 17 Aug 2023 13:26:38 +0900 Subject: [PATCH] Prevents `Observe` message emitted by Supervisors from stacking up. This introduce a refresh observation method, that does not wait for anything but just triggers an eventual refresh of the last observation. This method includes debouncing logic to avoid stacking up messages. We then use it in supervisors, which the other merit of making the Supervise handler return much faster. Before we were waiting for the observation to timeout. Closes #3485 --- quickwit/quickwit-actors/src/actor_context.rs | 12 +++ quickwit/quickwit-actors/src/actor_handle.rs | 78 +++++++++++++++++++ quickwit/quickwit-actors/src/mailbox.rs | 2 +- quickwit/quickwit-cli/src/index.rs | 16 ++-- quickwit/quickwit-cli/src/tool.rs | 21 +++-- .../src/actors/indexing_pipeline.rs | 19 ++--- .../src/actors/merge_pipeline.rs | 22 ++++-- .../src/actors/delete_task_pipeline.rs | 35 ++++----- 8 files changed, 143 insertions(+), 62 deletions(-) diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index 7d11fc353fc..e3d6795499f 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -21,6 +21,7 @@ use std::convert::Infallible; use std::fmt; use std::future::Future; use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -67,6 +68,8 @@ pub struct ActorContextInner { actor_state: AtomicState, backpressure_micros_counter_opt: Option, observable_state_tx: watch::Sender, + // Counter that gets incremented after each observation. + observe_enqueued: AtomicBool, } impl ActorContext { @@ -84,6 +87,7 @@ impl ActorContext { actor_state: AtomicState::default(), observable_state_tx, backpressure_micros_counter_opt, + observe_enqueued: AtomicBool::new(false), } .into(), } @@ -203,8 +207,16 @@ impl ActorContext { self.actor_state.resume(); } + /// Sets the queue as observed and returns the previous value. + /// This method is used to make sure we do not have Observe messages + /// stacking up in the observe queue. + pub(crate) fn set_observe_enqueued_and_return_previous(&self) -> bool { + self.observe_enqueued.swap(true, Ordering::Relaxed) + } + pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState { let obs_state = actor.observable_state(); + self.inner.observe_enqueued.store(false, Ordering::Relaxed); let _ = self.observable_state_tx.send(obs_state.clone()); obs_state } diff --git a/quickwit/quickwit-actors/src/actor_handle.rs b/quickwit/quickwit-actors/src/actor_handle.rs index 276230dab4f..77b3ed9f3a8 100644 --- a/quickwit/quickwit-actors/src/actor_handle.rs +++ b/quickwit/quickwit-actors/src/actor_handle.rs @@ -144,10 +144,35 @@ impl ActorHandle { /// /// The observation will be scheduled as a high priority message, therefore it will be executed /// after the current active message and the current command queue have been processed. + /// + /// This method does not do anything to avoid Observe messages from stacking up. + /// In supervisors, prefer using `refresh_observation`. pub async fn observe(&self) -> Observation { self.observe_with_priority(Priority::High).await } + // Triggers an observation. + // It is scheduled as a high priority + // message, and will hence be executed as soon as possible. + // + // This method does not enqueue an Observe requests if one is already enqueue. + // + // The resulting observation can eventually be accessible using the + // observation watch channel. + // + // This function returning does NOT mean that the observation was executed. + pub fn refresh_observe(&self) { + let observation_already_enqueued = self + .actor_context + .set_observe_enqueued_and_return_previous(); + if !observation_already_enqueued { + let _ = self + .actor_context + .mailbox() + .send_message_with_high_priority(Observe); + } + } + async fn observe_with_priority(&self, priority: Priority) -> Observation { if !self.actor_context.state().is_exit() { if let Ok(oneshot_rx) = self @@ -268,6 +293,9 @@ impl ActorHandle { #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + use std::time::Duration; + use async_trait::async_trait; use super::*; @@ -351,4 +379,54 @@ mod tests { assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state. Ok(()) } + + #[derive(Default)] + struct ObserveActor { + observe: AtomicU32, + } + + #[async_trait] + impl Actor for ObserveActor { + type ObservableState = u32; + + fn observable_state(&self) -> u32 { + self.observe.fetch_add(1, Ordering::Relaxed) + } + + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + ctx.send_self_message(YieldLoop).await?; + Ok(()) + } + } + + #[derive(Debug)] + struct YieldLoop; + + #[async_trait] + impl Handler for ObserveActor { + type Reply = (); + async fn handle( + &mut self, + _: YieldLoop, + ctx: &ActorContext, + ) -> Result { + ctx.sleep(Duration::from_millis(25)).await; // OBSERVE_TIMEOUT.mul_f32(10.0f32)).await; + ctx.send_self_message(YieldLoop).await?; + Ok(()) + } + } + + #[tokio::test] + async fn test_observation_debounce() { + // TODO investigate why Universe::with_accelerated_time() does not work here. + let universe = Universe::new(); + let (_, actor_handle) = universe.spawn_builder().spawn(ObserveActor::default()); + for _ in 0..10 { + let _ = actor_handle.refresh_observe(); + universe.sleep(Duration::from_millis(10)).await; + } + let (_last_obs, num_obs) = actor_handle.quit().await; + assert!(num_obs < 8); + universe.assert_quit().await; + } } diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index e1571682bc4..efe404eef02 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -253,7 +253,7 @@ impl Mailbox { } } - pub(crate) fn send_message_with_high_priority( + pub fn send_message_with_high_priority( &self, message: M, ) -> Result, SendError> diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs index db9cfdf655e..e7995d41d59 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -33,7 +33,7 @@ use colored::{ColoredString, Colorize}; use humantime::format_duration; use indicatif::{ProgressBar, ProgressStyle}; use itertools::Itertools; -use quickwit_actors::{ActorHandle, ObservationType}; +use quickwit_actors::ActorHandle; use quickwit_common::uri::Uri; use quickwit_config::{ConfigFormat, IndexConfig}; use quickwit_indexing::models::IndexingStatistics; @@ -923,18 +923,16 @@ pub async fn start_statistics_reporting_loop( report_interval.tick().await; // Try to receive with a timeout of 1 second. // 1 second is also the frequency at which we update statistic in the console - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + + let observation = pipeline_handle.last_observation(); // Let's not display live statistics to allow screen to scroll. - if observation.state.num_docs > 0 { - display_statistics( - &mut stdout_handle, - &mut throughput_calculator, - &observation.state, - )?; + if observation.num_docs > 0 { + display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { break; } } diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 8a0daabeff2..4ede1b03120 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -32,7 +32,7 @@ use chitchat::FailureDetectorConfig; use clap::{arg, ArgMatches, Command}; use colored::{ColoredString, Colorize}; use humantime::format_duration; -use quickwit_actors::{ActorExitStatus, ActorHandle, ObservationType, Universe}; +use quickwit_actors::{ActorExitStatus, ActorHandle, Universe}; use quickwit_cluster::{Cluster, ClusterMember}; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::Uri; @@ -616,14 +616,15 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { loop { check_interval.tick().await; - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + let observation = pipeline_handle.last_observation(); if observation.num_ongoing_merges == 0 { info!("Merge pipeline has no more ongoing merges, Exiting."); break; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { info!("Merge pipeline has exited, Exiting."); break; } @@ -753,18 +754,16 @@ pub async fn start_statistics_reporting_loop( report_interval.tick().await; // Try to receive with a timeout of 1 second. // 1 second is also the frequency at which we update statistic in the console - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + + let observation = pipeline_handle.last_observation(); // Let's not display live statistics to allow screen to scroll. - if observation.state.num_docs > 0 { - display_statistics( - &mut stdout_handle, - &mut throughput_calculator, - &observation.state, - )?; + if observation.num_docs > 0 { + display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { break; } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 736497d3076..1b29aa59e64 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{Metastore, MetastoreError}; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_storage::Storage; -use tokio::join; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; @@ -236,20 +235,18 @@ impl IndexingPipeline { let Some(handles) = &self.handles_opt else { return; }; - let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!( - handles.doc_processor.observe(), - handles.indexer.observe(), - handles.uploader.observe(), - handles.publisher.observe(), - ); + handles.doc_processor.refresh_observe(); + handles.indexer.refresh_observe(); + handles.uploader.refresh_observe(); + handles.publisher.refresh_observe(); self.statistics = self .previous_generations_statistics .clone() .add_actor_counters( - &doc_processor_counters, - &indexer_counters, - &uploader_counters, - &publisher_counters, + &handles.doc_processor.last_observation(), + &handles.indexer.last_observation(), + &handles.uploader.last_observation(), + &handles.publisher.last_observation(), ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 51236d97632..2e5a7652f40 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState}; use quickwit_proto::indexing::IndexingPipelineId; use time::OffsetDateTime; -use tokio::join; use tracing::{debug, error, info, instrument}; use crate::actors::indexing_pipeline::wait_duration_before_retry; @@ -357,18 +356,25 @@ impl MergePipeline { let Some(handles) = &self.handles_opt else { return; }; - let (merge_planner_state, merge_uploader_counters, merge_publisher_counters) = join!( - handles.merge_planner.observe(), - handles.merge_uploader.observe(), - handles.merge_publisher.observe(), - ); + handles.merge_planner.refresh_observe(); + handles.merge_uploader.refresh_observe(); + handles.merge_publisher.refresh_observe(); self.statistics = self .previous_generations_statistics .clone() - .add_actor_counters(&merge_uploader_counters, &merge_publisher_counters) + .add_actor_counters( + &handles.merge_uploader.last_observation(), + &handles.merge_publisher.last_observation(), + ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts) - .set_ongoing_merges(merge_planner_state.ongoing_merge_operations.len()); + .set_ongoing_merges( + handles + .merge_planner + .last_observation() + .ongoing_merge_operations + .len(), + ); } async fn perform_health_check( diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index d22024b171f..a6e755a7c1e 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -254,28 +254,19 @@ impl Handler for DeleteTaskPipeline { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { if let Some(handles) = &self.handles { - let ( - delete_task_planner, - downloader, - delete_task_executor, - packager, - uploader, - publisher, - ) = join!( - handles.delete_task_planner.observe(), - handles.downloader.observe(), - handles.delete_task_executor.observe(), - handles.packager.observe(), - handles.uploader.observe(), - handles.publisher.observe(), - ); + handles.delete_task_planner.refresh_observe(); + handles.downloader.refresh_observe(); + handles.delete_task_executor.refresh_observe(); + handles.packager.refresh_observe(); + handles.uploader.refresh_observe(); + handles.publisher.refresh_observe(); self.state = DeleteTaskPipelineState { - delete_task_planner: delete_task_planner.state, - downloader: downloader.state, - delete_task_executor: delete_task_executor.state, - packager: packager.state, - uploader: uploader.state, - publisher: publisher.state, + delete_task_planner: handles.delete_task_planner.last_observation(), + downloader: handles.downloader.last_observation(), + delete_task_executor: handles.delete_task_executor.last_observation(), + packager: handles.packager.last_observation(), + uploader: handles.uploader.last_observation(), + publisher: handles.publisher.last_observation(), } } ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe) @@ -399,7 +390,7 @@ mod tests { // for the pipeline state to be updated. test_sandbox .universe() - .sleep(OBSERVE_PIPELINE_INTERVAL * 2) + .sleep(OBSERVE_PIPELINE_INTERVAL * 3) .await; let pipeline_state = pipeline_handler.process_pending_and_observe().await.state; assert_eq!(pipeline_state.delete_task_planner.num_errors, 1);