Skip to content

Commit

Permalink
Allow events processing without holding total_consistency_lock
Browse files Browse the repository at this point in the history
Unfortunately, the RAII types used by `RwLock` are not `Send`, which is
why they can't be held over `await` boundaries. In order to allow
asynchronous events processing in multi-threaded environments, we here
allow to process events without holding the `total_consistency_lock`.
  • Loading branch information
tnull committed Apr 21, 2023
1 parent b546822 commit f2453b7
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 30 deletions.
22 changes: 12 additions & 10 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1485,10 +1485,9 @@ mod tests {
})
}, false,
);
// TODO: Drop _local and simply spawn after #2003
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(bp_future);
local_set.spawn_local(async move {

let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
let mut i = 0;
loop {
Expand All @@ -1500,7 +1499,9 @@ mod tests {
}, tokio::time::sleep(Duration::from_millis(1)).await);
exit_sender.send(()).unwrap();
});
local_set.await;
let (r1, r2) = tokio::join!(t1, t2);
r1.unwrap().unwrap();
r2.unwrap()
}

macro_rules! do_test_payment_path_scoring {
Expand Down Expand Up @@ -1654,13 +1655,14 @@ mod tests {
})
}, false,
);
// TODO: Drop _local and simply spawn after #2003
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(bp_future);
local_set.spawn_local(async move {
let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
do_test_payment_path_scoring!(nodes, receiver.recv().await);
exit_sender.send(()).unwrap();
});
local_set.await;

let (r1, r2) = tokio::join!(t1, t2);
r1.unwrap().unwrap();
r2.unwrap()
}
}
61 changes: 41 additions & 20 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use core::{cmp, mem};
use core::cell::RefCell;
use crate::io::Read;
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use core::time::Duration;
use core::ops::Deref;

Expand Down Expand Up @@ -926,6 +926,8 @@ where

/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_events: Mutex<Vec<events::Event>>,
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
pending_events_processor: AtomicBool,
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_background_events: Mutex<Vec<BackgroundEvent>>,
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
Expand Down Expand Up @@ -1680,30 +1682,47 @@ macro_rules! handle_new_monitor_update {

macro_rules! process_events_body {
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
// We'll acquire our total consistency lock until the returned future completes so that
// we can be sure no other persists happen while processing events.
let _read_guard = $self.total_consistency_lock.read().unwrap();
let mut processed_all_events = false;
while !processed_all_events {
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
return;
}

let mut result = NotifyOption::SkipPersist;
let mut result = NotifyOption::SkipPersist;

// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if $self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}
{
// We'll acquire our total consistency lock so that we can be sure no other
// persists happen while processing monitor events.
let _read_guard = $self.total_consistency_lock.read().unwrap();

// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if $self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}
}

let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}
let pending_events = $self.pending_events.lock().unwrap().clone();
let num_events = pending_events.len();
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}

for event in pending_events {
$event_to_handle = event;
$handle_event;
}
for event in pending_events {
$event_to_handle = event;
$handle_event;
}

if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
{
let mut pending_events = $self.pending_events.lock().unwrap();
pending_events.drain(..num_events);
processed_all_events = pending_events.is_empty();
$self.pending_events_processor.store(false, Ordering::Release);
}

if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
}
}
}
}
Expand Down Expand Up @@ -1771,6 +1790,7 @@ where
per_peer_state: FairRwLock::new(HashMap::new()),

pending_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down Expand Up @@ -7916,6 +7936,7 @@ where
per_peer_state: FairRwLock::new(per_peer_state),

pending_events: Mutex::new(pending_events_read),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down

0 comments on commit f2453b7

Please sign in to comment.