diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index aff023ad602..be68e47bb5e 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -191,15 +191,7 @@ impl ActorContext { self.actor_state.get_state() } - pub(crate) fn process(&self) { - self.actor_state.process(); - } - - pub(crate) fn idle(&self) { - self.actor_state.idle(); - } - - pub(crate) fn pause(&self) { + pub fn pause(&self) { self.actor_state.pause(); } diff --git a/quickwit/quickwit-actors/src/actor_state.rs b/quickwit/quickwit-actors/src/actor_state.rs index 105a151bef9..3efec933059 100644 --- a/quickwit/quickwit-actors/src/actor_state.rs +++ b/quickwit/quickwit-actors/src/actor_state.rs @@ -22,26 +22,25 @@ use std::sync::atomic::{AtomicU32, Ordering}; #[repr(u32)] #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum ActorState { - /// Processing implies that the actor has some message(s) (this includes commands) to process. - Processing = 0, - /// Idle means that the actor is currently waiting for messages. - Idle = 1, - /// Pause means that the actor processes no message but can process commands. - Paused = 2, + /// Running means that the actor consumes and processes both low priority messages (regular + /// message) and high priority message commands. + Running = 0, + /// Pause means that the actor only consumes and processes high priority messages. Typically + /// commands as well as scheduled messages. + Paused = 1, /// Success means that the actor exited and cannot return to any other states. - Success = 3, + Success = 2, /// Failure means that the actor exited with a failure or panicked. - Failure = 4, + Failure = 3, } impl From for ActorState { fn from(actor_state_u32: u32) -> Self { match actor_state_u32 { - 0 => ActorState::Processing, - 1 => ActorState::Idle, - 2 => ActorState::Paused, - 3 => ActorState::Success, - 4 => ActorState::Failure, + 0 => ActorState::Running, + 1 => ActorState::Paused, + 2 => ActorState::Success, + 3 => ActorState::Failure, _ => { panic!( "Found forbidden u32 value for ActorState `{actor_state_u32}`. This should \ @@ -60,12 +59,12 @@ impl From for AtomicState { impl ActorState { pub fn is_running(&self) -> bool { - *self == ActorState::Idle || *self == ActorState::Processing + *self == ActorState::Running } pub fn is_exit(&self) -> bool { match self { - ActorState::Processing | ActorState::Idle | ActorState::Paused => false, + ActorState::Running | ActorState::Paused => false, ActorState::Success | ActorState::Failure => true, } } @@ -75,29 +74,11 @@ pub(crate) struct AtomicState(AtomicU32); impl Default for AtomicState { fn default() -> Self { - AtomicState(AtomicU32::new(ActorState::Processing as u32)) + AtomicState(AtomicU32::new(ActorState::Running as u32)) } } impl AtomicState { - pub(crate) fn process(&self) { - let _ = self.0.compare_exchange( - ActorState::Idle as u32, - ActorState::Processing as u32, - Ordering::SeqCst, - Ordering::SeqCst, - ); - } - - pub(crate) fn idle(&self) { - let _ = self.0.compare_exchange( - ActorState::Processing as u32, - ActorState::Idle as u32, - Ordering::SeqCst, - Ordering::SeqCst, - ); - } - pub(crate) fn pause(&self) { let _ = self .0 @@ -112,7 +93,7 @@ impl AtomicState { pub(crate) fn resume(&self) { let _ = self.0.compare_exchange( ActorState::Paused as u32, - ActorState::Processing as u32, + ActorState::Running as u32, Ordering::SeqCst, Ordering::SeqCst, ); @@ -137,8 +118,6 @@ mod tests { use super::*; enum Operation { - Process, - Idle, Pause, Resume, ExitSuccess, @@ -148,12 +127,6 @@ mod tests { impl Operation { fn apply(&self, state: &AtomicState) { match self { - Operation::Process => { - state.process(); - } - Operation::Idle => { - state.idle(); - } Operation::Pause => { state.pause(); } @@ -173,26 +146,15 @@ mod tests { #[test] fn test_atomic_state_from_running() { - test_transition(ActorState::Idle, Operation::Process, ActorState::Processing); - test_transition(ActorState::Processing, Operation::Idle, ActorState::Idle); - test_transition(ActorState::Processing, Operation::Pause, ActorState::Paused); - test_transition(ActorState::Idle, Operation::Pause, ActorState::Paused); + test_transition(ActorState::Running, Operation::Pause, ActorState::Paused); + test_transition(ActorState::Running, Operation::Resume, ActorState::Running); test_transition( - ActorState::Processing, - Operation::Resume, - ActorState::Processing, - ); - test_transition( - ActorState::Processing, + ActorState::Running, Operation::ExitSuccess, ActorState::Success, ); test_transition(ActorState::Paused, Operation::Pause, ActorState::Paused); - test_transition( - ActorState::Paused, - Operation::Resume, - ActorState::Processing, - ); + test_transition(ActorState::Paused, Operation::Resume, ActorState::Running); test_transition( ActorState::Paused, Operation::ExitSuccess, @@ -204,8 +166,6 @@ mod tests { ActorState::Failure, ); - test_transition(ActorState::Success, Operation::Process, ActorState::Success); - test_transition(ActorState::Success, Operation::Idle, ActorState::Success); test_transition(ActorState::Success, Operation::Pause, ActorState::Success); test_transition(ActorState::Success, Operation::Resume, ActorState::Success); test_transition( @@ -214,8 +174,6 @@ mod tests { ActorState::Success, ); - test_transition(ActorState::Failure, Operation::Process, ActorState::Failure); - test_transition(ActorState::Failure, Operation::Idle, ActorState::Failure); test_transition(ActorState::Failure, Operation::Pause, ActorState::Failure); test_transition(ActorState::Failure, Operation::Resume, ActorState::Failure); test_transition( diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 785bb6a2496..c0999d82315 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -349,11 +349,6 @@ impl Inbox { None } - #[allow(dead_code)] // temporary - pub(crate) fn try_recv_cmd_and_scheduled_msg_only(&self) -> Result, RecvError> { - self.rx.try_recv_high_priority_message() - } - /// Destroys the inbox and returns the list of pending messages or commands /// in the low priority channel. /// diff --git a/quickwit/quickwit-actors/src/spawn_builder.rs b/quickwit/quickwit-actors/src/spawn_builder.rs index f7641fb9413..2f7f6a2e690 100644 --- a/quickwit/quickwit-actors/src/spawn_builder.rs +++ b/quickwit/quickwit-actors/src/spawn_builder.rs @@ -236,14 +236,8 @@ async fn recv_envelope(inbox: &mut Inbox, ctx: &ActorContext) -> } } -fn try_recv_envelope(inbox: &mut Inbox, ctx: &ActorContext) -> Option> { - if ctx.state().is_running() { - inbox.try_recv() - } else { - // The actor is paused. We only process command and scheduled message. - inbox.try_recv_cmd_and_scheduled_msg_only() - } - .ok() +fn try_recv_envelope(inbox: &mut Inbox) -> Option> { + inbox.try_recv().ok() } struct ActorExecutionEnv { @@ -294,19 +288,25 @@ impl ActorExecutionEnv { async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> { self.yield_and_check_if_killed().await?; let envelope = recv_envelope(&mut self.inbox, &self.ctx).await; - self.ctx.process(); self.process_one_message(envelope).await?; - loop { - while let Some(envelope) = try_recv_envelope(&mut self.inbox, &self.ctx) { - self.process_one_message(envelope).await?; - } - self.ctx.yield_now().await; - if self.inbox.is_empty() { - break; + // If the actor is Running (not Paused), we consume all of the message in the mailbox + // and call `on_drained_message`. + if self.ctx.state().is_running() { + loop { + while let Some(envelope) = try_recv_envelope(&mut self.inbox) { + self.process_one_message(envelope).await?; + } + // We have reached the last message. + // Let's still yield and see if we have more messages: + // an upstream actor might have been experience backpressure, and is now waiting for our mailbox + // to have some room. + self.ctx.yield_now().await; + if self.inbox.is_empty() { + break; + } } + self.actor.get_mut().on_drained_messages(&self.ctx).await?; } - self.actor.get_mut().on_drained_messages(&self.ctx).await?; - self.ctx.idle(); if self.ctx.mailbox().is_last_mailbox() { // We double check here that the mailbox does not contain any messages, // as someone on different runtime thread could have added a last message diff --git a/quickwit/quickwit-actors/src/supervisor.rs b/quickwit/quickwit-actors/src/supervisor.rs index a4e11f1e873..2ebfa539040 100644 --- a/quickwit/quickwit-actors/src/supervisor.rs +++ b/quickwit/quickwit-actors/src/supervisor.rs @@ -22,9 +22,7 @@ use serde::Serialize; use tracing::{info, warn}; use crate::mailbox::Inbox; -use crate::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Health, Supervisable, -}; +use crate::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Supervisable}; #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)] pub struct SupervisorMetrics { @@ -151,7 +149,7 @@ impl Supervisor { // The actor is failing we need to restart it. let actor_handle = self.handle_opt.take().unwrap(); let actor_mailbox = actor_handle.mailbox().clone(); - let (actor_exit_status, _last_state) = if actor_handle.state() == ActorState::Processing { + let (actor_exit_status, _last_state) = if !actor_handle.state().is_exit() { // The actor is probably frozen. // Let's kill it. warn!("killing"); diff --git a/quickwit/quickwit-actors/src/tests.rs b/quickwit/quickwit-actors/src/tests.rs index 22c322a2dae..6a916b364e1 100644 --- a/quickwit/quickwit-actors/src/tests.rs +++ b/quickwit/quickwit-actors/src/tests.rs @@ -62,7 +62,7 @@ impl Handler for PingReceiverActor { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.ping_count += 1; - assert_eq!(ctx.state(), ActorState::Processing); + assert_eq!(ctx.state(), ActorState::Running); Ok(()) } } @@ -316,14 +316,14 @@ async fn test_actor_running_states() { quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let (ping_mailbox, ping_handle) = universe.spawn_builder().spawn(PingReceiverActor::default()); - assert!(ping_handle.state() == ActorState::Processing); + assert_eq!(ping_handle.state(), ActorState::Running); for _ in 0u32..10u32 { assert!(ping_mailbox.send_message(Ping).await.is_ok()); } let obs = ping_handle.process_pending_and_observe().await; assert_eq!(*obs, 10); universe.sleep(Duration::from_millis(1)).await; - assert!(ping_handle.state() == ActorState::Idle); + assert_eq!(ping_handle.state(), ActorState::Running); universe.assert_quit().await; } @@ -505,7 +505,7 @@ impl Actor for BuggyFinalizeActor { async fn test_actor_finalize_error_set_exit_status_to_panicked() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); let (mailbox, handle) = universe.spawn_builder().spawn(BuggyFinalizeActor); - assert!(matches!(handle.state(), ActorState::Processing)); + assert!(matches!(handle.state(), ActorState::Running)); drop(mailbox); let (exit, _) = handle.join().await; assert!(matches!(exit, ActorExitStatus::Panicked)); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index f97cdee7eed..458647b06f2 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -417,8 +417,9 @@ impl Actor for Indexer { // Time to take a nap. let sleep_for = commit_timeout - elapsed; + ctx.pause(); ctx.schedule_self_msg(sleep_for, Command::Resume); - self.handle(Command::Pause, ctx).await?; + Ok(()) } @@ -1079,12 +1080,20 @@ mod tests { .await .unwrap(); universe.sleep(Duration::from_secs(3)).await; - let mut indexer_counters = indexer_handle.observe().await.state; - indexer_counters.pipeline_metrics_opt = None; + let mut indexer_counters: IndexerCounters = Default::default(); + for _ in 0..100 { + tokio::task::yield_now().await; + indexer_counters = indexer_handle.observe().await.state; + indexer_counters.pipeline_metrics_opt = None; + // drain was called at least once. + if indexer_counters.num_splits_emitted > 0 { + break; + } + } assert_eq!( - indexer_counters, - IndexerCounters { + &indexer_counters, + &IndexerCounters { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index d3132ed63a4..81d2553ea44 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -372,7 +372,7 @@ impl IndexingService { self.indexing_pipelines .retain(|pipeline_uid, pipeline_handle| { match pipeline_handle.handle.state() { - ActorState::Idle | ActorState::Paused | ActorState::Processing => true, + ActorState::Paused | ActorState::Running => true, ActorState::Success => { info!( pipeline_uid=%pipeline_uid,