Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce next_event_async allowing to poll event queue #224

Merged
merged 3 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/kotlin/ldk-node-android/lib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ android {
dependencies {
implementation("net.java.dev.jna:jna:5.12.0@aar")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk7")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")
implementation("androidx.appcompat:appcompat:1.4.0")
implementation("androidx.core:core-ktx:1.7.0")
api("org.slf4j:slf4j-api:1.7.30")
Expand Down
1 change: 1 addition & 0 deletions bindings/kotlin/ldk-node-jvm/lib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {

// Use the Kotlin JDK 8 standard library.
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")

implementation("net.java.dev.jna:jna:5.12.0")
}
Expand Down
2 changes: 2 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ interface LDKNode {
void stop();
Event? next_event();
Event wait_next_event();
[Async]
Event next_event_async();
void event_handled();
PublicKey node_id();
sequence<SocketAddress>? listening_addresses();
Expand Down
148 changes: 140 additions & 8 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
use bitcoin::blockdata::locktime::absolute::LockTime;
use bitcoin::secp256k1::PublicKey;
use bitcoin::OutPoint;
use core::future::Future;
use core::task::{Poll, Waker};
use rand::{thread_rng, Rng};
use std::collections::VecDeque;
use std::ops::Deref;
Expand Down Expand Up @@ -125,7 +127,8 @@ pub struct EventQueue<K: KVStore + Sync + Send, L: Deref>
where
L::Target: Logger,
{
queue: Mutex<VecDeque<Event>>,
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
notifier: Condvar,
kv_store: Arc<K>,
logger: L,
Expand All @@ -136,9 +139,10 @@ where
L::Target: Logger,
{
pub(crate) fn new(kv_store: Arc<K>, logger: L) -> Self {
let queue: Mutex<VecDeque<Event>> = Mutex::new(VecDeque::new());
let queue = Arc::new(Mutex::new(VecDeque::new()));
let waker = Arc::new(Mutex::new(None));
let notifier = Condvar::new();
Self { queue, notifier, kv_store, logger }
Self { queue, waker, notifier, kv_store, logger }
}

pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
Expand All @@ -149,6 +153,10 @@ where
}

self.notifier.notify_one();

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
Ok(())
}

Expand All @@ -157,6 +165,10 @@ where
locked_queue.front().map(|e| e.clone())
}

pub(crate) async fn next_event_async(&self) -> Event {
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
}

pub(crate) fn wait_next_event(&self) -> Event {
let locked_queue =
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
Expand All @@ -170,6 +182,10 @@ where
self.persist_queue(&locked_queue)?;
}
self.notifier.notify_one();

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
Ok(())
}

Expand Down Expand Up @@ -207,9 +223,10 @@ where
) -> Result<Self, lightning::ln::msgs::DecodeError> {
let (kv_store, logger) = args;
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
let queue: Mutex<VecDeque<Event>> = Mutex::new(read_queue.0);
let queue = Arc::new(Mutex::new(read_queue.0));
let waker = Arc::new(Mutex::new(None));
let notifier = Condvar::new();
Ok(Self { queue, notifier, kv_store, logger })
Ok(Self { queue, waker, notifier, kv_store, logger })
}
}

Expand Down Expand Up @@ -240,6 +257,26 @@ impl Writeable for EventQueueSerWrapper<'_> {
}
}

struct EventFuture {
event_queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Future for EventFuture {
type Output = Event;

fn poll(
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if let Some(event) = self.event_queue.lock().unwrap().front() {
Poll::Ready(event.clone())
} else {
*self.waker.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending
}
}
}

pub(crate) struct EventHandler<K: KVStore + Sync + Send, L: Deref>
where
L::Target: Logger,
Expand Down Expand Up @@ -796,12 +833,14 @@ where
mod tests {
use super::*;
use lightning::util::test_utils::{TestLogger, TestStore};
use std::sync::atomic::{AtomicU16, Ordering};
use std::time::Duration;

#[test]
fn event_queue_persistence() {
#[tokio::test]
async fn event_queue_persistence() {
let store = Arc::new(TestStore::new(false));
let logger = Arc::new(TestLogger::new());
let event_queue = EventQueue::new(Arc::clone(&store), Arc::clone(&logger));
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
assert_eq!(event_queue.next_event(), None);

let expected_event = Event::ChannelReady {
Expand All @@ -814,6 +853,7 @@ mod tests {
// Check we get the expected event and that it is returned until we mark it handled.
for _ in 0..5 {
assert_eq!(event_queue.wait_next_event(), expected_event);
assert_eq!(event_queue.next_event_async().await, expected_event);
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
}

Expand All @@ -832,4 +872,96 @@ mod tests {
event_queue.event_handled().unwrap();
assert_eq!(event_queue.next_event(), None);
}

#[tokio::test]
async fn event_queue_concurrency() {
let store = Arc::new(TestStore::new(false));
let logger = Arc::new(TestLogger::new());
let event_queue = Arc::new(EventQueue::new(Arc::clone(&store), Arc::clone(&logger)));
assert_eq!(event_queue.next_event(), None);

let expected_event = Event::ChannelReady {
channel_id: ChannelId([23u8; 32]),
user_channel_id: UserChannelId(2323),
counterparty_node_id: None,
};

// Check `next_event_async` won't return if the queue is empty and always rather timeout.
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {
// Timeout
}
_ = event_queue.next_event_async() => {
panic!();
}
}

assert_eq!(event_queue.next_event(), None);
// Check we get the expected number of events when polling/enqueuing concurrently.
let enqueued_events = AtomicU16::new(0);
let received_events = AtomicU16::new(0);
let mut delayed_enqueue = false;

for _ in 0..25 {
event_queue.add_event(expected_event.clone()).unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
}

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
event_queue.add_event(expected_event.clone()).unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
delayed_enqueue = true;
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
event_queue.event_handled().unwrap();
received_events.fetch_add(1, Ordering::SeqCst);

event_queue.add_event(expected_event.clone()).unwrap();
enqueued_events.fetch_add(1, Ordering::SeqCst);
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
event_queue.event_handled().unwrap();
received_events.fetch_add(1, Ordering::SeqCst);
}
}

if delayed_enqueue
&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
{
break;
}
}
assert_eq!(event_queue.next_event(), None);

// Check we operate correctly, even when mixing and matching blocking and async API calls.
let (tx, mut rx) = tokio::sync::watch::channel(());
let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
let e = thread_queue.wait_next_event();
assert_eq!(e, thread_event);
thread_queue.event_handled().unwrap();
tx.send(()).unwrap();
});

let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
// Sleep a bit before we enqueue the events everybody is waiting for.
std::thread::sleep(Duration::from_millis(20));
thread_queue.add_event(thread_event.clone()).unwrap();
thread_queue.add_event(thread_event.clone()).unwrap();
});

let e = event_queue.next_event_async().await;
assert_eq!(e, expected_event.clone());
event_queue.event_handled().unwrap();

rx.changed().await.unwrap();
assert_eq!(event_queue.next_event(), None);
}
}
9 changes: 9 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,15 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
self.event_queue.next_event()
}

/// Returns the next event in the event queue.
///
/// Will asynchronously poll the event queue until the next event is ready.
///
/// **Note:** this will always return the same event until handling is confirmed via [`Node::event_handled`].
pub async fn next_event_async(&self) -> Event {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its worth following the naming convention with wait_next_event and call this async_next_event
also, Its a bit confusing to have wait and async as usually they both mean async something..
maybe wait_next_event should be sync_next_event?

Copy link
Collaborator Author

@tnull tnull Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mh, I'm not sure: wait_next_event is called that way to follow std::sync::Condvar's naming that indicates it's going to block the current thread. I disagree that wait and async "both mean async something" as blocking or not blocking the thread is a fundamental difference here.

That said, I'm generally also not the biggest fan of the _async suffix here as it's redundant to the actual return type/async keyword of the method. I considered poll_next_event as an alternative name for next_event_async, however, it may also be a bit misleading as the semantics of Future's poll are slightly different. As we also use the _async suffix for LDK's process_events_async I stuck with that for now. Generally I'm still open for better suggestions though, poll_next_event might be an alternative candidate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

I would also go with poll_ , would look better than async fn name_async().
future_next_event could be another option..

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind next_event_async, would also consider next_event_future.

self.event_queue.next_event_async().await
}

/// Returns the next event in the event queue.
///
/// Will block the current thread until the next event is available.
Expand Down
Loading