Skip to content

Commit

Permalink
Allow events processing without holding total_consistency_lock
Browse files Browse the repository at this point in the history
Unfortunately, the RAII types used by `RwLock` are not `Send`, which is
why they can't be held over `await` boundaries. In order to allow
asynchronous events processing in multi-threaded environments, we here
allow to process events without holding the `total_consistency_lock`.
  • Loading branch information
tnull committed Apr 20, 2023
1 parent 36bf817 commit d7de357
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 33 deletions.
22 changes: 12 additions & 10 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,10 +1480,9 @@ mod tests {
})
}, false,
);
// TODO: Drop _local and simply spawn after #2003
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(bp_future);
local_set.spawn_local(async move {

let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
let mut i = 0;
loop {
Expand All @@ -1495,7 +1494,9 @@ mod tests {
}, tokio::time::sleep(Duration::from_millis(1)).await);
exit_sender.send(()).unwrap();
});
local_set.await;
let (r1, r2) = tokio::join!(t1, t2);
r1.unwrap().unwrap();
r2.unwrap()
}

macro_rules! do_test_payment_path_scoring {
Expand Down Expand Up @@ -1649,13 +1650,14 @@ mod tests {
})
}, false,
);
// TODO: Drop _local and simply spawn after #2003
let local_set = tokio::task::LocalSet::new();
local_set.spawn_local(bp_future);
local_set.spawn_local(async move {
let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {
do_test_payment_path_scoring!(nodes, receiver.recv().await);
exit_sender.send(()).unwrap();
});
local_set.await;

let (r1, r2) = tokio::join!(t1, t2);
r1.unwrap().unwrap();
r2.unwrap()
}
}
61 changes: 38 additions & 23 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use core::{cmp, mem};
use core::cell::RefCell;
use crate::io::Read;
use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use core::time::Duration;
use core::ops::Deref;

Expand Down Expand Up @@ -926,6 +926,8 @@ where

/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_events: Mutex<Vec<events::Event>>,
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
pending_events_processor: AtomicBool,
/// See `ChannelManager` struct-level documentation for lock order requirements.
pending_background_events: Mutex<Vec<BackgroundEvent>>,
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
Expand Down Expand Up @@ -1680,30 +1682,42 @@ macro_rules! handle_new_monitor_update {

macro_rules! process_events_body {
($self: expr, $event_to_handle: expr, $handle_event: expr) => {
// We'll acquire our total consistency lock until the returned future completes so that
// we can be sure no other persists happen while processing events.
let _read_guard = $self.total_consistency_lock.read().unwrap();
let mut processed_all_events = false;
while !processed_all_events {
if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
return;
}

let mut result = NotifyOption::SkipPersist;
let mut result = NotifyOption::SkipPersist;

// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if $self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}
// TODO: This behavior should be documented. It's unintuitive that we query
// ChannelMonitors when clearing other events.
if $self.process_pending_monitor_events() {
result = NotifyOption::DoPersist;
}

let pending_events = mem::replace(&mut *$self.pending_events.lock().unwrap(), vec![]);
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}
let pending_events = $self.pending_events.lock().unwrap().clone();
let num_events = pending_events.len();
if !pending_events.is_empty() {
result = NotifyOption::DoPersist;
}

for event in pending_events {
$event_to_handle = event;
$handle_event;
}
for event in pending_events {
$event_to_handle = event;
$handle_event;
}

if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
{
let mut pending_events = $self.pending_events.lock().unwrap();
pending_events.drain(..num_events);
processed_all_events = pending_events.is_empty();
}

if result == NotifyOption::DoPersist {
$self.persistence_notifier.notify();
}

$self.pending_events_processor.store(false, Ordering::Release);
}
}
}
Expand Down Expand Up @@ -1771,6 +1785,7 @@ where
per_peer_state: FairRwLock::new(HashMap::new()),

pending_events: Mutex::new(Vec::new()),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(Vec::new()),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down Expand Up @@ -4369,8 +4384,6 @@ where
}

fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64, counterparty_node_id: Option<&PublicKey>) {
debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock

let counterparty_node_id = match counterparty_node_id {
Some(cp_id) => cp_id.clone(),
None => {
Expand Down Expand Up @@ -5312,7 +5325,8 @@ where

/// Process pending events from the [`chain::Watch`], returning whether any events were processed.
fn process_pending_monitor_events(&self) -> bool {
debug_assert!(self.total_consistency_lock.try_write().is_err()); // Caller holds read lock
debug_assert!(self.total_consistency_lock.try_write().is_err() ||
self.pending_events_processor.load(Ordering::Relaxed)); // Caller holds read lock or processes events asynchronously.

let mut failed_channels = Vec::new();
let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
Expand Down Expand Up @@ -7916,6 +7930,7 @@ where
per_peer_state: FairRwLock::new(per_peer_state),

pending_events: Mutex::new(pending_events_read),
pending_events_processor: AtomicBool::new(false),
pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
Expand Down

0 comments on commit d7de357

Please sign in to comment.