diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index 7d11fc353fc..e3d6795499f 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -21,6 +21,7 @@ use std::convert::Infallible; use std::fmt; use std::future::Future; use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -67,6 +68,8 @@ pub struct ActorContextInner { actor_state: AtomicState, backpressure_micros_counter_opt: Option, observable_state_tx: watch::Sender, + // Counter that gets incremented after each observation. + observe_enqueued: AtomicBool, } impl ActorContext { @@ -84,6 +87,7 @@ impl ActorContext { actor_state: AtomicState::default(), observable_state_tx, backpressure_micros_counter_opt, + observe_enqueued: AtomicBool::new(false), } .into(), } @@ -203,8 +207,16 @@ impl ActorContext { self.actor_state.resume(); } + /// Sets the queue as observed and returns the previous value. + /// This method is used to make sure we do not have Observe messages + /// stacking up in the observe queue. + pub(crate) fn set_observe_enqueued_and_return_previous(&self) -> bool { + self.observe_enqueued.swap(true, Ordering::Relaxed) + } + pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState { let obs_state = actor.observable_state(); + self.inner.observe_enqueued.store(false, Ordering::Relaxed); let _ = self.observable_state_tx.send(obs_state.clone()); obs_state } diff --git a/quickwit/quickwit-actors/src/actor_handle.rs b/quickwit/quickwit-actors/src/actor_handle.rs index 276230dab4f..27dbe029cd1 100644 --- a/quickwit/quickwit-actors/src/actor_handle.rs +++ b/quickwit/quickwit-actors/src/actor_handle.rs @@ -144,10 +144,36 @@ impl ActorHandle { /// /// The observation will be scheduled as a high priority message, therefore it will be executed /// after the current active message and the current command queue have been processed. + /// + /// This method does not do anything to avoid Observe messages from stacking up. + /// In supervisors, prefer using `refresh_observation`. + #[doc(hidden)] pub async fn observe(&self) -> Observation { self.observe_with_priority(Priority::High).await } + // Triggers an observation. + // It is scheduled as a high priority + // message, and will hence be executed as soon as possible. + // + // This method does not enqueue an Observe requests if one is already enqueue. + // + // The resulting observation can eventually be accessible using the + // observation watch channel. + // + // This function returning does NOT mean that the observation was executed. + pub fn refresh_observe(&self) { + let observation_already_enqueued = self + .actor_context + .set_observe_enqueued_and_return_previous(); + if !observation_already_enqueued { + let _ = self + .actor_context + .mailbox() + .send_message_with_high_priority(Observe); + } + } + async fn observe_with_priority(&self, priority: Priority) -> Observation { if !self.actor_context.state().is_exit() { if let Ok(oneshot_rx) = self @@ -268,6 +294,9 @@ impl ActorHandle { #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + use std::time::Duration; + use async_trait::async_trait; use super::*; @@ -351,4 +380,54 @@ mod tests { assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state. Ok(()) } + + #[derive(Default)] + struct ObserveActor { + observe: AtomicU32, + } + + #[async_trait] + impl Actor for ObserveActor { + type ObservableState = u32; + + fn observable_state(&self) -> u32 { + self.observe.fetch_add(1, Ordering::Relaxed) + } + + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + ctx.send_self_message(YieldLoop).await?; + Ok(()) + } + } + + #[derive(Debug)] + struct YieldLoop; + + #[async_trait] + impl Handler for ObserveActor { + type Reply = (); + async fn handle( + &mut self, + _: YieldLoop, + ctx: &ActorContext, + ) -> Result { + ctx.sleep(Duration::from_millis(25)).await; // OBSERVE_TIMEOUT.mul_f32(10.0f32)).await; + ctx.send_self_message(YieldLoop).await?; + Ok(()) + } + } + + #[tokio::test] + async fn test_observation_debounce() { + // TODO investigate why Universe::with_accelerated_time() does not work here. + let universe = Universe::new(); + let (_, actor_handle) = universe.spawn_builder().spawn(ObserveActor::default()); + for _ in 0..10 { + let _ = actor_handle.refresh_observe(); + universe.sleep(Duration::from_millis(10)).await; + } + let (_last_obs, num_obs) = actor_handle.quit().await; + assert!(num_obs < 8); + universe.assert_quit().await; + } } diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index e1571682bc4..efe404eef02 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -253,7 +253,7 @@ impl Mailbox { } } - pub(crate) fn send_message_with_high_priority( + pub fn send_message_with_high_priority( &self, message: M, ) -> Result, SendError> diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs index db9cfdf655e..e7995d41d59 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -33,7 +33,7 @@ use colored::{ColoredString, Colorize}; use humantime::format_duration; use indicatif::{ProgressBar, ProgressStyle}; use itertools::Itertools; -use quickwit_actors::{ActorHandle, ObservationType}; +use quickwit_actors::ActorHandle; use quickwit_common::uri::Uri; use quickwit_config::{ConfigFormat, IndexConfig}; use quickwit_indexing::models::IndexingStatistics; @@ -923,18 +923,16 @@ pub async fn start_statistics_reporting_loop( report_interval.tick().await; // Try to receive with a timeout of 1 second. // 1 second is also the frequency at which we update statistic in the console - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + + let observation = pipeline_handle.last_observation(); // Let's not display live statistics to allow screen to scroll. - if observation.state.num_docs > 0 { - display_statistics( - &mut stdout_handle, - &mut throughput_calculator, - &observation.state, - )?; + if observation.num_docs > 0 { + display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { break; } } diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 8a0daabeff2..4ede1b03120 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -32,7 +32,7 @@ use chitchat::FailureDetectorConfig; use clap::{arg, ArgMatches, Command}; use colored::{ColoredString, Colorize}; use humantime::format_duration; -use quickwit_actors::{ActorExitStatus, ActorHandle, ObservationType, Universe}; +use quickwit_actors::{ActorExitStatus, ActorHandle, Universe}; use quickwit_cluster::{Cluster, ClusterMember}; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::Uri; @@ -616,14 +616,15 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { loop { check_interval.tick().await; - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + let observation = pipeline_handle.last_observation(); if observation.num_ongoing_merges == 0 { info!("Merge pipeline has no more ongoing merges, Exiting."); break; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { info!("Merge pipeline has exited, Exiting."); break; } @@ -753,18 +754,16 @@ pub async fn start_statistics_reporting_loop( report_interval.tick().await; // Try to receive with a timeout of 1 second. // 1 second is also the frequency at which we update statistic in the console - let observation = pipeline_handle.observe().await; + pipeline_handle.refresh_observe(); + + let observation = pipeline_handle.last_observation(); // Let's not display live statistics to allow screen to scroll. - if observation.state.num_docs > 0 { - display_statistics( - &mut stdout_handle, - &mut throughput_calculator, - &observation.state, - )?; + if observation.num_docs > 0 { + display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?; } - if observation.obs_type == ObservationType::PostMortem { + if pipeline_handle.state().is_exit() { break; } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 736497d3076..1b29aa59e64 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{Metastore, MetastoreError}; use quickwit_proto::indexing::IndexingPipelineId; use quickwit_storage::Storage; -use tokio::join; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; @@ -236,20 +235,18 @@ impl IndexingPipeline { let Some(handles) = &self.handles_opt else { return; }; - let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!( - handles.doc_processor.observe(), - handles.indexer.observe(), - handles.uploader.observe(), - handles.publisher.observe(), - ); + handles.doc_processor.refresh_observe(); + handles.indexer.refresh_observe(); + handles.uploader.refresh_observe(); + handles.publisher.refresh_observe(); self.statistics = self .previous_generations_statistics .clone() .add_actor_counters( - &doc_processor_counters, - &indexer_counters, - &uploader_counters, - &publisher_counters, + &handles.doc_processor.last_observation(), + &handles.indexer.last_observation(), + &handles.uploader.last_observation(), + &handles.publisher.last_observation(), ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts); diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 51236d97632..2e5a7652f40 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState}; use quickwit_proto::indexing::IndexingPipelineId; use time::OffsetDateTime; -use tokio::join; use tracing::{debug, error, info, instrument}; use crate::actors::indexing_pipeline::wait_duration_before_retry; @@ -357,18 +356,25 @@ impl MergePipeline { let Some(handles) = &self.handles_opt else { return; }; - let (merge_planner_state, merge_uploader_counters, merge_publisher_counters) = join!( - handles.merge_planner.observe(), - handles.merge_uploader.observe(), - handles.merge_publisher.observe(), - ); + handles.merge_planner.refresh_observe(); + handles.merge_uploader.refresh_observe(); + handles.merge_publisher.refresh_observe(); self.statistics = self .previous_generations_statistics .clone() - .add_actor_counters(&merge_uploader_counters, &merge_publisher_counters) + .add_actor_counters( + &handles.merge_uploader.last_observation(), + &handles.merge_publisher.last_observation(), + ) .set_generation(self.statistics.generation) .set_num_spawn_attempts(self.statistics.num_spawn_attempts) - .set_ongoing_merges(merge_planner_state.ongoing_merge_operations.len()); + .set_ongoing_merges( + handles + .merge_planner + .last_observation() + .ongoing_merge_operations + .len(), + ); } async fn perform_health_check( diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index d22024b171f..a6e755a7c1e 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -254,28 +254,19 @@ impl Handler for DeleteTaskPipeline { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { if let Some(handles) = &self.handles { - let ( - delete_task_planner, - downloader, - delete_task_executor, - packager, - uploader, - publisher, - ) = join!( - handles.delete_task_planner.observe(), - handles.downloader.observe(), - handles.delete_task_executor.observe(), - handles.packager.observe(), - handles.uploader.observe(), - handles.publisher.observe(), - ); + handles.delete_task_planner.refresh_observe(); + handles.downloader.refresh_observe(); + handles.delete_task_executor.refresh_observe(); + handles.packager.refresh_observe(); + handles.uploader.refresh_observe(); + handles.publisher.refresh_observe(); self.state = DeleteTaskPipelineState { - delete_task_planner: delete_task_planner.state, - downloader: downloader.state, - delete_task_executor: delete_task_executor.state, - packager: packager.state, - uploader: uploader.state, - publisher: publisher.state, + delete_task_planner: handles.delete_task_planner.last_observation(), + downloader: handles.downloader.last_observation(), + delete_task_executor: handles.delete_task_executor.last_observation(), + packager: handles.packager.last_observation(), + uploader: handles.uploader.last_observation(), + publisher: handles.publisher.last_observation(), } } ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe) @@ -399,7 +390,7 @@ mod tests { // for the pipeline state to be updated. test_sandbox .universe() - .sleep(OBSERVE_PIPELINE_INTERVAL * 2) + .sleep(OBSERVE_PIPELINE_INTERVAL * 3) .await; let pipeline_state = pipeline_handler.process_pending_and_observe().await.state; assert_eq!(pipeline_state.delete_task_planner.num_errors, 1);