Skip to content

Commit

Permalink
Introduce next_event_async allowing to poll event queue
Browse files Browse the repository at this point in the history
We implement a way to asynchronously poll the queue for new events,
providing an async alternative to `wait_next_event`.
  • Loading branch information
tnull committed Jan 24, 2024
1 parent 9026795 commit 83c0531
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
47 changes: 42 additions & 5 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use bitcoin::blockdata::locktime::absolute::LockTime;
use bitcoin::secp256k1::PublicKey;
use bitcoin::OutPoint;
use core::future::Future;
use core::task::{Poll, Waker};
use rand::{thread_rng, Rng};
use std::collections::VecDeque;
use std::ops::Deref;
Expand Down Expand Up @@ -125,7 +127,8 @@ pub struct EventQueue<K: KVStore + Sync + Send, L: Deref>
where
L::Target: Logger,
{
queue: Mutex<VecDeque<Event>>,
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
notifier: Condvar,
kv_store: Arc<K>,
logger: L,
Expand All @@ -136,9 +139,10 @@ where
L::Target: Logger,
{
pub(crate) fn new(kv_store: Arc<K>, logger: L) -> Self {
let queue: Mutex<VecDeque<Event>> = Mutex::new(VecDeque::new());
let queue = Arc::new(Mutex::new(VecDeque::new()));
let waker = Arc::new(Mutex::new(None));
let notifier = Condvar::new();
Self { queue, notifier, kv_store, logger }
Self { queue, waker, notifier, kv_store, logger }
}

pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
Expand All @@ -149,6 +153,10 @@ where
}

self.notifier.notify_one();

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
Ok(())
}

Expand All @@ -157,6 +165,10 @@ where
locked_queue.front().map(|e| e.clone())
}

pub(crate) async fn next_event_async(&self) -> Event {
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
}

pub(crate) fn wait_next_event(&self) -> Event {
let locked_queue =
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
Expand All @@ -170,6 +182,10 @@ where
self.persist_queue(&locked_queue)?;
}
self.notifier.notify_one();

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
Ok(())
}

Expand Down Expand Up @@ -207,9 +223,10 @@ where
) -> Result<Self, lightning::ln::msgs::DecodeError> {
let (kv_store, logger) = args;
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
let queue: Mutex<VecDeque<Event>> = Mutex::new(read_queue.0);
let queue = Arc::new(Mutex::new(read_queue.0));
let waker = Arc::new(Mutex::new(None));
let notifier = Condvar::new();
Ok(Self { queue, notifier, kv_store, logger })
Ok(Self { queue, waker, notifier, kv_store, logger })
}
}

Expand Down Expand Up @@ -240,6 +257,26 @@ impl Writeable for EventQueueSerWrapper<'_> {
}
}

struct EventFuture {
event_queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Future for EventFuture {
type Output = Event;

fn poll(
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if let Some(event) = self.event_queue.lock().unwrap().front() {
Poll::Ready(event.clone())
} else {
*self.waker.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending
}
}
}

pub(crate) struct EventHandler<K: KVStore + Sync + Send, L: Deref>
where
L::Target: Logger,
Expand Down
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,15 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
self.event_queue.next_event()
}

/// Returns the next event in the event queue.
///
/// Will asynchronously poll the event queue until the next event is ready.
///
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
pub async fn next_event_async(&self) -> Event {
self.event_queue.next_event_async().await
}

/// Returns the next event in the event queue.
///
/// Will block the current thread until the next event is available.
Expand Down

0 comments on commit 83c0531

Please sign in to comment.