Skip to content

Commit

Permalink
Observe
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Aug 18, 2023
1 parent 7dec7cc commit a20fece
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 97 deletions.
59 changes: 39 additions & 20 deletions quickwit/quickwit-actors/src/actor_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,33 @@ impl<A: Actor> ActorHandle<A> {
///
/// The observation will be scheduled as a high priority message, therefore it will be executed
/// after the current active message and the current command queue have been processed.
///
/// This method does not do anything to avoid Observe messages from stacking up.
/// In supervisors, prefer using `refresh_observation`.
#[doc(hidden)]
pub async fn observe(&self) -> Observation<A::ObservableState> {
let observation_already_enqueued = self.actor_context.set_observe_enqueued_and_return_previous();
if observation_already_enqueued {
// There is already an observation message in the priority channel.
// We avoid stacking them up here.
let last_obs = self.last_observation();
return Observation {
obs_type: ObservationType::Timeout,
state: last_obs,
};
} else {
self.observe_with_priority(Priority::High).await
self.observe_with_priority(Priority::High).await
}

// Triggers an observation.
// It is scheduled as a high priority
// message, and will hence be executed as soon as possible.
//
// This method does not enqueue an Observe requests if one is already enqueue.
//
// The resulting observation can eventually be accessible using the
// observation watch channel.
//
// This function returning does NOT mean that the observation was executed.
pub fn refresh_observe(&self) {
let observation_already_enqueued = self
.actor_context
.set_observe_enqueued_and_return_previous();
if !observation_already_enqueued {
let _ = self
.actor_context
.mailbox()
.send_message_with_high_priority(Observe);
}
}

Expand Down Expand Up @@ -279,12 +294,13 @@ impl<A: Actor> ActorHandle<A> {

#[cfg(test)]
mod tests {
use std::sync::atomic::{Ordering, AtomicU32};
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;

use async_trait::async_trait;

use super::*;
use crate::{Handler, Universe, OBSERVE_TIMEOUT};
use crate::{Handler, Universe};

#[derive(Default)]
struct PanickingActor {
Expand Down Expand Up @@ -378,7 +394,7 @@ mod tests {
self.observe.fetch_add(1, Ordering::Relaxed)
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(),ActorExitStatus> {
async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
Expand All @@ -395,20 +411,23 @@ mod tests {
_: YieldLoop,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
tokio::time::sleep(OBSERVE_TIMEOUT.mul_f32(2.0f32)).await;
ctx.sleep(Duration::from_millis(25)).await; // OBSERVE_TIMEOUT.mul_f32(10.0f32)).await;
ctx.send_self_message(YieldLoop).await?;
Ok(())
}
}

#[tokio::test]
async fn test_observation_debounce() {
let universe = Universe::default();
// TODO investigate why Universe::with_accelerated_time() does not work here.
let universe = Universe::new();
let (_, actor_handle) = universe.spawn_builder().spawn(ObserveActor::default());
let first_tick = actor_handle.observe().await;
assert_eq!(first_tick.state, 1);
let tick_debounced = actor_handle.observe().await;
assert_eq!(tick_debounced.state, 1);
for _ in 0..10 {
let _ = actor_handle.refresh_observe();
universe.sleep(Duration::from_millis(10)).await;
}
let (_last_obs, num_obs) = actor_handle.quit().await;
assert!(num_obs < 8);
universe.assert_quit().await;
}
}
2 changes: 1 addition & 1 deletion quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl<A: Actor> Mailbox<A> {
}
}

pub(crate) fn send_message_with_high_priority<M>(
pub fn send_message_with_high_priority<M>(
&self,
message: M,
) -> Result<oneshot::Receiver<A::Reply>, SendError>
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-actors/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ mod tests {
1
);
assert_eq!(
*supervisor_handle.observe().await,
*supervisor_handle.process_pending_and_observe().await,
SupervisorState {
num_panics: 1,
num_errors: 0,
Expand Down Expand Up @@ -312,7 +312,7 @@ mod tests {
1
);
assert_eq!(
*supervisor_handle.observe().await,
*supervisor_handle.process_pending_and_observe().await,
SupervisorState {
num_panics: 0,
num_errors: 1,
Expand All @@ -339,7 +339,7 @@ mod tests {
2
);
assert_eq!(
*supervisor_handle.observe().await,
*supervisor_handle.process_pending_and_observe().await,
SupervisorState {
num_panics: 0,
num_errors: 0,
Expand All @@ -357,7 +357,7 @@ mod tests {
1
);
assert_eq!(
*supervisor_handle.observe().await,
*supervisor_handle.process_pending_and_observe().await,
SupervisorState {
num_panics: 0,
num_errors: 0,
Expand Down
21 changes: 15 additions & 6 deletions quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use serde::Serialize;
use crate::observation::ObservationType;
use crate::{
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Command, Handler, Health,
Mailbox, Observation, Supervisable, Universe,
Mailbox, Observation, Observe, Supervisable, Universe,
};

// An actor that receives ping messages.
Expand Down Expand Up @@ -150,7 +150,7 @@ async fn test_ping_actor() {
let (ping_sender_mailbox, ping_sender_handle) =
universe.spawn_builder().spawn(PingerSenderActor::default());
assert_eq!(
ping_recv_handle.observe().await,
ping_recv_handle.process_pending_and_observe().await,
Observation {
obs_type: ObservationType::Alive,
state: 0
Expand Down Expand Up @@ -267,12 +267,12 @@ async fn test_timeouting_actor() {
let (buggy_mailbox, buggy_handle) = universe.spawn_builder().spawn(BuggyActor);
let buggy_mailbox = buggy_mailbox;
assert_eq!(
buggy_handle.observe().await.obs_type,
buggy_handle.process_pending_and_observe().await.obs_type,
ObservationType::Alive
);
assert!(buggy_mailbox.send_message(DoNothing).await.is_ok());
assert_eq!(
buggy_handle.observe().await.obs_type,
buggy_handle.process_pending_and_observe().await.obs_type,
ObservationType::Alive
);
assert!(buggy_mailbox.send_message(Block).await.is_ok());
Expand All @@ -299,9 +299,18 @@ async fn test_pause_actor() {
assert!(ping_mailbox
.send_message_with_high_priority(Command::Pause)
.is_ok());
let first_state = ping_handle.observe().await.state;

let obs = ping_mailbox
.send_message_with_high_priority(Observe)
.unwrap();
let first_state = obs.await.unwrap();

assert!(first_state < 1000);
let second_state = ping_handle.observe().await.state;
let obs = ping_mailbox
.send_message_with_high_priority(Observe)
.unwrap();
let second_state = obs.await.unwrap();

assert_eq!(first_state, second_state);
assert!(ping_mailbox
.send_message_with_high_priority(Command::Resume)
Expand Down
16 changes: 7 additions & 9 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use colored::{ColoredString, Colorize};
use humantime::format_duration;
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use quickwit_actors::{ActorHandle, ObservationType};
use quickwit_actors::ActorHandle;
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
Expand Down Expand Up @@ -923,18 +923,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
21 changes: 10 additions & 11 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use chitchat::FailureDetectorConfig;
use clap::{arg, ArgMatches, Command};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, ObservationType, Universe};
use quickwit_actors::{ActorExitStatus, ActorHandle, Universe};
use quickwit_cluster::{Cluster, ClusterMember};
use quickwit_common::runtimes::RuntimesConfig;
use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -616,14 +616,15 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
loop {
check_interval.tick().await;

let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();
let observation = pipeline_handle.last_observation();

if observation.num_ongoing_merges == 0 {
info!("Merge pipeline has no more ongoing merges, Exiting.");
break;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
info!("Merge pipeline has exited, Exiting.");
break;
}
Expand Down Expand Up @@ -753,18 +754,16 @@ pub async fn start_statistics_reporting_loop(
report_interval.tick().await;
// Try to receive with a timeout of 1 second.
// 1 second is also the frequency at which we update statistic in the console
let observation = pipeline_handle.observe().await;
pipeline_handle.refresh_observe();

let observation = pipeline_handle.last_observation();

// Let's not display live statistics to allow screen to scroll.
if observation.state.num_docs > 0 {
display_statistics(
&mut stdout_handle,
&mut throughput_calculator,
&observation.state,
)?;
if observation.num_docs > 0 {
display_statistics(&mut stdout_handle, &mut throughput_calculator, &observation)?;
}

if observation.obs_type == ObservationType::PostMortem {
if pipeline_handle.state().is_exit() {
break;
}
}
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,9 @@ mod tests {
.await
.unwrap();
universe.sleep(Duration::from_secs(3)).await;
let indexer_counters = indexer_handle.observe().await.state;
indexer_handle.refresh_observe();
universe.sleep(Duration::from_secs(3)).await;
let indexer_counters = indexer_handle.process_pending_and_observe().await.state;
assert_eq!(
indexer_counters,
IndexerCounters {
Expand Down
19 changes: 8 additions & 11 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{Metastore, MetastoreError};
use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_storage::Storage;
use tokio::join;
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};

Expand Down Expand Up @@ -236,20 +235,18 @@ impl IndexingPipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (doc_processor_counters, indexer_counters, uploader_counters, publisher_counters) = join!(
handles.doc_processor.observe(),
handles.indexer.observe(),
handles.uploader.observe(),
handles.publisher.observe(),
);
handles.doc_processor.refresh_observe();
handles.indexer.refresh_observe();
handles.uploader.refresh_observe();
handles.publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(
&doc_processor_counters,
&indexer_counters,
&uploader_counters,
&publisher_counters,
&handles.doc_processor.last_observation(),
&handles.indexer.last_observation(),
&handles.uploader.last_observation(),
&handles.publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,7 @@ mod tests {
})
.await
.unwrap();
let observation = indexing_service_handle.observe().await;
let observation = indexing_service_handle.process_pending_and_observe().await;
assert_eq!(observation.num_running_pipelines, 1);
assert_eq!(observation.num_failed_pipelines, 0);
assert_eq!(observation.num_successful_pipelines, 0);
Expand All @@ -1312,7 +1312,7 @@ mod tests {
.unwrap();
universe.sleep(*HEARTBEAT * 5).await;
// Check that indexing and merge pipelines are still running.
let observation = indexing_service_handle.observe().await;
let observation = indexing_service_handle.process_pending_and_observe().await;
assert_eq!(observation.num_running_pipelines, 1);
assert_eq!(observation.num_failed_pipelines, 0);
assert_eq!(observation.num_running_merge_pipelines, 1);
Expand Down
22 changes: 14 additions & 8 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{ListSplitsQuery, Metastore, MetastoreError, SplitState};
use quickwit_proto::indexing::IndexingPipelineId;
use time::OffsetDateTime;
use tokio::join;
use tracing::{debug, error, info, instrument};

use crate::actors::indexing_pipeline::wait_duration_before_retry;
Expand Down Expand Up @@ -357,18 +356,25 @@ impl MergePipeline {
let Some(handles) = &self.handles_opt else {
return;
};
let (merge_planner_state, merge_uploader_counters, merge_publisher_counters) = join!(
handles.merge_planner.observe(),
handles.merge_uploader.observe(),
handles.merge_publisher.observe(),
);
handles.merge_planner.refresh_observe();
handles.merge_uploader.refresh_observe();
handles.merge_publisher.refresh_observe();
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(&merge_uploader_counters, &merge_publisher_counters)
.add_actor_counters(
&handles.merge_uploader.last_observation(),
&handles.merge_publisher.last_observation(),
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts)
.set_ongoing_merges(merge_planner_state.ongoing_merge_operations.len());
.set_ongoing_merges(
handles
.merge_planner
.last_observation()
.ongoing_merge_operations
.len(),
);
}

async fn perform_health_check(
Expand Down
Loading

0 comments on commit a20fece

Please sign in to comment.