Skip to content

Commit

Permalink
Prevents Observe message emitted by Supervisors from stacking up.
Browse files Browse the repository at this point in the history
This introduce a refresh observation method, that does not wait
for anything but just triggers an eventual refresh of the last
observation.

This method includes debouncing logic to avoid stacking up messages.

Closes #3485
  • Loading branch information
fulmicoton committed Aug 18, 2023
1 parent 00ba301 commit 167be6e
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 62 deletions.
12 changes: 12 additions & 0 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::convert::Infallible;
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -67,6 +68,8 @@ pub struct ActorContextInner<A: Actor> {
actor_state: AtomicState,
backpressure_micros_counter_opt: Option<IntCounter>,
observable_state_tx: watch::Sender<A::ObservableState>,
// Counter that gets incremented after each observation.
observe_enqueued: AtomicBool,
}

impl<A: Actor> ActorContext<A> {
Expand All @@ -84,6 +87,7 @@ impl<A: Actor> ActorContext<A> {
actor_state: AtomicState::default(),
observable_state_tx,
backpressure_micros_counter_opt,
observe_enqueued: AtomicBool::new(false),
}
.into(),
}
Expand Down Expand Up @@ -203,8 +207,16 @@ impl<A: Actor> ActorContext<A> {
self.actor_state.resume();
}

/// Sets the queue as observed and returns the previous value.
/// This method is used to make sure we do not have Observe messages
/// stacking up in the observe queue.
pub(crate) fn set_observe_enqueued_and_return_previous(&self) -> bool {
self.observe_enqueued.swap(true, Ordering::Relaxed)
}

pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
let obs_state = actor.observable_state();
self.inner.observe_enqueued.store(false, Ordering::Relaxed);
let _ = self.observable_state_tx.send(obs_state.clone());
obs_state
}
Expand Down
79 changes: 79 additions & 0 deletions quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,36 @@ impl<A: Actor> ActorHandle<A> {
///
/// The observation will be scheduled as a high priority message, therefore it will be executed
/// after the current active message and the current command queue have been processed.
///
/// This method does not do anything to avoid Observe messages from stacking up.
/// In supervisors, prefer using `refresh_observation`.
#[doc(hidden)]
pub async fn observe(&self) -> Observation<A::ObservableState> {
self.observe_with_priority(Priority::High).await
}

// Triggers an observation.
// It is scheduled as a high priority
// message, and will hence be executed as soon as possible.
//
// This method does not enqueue an Observe requests if one is already enqueue.
//
// The resulting observation can eventually be accessible using the
// observation watch channel.
//
// This function returning does NOT mean that the observation was executed.
pub fn refresh_observe(&self) {
let observation_already_enqueued = self
.actor_context
.set_observe_enqueued_and_return_previous();
if !observation_already_enqueued {
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Observe);
}
}

async fn observe_with_priority(&self, priority: Priority) -> Observation<A::ObservableState> {
if !self.actor_context.state().is_exit() {
if let Ok(oneshot_rx) = self
Expand Down Expand Up @@ -268,6 +294,9 @@ impl<A: Actor> ActorHandle<A> {

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use async_trait::async_trait;

use super::*;
Expand Down Expand Up @@ -351,4 +380,54 @@ mod tests {
assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
Ok(())
}

#[derive(Default)]
struct ObserveActor {
observe: AtomicU32,
}

#[async_trait]
impl Actor for ObserveActor {
type ObservableState = u32;

fn observable_state(&self) -> u32 {
self.observe.fetch_add(1, Ordering::Relaxed)
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
}

#[derive(Debug)]
struct YieldLoop;

#[async_trait]
impl Handler<YieldLoop> for ObserveActor {
type Reply = ();
async fn handle(
&mut self,
_: YieldLoop,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
ctx.sleep(Duration::from_millis(25)).await; // OBSERVE_TIMEOUT.mul_f32(10.0f32)).await;
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
}

#[tokio::test]
async fn test_observation_debounce() {
// TODO investigate why Universe::with_accelerated_time() does not work here.
let universe = Universe::new();
let (_, actor_handle) = universe.spawn_builder().spawn(ObserveActor::default());
for _ in 0..10 {
let _ = actor_handle.refresh_observe();
universe.sleep(Duration::from_millis(10)).await;
}
let (_last_obs, num_obs) = actor_handle.quit().await;
assert!(num_obs < 8);
universe.assert_quit().await;
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl<A: Actor> Mailbox<A> {
}
}

pub(crate) fn send_message_with_high_priority<M>(
pub fn send_message_with_high_priority<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
Expand Down
16 changes: 7 additions & 9 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use colored::{ColoredString, Colorize};
use humantime::format_duration;
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use quickwit_actors::{ActorHandle, ObservationType};
use quickwit_actors::ActorHandle;
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
Expand Down Expand Up @@ -923,18 +923,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
21 changes: 10 additions & 11 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use chitchat::FailureDetectorConfig;
use clap::{arg, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, ObservationType, Universe};
use quickwit_actors::{ActorExitStatus, ActorHandle, Universe};
use quickwit_cluster::{Cluster, ClusterMember};
use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -616,14 +616,15 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
loop {
check_interval.tick().await;

let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();
let observation = pipeline_handle.last_observation();

if observation.num_ongoing_merges == 0 {
info!("Merge pipeline has no more ongoing merges, Exiting.");
break;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
info!("Merge pipeline has exited, Exiting.");
break;
}
Expand Down Expand Up @@ -753,18 +754,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
19 changes: 8 additions & 11 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, MetastoreError};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_storage::Storage;
use tokio::join;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -236,20 +235,18 @@ impl IndexingPipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!(
handles.doc_processor.observe(),
handles.indexer.observe(),
handles.uploader.observe(),
handles.publisher.observe(),
);
handles.doc_processor.refresh_observe();
handles.indexer.refresh_observe();
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(
&doc_processor_counters,
&indexer_counters,
&uploader_counters,
&publisher_counters,
&handles.doc_processor.last_observation(),
&handles.indexer.last_observation(),
&handles.uploader.last_observation(),
&handles.publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
Expand Down
22 changes: 14 additions & 8 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState};
use quickwit_proto::indexing::IndexingPipelineId;
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};

use crate::actors::indexing_pipeline::wait_duration_before_retry;
Expand Down Expand Up @@ -357,18 +356,25 @@ impl MergePipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (merge_planner_state, merge_uploader_counters, merge_publisher_counters) = join!(
handles.merge_planner.observe(),
handles.merge_uploader.observe(),
handles.merge_publisher.observe(),
);
handles.merge_planner.refresh_observe();
handles.merge_uploader.refresh_observe();
handles.merge_publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(&merge_uploader_counters, &merge_publisher_counters)
.add_actor_counters(
&handles.merge_uploader.last_observation(),
&handles.merge_publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts)
.set_ongoing_merges(merge_planner_state.ongoing_merge_operations.len());
.set_ongoing_merges(
handles
.merge_planner
.last_observation()
.ongoing_merge_operations
.len(),
);
}

async fn perform_health_check(
Expand Down
35 changes: 13 additions & 22 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,28 +254,19 @@ impl Handler<Observe> for DeleteTaskPipeline {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Some(handles) = &self.handles {
let (
delete_task_planner,
downloader,
delete_task_executor,
packager,
uploader,
publisher,
) = join!(
handles.delete_task_planner.observe(),
handles.downloader.observe(),
handles.delete_task_executor.observe(),
handles.packager.observe(),
handles.uploader.observe(),
handles.publisher.observe(),
);
handles.delete_task_planner.refresh_observe();
handles.downloader.refresh_observe();
handles.delete_task_executor.refresh_observe();
handles.packager.refresh_observe();
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.state = DeleteTaskPipelineState {
delete_task_planner: delete_task_planner.state,
downloader: downloader.state,
delete_task_executor: delete_task_executor.state,
packager: packager.state,
uploader: uploader.state,
publisher: publisher.state,
delete_task_planner: handles.delete_task_planner.last_observation(),
downloader: handles.downloader.last_observation(),
delete_task_executor: handles.delete_task_executor.last_observation(),
packager: handles.packager.last_observation(),
uploader: handles.uploader.last_observation(),
publisher: handles.publisher.last_observation(),
}
}
ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe)
Expand Down Expand Up @@ -399,7 +390,7 @@ mod tests {
// for the pipeline state to be updated.
test_sandbox
.universe()
.sleep(OBSERVE_PIPELINE_INTERVAL * 2)
.sleep(OBSERVE_PIPELINE_INTERVAL * 3)
.await;
let pipeline_state = pipeline_handler.process_pending_and_observe().await.state;
assert_eq!(pipeline_state.delete_task_planner.num_errors, 1);
Expand Down

0 comments on commit 167be6e

Please sign in to comment.