diff --git a/src/sweep.rs b/src/sweep.rs index 37e267889..4826c763a 100644 --- a/src/sweep.rs +++ b/src/sweep.rs @@ -8,18 +8,23 @@ use crate::wallet::Wallet; use crate::{Error, KeysManager}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; -use lightning::chain::BestBlock; +use lightning::chain::{self, BestBlock, Confirm, Filter, Listen}; use lightning::impl_writeable_tlv_based; use lightning::sign::{EntropySource, SpendableOutputDescriptor}; use lightning::util::persist::KVStore; use lightning::util::ser::Writeable; use bitcoin::secp256k1::Secp256k1; -use bitcoin::{BlockHash, LockTime, PackedLockTime, Transaction}; +use bitcoin::{BlockHash, BlockHeader, LockTime, PackedLockTime, Transaction, Txid}; +use std::collections::HashSet; use std::ops::Deref; use std::sync::{Arc, Mutex}; +const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6; + +const REGENERATE_SPEND_THRESHOLD: u32 = 144; + #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct SpendableOutputInfo { id: [u8; 32], @@ -37,8 +42,9 @@ impl_writeable_tlv_based!(SpendableOutputInfo, { (8, confirmed_in_block, option), }); -pub(crate) struct OutputSweeper +pub(crate) struct OutputSweeper where + F::Target: Filter, L::Target: Logger, { outputs: Mutex>, @@ -46,20 +52,33 @@ where keys_manager: Arc, kv_store: Arc, best_block: Mutex, + chain_source: Option, logger: L, } -impl OutputSweeper +impl OutputSweeper where + F::Target: Filter, L::Target: Logger, { pub(crate) fn new( outputs: Vec, wallet: Arc>, - keys_manager: Arc, kv_store: Arc, best_block: BestBlock, logger: L, + keys_manager: Arc, kv_store: Arc, best_block: BestBlock, + chain_source: Option, logger: L, ) -> Self { + if let Some(filter) = chain_source.as_ref() { + for o in &outputs { + if let Some(tx) = o.spending_tx.as_ref() { + if let Some(tx_out) = tx.output.first() { + filter.register_tx(&tx.txid(), &tx_out.script_pubkey); + } + } + } + } + let outputs = Mutex::new(outputs); let best_block = Mutex::new(best_block); - Self { outputs, wallet, keys_manager, kv_store, best_block, logger } + Self { outputs, wallet, keys_manager, kv_store, best_block, chain_source, logger } } pub(crate) fn add_outputs(&self, mut output_descriptors: Vec) { @@ -76,6 +95,11 @@ where match self.get_spending_tx(&non_static_outputs, cur_height) { Ok(spending_tx) => { self.wallet.broadcast_transactions(&[&spending_tx]); + if let Some(filter) = self.chain_source.as_ref() { + if let Some(tx_out) = spending_tx.output.first() { + filter.register_tx(&spending_tx.txid(), &tx_out.script_pubkey); + } + } (Some(spending_tx), Some(cur_height)) } Err(e) => { @@ -150,3 +174,199 @@ where }) } } + +impl Listen for OutputSweeper +where + F::Target: Filter, + L::Target: Logger, +{ + fn filtered_block_connected( + &self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32, + ) { + { + let best_block = self.best_block.lock().unwrap(); + assert_eq!(best_block.block_hash(), header.prev_blockhash, + "Blocks must be connected in chain-order - the connected header must build on the last connected header"); + assert_eq!(best_block.height(), height - 1, + "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); + } + + self.transactions_confirmed(header, txdata, height); + self.best_block_updated(header, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + let new_height = height - 1; + { + let mut best_block = self.best_block.lock().unwrap(); + assert_eq!(best_block.block_hash(), header.block_hash(), + "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); + assert_eq!(best_block.height(), height, + "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); + *best_block = BestBlock::new(header.prev_blockhash, new_height) + } + + let mut locked_outputs = self.outputs.lock().unwrap(); + for output_info in locked_outputs.iter_mut() { + if output_info.confirmed_in_block == Some((height, header.block_hash())) { + output_info.confirmed_in_block = None; + match self.persist_info(output_info) { + Ok(()) => {} + Err(e) => { + log_error!(self.logger, "Error persisting spendable output info: {:?}", e) + } + } + } + } + } +} + +impl Confirm for OutputSweeper +where + F::Target: Filter, + L::Target: Logger, +{ + fn transactions_confirmed( + &self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32, + ) { + let mut locked_outputs = self.outputs.lock().unwrap(); + for (_, tx) in txdata { + locked_outputs + .iter_mut() + .filter(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(tx.txid())) + .for_each(|o| { + o.confirmed_in_block = Some((height, header.block_hash())); + match self.persist_info(o) { + Ok(()) => {} + Err(e) => { + log_error!( + self.logger, + "Error persisting spendable output info: {:?}", + e + ) + } + } + }); + } + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + let mut locked_outputs = self.outputs.lock().unwrap(); + + // Get what height was unconfirmed. + let unconf_height = locked_outputs + .iter() + .find(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(*txid)) + .and_then(|o| o.confirmed_in_block) + .map(|t| t.0); + + // Unconfirm all >= this height. + locked_outputs + .iter_mut() + .filter(|o| o.confirmed_in_block.map(|t| t.0) >= unconf_height) + .for_each(|o| { + o.confirmed_in_block = None; + match self.persist_info(o) { + Ok(()) => {} + Err(e) => { + log_error!(self.logger, "Error persisting spendable output info: {:?}", e) + } + } + }); + } + + fn best_block_updated(&self, header: &BlockHeader, height: u32) { + *self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height); + + let mut locked_outputs = self.outputs.lock().unwrap(); + + // Regenerate spending tx and fee bump all outputs that didn't get confirmed by now. + for output_info in locked_outputs.iter_mut().filter(|o| o.confirmed_in_block.is_none()) { + let bcast_height = output_info.broadcast_height.unwrap_or(0); + if height >= bcast_height + REGENERATE_SPEND_THRESHOLD { + let output_descriptors = vec![output_info.descriptor.clone()]; + match self.get_spending_tx(&output_descriptors, height) { + Ok(Some(spending_tx)) => { + if let Some(filter) = self.chain_source.as_ref() { + if let Some(tx_out) = spending_tx.output.first() { + filter.register_tx(&spending_tx.txid(), &tx_out.script_pubkey); + } + } + output_info.spending_tx = Some(spending_tx); + output_info.broadcast_height = Some(height); + match self.persist_info(output_info) { + Ok(()) => {} + Err(e) => { + log_error!( + self.logger, + "Error persisting spendable output info: {:?}", + e + ) + } + } + } + Ok(None) => { + log_debug!( + self.logger, + "Omitted spending static outputs: {:?}", + output_descriptors + ); + } + Err(err) => { + log_error!(self.logger, "Error spending outputs: {:?}", err); + } + }; + } + } + + // Prune all outputs that have sufficient depth by now. + locked_outputs.retain(|o| { + if let Some((conf_height, _)) = o.confirmed_in_block { + if height >= conf_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 { + let key = hex_utils::to_string(&o.id); + match self.kv_store.remove( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + false, + ) { + Ok(_) => return false, + Err(e) => { + log_error!( + self.logger, + "Removal of key {}/{}/{} failed due to: {}", + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + key, + e + ); + return true; + } + } + } + } + true + }); + + // Rebroadcast all pending spending txs + let mut txs = locked_outputs + .iter() + .filter_map(|o| o.spending_tx.as_ref()) + .collect::>(); + self.wallet.broadcast_transactions(&txs.drain().collect::>()); + } + + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { + let locked_outputs = self.outputs.lock().unwrap(); + locked_outputs + .iter() + .filter_map(|o| { + if let Some(tx) = o.spending_tx.as_ref() { + Some((tx.txid(), o.confirmed_in_block.map(|c| c.1))) + } else { + None + } + }) + .collect::>() + } +}