From c4ca94a5d2e10a0598d3dd87df21adc5cf69e32f Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 20 Aug 2023 12:42:52 +0900 Subject: [PATCH] Prevent `Observe` messages from stacking up (#3765) * Prevents `Observe` message emitted by Supervisors from stacking up. This introduce a refresh observation method, that does not wait for anything but just triggers an eventual refresh of the last observation. This method includes debouncing logic to avoid stacking up messages. We then use it in supervisors, which the other merit of making the Supervise handler return much faster. Before we were waiting for the observation to timeout. Closes #3485 * CR comments --- quickwit/quickwit-actors/src/actor_context.rs | 12 +++ quickwit/quickwit-actors/src/actor_handle.rs | 79 +++++++++++++++++++ quickwit/quickwit-actors/src/mailbox.rs | 2 +- quickwit/quickwit-cli/src/index.rs | 16 ++-- quickwit/quickwit-cli/src/tool.rs | 21 +++-- .../src/actors/indexing_pipeline.rs | 19 ++--- .../src/actors/merge_pipeline.rs | 22 ++++-- .../src/actors/delete_task_pipeline.rs | 35 +++----- 8 files changed, 144 insertions(+), 62 deletions(-) diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index 7d11fc353fc..b0201d92a9d 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -21,6 +21,7 @@ use std::convert::Infallible; use std::fmt; use std::future::Future; use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -67,6 +68,8 @@ pub struct ActorContextInner { actor_state: AtomicState, backpressure_micros_counter_opt: Option, observable_state_tx: watch::Sender, + // Boolean marking the presence of an observe message in the actor's high priority queue. + observe_enqueued: AtomicBool, } impl ActorContext { @@ -84,6 +87,7 @@ impl ActorContext { actor_state: AtomicState::default(), observable_state_tx, backpressure_micros_counter_opt, + observe_enqueued: AtomicBool::new(false), } .into(), } @@ -203,8 +207,16 @@ impl ActorContext { self.actor_state.resume(); } + /// Sets the queue as observed and returns the previous value. + /// This method is used to make sure we do not have Observe messages + /// stacking up in the observe queue. + pub(crate) fn set_observe_enqueued_and_return_previous(&self) -> bool { + self.observe_enqueued.swap(true, Ordering::Relaxed) + } + pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState { let obs_state = actor.observable_state(); + self.inner.observe_enqueued.store(false, Ordering::Relaxed); let _ = self.observable_state_tx.send(obs_state.clone()); obs_state } diff --git a/quickwit/quickwit-actors/src/actor_handle.rs b/quickwit/quickwit-actors/src/actor_handle.rs index 276230dab4f..f800e886c5f 100644 --- a/quickwit/quickwit-actors/src/actor_handle.rs +++ b/quickwit/quickwit-actors/src/actor_handle.rs @@ -144,10 +144,36 @@ impl ActorHandle { /// /// The observation will be scheduled as a high priority message, therefore it will be executed /// after the current active message and the current command queue have been processed. + /// + /// This method does not do anything to avoid Observe messages from stacking up. + /// In supervisors, prefer using `refresh_observation`. pub async fn observe(&self) -> Observation { self.observe_with_priority(Priority::High).await } + /// Triggers an observation. + /// It is scheduled as a high priority + /// message, and will hence be executed as soon as possible. + /// + /// This method does not enqueue an Observe request if there is already one in + /// the queue. + /// + /// The resulting observation can eventually be accessible using the + /// observation watch channel. + /// + /// This function returning does NOT mean that the observation was executed. + pub fn refresh_observe(&self) { + let observation_already_enqueued = self + .actor_context + .set_observe_enqueued_and_return_previous(); + if !observation_already_enqueued { + let _ = self + .actor_context + .mailbox() + .send_message_with_high_priority(Observe); + } + } + async fn observe_with_priority(&self, priority: Priority) -> Observation { if !self.actor_context.state().is_exit() { if let Ok(oneshot_rx) = self @@ -268,6 +294,9 @@ impl ActorHandle { #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + use std::time::Duration; + use async_trait::async_trait; use super::*; @@ -351,4 +380,54 @@ mod tests { assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state. Ok(()) } + + #[derive(Default)] + struct ObserveActor { + observe: AtomicU32, + } + + #[async_trait] + impl Actor for ObserveActor { + type ObservableState = u32; + + fn observable_state(&self) -> u32 { + self.observe.fetch_add(1, Ordering::Relaxed) + } + + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + ctx.send_self_message(YieldLoop).await?; + Ok(()) + } + } + + #[derive(Debug)] + struct YieldLoop; + + #[async_trait] + impl Handler for ObserveActor { + type Reply = (); + async fn handle( + &mut self, + _: YieldLoop, + ctx: &ActorContext, + ) -> Result { + ctx.sleep(Duration::from_millis(25)).await; // OBSERVE_TIMEOUT.mul_f32(10.0f32)).await; + ctx.send_self_message(YieldLoop).await?; + Ok(()) + } + } + + #[tokio::test] + async fn test_observation_debounce() { + // TODO investigate why Universe::with_accelerated_time() does not work here. + let universe = Universe::new(); + let (_, actor_handle) = universe.spawn_builder().spawn(ObserveActor::default()); + for _ in 0..10 { + actor_handle.refresh_observe(); + universe.sleep(Duration::from_millis(10)).await; + } + let (_last_obs, num_obs) = actor_handle.quit().await; + assert!(num_obs < 8); + universe.assert_quit().await; + } } diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index e1571682bc4..efe404eef02 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -253,7 +253,7 @@ impl Mailbox { } } - pub(crate) fn send_message_with_high_priority( + pub fn send_message_with_high_priority( &self, message: M, ) -> Result, SendError> diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs index db9cfdf655e..e7995d41d59 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -33,7 +33,7 @@ use colored::{ColoredString, Colorize}; use humantime::format_duration; use indicatif::{ProgressBar, ProgressStyle}; use itertools::Itertools; -use quickwit_actors::{ActorHandle, ObservationType}; +use quickwit_actors::ActorHandle; use quickwit_common::uri::Uri; use quickwit_config::{ConfigFormat, IndexConfig}; use quickwit_indexing::models::IndexingStatistics; @@ -923,18 +923,16 @@ pub async fn start_statistics_reporting_loop( report_interval.tick().await; // Try to receive with a timeout of 1 second. // 1 second is also the frequency at which we update statistic in the console - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + + let observation = pipeline_handle.last_observation(); // Let's not display live statistics to allow screen to scroll. - if observation.state.num_docs > 0 { - display_statistics( - &mut stdout_handle, - &mut throughput_calculator, - &observation.state, - )?; + if observation.num_docs > 0 { + display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { break; } } diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index c84d7dd184c..6cf78ec24f5 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -32,7 +32,7 @@ use chitchat::FailureDetectorConfig; use clap::{arg, ArgMatches, Command}; use colored::{ColoredString, Colorize}; use humantime::format_duration; -use quickwit_actors::{ActorExitStatus, ActorHandle, ObservationType, Universe}; +use quickwit_actors::{ActorExitStatus, ActorHandle, Universe}; use quickwit_cluster::{Cluster, ClusterMember}; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::Uri; @@ -616,14 +616,15 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { loop { check_interval.tick().await; - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + let observation = pipeline_handle.last_observation(); if observation.num_ongoing_merges == 0 { info!("Merge pipeline has no more ongoing merges, Exiting."); break; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { info!("Merge pipeline has exited, Exiting."); break; } @@ -753,18 +754,16 @@ pub async fn start_statistics_reporting_loop( report_interval.tick().await; // Try to receive with a timeout of 1 second. // 1 second is also the frequency at which we update statistic in the console - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + + let observation = pipeline_handle.last_observation(); // Let's not display live statistics to allow screen to scroll. - if observation.state.num_docs > 0 { - display_statistics( - &mut stdout_handle, - &mut throughput_calculator, - &observation.state, - )?; + if observation.num_docs > 0 { + display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { break; } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index a00218de0b4..9b48297125c 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -34,7 +34,6 @@ use quickwit_metastore::Metastore; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::MetastoreError; use quickwit_storage::Storage; -use tokio::join; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; @@ -237,20 +236,18 @@ impl IndexingPipeline { let Some(handles) = &self.handles_opt else { return; }; - let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!( - handles.doc_processor.observe(), - handles.indexer.observe(), - handles.uploader.observe(), - handles.publisher.observe(), - ); + handles.doc_processor.refresh_observe(); + handles.indexer.refresh_observe(); + handles.uploader.refresh_observe(); + handles.publisher.refresh_observe(); self.statistics = self .previous_generations_statistics .clone() .add_actor_counters( - &doc_processor_counters, - &indexer_counters, - &uploader_counters, - &publisher_counters, + &handles.doc_processor.last_observation(), + &handles.indexer.last_observation(), + &handles.uploader.last_observation(), + &handles.publisher.last_observation(), ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 0e5fcbf4fdd..aa2ea214e97 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -34,7 +34,6 @@ use quickwit_metastore::{ListSplitsQuery, Metastore, SplitState}; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::metastore::MetastoreError; use time::OffsetDateTime; -use tokio::join; use tracing::{debug, error, info, instrument}; use crate::actors::indexing_pipeline::wait_duration_before_retry; @@ -358,18 +357,25 @@ impl MergePipeline { let Some(handles) = &self.handles_opt else { return; }; - let (merge_planner_state, merge_uploader_counters, merge_publisher_counters) = join!( - handles.merge_planner.observe(), - handles.merge_uploader.observe(), - handles.merge_publisher.observe(), - ); + handles.merge_planner.refresh_observe(); + handles.merge_uploader.refresh_observe(); + handles.merge_publisher.refresh_observe(); self.statistics = self .previous_generations_statistics .clone() - .add_actor_counters(&merge_uploader_counters, &merge_publisher_counters) + .add_actor_counters( + &handles.merge_uploader.last_observation(), + &handles.merge_publisher.last_observation(), + ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts) - .set_ongoing_merges(merge_planner_state.ongoing_merge_operations.len()); + .set_ongoing_merges( + handles + .merge_planner + .last_observation() + .ongoing_merge_operations + .len(), + ); } async fn perform_health_check( diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index ecf98b8a6c1..7a4cc08fd23 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -254,28 +254,19 @@ impl Handler for DeleteTaskPipeline { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { if let Some(handles) = &self.handles { - let ( - delete_task_planner, - downloader, - delete_task_executor, - packager, - uploader, - publisher, - ) = join!( - handles.delete_task_planner.observe(), - handles.downloader.observe(), - handles.delete_task_executor.observe(), - handles.packager.observe(), - handles.uploader.observe(), - handles.publisher.observe(), - ); + handles.delete_task_planner.refresh_observe(); + handles.downloader.refresh_observe(); + handles.delete_task_executor.refresh_observe(); + handles.packager.refresh_observe(); + handles.uploader.refresh_observe(); + handles.publisher.refresh_observe(); self.state = DeleteTaskPipelineState { - delete_task_planner: delete_task_planner.state, - downloader: downloader.state, - delete_task_executor: delete_task_executor.state, - packager: packager.state, - uploader: uploader.state, - publisher: publisher.state, + delete_task_planner: handles.delete_task_planner.last_observation(), + downloader: handles.downloader.last_observation(), + delete_task_executor: handles.delete_task_executor.last_observation(), + packager: handles.packager.last_observation(), + uploader: handles.uploader.last_observation(), + publisher: handles.publisher.last_observation(), } } ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe) @@ -403,7 +394,7 @@ mod tests { // for the pipeline state to be updated. test_sandbox .universe() - .sleep(OBSERVE_PIPELINE_INTERVAL * 2) + .sleep(OBSERVE_PIPELINE_INTERVAL * 3) .await; let pipeline_state = pipeline_handler.process_pending_and_observe().await.state; assert_eq!(pipeline_state.delete_task_planner.num_errors, 1);