diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 2aeab590dbe..04f82926828 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -19,7 +19,6 @@ use std::collections::HashMap; use std::fmt; -use std::marker::PhantomData; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; @@ -60,7 +59,7 @@ struct InnerEventBroker { impl EventBroker { /// Subscribes to an event type. - pub fn subscribe(&self, subscriber: impl EventSubscriber) -> EventSubscriptionHandle + pub fn subscribe(&self, subscriber: impl EventSubscriber) -> EventSubscriptionHandle where E: Event { let mut subscriptions = self .inner @@ -84,11 +83,19 @@ impl EventBroker { .get_mut::>() .expect("The subscription map should exist."); typed_subscriptions.insert(subscription_id, subscription); - EventSubscriptionHandle { subscription_id, broker: Arc::downgrade(&self.inner), - _phantom: PhantomData, + drop_me: |subscription_id, broker| { + let mut subscriptions = broker + .subscriptions + .lock() + .expect("the lock should not be poisoned"); + if let Some(typed_subscriptions) = subscriptions.get_mut::>() + { + typed_subscriptions.remove(&subscription_id); + } + }, } } @@ -120,31 +127,20 @@ struct EventSubscription { subscriber: Box>, } -#[derive(Debug)] -pub struct EventSubscriptionHandle { +pub struct EventSubscriptionHandle { subscription_id: usize, broker: Weak, - _phantom: PhantomData, + drop_me: fn(usize, &InnerEventBroker), } -impl EventSubscriptionHandle -where E: Event -{ +impl EventSubscriptionHandle { pub fn cancel(self) {} } -impl Drop for EventSubscriptionHandle -where E: Event -{ +impl Drop for EventSubscriptionHandle { fn drop(&mut self) { if let Some(broker) = self.broker.upgrade() { - let mut subscriptions = broker - .subscriptions - .lock() - .expect("the lock should not be poisoned"); - if let Some(typed_subscriptions) = subscriptions.get_mut::>() { - typed_subscriptions.remove(&self.subscription_id); - } + (self.drop_me)(self.subscription_id, &broker); } } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index b3581873045..e2cd13ce49d 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -133,7 +133,7 @@ struct QuickwitServices { /// The control plane listens to metastore events. /// We must maintain a reference to the subscription handles to continue receiving /// notifications. Otherwise, the subscriptions are dropped. - _report_splits_subscription_handle_opt: Option>, + _report_splits_subscription_handle_opt: Option, } fn has_node_with_metastore_service(members: &[ClusterMember]) -> bool {