From a2e3b92ba52b34c4619161e4960ab05420675413 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 25 Jan 2024 11:09:58 +0900 Subject: [PATCH] =?UTF-8?q?Fixing=20bug=20that=20was=20causing=20the=20Ind?= =?UTF-8?q?exer=20to=20stay=20busy=20in=20cooperative=20m=E2=80=A6=20(#445?= =?UTF-8?q?2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixing bug that was causing the Indexer to stay busy in cooperative mode. Removing the difference between Idle and Processing. It was originally introduced with the idea of measure how long actors where busy or idle, but tokio-console does that job very well now. The code was relying on the enforcement of the non trivial state machine: calling .idle() in the pause state had no effect for instance. The real bug however showed up with the following sequence of event: - indexer drains its queue - on_drain is called - indexer "goes to sleep" by putting itself in the Pause state. - a low priority message is queued into the low priority queue. - indexer receives a high priority message (Observe is sent every second). - indexer would go through the loop logic consuming the entire pipeline, but only consuming high priority messages. The `inbox.is_empty()` exit condiition is never satisfied, so that this loop would keep going, yielding at each iteration, until the Resume Command is received. This is done on the indexer runtime. The task would always be busy, but yield all of the time, consuming the whatever room is available on the thread it is consuming. Closes #4448 Co-authored-by: Adrien Guillo --- quickwit/quickwit-actors/src/actor_context.rs | 10 +-- quickwit/quickwit-actors/src/actor_state.rs | 82 +++++-------------- quickwit/quickwit-actors/src/mailbox.rs | 5 -- quickwit/quickwit-actors/src/spawn_builder.rs | 36 ++++---- quickwit/quickwit-actors/src/supervisor.rs | 6 +- quickwit/quickwit-actors/src/tests.rs | 8 +- .../quickwit-indexing/src/actors/indexer.rs | 25 ++++-- .../src/actors/indexing_service.rs | 2 +- 8 files changed, 65 insertions(+), 109 deletions(-) diff --git a/quickwit/quickwit-actors/src/actor_context.rs b/quickwit/quickwit-actors/src/actor_context.rs index aff023ad602..be68e47bb5e 100644 --- a/quickwit/quickwit-actors/src/actor_context.rs +++ b/quickwit/quickwit-actors/src/actor_context.rs @@ -191,15 +191,7 @@ impl ActorContext { self.actor_state.get_state() } - pub(crate) fn process(&self) { - self.actor_state.process(); - } - - pub(crate) fn idle(&self) { - self.actor_state.idle(); - } - - pub(crate) fn pause(&self) { + pub fn pause(&self) { self.actor_state.pause(); } diff --git a/quickwit/quickwit-actors/src/actor_state.rs b/quickwit/quickwit-actors/src/actor_state.rs index 105a151bef9..3efec933059 100644 --- a/quickwit/quickwit-actors/src/actor_state.rs +++ b/quickwit/quickwit-actors/src/actor_state.rs @@ -22,26 +22,25 @@ use std::sync::atomic::{AtomicU32, Ordering}; #[repr(u32)] #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum ActorState { - /// Processing implies that the actor has some message(s) (this includes commands) to process. - Processing = 0, - /// Idle means that the actor is currently waiting for messages. - Idle = 1, - /// Pause means that the actor processes no message but can process commands. - Paused = 2, + /// Running means that the actor consumes and processes both low priority messages (regular + /// message) and high priority message commands. + Running = 0, + /// Pause means that the actor only consumes and processes high priority messages. Typically + /// commands as well as scheduled messages. + Paused = 1, /// Success means that the actor exited and cannot return to any other states. - Success = 3, + Success = 2, /// Failure means that the actor exited with a failure or panicked. - Failure = 4, + Failure = 3, } impl From for ActorState { fn from(actor_state_u32: u32) -> Self { match actor_state_u32 { - 0 => ActorState::Processing, - 1 => ActorState::Idle, - 2 => ActorState::Paused, - 3 => ActorState::Success, - 4 => ActorState::Failure, + 0 => ActorState::Running, + 1 => ActorState::Paused, + 2 => ActorState::Success, + 3 => ActorState::Failure, _ => { panic!( "Found forbidden u32 value for ActorState `{actor_state_u32}`. This should \ @@ -60,12 +59,12 @@ impl From for AtomicState { impl ActorState { pub fn is_running(&self) -> bool { - *self == ActorState::Idle || *self == ActorState::Processing + *self == ActorState::Running } pub fn is_exit(&self) -> bool { match self { - ActorState::Processing | ActorState::Idle | ActorState::Paused => false, + ActorState::Running | ActorState::Paused => false, ActorState::Success | ActorState::Failure => true, } } @@ -75,29 +74,11 @@ pub(crate) struct AtomicState(AtomicU32); impl Default for AtomicState { fn default() -> Self { - AtomicState(AtomicU32::new(ActorState::Processing as u32)) + AtomicState(AtomicU32::new(ActorState::Running as u32)) } } impl AtomicState { - pub(crate) fn process(&self) { - let _ = self.0.compare_exchange( - ActorState::Idle as u32, - ActorState::Processing as u32, - Ordering::SeqCst, - Ordering::SeqCst, - ); - } - - pub(crate) fn idle(&self) { - let _ = self.0.compare_exchange( - ActorState::Processing as u32, - ActorState::Idle as u32, - Ordering::SeqCst, - Ordering::SeqCst, - ); - } - pub(crate) fn pause(&self) { let _ = self .0 @@ -112,7 +93,7 @@ impl AtomicState { pub(crate) fn resume(&self) { let _ = self.0.compare_exchange( ActorState::Paused as u32, - ActorState::Processing as u32, + ActorState::Running as u32, Ordering::SeqCst, Ordering::SeqCst, ); @@ -137,8 +118,6 @@ mod tests { use super::*; enum Operation { - Process, - Idle, Pause, Resume, ExitSuccess, @@ -148,12 +127,6 @@ mod tests { impl Operation { fn apply(&self, state: &AtomicState) { match self { - Operation::Process => { - state.process(); - } - Operation::Idle => { - state.idle(); - } Operation::Pause => { state.pause(); } @@ -173,26 +146,15 @@ mod tests { #[test] fn test_atomic_state_from_running() { - test_transition(ActorState::Idle, Operation::Process, ActorState::Processing); - test_transition(ActorState::Processing, Operation::Idle, ActorState::Idle); - test_transition(ActorState::Processing, Operation::Pause, ActorState::Paused); - test_transition(ActorState::Idle, Operation::Pause, ActorState::Paused); + test_transition(ActorState::Running, Operation::Pause, ActorState::Paused); + test_transition(ActorState::Running, Operation::Resume, ActorState::Running); test_transition( - ActorState::Processing, - Operation::Resume, - ActorState::Processing, - ); - test_transition( - ActorState::Processing, + ActorState::Running, Operation::ExitSuccess, ActorState::Success, ); test_transition(ActorState::Paused, Operation::Pause, ActorState::Paused); - test_transition( - ActorState::Paused, - Operation::Resume, - ActorState::Processing, - ); + test_transition(ActorState::Paused, Operation::Resume, ActorState::Running); test_transition( ActorState::Paused, Operation::ExitSuccess, @@ -204,8 +166,6 @@ mod tests { ActorState::Failure, ); - test_transition(ActorState::Success, Operation::Process, ActorState::Success); - test_transition(ActorState::Success, Operation::Idle, ActorState::Success); test_transition(ActorState::Success, Operation::Pause, ActorState::Success); test_transition(ActorState::Success, Operation::Resume, ActorState::Success); test_transition( @@ -214,8 +174,6 @@ mod tests { ActorState::Success, ); - test_transition(ActorState::Failure, Operation::Process, ActorState::Failure); - test_transition(ActorState::Failure, Operation::Idle, ActorState::Failure); test_transition(ActorState::Failure, Operation::Pause, ActorState::Failure); test_transition(ActorState::Failure, Operation::Resume, ActorState::Failure); test_transition( diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 785bb6a2496..c0999d82315 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -349,11 +349,6 @@ impl Inbox { None } - #[allow(dead_code)] // temporary - pub(crate) fn try_recv_cmd_and_scheduled_msg_only(&self) -> Result, RecvError> { - self.rx.try_recv_high_priority_message() - } - /// Destroys the inbox and returns the list of pending messages or commands /// in the low priority channel. /// diff --git a/quickwit/quickwit-actors/src/spawn_builder.rs b/quickwit/quickwit-actors/src/spawn_builder.rs index f7641fb9413..4ee673a9436 100644 --- a/quickwit/quickwit-actors/src/spawn_builder.rs +++ b/quickwit/quickwit-actors/src/spawn_builder.rs @@ -236,14 +236,8 @@ async fn recv_envelope(inbox: &mut Inbox, ctx: &ActorContext) -> } } -fn try_recv_envelope(inbox: &mut Inbox, ctx: &ActorContext) -> Option> { - if ctx.state().is_running() { - inbox.try_recv() - } else { - // The actor is paused. We only process command and scheduled message. - inbox.try_recv_cmd_and_scheduled_msg_only() - } - .ok() +fn try_recv_envelope(inbox: &mut Inbox) -> Option> { + inbox.try_recv().ok() } struct ActorExecutionEnv { @@ -294,19 +288,25 @@ impl ActorExecutionEnv { async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> { self.yield_and_check_if_killed().await?; let envelope = recv_envelope(&mut self.inbox, &self.ctx).await; - self.ctx.process(); self.process_one_message(envelope).await?; - loop { - while let Some(envelope) = try_recv_envelope(&mut self.inbox, &self.ctx) { - self.process_one_message(envelope).await?; - } - self.ctx.yield_now().await; - if self.inbox.is_empty() { - break; + // If the actor is Running (not Paused), we consume all the messages in the mailbox + // and call `on_drained_message`. + if self.ctx.state().is_running() { + loop { + while let Some(envelope) = try_recv_envelope(&mut self.inbox) { + self.process_one_message(envelope).await?; + } + // We have reached the last message. + // Let's still yield and see if we have more messages: + // an upstream actor might have experienced backpressure, and is now waiting for our + // mailbox to have some room. + self.ctx.yield_now().await; + if self.inbox.is_empty() { + break; + } } + self.actor.get_mut().on_drained_messages(&self.ctx).await?; } - self.actor.get_mut().on_drained_messages(&self.ctx).await?; - self.ctx.idle(); if self.ctx.mailbox().is_last_mailbox() { // We double check here that the mailbox does not contain any messages, // as someone on different runtime thread could have added a last message diff --git a/quickwit/quickwit-actors/src/supervisor.rs b/quickwit/quickwit-actors/src/supervisor.rs index a4e11f1e873..2ebfa539040 100644 --- a/quickwit/quickwit-actors/src/supervisor.rs +++ b/quickwit/quickwit-actors/src/supervisor.rs @@ -22,9 +22,7 @@ use serde::Serialize; use tracing::{info, warn}; use crate::mailbox::Inbox; -use crate::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Health, Supervisable, -}; +use crate::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Supervisable}; #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)] pub struct SupervisorMetrics { @@ -151,7 +149,7 @@ impl Supervisor { // The actor is failing we need to restart it. let actor_handle = self.handle_opt.take().unwrap(); let actor_mailbox = actor_handle.mailbox().clone(); - let (actor_exit_status, _last_state) = if actor_handle.state() == ActorState::Processing { + let (actor_exit_status, _last_state) = if !actor_handle.state().is_exit() { // The actor is probably frozen. // Let's kill it. warn!("killing"); diff --git a/quickwit/quickwit-actors/src/tests.rs b/quickwit/quickwit-actors/src/tests.rs index 22c322a2dae..6a916b364e1 100644 --- a/quickwit/quickwit-actors/src/tests.rs +++ b/quickwit/quickwit-actors/src/tests.rs @@ -62,7 +62,7 @@ impl Handler for PingReceiverActor { ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { self.ping_count += 1; - assert_eq!(ctx.state(), ActorState::Processing); + assert_eq!(ctx.state(), ActorState::Running); Ok(()) } } @@ -316,14 +316,14 @@ async fn test_actor_running_states() { quickwit_common::setup_logging_for_tests(); let universe = Universe::with_accelerated_time(); let (ping_mailbox, ping_handle) = universe.spawn_builder().spawn(PingReceiverActor::default()); - assert!(ping_handle.state() == ActorState::Processing); + assert_eq!(ping_handle.state(), ActorState::Running); for _ in 0u32..10u32 { assert!(ping_mailbox.send_message(Ping).await.is_ok()); } let obs = ping_handle.process_pending_and_observe().await; assert_eq!(*obs, 10); universe.sleep(Duration::from_millis(1)).await; - assert!(ping_handle.state() == ActorState::Idle); + assert_eq!(ping_handle.state(), ActorState::Running); universe.assert_quit().await; } @@ -505,7 +505,7 @@ impl Actor for BuggyFinalizeActor { async fn test_actor_finalize_error_set_exit_status_to_panicked() -> anyhow::Result<()> { let universe = Universe::with_accelerated_time(); let (mailbox, handle) = universe.spawn_builder().spawn(BuggyFinalizeActor); - assert!(matches!(handle.state(), ActorState::Processing)); + assert!(matches!(handle.state(), ActorState::Running)); drop(mailbox); let (exit, _) = handle.join().await; assert!(matches!(exit, ActorExitStatus::Panicked)); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index f97cdee7eed..930662460e0 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -417,8 +417,9 @@ impl Actor for Indexer { // Time to take a nap. let sleep_for = commit_timeout - elapsed; + ctx.pause(); ctx.schedule_self_msg(sleep_for, Command::Resume); - self.handle(Command::Pause, ctx).await?; + Ok(()) } @@ -1078,13 +1079,25 @@ mod tests { }) .await .unwrap(); - universe.sleep(Duration::from_secs(3)).await; - let mut indexer_counters = indexer_handle.observe().await.state; - indexer_counters.pipeline_metrics_opt = None; + let mut indexer_counters: IndexerCounters = Default::default(); + for _ in 0..100 { + // When a lot of unit tests are running concurrently we have a race condition here. + // It is very difficult to assess when drain will actually be called. + // + // Therefore we check that it happens "eventually". + universe.sleep(Duration::from_secs(1)).await; + tokio::task::yield_now().await; + indexer_counters = indexer_handle.observe().await.state; + indexer_counters.pipeline_metrics_opt = None; + // drain was called at least once. + if indexer_counters.num_splits_emitted > 0 { + break; + } + } assert_eq!( - indexer_counters, - IndexerCounters { + &indexer_counters, + &IndexerCounters { num_splits_emitted: 1, num_split_batches_emitted: 1, num_docs_in_workbench: 0, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index d3132ed63a4..81d2553ea44 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -372,7 +372,7 @@ impl IndexingService { self.indexing_pipelines .retain(|pipeline_uid, pipeline_handle| { match pipeline_handle.handle.state() { - ActorState::Idle | ActorState::Paused | ActorState::Processing => true, + ActorState::Paused | ActorState::Running => true, ActorState::Success => { info!( pipeline_uid=%pipeline_uid,