Skip to content

Commit

Permalink
Remove two bugs linked to the last mailbox in the actor framework (#4251
Browse files Browse the repository at this point in the history
)

The weak mailbox was messing the refcounting.
and we technically had a race condition.

The bug could surfaced on any actor looping alone,
like the control plane.

Closes #4248
  • Loading branch information
fulmicoton authored Dec 11, 2023
1 parent 041d9b5 commit 9f20660
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 41 deletions.
53 changes: 17 additions & 36 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::Instant;

use async_trait::async_trait;
use quickwit_common::metrics::IntCounter;
use tokio::sync::oneshot;

use crate::channel_with_priority::{Receiver, Sender, TrySendError};
use crate::envelope::{wrap_in_envelope, Envelope};
use crate::scheduler::SchedulerClient;
use crate::{
Actor, ActorContext, ActorExitStatus, AskError, DeferableReplyHandler, Handler, QueueCapacity,
RecvError, SendError,
};
use crate::{Actor, AskError, Command, DeferableReplyHandler, QueueCapacity, RecvError, SendError};

/// A mailbox is the object that makes it possible to send a message
/// to an actor.
Expand Down Expand Up @@ -76,40 +72,11 @@ impl<A: Actor> Drop for Mailbox<A> {
// This was the last mailbox.
// `ref_count == 1` means that only the mailbox in the ActorContext
// is remaining.
let _ = self.send_message_with_high_priority(LastMailbox);
let _ = self.send_message_with_high_priority(Command::Nudge);
}
}
}

#[derive(Debug)]
struct LastMailbox;

#[async_trait]
impl<A: Actor> Handler<LastMailbox> for A {
type Reply = ();

async fn handle(
&mut self,
_: LastMailbox,
_ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
// Being the last mailbox does not necessarily mean that we
// want to stop the processing.
//
// There could be pending message in the queue that will
// spawn actors which will get a new copy of the mailbox
// etc.
//
// For that reason, the logic that really detects
// the last mailbox happens when all message have been drained.
//
// The `LastMailbox` message is just here to make sure the actor
// loop does not get stuck waiting for a message that does
// will never come.
Ok(())
}
}

#[derive(Copy, Clone)]
pub(crate) enum Priority {
High,
Expand Down Expand Up @@ -451,6 +418,7 @@ impl<A: Actor> WeakMailbox<A> {
pub fn upgrade(&self) -> Option<Mailbox<A>> {
let inner = self.inner.upgrade()?;
let ref_count = self.ref_count.upgrade()?;
ref_count.fetch_add(1, Ordering::SeqCst);
Some(Mailbox { inner, ref_count })
}
}
Expand All @@ -462,7 +430,7 @@ mod tests {

use super::*;
use crate::tests::{Ping, PingReceiverActor};
use crate::Universe;
use crate::{ActorContext, ActorExitStatus, Handler, Universe};

#[tokio::test]
async fn test_weak_mailbox_downgrade_upgrade() {
Expand Down Expand Up @@ -623,4 +591,17 @@ mod tests {
TrySendError::Disconnected
));
}

#[tokio::test]
async fn test_weak_mailbox_ref_count() {
let universe = Universe::with_accelerated_time();
let (mailbox, _inbox) = universe
.create_mailbox::<PingReceiverActor>("hello".to_string(), QueueCapacity::Bounded(1));
assert!(mailbox.is_last_mailbox());
let weak_mailbox = mailbox.downgrade();
let second_mailbox = weak_mailbox.upgrade().unwrap();
assert!(!mailbox.is_last_mailbox());
drop(second_mailbox);
assert!(mailbox.is_last_mailbox());
}
}
14 changes: 10 additions & 4 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,17 +285,23 @@ impl<A: Actor> ActorExecutionEnv<A> {
self.process_one_message(envelope).await?;
}
self.ctx.yield_now().await;

if self.inbox.is_empty() {
break;
}
}
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
self.ctx.idle();
if self.ctx.mailbox().is_last_mailbox() {
// No one will be able to send us more messages.
// We can exit the actor.
return Err(ActorExitStatus::Success);
// We double check here that the mailbox does not contain any messages,
// as someone on different runtime thread could have added a last message
// and dropped the last mailbox right before this block.
// See #4248
if self.inbox.is_empty() {
// No one will be able to send us more messages.
// We can exit the actor.
info!(actor = self.ctx.actor_instance_id(), "no more messages");
return Err(ActorExitStatus::Success);
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Ingester {
rate_limiter_settings: RateLimiterSettings,
replication_factor: usize,
) -> IngestV2Result<Self> {
let self_node_id: NodeId = cluster.self_node_id().clone().into();
let self_node_id: NodeId = cluster.self_node_id().into();
let mrecordlog = MultiRecordLog::open_with_prefs(
wal_dir_path,
mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)),
Expand Down

0 comments on commit 9f20660

Please sign in to comment.