diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 90f95e7b8b3..6c46eb4c07c 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -21,10 +21,10 @@ use std::any::Any; use std::convert::Infallible; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, OnceLock, Weak}; use std::time::Instant; -use quickwit_common::metrics::IntCounter; +use quickwit_common::metrics::{GaugeGuard, IntCounter, IntGauge}; use tokio::sync::oneshot; use crate::channel_with_priority::{Receiver, Sender, TrySendError}; @@ -311,39 +311,45 @@ impl Mailbox { } } +struct InboxInner { + rx: Receiver>, + _actors_count_gauge_guard: GaugeGuard<'static>, + +} + pub struct Inbox { - rx: Arc>>, + inner: Arc>, } impl Clone for Inbox { fn clone(&self) -> Self { Inbox { - rx: self.rx.clone(), + inner: self.inner.clone(), } } } impl Inbox { pub(crate) fn is_empty(&self) -> bool { - self.rx.is_empty() + self.inner.rx.is_empty() } pub(crate) async fn recv(&self) -> Result, RecvError> { - self.rx.recv().await + self.inner.rx.recv().await } pub(crate) async fn recv_cmd_and_scheduled_msg_only(&self) -> Envelope { - self.rx.recv_high_priority().await + self.inner.rx.recv_high_priority().await } pub(crate) fn try_recv(&self) -> Result, RecvError> { - self.rx.try_recv() + self.inner.rx.try_recv() } #[cfg(any(test, feature = "testsuite"))] pub async fn recv_typed_message(&self) -> Result { loop { - match self.rx.recv().await { + match self.inner.rx.recv().await { Ok(mut envelope) => { if let Some(msg) = envelope.message_typed() { return Ok(msg); @@ -362,7 +368,7 @@ impl Inbox { /// Warning this iterator might never be exhausted if there is a living /// mailbox associated to it. pub fn drain_for_test(&self) -> Vec> { - self.rx + self.inner.rx .drain_low_priority() .into_iter() .map(|mut envelope| envelope.message()) @@ -375,7 +381,7 @@ impl Inbox { /// Warning this iterator might never be exhausted if there is a living /// mailbox associated to it. pub fn drain_for_test_typed(&self) -> Vec { - self.rx + self.inner.rx .drain_low_priority() .into_iter() .flat_map(|mut envelope| envelope.message_typed()) @@ -383,11 +389,21 @@ impl Inbox { } } + +fn get_actors_inbox_count_gauge_guard() -> GaugeGuard<'static> { + static INBOX_GAUGE: std::sync::OnceLock = OnceLock::new(); + let gauge = INBOX_GAUGE.get_or_init(|| { + quickwit_common::metrics::new_gauge("inbox_count", "overall count of actors", "actors", &[]) + }); + GaugeGuard::from_gauge(gauge) +} + pub(crate) fn create_mailbox( actor_name: String, queue_capacity: QueueCapacity, scheduler_client_opt: Option, ) -> (Mailbox, Inbox) { + let (tx, rx) = crate::channel_with_priority::channel(queue_capacity); let ref_count = Arc::new(AtomicUsize::new(1)); let mailbox = Mailbox { @@ -398,7 +414,13 @@ pub(crate) fn create_mailbox( }), ref_count, }; - let inbox = Inbox { rx: Arc::new(rx) }; + let actors_count_gauge_guard = get_actors_inbox_count_gauge_guard(); + let inner = InboxInner { + rx, + _actors_count_gauge_guard: actors_count_gauge_guard, + + }; + let inbox = Inbox { inner: Arc::new(inner) }; (mailbox, inbox) }