Skip to content

Commit

Permalink
Add infra to block ChannelMonitorUpdates on forwarded claims
Browse files Browse the repository at this point in the history
When we forward a payment and receive an `update_fulfill_htlc`
message from the downstream channel, we immediately claim the HTLC
on the upstream channel, before even doing a `commitment_signed`
dance on the downstream channel. This implies that our
`ChannelMonitorUpdate`s "go out" in the right order - first we
ensure we'll get our money by writing the preimage down, then we
write the update that resolves giving money on the downstream node.

This is safe as long as `ChannelMonitorUpdate`s complete in the
order in which they are generated, but of course looking forward we
want to support asynchronous updates, which may complete in any
order.

Here we add infrastructure to handle downstream
`ChannelMonitorUpdate`s which are blocked on an upstream
preimage-containing one. We don't yet actually do the blocking which
will come in a future commit.
  • Loading branch information
TheBlueMatt committed May 30, 2023
1 parent 785bdb8 commit 394f54d
Showing 1 changed file with 131 additions and 27 deletions.
158 changes: 131 additions & 27 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,13 +532,31 @@ pub(crate) enum MonitorUpdateCompletionAction {
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
/// event can be generated.
PaymentClaimed { payment_hash: PaymentHash },
/// Indicates an [`events::Event`] should be surfaced to the user.
EmitEvent { event: events::Event },
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
/// operation of another channel.
///
/// This is usually generated when we've forwarded an HTLC and want to block the outbound edge
/// from completing a monitor update which removes the payment preimage until the inbound edge
/// completes a monitor update containing the payment preimage. In that case, after the inbound
/// edge completes, we will surface an [`Event::PaymentForwarded`] as well as unblock the
/// outbound edge.
EmitEventAndFreeOtherChannel {
event: events::Event,
downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>,
},
}

impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
(0, PaymentClaimed) => { (0, payment_hash, required) },
(2, EmitEvent) => { (0, event, upgradable_required) },
(2, EmitEventAndFreeOtherChannel) => {
(0, event, upgradable_required),
// LDK prior to 0.0.116 did not have this field as the monitor update application order was
// required by clients. If we downgrade to something prior to 0.0.116 this may result in
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
// support async monitor updates even in LDK 0.0.116 and once we do we'll require no
// downgrades to prior versions.
(1, downstream_counterparty_and_funding_outpoint, option),
},
);

#[derive(Clone, Debug, PartialEq, Eq)]
Expand All @@ -555,6 +573,36 @@ impl_writeable_tlv_based_enum!(EventCompletionAction,
};
);

#[derive(Clone, PartialEq, Eq, Debug)]
/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
/// the blocked action here. See enum variants for more info.
pub(crate) enum RAAMonitorUpdateBlockingAction {
/// A forwarded payment was claimed. We block the downstream channel completing its monitor
/// update which removes the HTLC preimage until the upstream channel has gotten the preimage
/// durably to disk.
ForwardedPaymentInboundClaim {
/// The upstream channel ID (i.e. the inbound edge).
channel_id: [u8; 32],
/// The HTLC ID on the inbound edge.
htlc_id: u64,
},
}

impl RAAMonitorUpdateBlockingAction {
#[allow(unused)]
fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self {
Self::ForwardedPaymentInboundClaim {
channel_id: prev_hop.outpoint.to_channel_id(),
htlc_id: prev_hop.htlc_id,
}
}
}

impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
(0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
;);


/// State we hold per-peer.
pub(super) struct PeerState<Signer: ChannelSigner> {
/// `temporary_channel_id` or `channel_id` -> `channel`.
Expand Down Expand Up @@ -583,6 +631,11 @@ pub(super) struct PeerState<Signer: ChannelSigner> {
/// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure
/// duplicates do not occur, so such channels should fail without a monitor update completing.
monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec<MonitorUpdateCompletionAction>>,
/// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have
/// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update
/// will remove a preimage that needs to be durably in an upstream channel first), we put an
/// entry here to note that the channel with the key's ID is blocked on a set of actions.
actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
/// The peer is currently connected (i.e. we've seen a
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
/// [`ChannelMessageHandler::peer_disconnected`].
Expand Down Expand Up @@ -4490,16 +4543,16 @@ where
Some(claimed_htlc_value - forwarded_htlc_value)
} else { None };

let prev_channel_id = Some(prev_outpoint.to_channel_id());
let next_channel_id = Some(next_channel_id);

Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded {
fee_earned_msat,
claim_from_onchain_tx: from_onchain,
prev_channel_id,
next_channel_id,
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
}})
Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
event: events::Event::PaymentForwarded {
fee_earned_msat,
claim_from_onchain_tx: from_onchain,
prev_channel_id: Some(prev_outpoint.to_channel_id()),
next_channel_id: Some(next_channel_id),
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
},
downstream_counterparty_and_funding_outpoint: None,
})
} else { None }
});
if let Err((pk, err)) = res {
Expand All @@ -4526,8 +4579,13 @@ where
}, None));
}
},
MonitorUpdateCompletionAction::EmitEvent { event } => {
MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
event, downstream_counterparty_and_funding_outpoint
} => {
self.pending_events.lock().unwrap().push_back((event, None));
if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint {
self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker));
}
},
}
}
Expand Down Expand Up @@ -5374,6 +5432,24 @@ where
}
}

/// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote
/// [`msgs::RevokeAndACK`] should be held for the given channel until some other event
/// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of
/// the [`ChannelMonitorUpdate`] in question.
fn raa_monitor_updates_held(&self,
actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec<RAAMonitorUpdateBlockingAction>>,
channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey
) -> bool {
actions_blocking_raa_monitor_updates
.get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false)
|| self.pending_events.lock().unwrap().iter().any(|(_, action)| {
action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
channel_funding_outpoint,
counterparty_node_id,
})
})
}

fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> {
let (htlcs_to_fail, res) = {
let per_peer_state = self.per_peer_state.read().unwrap();
Expand Down Expand Up @@ -6038,25 +6114,37 @@ where
self.pending_outbound_payments.clear_pending_payments()
}

fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) {
/// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
/// [`Event`] being handled) completes, this should be called to restore the channel to normal
/// operation. It will double-check that nothing *else* is also blocking the same channel from
/// making progress and then any blocked [`ChannelMonitorUpdate`]s fly.
fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option<RAAMonitorUpdateBlockingAction>) {
let mut errors = Vec::new();
loop {
let per_peer_state = self.per_peer_state.read().unwrap();
if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) {
let mut peer_state_lck = peer_state_mtx.lock().unwrap();
let peer_state = &mut *peer_state_lck;
if self.pending_events.lock().unwrap().iter()
.any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
channel_funding_outpoint, counterparty_node_id
}))
{
// Check that, while holding the peer lock, we don't have another event
// blocking any monitor updates for this channel. If we do, let those
// events be the ones that ultimately release the monitor update(s).
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending",

if let Some(blocker) = completed_blocker.take() {
// Only do this on the first iteration of the loop.
if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates
.get_mut(&channel_funding_outpoint.to_channel_id())
{
blockers.retain(|iter| iter != &blocker);
}
}

if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates,
channel_funding_outpoint, counterparty_node_id) {
// Check that, while holding the peer lock, we don't have anything else
// blocking monitor updates for this channel. If we do, release the monitor
// update(s) when those blockers complete.
log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first",
log_bytes!(&channel_funding_outpoint.to_channel_id()[..]));
break;
}

if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) {
debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint);
if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() {
Expand Down Expand Up @@ -6098,7 +6186,7 @@ where
EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
channel_funding_outpoint, counterparty_node_id
} => {
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint);
self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None);
}
}
}
Expand Down Expand Up @@ -6774,6 +6862,7 @@ where
latest_features: init_msg.features.clone(),
pending_msg_events: Vec::new(),
monitor_update_blocked_actions: BTreeMap::new(),
actions_blocking_raa_monitor_updates: BTreeMap::new(),
is_connected: true,
}));
},
Expand Down Expand Up @@ -7970,6 +8059,7 @@ where
latest_features: Readable::read(reader)?,
pending_msg_events: Vec::new(),
monitor_update_blocked_actions: BTreeMap::new(),
actions_blocking_raa_monitor_updates: BTreeMap::new(),
is_connected: false,
};
per_peer_state.insert(peer_pubkey, Mutex::new(peer_state));
Expand Down Expand Up @@ -8051,7 +8141,7 @@ where
let mut claimable_htlc_purposes = None;
let mut claimable_htlc_onion_fields = None;
let mut pending_claiming_payments = Some(HashMap::new());
let mut monitor_update_blocked_actions_per_peer = Some(Vec::new());
let mut monitor_update_blocked_actions_per_peer: Option<Vec<(_, BTreeMap<_, Vec<_>>)>> = Some(Vec::new());
let mut events_override = None;
read_tlv_fields!(reader, {
(1, pending_outbound_payments_no_retry, option),
Expand Down Expand Up @@ -8376,7 +8466,21 @@ where
}

for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() {
if let Some(peer_state) = per_peer_state.get_mut(&node_id) {
if let Some(peer_state) = per_peer_state.get(&node_id) {
for (_, actions) in monitor_update_blocked_actions.iter() {
for action in actions.iter() {
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
downstream_counterparty_and_funding_outpoint:
Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), ..
} = action {
if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) {
blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates
.entry(blocked_channel_outpoint.to_channel_id())
.or_insert_with(Vec::new).push(blocking_action.clone());
}
}
}
}
peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions;
} else {
log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);
Expand Down

0 comments on commit 394f54d

Please sign in to comment.