Skip to content

Commit

Permalink
Added event subscriber's name in presence of a timeout. (#4462)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton authored Jan 26, 2024
1 parent 1c9bc78 commit 11d917b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 10 deletions.
27 changes: 22 additions & 5 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,13 @@ struct InnerEventBroker {
}

impl EventBroker {
/// Subscribes to an event type.
#[must_use]
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
where E: Event {
// The point of this private method is to allow the public subscribe method to have only one
// generic argument and avoid the ugly `::<E, _>` syntax.
fn subscribe_aux<E, S>(&self, subscriber: S) -> EventSubscriptionHandle
where
E: Event,
S: EventSubscriber<E> + Send + Sync + 'static,
{
let mut subscriptions = self
.inner
.subscriptions
Expand All @@ -87,7 +90,9 @@ impl EventBroker {
.subscription_sequence
.fetch_add(1, Ordering::Relaxed);

let subscriber_name = std::any::type_name::<S>();
let subscription = EventSubscription {
subscriber_name,
subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))),
};
let typed_subscriptions = subscriptions
Expand All @@ -111,6 +116,13 @@ impl EventBroker {
}
}

/// Subscribes to an event type.
#[must_use]
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
where E: Event {
self.subscribe_aux(subscriber)
}

/// Publishes an event.
pub fn publish<E>(&self, event: E)
where E: Event {
Expand All @@ -123,6 +135,7 @@ impl EventBroker {
if let Some(typed_subscriptions) = subscriptions.get::<EventSubscriptions<E>>() {
for subscription in typed_subscriptions.values() {
let event = event.clone();
let subscriber_name = subscription.subscriber_name;
let subscriber_clone = subscription.subscriber.clone();
let handle_event_fut = async move {
if tokio::time::timeout(Duration::from_secs(1), async {
Expand All @@ -131,7 +144,8 @@ impl EventBroker {
.await
.is_err()
{
warn!("`{}` event handler timed out", std::any::type_name::<E>());
let event_name = std::any::type_name::<E>();
warn!("{}'s handler for {event_name} timed out", subscriber_name);
}
};
tokio::spawn(handle_event_fut);
Expand All @@ -141,6 +155,9 @@ impl EventBroker {
}

struct EventSubscription<E> {
// We put that in the subscription in order to avoid having to take the lock
// to access it.
subscriber_name: &'static str,
subscriber: Arc<TokioMutex<Box<dyn EventSubscriber<E>>>>,
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl ControlPlane {
/// This method includes debouncing logic. Every call will be followed by a cooldown period.
fn rebuild_plan_debounced(&mut self, ctx: &ActorContext<Self>) {
self.rebuild_plan_debouncer
.self_send_with_cooldown::<_, RebuildPlan>(ctx);
.self_send_with_cooldown::<RebuildPlan>(ctx);
}

/// Deletes a set of shards from the metastore and the control plane model.
Expand Down
9 changes: 5 additions & 4 deletions quickwit/quickwit-control-plane/src/debouncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ impl Debouncer {
.schedule_event(callback, self.cooldown_period);
}

pub fn self_send_with_cooldown<A, M>(&self, ctx: &ActorContext<A>)
where
A: Actor + Handler<M> + DeferableReplyHandler<M>,
pub fn self_send_with_cooldown<M>(
&self,
ctx: &ActorContext<impl Actor + Handler<M> + DeferableReplyHandler<M>>,
) where
M: Default + std::fmt::Debug + Send + Sync + 'static,
{
let cooldown_state = self.accept_transition(Transition::Emit);
Expand Down Expand Up @@ -194,7 +195,7 @@ mod tests {
_message: DebouncedIncrement,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
self.debouncer.self_send_with_cooldown::<_, Increment>(ctx);
self.debouncer.self_send_with_cooldown::<Increment>(ctx);
Ok(())
}
}
Expand Down

0 comments on commit 11d917b

Please sign in to comment.