Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing bug that was causing the Indexer to stay busy in cooperative m… #4452

Merged
merged 4 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,7 @@ impl<A: Actor> ActorContext<A> {
self.actor_state.get_state()
}

pub(crate) fn process(&self) {
self.actor_state.process();
}

pub(crate) fn idle(&self) {
self.actor_state.idle();
}

pub(crate) fn pause(&self) {
pub fn pause(&self) {
self.actor_state.pause();
}

Expand Down
82 changes: 20 additions & 62 deletions quickwit/quickwit-actors/src/actor_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@ use std::sync::atomic::{AtomicU32, Ordering};
#[repr(u32)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ActorState {
/// Processing implies that the actor has some message(s) (this includes commands) to process.
Processing = 0,
/// Idle means that the actor is currently waiting for messages.
Idle = 1,
/// Pause means that the actor processes no message but can process commands.
Paused = 2,
/// Running means that the actor consumes and processes both low priority messages (regular
/// message) and high priority message commands.
Running = 0,
/// Pause means that the actor only consumes and processes high priority messages. Typically
/// commands as well as scheduled messages.
Paused = 1,
/// Success means that the actor exited and cannot return to any other states.
Success = 3,
Success = 2,
/// Failure means that the actor exited with a failure or panicked.
Failure = 4,
Failure = 3,
}

impl From<u32> for ActorState {
fn from(actor_state_u32: u32) -> Self {
match actor_state_u32 {
0 => ActorState::Processing,
1 => ActorState::Idle,
2 => ActorState::Paused,
3 => ActorState::Success,
4 => ActorState::Failure,
0 => ActorState::Running,
1 => ActorState::Paused,
2 => ActorState::Success,
3 => ActorState::Failure,
_ => {
panic!(
"Found forbidden u32 value for ActorState `{actor_state_u32}`. This should \
Expand All @@ -60,12 +59,12 @@ impl From<ActorState> for AtomicState {

impl ActorState {
pub fn is_running(&self) -> bool {
*self == ActorState::Idle || *self == ActorState::Processing
*self == ActorState::Running
}

pub fn is_exit(&self) -> bool {
match self {
ActorState::Processing | ActorState::Idle | ActorState::Paused => false,
ActorState::Running | ActorState::Paused => false,
ActorState::Success | ActorState::Failure => true,
}
}
Expand All @@ -75,29 +74,11 @@ pub(crate) struct AtomicState(AtomicU32);

impl Default for AtomicState {
fn default() -> Self {
AtomicState(AtomicU32::new(ActorState::Processing as u32))
AtomicState(AtomicU32::new(ActorState::Running as u32))
}
}

impl AtomicState {
pub(crate) fn process(&self) {
let _ = self.0.compare_exchange(
ActorState::Idle as u32,
ActorState::Processing as u32,
Ordering::SeqCst,
Ordering::SeqCst,
);
}

pub(crate) fn idle(&self) {
let _ = self.0.compare_exchange(
ActorState::Processing as u32,
ActorState::Idle as u32,
Ordering::SeqCst,
Ordering::SeqCst,
);
}

pub(crate) fn pause(&self) {
let _ = self
.0
Expand All @@ -112,7 +93,7 @@ impl AtomicState {
pub(crate) fn resume(&self) {
let _ = self.0.compare_exchange(
ActorState::Paused as u32,
ActorState::Processing as u32,
ActorState::Running as u32,
Ordering::SeqCst,
Ordering::SeqCst,
);
Expand All @@ -137,8 +118,6 @@ mod tests {
use super::*;

enum Operation {
Process,
Idle,
Pause,
Resume,
ExitSuccess,
Expand All @@ -148,12 +127,6 @@ mod tests {
impl Operation {
fn apply(&self, state: &AtomicState) {
match self {
Operation::Process => {
state.process();
}
Operation::Idle => {
state.idle();
}
Operation::Pause => {
state.pause();
}
Expand All @@ -173,26 +146,15 @@ mod tests {

#[test]
fn test_atomic_state_from_running() {
test_transition(ActorState::Idle, Operation::Process, ActorState::Processing);
test_transition(ActorState::Processing, Operation::Idle, ActorState::Idle);
test_transition(ActorState::Processing, Operation::Pause, ActorState::Paused);
test_transition(ActorState::Idle, Operation::Pause, ActorState::Paused);
test_transition(ActorState::Running, Operation::Pause, ActorState::Paused);
test_transition(ActorState::Running, Operation::Resume, ActorState::Running);
test_transition(
ActorState::Processing,
Operation::Resume,
ActorState::Processing,
);
test_transition(
ActorState::Processing,
ActorState::Running,
Operation::ExitSuccess,
ActorState::Success,
);
test_transition(ActorState::Paused, Operation::Pause, ActorState::Paused);
test_transition(
ActorState::Paused,
Operation::Resume,
ActorState::Processing,
);
test_transition(ActorState::Paused, Operation::Resume, ActorState::Running);
test_transition(
ActorState::Paused,
Operation::ExitSuccess,
Expand All @@ -204,8 +166,6 @@ mod tests {
ActorState::Failure,
);

test_transition(ActorState::Success, Operation::Process, ActorState::Success);
test_transition(ActorState::Success, Operation::Idle, ActorState::Success);
test_transition(ActorState::Success, Operation::Pause, ActorState::Success);
test_transition(ActorState::Success, Operation::Resume, ActorState::Success);
test_transition(
Expand All @@ -214,8 +174,6 @@ mod tests {
ActorState::Success,
);

test_transition(ActorState::Failure, Operation::Process, ActorState::Failure);
test_transition(ActorState::Failure, Operation::Idle, ActorState::Failure);
test_transition(ActorState::Failure, Operation::Pause, ActorState::Failure);
test_transition(ActorState::Failure, Operation::Resume, ActorState::Failure);
test_transition(
Expand Down
5 changes: 0 additions & 5 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,6 @@ impl<A: Actor> Inbox<A> {
None
}

#[allow(dead_code)] // temporary
pub(crate) fn try_recv_cmd_and_scheduled_msg_only(&self) -> Result<Envelope<A>, RecvError> {
self.rx.try_recv_high_priority_message()
}

/// Destroys the inbox and returns the list of pending messages or commands
/// in the low priority channel.
///
Expand Down
36 changes: 18 additions & 18 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,8 @@ async fn recv_envelope<A: Actor>(inbox: &mut Inbox<A>, ctx: &ActorContext<A>) ->
}
}

fn try_recv_envelope<A: Actor>(inbox: &mut Inbox<A>, ctx: &ActorContext<A>) -> Option<Envelope<A>> {
if ctx.state().is_running() {
inbox.try_recv()
} else {
// The actor is paused. We only process command and scheduled message.
inbox.try_recv_cmd_and_scheduled_msg_only()
}
.ok()
fn try_recv_envelope<A: Actor>(inbox: &mut Inbox<A>) -> Option<Envelope<A>> {
inbox.try_recv().ok()
}

struct ActorExecutionEnv<A: Actor> {
Expand Down Expand Up @@ -294,19 +288,25 @@ impl<A: Actor> ActorExecutionEnv<A> {
async fn process_all_available_messages(&mut self) -> Result<(), ActorExitStatus> {
self.yield_and_check_if_killed().await?;
let envelope = recv_envelope(&mut self.inbox, &self.ctx).await;
self.ctx.process();
self.process_one_message(envelope).await?;
loop {
while let Some(envelope) = try_recv_envelope(&mut self.inbox, &self.ctx) {
self.process_one_message(envelope).await?;
}
self.ctx.yield_now().await;
if self.inbox.is_empty() {
break;
// If the actor is Running (not Paused), we consume all the messages in the mailbox
// and call `on_drained_message`.
if self.ctx.state().is_running() {
loop {
while let Some(envelope) = try_recv_envelope(&mut self.inbox) {
self.process_one_message(envelope).await?;
}
// We have reached the last message.
// Let's still yield and see if we have more messages:
// an upstream actor might have experienced backpressure, and is now waiting for our
// mailbox to have some room.
self.ctx.yield_now().await;
if self.inbox.is_empty() {
break;
}
}
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
}
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
self.ctx.idle();
if self.ctx.mailbox().is_last_mailbox() {
// We double check here that the mailbox does not contain any messages,
// as someone on different runtime thread could have added a last message
Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-actors/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use serde::Serialize;
use tracing::{info, warn};

use crate::mailbox::Inbox;
use crate::{
Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Health, Supervisable,
};
use crate::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Supervisable};

#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Serialize)]
pub struct SupervisorMetrics {
Expand Down Expand Up @@ -151,7 +149,7 @@ impl<A: Actor> Supervisor<A> {
// The actor is failing we need to restart it.
let actor_handle = self.handle_opt.take().unwrap();
let actor_mailbox = actor_handle.mailbox().clone();
let (actor_exit_status, _last_state) = if actor_handle.state() == ActorState::Processing {
let (actor_exit_status, _last_state) = if !actor_handle.state().is_exit() {
// The actor is probably frozen.
// Let's kill it.
warn!("killing");
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Handler<Ping> for PingReceiverActor {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.ping_count += 1;
assert_eq!(ctx.state(), ActorState::Processing);
assert_eq!(ctx.state(), ActorState::Running);
Ok(())
}
}
Expand Down Expand Up @@ -316,14 +316,14 @@ async fn test_actor_running_states() {
quickwit_common::setup_logging_for_tests();
let universe = Universe::with_accelerated_time();
let (ping_mailbox, ping_handle) = universe.spawn_builder().spawn(PingReceiverActor::default());
assert!(ping_handle.state() == ActorState::Processing);
assert_eq!(ping_handle.state(), ActorState::Running);
for _ in 0u32..10u32 {
assert!(ping_mailbox.send_message(Ping).await.is_ok());
}
let obs = ping_handle.process_pending_and_observe().await;
assert_eq!(*obs, 10);
universe.sleep(Duration::from_millis(1)).await;
assert!(ping_handle.state() == ActorState::Idle);
assert_eq!(ping_handle.state(), ActorState::Running);
universe.assert_quit().await;
}

Expand Down Expand Up @@ -505,7 +505,7 @@ impl Actor for BuggyFinalizeActor {
async fn test_actor_finalize_error_set_exit_status_to_panicked() -> anyhow::Result<()> {
let universe = Universe::with_accelerated_time();
let (mailbox, handle) = universe.spawn_builder().spawn(BuggyFinalizeActor);
assert!(matches!(handle.state(), ActorState::Processing));
assert!(matches!(handle.state(), ActorState::Running));
drop(mailbox);
let (exit, _) = handle.join().await;
assert!(matches!(exit, ActorExitStatus::Panicked));
Expand Down
25 changes: 19 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,9 @@ impl Actor for Indexer {
// Time to take a nap.
let sleep_for = commit_timeout - elapsed;

ctx.pause();
ctx.schedule_self_msg(sleep_for, Command::Resume);
self.handle(Command::Pause, ctx).await?;

Ok(())
}

Expand Down Expand Up @@ -1078,13 +1079,25 @@ mod tests {
})
.await
.unwrap();
universe.sleep(Duration::from_secs(3)).await;
let mut indexer_counters = indexer_handle.observe().await.state;
indexer_counters.pipeline_metrics_opt = None;
let mut indexer_counters: IndexerCounters = Default::default();
for _ in 0..100 {
// When a lot of unit tests are running concurrently we have a race condition here.
// It is very difficult to assess when drain will actually be called.
//
// Therefore we check that it happens "eventually".
universe.sleep(Duration::from_secs(1)).await;
tokio::task::yield_now().await;
indexer_counters = indexer_handle.observe().await.state;
indexer_counters.pipeline_metrics_opt = None;
// drain was called at least once.
if indexer_counters.num_splits_emitted > 0 {
break;
}
}

assert_eq!(
indexer_counters,
IndexerCounters {
&indexer_counters,
&IndexerCounters {
num_splits_emitted: 1,
num_split_batches_emitted: 1,
num_docs_in_workbench: 0,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl IndexingService {
self.indexing_pipelines
.retain(|pipeline_uid, pipeline_handle| {
match pipeline_handle.handle.state() {
ActorState::Idle | ActorState::Paused | ActorState::Processing => true,
ActorState::Paused | ActorState::Running => true,
ActorState::Success => {
info!(
pipeline_uid=%pipeline_uid,
Expand Down