Skip to content

Commit

Permalink
Removing needless generics of event subscription handle. (#4126)
Browse files Browse the repository at this point in the history
The dispatch is made using a function pointer.
  • Loading branch information
fulmicoton authored Nov 15, 2023
1 parent 69e92f3 commit fb903e6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
36 changes: 16 additions & 20 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use std::collections::HashMap;
use std::fmt;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
Expand Down Expand Up @@ -60,7 +59,7 @@ struct InnerEventBroker {

impl EventBroker {
/// Subscribes to an event type.
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle<E>
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
where E: Event {
let mut subscriptions = self
.inner
Expand All @@ -84,11 +83,19 @@ impl EventBroker {
.get_mut::<EventSubscriptions<E>>()
.expect("The subscription map should exist.");
typed_subscriptions.insert(subscription_id, subscription);

EventSubscriptionHandle {
subscription_id,
broker: Arc::downgrade(&self.inner),
_phantom: PhantomData,
drop_me: |subscription_id, broker| {
let mut subscriptions = broker
.subscriptions
.lock()
.expect("the lock should not be poisoned");
if let Some(typed_subscriptions) = subscriptions.get_mut::<EventSubscriptions<E>>()
{
typed_subscriptions.remove(&subscription_id);
}
},
}
}

Expand Down Expand Up @@ -120,31 +127,20 @@ struct EventSubscription<E> {
subscriber: Box<dyn EventSubscriber<E>>,
}

#[derive(Debug)]
pub struct EventSubscriptionHandle<E: Event> {
pub struct EventSubscriptionHandle {
subscription_id: usize,
broker: Weak<InnerEventBroker>,
_phantom: PhantomData<E>,
drop_me: fn(usize, &InnerEventBroker),
}

impl<E> EventSubscriptionHandle<E>
where E: Event
{
impl EventSubscriptionHandle {
pub fn cancel(self) {}
}

impl<E> Drop for EventSubscriptionHandle<E>
where E: Event
{
impl Drop for EventSubscriptionHandle {
fn drop(&mut self) {
if let Some(broker) = self.broker.upgrade() {
let mut subscriptions = broker
.subscriptions
.lock()
.expect("the lock should not be poisoned");
if let Some(typed_subscriptions) = subscriptions.get_mut::<EventSubscriptions<E>>() {
typed_subscriptions.remove(&self.subscription_id);
}
(self.drop_me)(self.subscription_id, &broker);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ struct QuickwitServices {
/// The control plane listens to metastore events.
/// We must maintain a reference to the subscription handles to continue receiving
/// notifications. Otherwise, the subscriptions are dropped.
_report_splits_subscription_handle_opt: Option<EventSubscriptionHandle<ReportSplitsRequest>>,
_report_splits_subscription_handle_opt: Option<EventSubscriptionHandle>,
}

fn has_node_with_metastore_service(members: &[ClusterMember]) -> bool {
Expand Down

0 comments on commit fb903e6

Please sign in to comment.