Skip to content

Commit

Permalink
Adding an actor inbox gauge.
Browse files Browse the repository at this point in the history
The goal here is to offer a way to spot a possible leak
(diverging number of actors).
  • Loading branch information
fulmicoton committed Oct 9, 2024
1 parent cc7a97f commit ac678de
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions quickwit/quickwit-actors/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use std::any::Any;
use std::convert::Infallible;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::sync::{Arc, OnceLock, Weak};
use std::time::Instant;

use quickwit_common::metrics::IntCounter;
use quickwit_common::metrics::{GaugeGuard, IntCounter, IntGauge};
use tokio::sync::oneshot;

use crate::channel_with_priority::{Receiver, Sender, TrySendError};
Expand Down Expand Up @@ -311,39 +311,45 @@ impl<A: Actor> Mailbox<A> {
}
}

struct InboxInner<A: Actor> {
rx: Receiver<Envelope<A>>,
_actors_count_gauge_guard: GaugeGuard<'static>,

}

pub struct Inbox<A: Actor> {
rx: Arc<Receiver<Envelope<A>>>,
inner: Arc<InboxInner<A>>,
}

impl<A: Actor> Clone for Inbox<A> {
fn clone(&self) -> Self {
Inbox {
rx: self.rx.clone(),
inner: self.inner.clone(),
}
}
}

impl<A: Actor> Inbox<A> {
pub(crate) fn is_empty(&self) -> bool {
self.rx.is_empty()
self.inner.rx.is_empty()
}

pub(crate) async fn recv(&self) -> Result<Envelope<A>, RecvError> {
self.rx.recv().await
self.inner.rx.recv().await
}

pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope<A> {
self.rx.recv_high_priority().await
self.inner.rx.recv_high_priority().await
}

pub(crate) fn try_recv(&self) -> Result<Envelope<A>, RecvError> {
self.rx.try_recv()
self.inner.rx.try_recv()
}

#[cfg(any(test, feature = "testsuite"))]
pub async fn recv_typed_message<M: 'static>(&self) -> Result<M, RecvError> {
loop {
match self.rx.recv().await {
match self.inner.rx.recv().await {
Ok(mut envelope) => {
if let Some(msg) = envelope.message_typed() {
return Ok(msg);
Expand All @@ -362,7 +368,7 @@ impl<A: Actor> Inbox<A> {
/// Warning this iterator might never be exhausted if there is a living
/// mailbox associated to it.
pub fn drain_for_test(&self) -> Vec<Box<dyn Any>> {
self.rx
self.inner.rx
.drain_low_priority()
.into_iter()
.map(|mut envelope| envelope.message())
Expand All @@ -375,19 +381,29 @@ impl<A: Actor> Inbox<A> {
/// Warning this iterator might never be exhausted if there is a living
/// mailbox associated to it.
pub fn drain_for_test_typed<M: 'static>(&self) -> Vec<M> {
self.rx
self.inner.rx
.drain_low_priority()
.into_iter()
.flat_map(|mut envelope| envelope.message_typed())
.collect()
}
}


fn get_actors_inbox_count_gauge_guard() -> GaugeGuard<'static> {
static INBOX_GAUGE: std::sync::OnceLock<IntGauge> = OnceLock::new();
let gauge = INBOX_GAUGE.get_or_init(|| {
quickwit_common::metrics::new_gauge("inbox_count", "overall count of actors", "actors", &[])
});
GaugeGuard::from_gauge(gauge)
}

pub(crate) fn create_mailbox<A: Actor>(
actor_name: String,
queue_capacity: QueueCapacity,
scheduler_client_opt: Option<SchedulerClient>,
) -> (Mailbox<A>, Inbox<A>) {

let (tx, rx) = crate::channel_with_priority::channel(queue_capacity);
let ref_count = Arc::new(AtomicUsize::new(1));
let mailbox = Mailbox {
Expand All @@ -398,7 +414,13 @@ pub(crate) fn create_mailbox<A: Actor>(
}),
ref_count,
};
let inbox = Inbox { rx: Arc::new(rx) };
let actors_count_gauge_guard = get_actors_inbox_count_gauge_guard();
let inner = InboxInner {
rx,
_actors_count_gauge_guard: actors_count_gauge_guard,

};
let inbox = Inbox { inner: Arc::new(inner) };
(mailbox, inbox)
}

Expand Down

0 comments on commit ac678de

Please sign in to comment.