From 6f7d616d1c880db5cae2831bbd4f2e541cc6fa9f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Wed, 6 Dec 2023 10:46:46 +0100 Subject: [PATCH] f Use `register_output` and remove spend state tracking --- src/sweep.rs | 348 +++++++++++++++++++-------------------------------- 1 file changed, 127 insertions(+), 221 deletions(-) diff --git a/src/sweep.rs b/src/sweep.rs index d239b0eb0..eac8b4dcb 100644 --- a/src/sweep.rs +++ b/src/sweep.rs @@ -8,17 +8,16 @@ use crate::wallet::{Wallet, WalletKeysManager}; use crate::Error; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; -use lightning::chain::{self, BestBlock, Confirm, Filter, Listen}; +use lightning::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput}; +use lightning::impl_writeable_tlv_based; use lightning::ln::ChannelId; use lightning::sign::{EntropySource, SpendableOutputDescriptor}; use lightning::util::persist::KVStore; use lightning::util::ser::Writeable; -use lightning::{impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; use bitcoin::secp256k1::Secp256k1; use bitcoin::{BlockHash, BlockHeader, LockTime, PackedLockTime, Transaction, Txid}; -use std::collections::HashMap; use std::ops::Deref; use std::sync::{Arc, Mutex}; @@ -26,161 +25,53 @@ const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6; const REGENERATE_SPEND_THRESHOLD: u32 = 144; -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) struct BroadcastTx { - tx: Transaction, - broadcast_height: u32, -} - -impl_writeable_tlv_based!(BroadcastTx, { - (0, tx, required), - (2, broadcast_height, required), -}); - -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) struct ConfirmedTx { - tx: Transaction, - broadcast_height: u32, - confirmation_height: u32, - confirmation_hash: BlockHash, -} - -impl_writeable_tlv_based!(ConfirmedTx, { - (0, tx, required), - (2, broadcast_height, required), - (4, confirmation_height, required), - (6, confirmation_hash, required), -}); - -#[derive(Clone, Debug, PartialEq, Eq)] -pub(crate) enum SpendableOutputStatus { - Pending, - Broadcast { txs: HashMap }, - Confirmed { txs: HashMap, confirmed_tx: ConfirmedTx }, -} - -impl_writeable_tlv_based_enum!(SpendableOutputStatus, - (0, Pending) => {}, - (2, Broadcast) => { - (0, txs, required), - }, - (4, Confirmed) => { - (0, txs, required), - (2, confirmed_tx, required), - }; -); - #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct SpendableOutputInfo { id: [u8; 32], descriptor: SpendableOutputDescriptor, channel_id: Option, - status: SpendableOutputStatus, + first_broadcast_hash: Option, + latest_broadcast_height: Option, + latest_spending_tx: Option, + confirmation_height: Option, + confirmation_hash: Option, } impl SpendableOutputInfo { - fn spending_txs(&self) -> Vec<&Transaction> { - match self.status { - SpendableOutputStatus::Pending { .. } => Vec::new(), - SpendableOutputStatus::Broadcast { ref txs, .. } => { - txs.iter().map(|(_, btx)| &btx.tx).collect() - } - SpendableOutputStatus::Confirmed { ref txs, ref confirmed_tx, .. } => { - let mut res: Vec<&Transaction> = txs.iter().map(|(_, btx)| &btx.tx).collect(); - res.push(&confirmed_tx.tx); - res - } + fn to_watched_output(&self) -> WatchedOutput { + match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, output } => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: *outpoint, + script_pubkey: output.script_pubkey.clone(), + }, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, + SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, } } - fn last_spending_tx(&self) -> Option<&Transaction> { - match self.status { - SpendableOutputStatus::Pending { .. } => None, - SpendableOutputStatus::Broadcast { ref txs, .. } => { - txs.iter().max_by_key(|(_, btx)| btx.broadcast_height).map(|(_, btx)| &btx.tx) - } - SpendableOutputStatus::Confirmed { ref confirmed_tx, .. } => Some(&confirmed_tx.tx), - } - } + fn is_spent_in(&self, tx: &Transaction) -> bool { + let prev_outpoint = match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint, + SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint, + }; - fn confirmed_tx(&self) -> Option<&ConfirmedTx> { - match self.status { - SpendableOutputStatus::Pending { .. } => None, - SpendableOutputStatus::Broadcast { .. } => None, - SpendableOutputStatus::Confirmed { ref confirmed_tx, .. } => Some(&confirmed_tx), - } - } - - fn tx_broadcast(&mut self, tx: Transaction, broadcast_height: u32) { - match self.status { - SpendableOutputStatus::Pending => { - let mut txs = HashMap::new(); - let txid = tx.txid(); - let broadcast_tx = BroadcastTx { tx, broadcast_height }; - txs.insert(txid, broadcast_tx); - self.status = SpendableOutputStatus::Broadcast { txs }; - } - SpendableOutputStatus::Broadcast { ref mut txs } => { - let txid = tx.txid(); - let broadcast_tx = BroadcastTx { tx, broadcast_height }; - txs.insert(txid, broadcast_tx); - } - SpendableOutputStatus::Confirmed { .. } => { - debug_assert!(false, "We should never broadcast further transactions if we already saw a confirmation."); - } - } - } - - fn tx_confirmed(&mut self, txid: Txid, confirmation_height: u32, confirmation_hash: BlockHash) { - match self.status { - SpendableOutputStatus::Pending => { - debug_assert!( - false, - "We should never confirm a transaction if we haven't broadcast." - ); - } - SpendableOutputStatus::Broadcast { ref mut txs } => { - if let Some(broadcast_tx) = txs.get_mut(&txid) { - let confirmed_tx = ConfirmedTx { - tx: broadcast_tx.tx.clone(), - broadcast_height: broadcast_tx.broadcast_height, - confirmation_height, - confirmation_hash, - }; - self.status = - SpendableOutputStatus::Confirmed { txs: txs.clone(), confirmed_tx }; - } else { - debug_assert!( - false, - "We should never confirm a transaction if we haven't broadcast." - ); - } - } - SpendableOutputStatus::Confirmed { ref confirmed_tx, .. } => { - if txid != confirmed_tx.tx.txid() - || confirmation_hash != confirmed_tx.confirmation_hash - { - debug_assert!(false, "We should never reconfirm a conflicting transaction without unconfirming the prior one first."); - } + for input in &tx.input { + if input.previous_output == prev_outpoint.into_bitcoin_outpoint() { + return true; } } - } - fn tx_unconfirmed(&mut self, txid: Txid) { - match self.status { - SpendableOutputStatus::Pending => { - debug_assert!( - false, - "We should never unconfirm a transaction if we haven't broadcast." - ); - } - SpendableOutputStatus::Broadcast { .. } => {} - SpendableOutputStatus::Confirmed { ref txs, ref confirmed_tx } => { - if txid == confirmed_tx.tx.txid() { - self.status = SpendableOutputStatus::Broadcast { txs: txs.clone() }; - } - } - } + false } } @@ -188,7 +79,11 @@ impl_writeable_tlv_based!(SpendableOutputInfo, { (0, id, required), (2, descriptor, required), (4, channel_id, option), - (6, status, required), + (6, first_broadcast_hash, option), + (8, latest_broadcast_height, option), + (10, latest_spending_tx, option), + (12, confirmation_height, option), + (14, confirmation_hash, option), }); pub(crate) struct OutputSweeper @@ -227,11 +122,8 @@ where ) -> Self { if let Some(filter) = chain_source.as_ref() { for output_info in &outputs { - for tx in &output_info.spending_txs() { - if let Some(tx_out) = tx.output.first() { - filter.register_tx(&tx.txid(), &tx_out.script_pubkey); - } - } + let watched_output = output_info.to_watched_output(); + filter.register_output(watched_output); } } @@ -271,12 +163,16 @@ where id, descriptor, channel_id, - status: SpendableOutputStatus::Pending, + first_broadcast_hash: None, + latest_broadcast_height: None, + latest_spending_tx: None, + confirmation_height: None, + confirmation_hash: None, }; locked_outputs.push(output_info.clone()); - self.persist_status(&output_info).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting spendable output status: {:?}", e) + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) }); } } @@ -285,7 +181,10 @@ where } fn rebroadcast_if_necessary(&self) { - let cur_height = self.best_block.lock().unwrap().height(); + let (cur_height, cur_hash) = { + let best_block = self.best_block.lock().unwrap(); + (best_block.height(), best_block.block_hash()) + }; let mut respend_descriptors = Vec::new(); let mut respend_ids = Vec::new(); @@ -293,37 +192,39 @@ where { let mut locked_outputs = self.outputs.lock().unwrap(); for output_info in locked_outputs.iter_mut() { - match output_info.status { - SpendableOutputStatus::Pending => { + if output_info.confirmation_height.is_some() { + // Don't rebroadcast confirmed txs + debug_assert!(output_info.confirmation_hash.is_some()); + continue; + } + + if let Some(latest_broadcast_height) = output_info.latest_broadcast_height { + // Re-generate spending tx after REGENERATE_SPEND_THRESHOLD, rebroadcast + // after every block + if latest_broadcast_height + REGENERATE_SPEND_THRESHOLD >= cur_height { respend_descriptors.push(output_info.descriptor.clone()); respend_ids.push(output_info.id); - } - SpendableOutputStatus::Broadcast { ref txs } => { - // Re-generate spending tx after REGENERATE_SPEND_THRESHOLD, rebroadcast - // after every block - if txs.iter().all(|(_, t)| { - t.broadcast_height + REGENERATE_SPEND_THRESHOLD >= cur_height - }) { - respend_descriptors.push(output_info.descriptor.clone()); - respend_ids.push(output_info.id); - } else if txs.iter().all(|(_, t)| t.broadcast_height < cur_height) { - if let Some(last_spending_tx) = output_info.last_spending_tx() { - self.broadcaster.broadcast_transactions(&[&last_spending_tx]); - output_info.tx_broadcast(last_spending_tx.clone(), cur_height); - - self.persist_status(&output_info).unwrap_or_else(|e| { - log_error!( - self.logger, - "Error persisting spendable output status: {:?}", - e - ) - }); - } + } else if latest_broadcast_height < cur_height { + if let Some(latest_spending_tx) = output_info.latest_spending_tx.as_ref() { + self.broadcaster.broadcast_transactions(&[&latest_spending_tx]); + output_info.latest_broadcast_height = Some(cur_height); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!( + self.logger, + "Error persisting SpendableOutputInfo: {:?}", + e + ) + }); } } - SpendableOutputStatus::Confirmed { .. } => { - // Don't broadcast if we already have a transaction pending threshold conf. - } + } else { + // Our first broadcast. + respend_descriptors.push(output_info.descriptor.clone()); + respend_ids.push(output_info.id); + output_info.first_broadcast_hash = Some(cur_hash); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); } } } @@ -332,20 +233,20 @@ where match self.get_spending_tx(&respend_descriptors, cur_height) { Ok(spending_tx) => { self.broadcaster.broadcast_transactions(&[&spending_tx]); - if let Some(filter) = self.chain_source.as_ref() { - if let Some(tx_out) = spending_tx.output.first() { - filter.register_tx(&spending_tx.txid(), &tx_out.script_pubkey); - } - } - let mut locked_outputs = self.outputs.lock().unwrap(); for output_info in locked_outputs.iter_mut() { if respend_ids.contains(&output_info.id) { - output_info.tx_broadcast(spending_tx.clone(), cur_height); - self.persist_status(&output_info).unwrap_or_else(|e| { + if let Some(filter) = self.chain_source.as_ref() { + let watched_output = output_info.to_watched_output(); + filter.register_output(watched_output); + } + + output_info.latest_spending_tx = Some(spending_tx.clone()); + output_info.latest_broadcast_height = Some(cur_height); + self.persist_info(&output_info).unwrap_or_else(|e| { log_error!( self.logger, - "Error persisting spendable output status: {:?}", + "Error persisting SpendableOutputInfo: {:?}", e ) }); @@ -365,8 +266,8 @@ where // Prune all outputs that have sufficient depth by now. locked_outputs.retain(|o| { - if let Some(ctx) = o.confirmed_tx() { - if cur_height >= ctx.confirmation_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 { + if let Some(confirmation_height) = o.confirmation_height { + if cur_height >= confirmation_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 { let key = hex_utils::to_string(&o.id); match self.kv_store.remove( SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, @@ -417,7 +318,7 @@ where ) } - fn persist_status(&self, output: &SpendableOutputInfo) -> Result<(), Error> { + fn persist_info(&self, output: &SpendableOutputInfo) -> Result<(), Error> { let key = hex_utils::to_string(&output.id); let data = output.encode(); self.kv_store @@ -477,14 +378,13 @@ where let mut locked_outputs = self.outputs.lock().unwrap(); for output_info in locked_outputs.iter_mut() { - if let SpendableOutputStatus::Confirmed { ref confirmed_tx, .. } = output_info.status { - if confirmed_tx.confirmation_hash == header.block_hash() { - let txid = confirmed_tx.tx.txid(); - output_info.tx_unconfirmed(txid); - self.persist_status(&output_info).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting spendable output status: {:?}", e) - }); - } + if output_info.confirmation_hash == Some(header.block_hash()) { + debug_assert_eq!(output_info.confirmation_height, Some(height)); + output_info.confirmation_hash = None; + output_info.confirmation_height = None; + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); } } } @@ -503,12 +403,17 @@ where ) { let mut locked_outputs = self.outputs.lock().unwrap(); for (_, tx) in txdata { - locked_outputs.iter_mut().filter(|o| o.spending_txs().contains(tx)).for_each(|o| { - o.tx_confirmed(tx.txid(), height, header.block_hash()); - self.persist_status(&o).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting spendable output status: {:?}", e) - }); - }); + for output_info in locked_outputs.iter_mut() { + if output_info.is_spent_in(*tx) { + debug_assert!(Some(height) > output_info.latest_broadcast_height); + output_info.confirmation_hash = Some(header.block_hash()); + output_info.confirmation_height = Some(height); + output_info.latest_spending_tx = Some((*tx).clone()); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); + } + } } } @@ -518,20 +423,19 @@ where // Get what height was unconfirmed. let unconf_height = locked_outputs .iter() - .find(|o| o.confirmed_tx().map(|ctx| ctx.tx.txid()) == Some(*txid)) - .and_then(|o| o.confirmed_tx()) - .map(|ctx| ctx.confirmation_height); + .find(|o| o.latest_spending_tx.as_ref().map(|tx| tx.txid()) == Some(*txid)) + .and_then(|o| o.confirmation_height); // Unconfirm all >= this height. - locked_outputs - .iter_mut() - .filter(|o| o.confirmed_tx().map(|ctx| ctx.confirmation_height) >= unconf_height) - .for_each(|o| { - o.tx_unconfirmed(*txid); - self.persist_status(&o).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting spendable output status: {:?}", e) + locked_outputs.iter_mut().filter(|o| o.confirmation_height >= unconf_height).for_each( + |o| { + o.confirmation_hash = None; + o.confirmation_height = None; + self.persist_info(&o).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) }); - }); + }, + ); } fn best_block_updated(&self, header: &BlockHeader, height: u32) { @@ -545,11 +449,13 @@ where locked_outputs .iter() .filter_map(|o| { - if let Some(ctx) = o.confirmed_tx().as_ref() { - Some((ctx.tx.txid(), Some(ctx.confirmation_hash))) - } else { - None + if let Some(confirmation_hash) = o.confirmation_hash { + if let Some(latest_spending_tx) = o.latest_spending_tx.as_ref() { + return Some((latest_spending_tx.txid(), Some(confirmation_hash))); + } } + + None }) .collect::>() }