Skip to content

Commit

Permalink
Fixing bug that was causing the Indexer to stay busy in cooperative m…
Browse files Browse the repository at this point in the history
…ode.

Removing the difference between Idle and Processing.

Closes #4448
  • Loading branch information
fulmicoton committed Jan 24, 2024
1 parent 53f38d6 commit 1a4e1a0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 67 deletions.
12 changes: 2 additions & 10 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,11 @@ impl<A: Actor> ActorContext<A> {
self.progress.record_progress();
}

pub(crate) fn state(&self) -> ActorState {
pub fn state(&self) -> ActorState {
self.actor_state.get_state()
}

pub(crate) fn process(&self) {
self.actor_state.process();
}

pub(crate) fn idle(&self) {
self.actor_state.idle();
}

pub(crate) fn pause(&self) {
pub fn pause(&self) {
self.actor_state.pause();
}

Expand Down
65 changes: 23 additions & 42 deletions quickwit/quickwit-actors/src/actor_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@ use std::sync::atomic::{AtomicU32, Ordering};
#[repr(u32)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ActorState {
/// Processing implies that the actor has some message(s) (this includes commands) to process.
Processing = 0,
/// Idle means that the actor is currently waiting for messages.
Idle = 1,
/// Pause means that the actor processes no message but can process commands.
Paused = 2,
/// Running means that the actor consumes and processes both low priority messages (regular message) and high
/// priority message commands.
Running = 0,
/// Pause means that the actor only consumes and processes high priority messages. Typically commands as well
/// as scheduled messages.
Paused = 1,
/// Success means that the actor exited and cannot return to any other states.
Success = 3,
Success = 2,
/// Failure means that the actor exited with a failure or panicked.
Failure = 4,
Failure = 3,
}

impl From<u32> for ActorState {
fn from(actor_state_u32: u32) -> Self {
match actor_state_u32 {
0 => ActorState::Processing,
1 => ActorState::Idle,
2 => ActorState::Paused,
3 => ActorState::Success,
4 => ActorState::Failure,
0 => ActorState::Running,
1 => ActorState::Paused,
2 => ActorState::Success,
3 => ActorState::Failure,
_ => {
panic!(
"Found forbidden u32 value for ActorState `{actor_state_u32}`. This should \
Expand All @@ -60,12 +59,12 @@ impl From<ActorState> for AtomicState {

impl ActorState {
pub fn is_running(&self) -> bool {
*self == ActorState::Idle || *self == ActorState::Processing
*self == ActorState::Running
}

pub fn is_exit(&self) -> bool {
match self {
ActorState::Processing | ActorState::Idle | ActorState::Paused => false,
ActorState::Running | ActorState::Paused => false,
ActorState::Success | ActorState::Failure => true,
}
}
Expand All @@ -75,29 +74,11 @@ pub(crate) struct AtomicState(AtomicU32);

impl Default for AtomicState {
fn default() -> Self {
AtomicState(AtomicU32::new(ActorState::Processing as u32))
AtomicState(AtomicU32::new(ActorState::Running as u32))
}
}

impl AtomicState {
pub(crate) fn process(&self) {
let _ = self.0.compare_exchange(
ActorState::Idle as u32,
ActorState::Processing as u32,
Ordering::SeqCst,
Ordering::SeqCst,
);
}

pub(crate) fn idle(&self) {
let _ = self.0.compare_exchange(
ActorState::Processing as u32,
ActorState::Idle as u32,
Ordering::SeqCst,
Ordering::SeqCst,
);
}

pub(crate) fn pause(&self) {
let _ = self
.0
Expand All @@ -112,7 +93,7 @@ impl AtomicState {
pub(crate) fn resume(&self) {
let _ = self.0.compare_exchange(
ActorState::Paused as u32,
ActorState::Processing as u32,
ActorState::Running as u32,
Ordering::SeqCst,
Ordering::SeqCst,
);
Expand Down Expand Up @@ -173,25 +154,25 @@ mod tests {

#[test]
fn test_atomic_state_from_running() {
test_transition(ActorState::Idle, Operation::Process, ActorState::Processing);
test_transition(ActorState::Processing, Operation::Idle, ActorState::Idle);
test_transition(ActorState::Processing, Operation::Pause, ActorState::Paused);
test_transition(ActorState::Idle, Operation::Process, ActorState::Running);
test_transition(ActorState::Running, Operation::Idle, ActorState::Idle);
test_transition(ActorState::Running, Operation::Pause, ActorState::Paused);
test_transition(ActorState::Idle, Operation::Pause, ActorState::Paused);
test_transition(
ActorState::Processing,
ActorState::Running,
Operation::Resume,
ActorState::Processing,
ActorState::Running,
);
test_transition(
ActorState::Processing,
ActorState::Running,
Operation::ExitSuccess,
ActorState::Success,
);
test_transition(ActorState::Paused, Operation::Pause, ActorState::Paused);
test_transition(
ActorState::Paused,
Operation::Resume,
ActorState::Processing,
ActorState::Running,
);
test_transition(
ActorState::Paused,
Expand Down
26 changes: 15 additions & 11 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::registry::{ActorJoinHandle, ActorRegistry};
use crate::scheduler::{NoAdvanceTimeGuard, SchedulerClient};
use crate::supervisor::Supervisor;
use crate::{
Actor, ActorContext, ActorExitStatus, ActorHandle, KillSwitch, Mailbox, QueueCapacity,
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, KillSwitch, Mailbox,
QueueCapacity,
};

#[derive(Clone)]
Expand Down Expand Up @@ -294,19 +295,22 @@ impl<A: Actor> ActorExecutionEnv<A> {
async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> {
self.yield_and_check_if_killed().await?;
let envelope = recv_envelope(&mut self.inbox, &self.ctx).await;
self.ctx.process();
self.process_one_message(envelope).await?;
loop {
while let Some(envelope) = try_recv_envelope(&mut self.inbox, &self.ctx) {
self.process_one_message(envelope).await?;
}
self.ctx.yield_now().await;
if self.inbox.is_empty() {
break;
// If the actor is not Paused, we consume all of the message in the mailbox.
if self.ctx.state() == ActorState::Running {
loop {
while let Some(envelope) = try_recv_envelope(&mut self.inbox, &self.ctx) {
self.process_one_message(envelope).await?;
}
self.ctx.yield_now().await;
if self.inbox.is_empty() {
break;
}
}
}
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
self.ctx.idle();
if self.ctx.state().is_running() {
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
}
if self.ctx.mailbox().is_last_mailbox() {
// We double check here that the mailbox does not contain any messages,
// as someone on different runtime thread could have added a last message
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-actors/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tracing::{info, warn};

use crate::mailbox::Inbox;
use crate::{
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Health, Supervisable,
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Supervisable,
};

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)]
Expand Down Expand Up @@ -151,7 +151,7 @@ impl<A: Actor> Supervisor<A> {
// The actor is failing we need to restart it.
let actor_handle = self.handle_opt.take().unwrap();
let actor_mailbox = actor_handle.mailbox().clone();
let (actor_exit_status, _last_state) = if actor_handle.state() == ActorState::Processing {
let (actor_exit_status, _last_state) = if !actor_handle.state().is_exit() {
// The actor is probably frozen.
// Let's kill it.
warn!("killing");
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,9 @@ impl Actor for Indexer {
// Time to take a nap.
let sleep_for = commit_timeout - elapsed;

ctx.pause();
ctx.schedule_self_msg(sleep_for, Command::Resume);
self.handle(Command::Pause, ctx).await?;

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl IndexingService {
self.indexing_pipelines
.retain(|pipeline_uid, pipeline_handle| {
match pipeline_handle.handle.state() {
ActorState::Idle | ActorState::Paused | ActorState::Processing => true,
ActorState::Paused | ActorState::Running => true,
ActorState::Success => {
info!(
pipeline_uid=%pipeline_uid,
Expand Down

0 comments on commit 1a4e1a0

Please sign in to comment.