Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent Observe messages from stacking up #3765

Merged
merged 3 commits into from
Aug 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::convert::Infallible;
use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -67,6 +68,8 @@ pub struct ActorContextInner<A: Actor> {
actor_state: AtomicState,
backpressure_micros_counter_opt: Option<IntCounter>,
observable_state_tx: watch::Sender<A::ObservableState>,
// Boolean marking the presence of an observe message in the actor's high priority queue.
observe_enqueued: AtomicBool,
}

impl<A: Actor> ActorContext<A> {
Expand All @@ -84,6 +87,7 @@ impl<A: Actor> ActorContext<A> {
actor_state: AtomicState::default(),
observable_state_tx,
backpressure_micros_counter_opt,
observe_enqueued: AtomicBool::new(false),
}
.into(),
}
Expand Down Expand Up @@ -203,8 +207,16 @@ impl<A: Actor> ActorContext<A> {
self.actor_state.resume();
}

/// Sets the queue as observed and returns the previous value.
/// This method is used to make sure we do not have Observe messages
/// stacking up in the observe queue.
pub(crate) fn set_observe_enqueued_and_return_previous(&self) -> bool {
self.observe_enqueued.swap(true, Ordering::Relaxed)
}

pub(crate) fn observe(&self, actor: &mut A) -> A::ObservableState {
let obs_state = actor.observable_state();
self.inner.observe_enqueued.store(false, Ordering::Relaxed);
let _ = self.observable_state_tx.send(obs_state.clone());
obs_state
}
Expand Down
79 changes: 79 additions & 0 deletions quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,36 @@ impl<A: Actor> ActorHandle<A> {
///
/// The observation will be scheduled as a high priority message, therefore it will be executed
/// after the current active message and the current command queue have been processed.
///
/// This method does not do anything to avoid Observe messages from stacking up.
/// In supervisors, prefer using `refresh_observation`.
pub async fn observe(&self) -> Observation<A::ObservableState> {
self.observe_with_priority(Priority::High).await
}

/// Triggers an observation.
/// It is scheduled as a high priority
/// message, and will hence be executed as soon as possible.
///
/// This method does not enqueue an Observe request if there is already one in
/// the queue.
///
/// The resulting observation can eventually be accessible using the
/// observation watch channel.
///
/// This function returning does NOT mean that the observation was executed.
pub fn refresh_observe(&self) {
let observation_already_enqueued = self
.actor_context
.set_observe_enqueued_and_return_previous();
if !observation_already_enqueued {
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Observe);
}
}

async fn observe_with_priority(&self, priority: Priority) -> Observation<A::ObservableState> {
if !self.actor_context.state().is_exit() {
if let Ok(oneshot_rx) = self
Expand Down Expand Up @@ -268,6 +294,9 @@ impl<A: Actor> ActorHandle<A> {

#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use async_trait::async_trait;

use super::*;
Expand Down Expand Up @@ -351,4 +380,54 @@ mod tests {
assert!(matches!(count, 1)); //< Upon panick we cannot get a post mortem state.
Ok(())
}

#[derive(Default)]
struct ObserveActor {
observe: AtomicU32,
}

#[async_trait]
impl Actor for ObserveActor {
type ObservableState = u32;

fn observable_state(&self) -> u32 {
self.observe.fetch_add(1, Ordering::Relaxed)
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
}

#[derive(Debug)]
struct YieldLoop;

#[async_trait]
impl Handler<YieldLoop> for ObserveActor {
type Reply = ();
async fn handle(
&mut self,
_: YieldLoop,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
ctx.sleep(Duration::from_millis(25)).await; // OBSERVE_TIMEOUT.mul_f32(10.0f32)).await;
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
}

#[tokio::test]
async fn test_observation_debounce() {
// TODO investigate why Universe::with_accelerated_time() does not work here.
let universe = Universe::new();
let (_, actor_handle) = universe.spawn_builder().spawn(ObserveActor::default());
for _ in 0..10 {
actor_handle.refresh_observe();
universe.sleep(Duration::from_millis(10)).await;
}
let (_last_obs, num_obs) = actor_handle.quit().await;
assert!(num_obs < 8);
universe.assert_quit().await;
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl<A: Actor> Mailbox<A> {
}
}

pub(crate) fn send_message_with_high_priority<M>(
pub fn send_message_with_high_priority<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
Expand Down
16 changes: 7 additions & 9 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use colored::{ColoredString, Colorize};
use humantime::format_duration;
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use quickwit_actors::{ActorHandle, ObservationType};
use quickwit_actors::ActorHandle;
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
Expand Down Expand Up @@ -923,18 +923,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
21 changes: 10 additions & 11 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use chitchat::FailureDetectorConfig;
use clap::{arg, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, ObservationType, Universe};
use quickwit_actors::{ActorExitStatus, ActorHandle, Universe};
use quickwit_cluster::{Cluster, ClusterMember};
use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -616,14 +616,15 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
loop {
check_interval.tick().await;

let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();
let observation = pipeline_handle.last_observation();

if observation.num_ongoing_merges == 0 {
info!("Merge pipeline has no more ongoing merges, Exiting.");
break;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
info!("Merge pipeline has exited, Exiting.");
break;
}
Expand Down Expand Up @@ -753,18 +754,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
19 changes: 8 additions & 11 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use quickwit_metastore::Metastore;
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use quickwit_storage::Storage;
use tokio::join;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -237,20 +236,18 @@ impl IndexingPipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!(
handles.doc_processor.observe(),
handles.indexer.observe(),
handles.uploader.observe(),
handles.publisher.observe(),
);
handles.doc_processor.refresh_observe();
handles.indexer.refresh_observe();
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(
&doc_processor_counters,
&indexer_counters,
&uploader_counters,
&publisher_counters,
&handles.doc_processor.last_observation(),
&handles.indexer.last_observation(),
&handles.uploader.last_observation(),
&handles.publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
Expand Down
22 changes: 14 additions & 8 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use quickwit_metastore::{ListSplitsQuery, Metastore, SplitState};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::MetastoreError;
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};

use crate::actors::indexing_pipeline::wait_duration_before_retry;
Expand Down Expand Up @@ -358,18 +357,25 @@ impl MergePipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (merge_planner_state, merge_uploader_counters, merge_publisher_counters) = join!(
handles.merge_planner.observe(),
handles.merge_uploader.observe(),
handles.merge_publisher.observe(),
);
handles.merge_planner.refresh_observe();
handles.merge_uploader.refresh_observe();
handles.merge_publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(&merge_uploader_counters, &merge_publisher_counters)
.add_actor_counters(
&handles.merge_uploader.last_observation(),
&handles.merge_publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts)
.set_ongoing_merges(merge_planner_state.ongoing_merge_operations.len());
.set_ongoing_merges(
handles
.merge_planner
.last_observation()
.ongoing_merge_operations
.len(),
);
}

async fn perform_health_check(
Expand Down
35 changes: 13 additions & 22 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,28 +254,19 @@ impl Handler<Observe> for DeleteTaskPipeline {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Some(handles) = &self.handles {
let (
delete_task_planner,
downloader,
delete_task_executor,
packager,
uploader,
publisher,
) = join!(
handles.delete_task_planner.observe(),
handles.downloader.observe(),
handles.delete_task_executor.observe(),
handles.packager.observe(),
handles.uploader.observe(),
handles.publisher.observe(),
);
handles.delete_task_planner.refresh_observe();
handles.downloader.refresh_observe();
handles.delete_task_executor.refresh_observe();
handles.packager.refresh_observe();
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.state = DeleteTaskPipelineState {
delete_task_planner: delete_task_planner.state,
downloader: downloader.state,
delete_task_executor: delete_task_executor.state,
packager: packager.state,
uploader: uploader.state,
publisher: publisher.state,
delete_task_planner: handles.delete_task_planner.last_observation(),
downloader: handles.downloader.last_observation(),
delete_task_executor: handles.delete_task_executor.last_observation(),
packager: handles.packager.last_observation(),
uploader: handles.uploader.last_observation(),
publisher: handles.publisher.last_observation(),
}
}
ctx.schedule_self_msg(OBSERVE_PIPELINE_INTERVAL, Observe)
Expand Down Expand Up @@ -403,7 +394,7 @@ mod tests {
// for the pipeline state to be updated.
test_sandbox
.universe()
.sleep(OBSERVE_PIPELINE_INTERVAL * 2)
.sleep(OBSERVE_PIPELINE_INTERVAL * 3)
.await;
let pipeline_state = pipeline_handler.process_pending_and_observe().await.state;
assert_eq!(pipeline_state.delete_task_planner.num_errors, 1);
Expand Down
Loading