Skip to content

Commit

Permalink
Prevent Observe messages from stacking up (#3765)
Browse files Browse the repository at this point in the history
* Prevents `Observe` message emitted by Supervisors from stacking up.

This introduce a refresh observation method, that does not wait
for anything but just triggers an eventual refresh of the last
observation.

This method includes debouncing logic to avoid stacking up messages.

We then use it in supervisors, which the other merit of making the
Supervise handler return much faster.

Before we were waiting for the observation to timeout.

Closes #3485

* CR comments
  • Loading branch information
fulmicoton authored Aug 20, 2023
1 parent f71466b commit c4ca94a
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 62 deletions.
12 changes: 12 additions & 0 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::convert::Infallible;
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -67,6 +68,8 @@ pub struct ActorContextInner<A: Actor> {
actor_state: AtomicState,
backpressure_micros_counter_opt: Option<IntCounter>,
observable_state_tx: watch::Sender<A::ObservableState>,
// Boolean marking the presence of an observe message in the actor's high priority queue.
observe_enqueued: AtomicBool,
}

impl<A: Actor> ActorContext<A> {
Expand All @@ -84,6 +87,7 @@ impl<A: Actor> ActorContext<A> {
actor_state: AtomicState::default(),
observable_state_tx,
backpressure_micros_counter_opt,
observe_enqueued: AtomicBool::new(false),
}
.into(),
}
Expand Down Expand Up @@ -203,8 +207,16 @@ impl<A: Actor> ActorContext<A> {
self.actor_state.resume();
}

/// Sets the queue as observed and returns the previous value.
/// This method is used to make sure we do not have Observe messages
/// stacking up in the observe queue.
pub(crate) fn set_observe_enqueued_and_return_previous(&self) -> bool {
self.observe_enqueued.swap(true, Ordering::Relaxed)
}

pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
let obs_state = actor.observable_state();
self.inner.observe_enqueued.store(false, Ordering::Relaxed);
let _ = self.observable_state_tx.send(obs_state.clone());
obs_state
}
Expand Down
79 changes: 79 additions & 0 deletions quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,36 @@ impl<A: Actor> ActorHandle<A> {
///
/// The observation will be scheduled as a high priority message, therefore it will be executed
/// after the current active message and the current command queue have been processed.
///
/// This method does not do anything to avoid Observe messages from stacking up.
/// In supervisors, prefer using `refresh_observation`.
pub async fn observe(&self) -> Observation<A::ObservableState> {
self.observe_with_priority(Priority::High).await
}

/// Triggers an observation.
/// It is scheduled as a high priority
/// message, and will hence be executed as soon as possible.
///
/// This method does not enqueue an Observe request if there is already one in
/// the queue.
///
/// The resulting observation can eventually be accessible using the
/// observation watch channel.
///
/// This function returning does NOT mean that the observation was executed.
pub fn refresh_observe(&self) {
let observation_already_enqueued = self
.actor_context
.set_observe_enqueued_and_return_previous();
if !observation_already_enqueued {
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Observe);
}
}

async fn observe_with_priority(&self, priority: Priority) -> Observation<A::ObservableState> {
if !self.actor_context.state().is_exit() {
if let Ok(oneshot_rx) = self
Expand Down Expand Up @@ -268,6 +294,9 @@ impl<A: Actor> ActorHandle<A> {

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use async_trait::async_trait;

use super::*;
Expand Down Expand Up @@ -351,4 +380,54 @@ mod tests {
assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
Ok(())
}

#[derive(Default)]
struct ObserveActor {
observe: AtomicU32,
}

#[async_trait]
impl Actor for ObserveActor {
type ObservableState = u32;

fn observable_state(&self) -> u32 {
self.observe.fetch_add(1, Ordering::Relaxed)
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
}

#[derive(Debug)]
struct YieldLoop;

#[async_trait]
impl Handler<YieldLoop> for ObserveActor {
type Reply = ();
async fn handle(
&mut self,
_: YieldLoop,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
ctx.sleep(Duration::from_millis(25)).await; // OBSERVE_TIMEOUT.mul_f32(10.0f32)).await;
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
}

#[tokio::test]
async fn test_observation_debounce() {
// TODO investigate why Universe::with_accelerated_time() does not work here.
let universe = Universe::new();
let (_, actor_handle) = universe.spawn_builder().spawn(ObserveActor::default());
for _ in 0..10 {
actor_handle.refresh_observe();
universe.sleep(Duration::from_millis(10)).await;
}
let (_last_obs, num_obs) = actor_handle.quit().await;
assert!(num_obs < 8);
universe.assert_quit().await;
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl<A: Actor> Mailbox<A> {
}
}

pub(crate) fn send_message_with_high_priority<M>(
pub fn send_message_with_high_priority<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
Expand Down
16 changes: 7 additions & 9 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use colored::{ColoredString, Colorize};
use humantime::format_duration;
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use quickwit_actors::{ActorHandle, ObservationType};
use quickwit_actors::ActorHandle;
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
Expand Down Expand Up @@ -923,18 +923,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
21 changes: 10 additions & 11 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use chitchat::FailureDetectorConfig;
use clap::{arg, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, ObservationType, Universe};
use quickwit_actors::{ActorExitStatus, ActorHandle, Universe};
use quickwit_cluster::{Cluster, ClusterMember};
use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -616,14 +616,15 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
loop {
check_interval.tick().await;

let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();
let observation = pipeline_handle.last_observation();

if observation.num_ongoing_merges == 0 {
info!("Merge pipeline has no more ongoing merges, Exiting.");
break;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
info!("Merge pipeline has exited, Exiting.");
break;
}
Expand Down Expand Up @@ -753,18 +754,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
19 changes: 8 additions & 11 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use quickwit_metastore::Metastore;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use quickwit_storage::Storage;
use tokio::join;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -237,20 +236,18 @@ impl IndexingPipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!(
handles.doc_processor.observe(),
handles.indexer.observe(),
handles.uploader.observe(),
handles.publisher.observe(),
);
handles.doc_processor.refresh_observe();
handles.indexer.refresh_observe();
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(
&doc_processor_counters,
&indexer_counters,
&uploader_counters,
&publisher_counters,
&handles.doc_processor.last_observation(),
&handles.indexer.last_observation(),
&handles.uploader.last_observation(),
&handles.publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
Expand Down
22 changes: 14 additions & 8 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use quickwit_metastore::{ListSplitsQuery, Metastore, SplitState};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};

use crate::actors::indexing_pipeline::wait_duration_before_retry;
Expand Down Expand Up @@ -358,18 +357,25 @@ impl MergePipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (merge_planner_state, merge_uploader_counters, merge_publisher_counters) = join!(
handles.merge_planner.observe(),
handles.merge_uploader.observe(),
handles.merge_publisher.observe(),
);
handles.merge_planner.refresh_observe();
handles.merge_uploader.refresh_observe();
handles.merge_publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(&merge_uploader_counters, &merge_publisher_counters)
.add_actor_counters(
&handles.merge_uploader.last_observation(),
&handles.merge_publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts)
.set_ongoing_merges(merge_planner_state.ongoing_merge_operations.len());
.set_ongoing_merges(
handles
.merge_planner
.last_observation()
.ongoing_merge_operations
.len(),
);
}

async fn perform_health_check(
Expand Down
35 changes: 13 additions & 22 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,28 +254,19 @@ impl Handler<Observe> for DeleteTaskPipeline {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Some(handles) = &self.handles {
let (
delete_task_planner,
downloader,
delete_task_executor,
packager,
uploader,
publisher,
) = join!(
handles.delete_task_planner.observe(),
handles.downloader.observe(),
handles.delete_task_executor.observe(),
handles.packager.observe(),
handles.uploader.observe(),
handles.publisher.observe(),
);
handles.delete_task_planner.refresh_observe();
handles.downloader.refresh_observe();
handles.delete_task_executor.refresh_observe();
handles.packager.refresh_observe();
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.state = DeleteTaskPipelineState {
delete_task_planner: delete_task_planner.state,
downloader: downloader.state,
delete_task_executor: delete_task_executor.state,
packager: packager.state,
uploader: uploader.state,
publisher: publisher.state,
delete_task_planner: handles.delete_task_planner.last_observation(),
downloader: handles.downloader.last_observation(),
delete_task_executor: handles.delete_task_executor.last_observation(),
packager: handles.packager.last_observation(),
uploader: handles.uploader.last_observation(),
publisher: handles.publisher.last_observation(),
}
}
ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe)
Expand Down Expand Up @@ -403,7 +394,7 @@ mod tests {
// for the pipeline state to be updated.
test_sandbox
.universe()
.sleep(OBSERVE_PIPELINE_INTERVAL * 2)
.sleep(OBSERVE_PIPELINE_INTERVAL * 3)
.await;
let pipeline_state = pipeline_handler.process_pending_and_observe().await.state;
assert_eq!(pipeline_state.delete_task_planner.num_errors, 1);
Expand Down

0 comments on commit c4ca94a

Please sign in to comment.