diff --git a/src/builder.rs b/src/builder.rs index e48af2cf3..a44a05f0c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -6,6 +6,7 @@ use crate::io::sqlite_store::SqliteStore; use crate::logger::{log_error, FilesystemLogger, Logger}; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; +use crate::sweep::OutputSweeper; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph, @@ -777,6 +778,25 @@ fn build_with_store_internal( } }; + let best_block = channel_manager.current_best_block(); + let output_sweeper = + match io::utils::read_spendable_outputs(Arc::clone(&kv_store), Arc::clone(&logger)) { + Ok(outputs) => Arc::new(OutputSweeper::new( + outputs, + Arc::clone(&wallet), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&keys_manager), + Arc::clone(&kv_store), + best_block, + Some(Arc::clone(&tx_sync)), + Arc::clone(&logger), + )), + Err(_) => { + return Err(BuildError::ReadFailed); + } + }; + let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); Ok(Node { @@ -791,6 +811,7 @@ fn build_with_store_internal( event_queue, channel_manager, chain_monitor, + output_sweeper, peer_manager, keys_manager, network_graph, diff --git a/src/event.rs b/src/event.rs index 9cb0f7070..3d7e4599b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,7 +1,6 @@ -use crate::types::{Broadcaster, FeeEstimator, Wallet}; +use crate::types::{Sweeper, Wallet}; use crate::{ - hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore, - UserChannelId, + hex_utils, ChannelManager, Config, Error, NetworkGraph, PeerInfo, PeerStore, UserChannelId, }; use crate::payment_store::{ @@ -12,11 +11,9 @@ use crate::io::{ EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE, EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE, }; -use crate::logger::{log_debug, log_error, log_info, Logger}; +use crate::logger::{log_error, log_info, Logger}; -use lightning::chain::chaininterface::{ - BroadcasterInterface, ConfirmationTarget, FeeEstimator as LDKFeeEstimator, -}; +use lightning::chain::chaininterface::ConfirmationTarget; use lightning::events::Event as LdkEvent; use lightning::events::PaymentPurpose; use lightning::impl_writeable_tlv_based_enum; @@ -26,8 +23,8 @@ use lightning::util::errors::APIError; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; -use bitcoin::secp256k1::{PublicKey, Secp256k1}; -use bitcoin::{LockTime, OutPoint, PackedLockTime}; +use bitcoin::secp256k1::PublicKey; +use bitcoin::{LockTime, OutPoint}; use rand::{thread_rng, Rng}; use std::collections::VecDeque; use std::ops::Deref; @@ -249,10 +246,8 @@ where event_queue: Arc>, wallet: Arc, channel_manager: Arc>, - tx_broadcaster: Arc, - fee_estimator: Arc, + output_sweeper: Arc>, network_graph: Arc, - keys_manager: Arc, payment_store: Arc>, peer_store: Arc>, runtime: Arc>>, @@ -266,9 +261,8 @@ where { pub fn new( event_queue: Arc>, wallet: Arc, - channel_manager: Arc>, tx_broadcaster: Arc, - fee_estimator: Arc, network_graph: Arc, - keys_manager: Arc, payment_store: Arc>, + channel_manager: Arc>, output_sweeper: Arc>, + network_graph: Arc, payment_store: Arc>, peer_store: Arc>, runtime: Arc>>, logger: L, config: Arc, ) -> Self { @@ -276,10 +270,8 @@ where event_queue, wallet, channel_manager, - tx_broadcaster, - fee_estimator, + output_sweeper, network_graph, - keys_manager, payment_store, peer_store, logger, @@ -585,42 +577,8 @@ where }); } } - LdkEvent::SpendableOutputs { outputs, channel_id: _ } => { - // TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so. - let destination_address = self.wallet.get_new_address().unwrap_or_else(|e| { - log_error!(self.logger, "Failed to get destination address: {}", e); - panic!("Failed to get destination address"); - }); - - let output_descriptors = &outputs.iter().collect::>(); - let tx_feerate = self - .fee_estimator - .get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee); - - // We set nLockTime to the current height to discourage fee sniping. - let cur_height = self.channel_manager.current_best_block().height(); - let locktime: PackedLockTime = - LockTime::from_height(cur_height).map_or(PackedLockTime::ZERO, |l| l.into()); - let res = self.keys_manager.spend_spendable_outputs( - output_descriptors, - Vec::new(), - destination_address.script_pubkey(), - tx_feerate, - Some(locktime), - &Secp256k1::new(), - ); - - match res { - Ok(Some(spending_tx)) => { - self.tx_broadcaster.broadcast_transactions(&[&spending_tx]) - } - Ok(None) => { - log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs); - } - Err(err) => { - log_error!(self.logger, "Error spending outputs: {:?}", err); - } - } + LdkEvent::SpendableOutputs { outputs, channel_id } => { + self.output_sweeper.add_outputs(outputs, channel_id) } LdkEvent::OpenChannelRequest { temporary_channel_id, diff --git a/src/io/mod.rs b/src/io/mod.rs index a7d1085c5..d9dab440c 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,6 +21,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers"; pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments"; pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; +/// The spendable output information will be persisted under this prefix. +pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs"; +pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key. pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = ""; pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index f722d8668..603b3344e 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -3,6 +3,7 @@ use crate::WALLET_KEYS_SEED_LEN; use crate::logger::log_error; use crate::peer_store::PeerStore; +use crate::sweep::SpendableOutputInfo; use crate::{Error, EventQueue, PaymentDetails}; use lightning::routing::gossip::NetworkGraph; @@ -199,6 +200,36 @@ where Ok(res) } +/// Read previously persisted spendable output information from the store. +pub(crate) fn read_spendable_outputs( + kv_store: Arc, logger: L, +) -> Result, std::io::Error> +where + L::Target: Logger, +{ + let mut res = Vec::new(); + + for stored_key in kv_store.list( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + )? { + let mut reader = Cursor::new(kv_store.read( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + )?); + let output = SpendableOutputInfo::read(&mut reader).map_err(|e| { + log_error!(logger, "Failed to deserialize SpendableOutputInfo: {}", e); + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize SpendableOutputInfo", + ) + })?; + res.push(output); + } + Ok(res) +} + pub(crate) fn read_latest_rgs_sync_timestamp( kv_store: Arc, logger: L, ) -> Result diff --git a/src/lib.rs b/src/lib.rs index c3b778db5..0ffd7d062 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,7 @@ pub mod io; mod logger; mod payment_store; mod peer_store; +mod sweep; #[cfg(test)] mod test; mod tx_broadcaster; @@ -122,7 +123,7 @@ pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus}; use peer_store::{PeerInfo, PeerStore}; use types::{ Broadcaster, ChainMonitor, ChannelManager, FeeEstimator, KeysManager, NetworkGraph, - PeerManager, Router, Scorer, Wallet, + PeerManager, Router, Scorer, Sweeper, Wallet, }; pub use types::{ChannelDetails, PeerDetails, UserChannelId}; @@ -295,6 +296,7 @@ pub struct Node { event_queue: Arc>>, channel_manager: Arc>, chain_monitor: Arc>, + output_sweeper: Arc>, peer_manager: Arc>, keys_manager: Arc, network_graph: Arc, @@ -432,6 +434,7 @@ impl Node { let tx_sync = Arc::clone(&self.tx_sync); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); + let sync_sweeper = Arc::clone(&self.output_sweeper); let sync_logger = Arc::clone(&self.logger); let mut stop_sync = self.stop_receiver.clone(); let wallet_sync_interval_secs = @@ -449,6 +452,7 @@ impl Node { let confirmables = vec![ &*sync_cman as &(dyn Confirm + Sync + Send), &*sync_cmon as &(dyn Confirm + Sync + Send), + &*sync_sweeper as &(dyn Confirm + Sync + Send), ]; let now = Instant::now(); match tx_sync.sync(confirmables).await { @@ -695,10 +699,8 @@ impl Node { Arc::clone(&self.event_queue), Arc::clone(&self.wallet), Arc::clone(&self.channel_manager), - Arc::clone(&self.tx_broadcaster), - Arc::clone(&self.fee_estimator), + Arc::clone(&self.output_sweeper), Arc::clone(&self.network_graph), - Arc::clone(&self.keys_manager), Arc::clone(&self.payment_store), Arc::clone(&self.peer_store), Arc::clone(&self.runtime), @@ -1036,10 +1038,12 @@ impl Node { let tx_sync = Arc::clone(&self.tx_sync); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); + let sync_sweeper = Arc::clone(&self.output_sweeper); let sync_logger = Arc::clone(&self.logger); let confirmables = vec![ &*sync_cman as &(dyn Confirm + Sync + Send), &*sync_cmon as &(dyn Confirm + Sync + Send), + &*sync_sweeper as &(dyn Confirm + Sync + Send), ]; tokio::task::block_in_place(move || { diff --git a/src/sweep.rs b/src/sweep.rs new file mode 100644 index 000000000..eac8b4dcb --- /dev/null +++ b/src/sweep.rs @@ -0,0 +1,462 @@ +use crate::hex_utils; +use crate::io::{ + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, +}; +use crate::logger::{log_error, Logger}; +use crate::wallet::{Wallet, WalletKeysManager}; +use crate::Error; + +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput}; +use lightning::impl_writeable_tlv_based; +use lightning::ln::ChannelId; +use lightning::sign::{EntropySource, SpendableOutputDescriptor}; +use lightning::util::persist::KVStore; +use lightning::util::ser::Writeable; + +use bitcoin::secp256k1::Secp256k1; +use bitcoin::{BlockHash, BlockHeader, LockTime, PackedLockTime, Transaction, Txid}; + +use std::ops::Deref; +use std::sync::{Arc, Mutex}; + +const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6; + +const REGENERATE_SPEND_THRESHOLD: u32 = 144; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct SpendableOutputInfo { + id: [u8; 32], + descriptor: SpendableOutputDescriptor, + channel_id: Option, + first_broadcast_hash: Option, + latest_broadcast_height: Option, + latest_spending_tx: Option, + confirmation_height: Option, + confirmation_hash: Option, +} + +impl SpendableOutputInfo { + fn to_watched_output(&self) -> WatchedOutput { + match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, output } => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: *outpoint, + script_pubkey: output.script_pubkey.clone(), + }, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, + SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, + } + } + + fn is_spent_in(&self, tx: &Transaction) -> bool { + let prev_outpoint = match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint, + SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint, + }; + + for input in &tx.input { + if input.previous_output == prev_outpoint.into_bitcoin_outpoint() { + return true; + } + } + + false + } +} + +impl_writeable_tlv_based!(SpendableOutputInfo, { + (0, id, required), + (2, descriptor, required), + (4, channel_id, option), + (6, first_broadcast_hash, option), + (8, latest_broadcast_height, option), + (10, latest_spending_tx, option), + (12, confirmation_height, option), + (14, confirmation_hash, option), +}); + +pub(crate) struct OutputSweeper +where + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + outputs: Mutex>, + wallet: Arc>, + broadcaster: B, + fee_estimator: E, + keys_manager: Arc>, + kv_store: K, + best_block: Mutex, + chain_source: Option, + logger: L, +} + +impl OutputSweeper +where + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + pub(crate) fn new( + outputs: Vec, + wallet: Arc>, broadcaster: B, + fee_estimator: E, + keys_manager: Arc>, kv_store: K, + best_block: BestBlock, chain_source: Option, logger: L, + ) -> Self { + if let Some(filter) = chain_source.as_ref() { + for output_info in &outputs { + let watched_output = output_info.to_watched_output(); + filter.register_output(watched_output); + } + } + + let outputs = Mutex::new(outputs); + let best_block = Mutex::new(best_block); + Self { + outputs, + wallet, + broadcaster, + fee_estimator, + keys_manager, + kv_store, + best_block, + chain_source, + logger, + } + } + + pub(crate) fn add_outputs( + &self, mut output_descriptors: Vec, + channel_id: Option, + ) { + let non_static_outputs = output_descriptors + .drain(..) + .filter(|desc| !matches!(desc, SpendableOutputDescriptor::StaticOutput { .. })) + .collect::>(); + + if non_static_outputs.is_empty() { + return; + } + + { + let mut locked_outputs = self.outputs.lock().unwrap(); + for descriptor in non_static_outputs { + let id = self.keys_manager.get_secure_random_bytes(); + let output_info = SpendableOutputInfo { + id, + descriptor, + channel_id, + first_broadcast_hash: None, + latest_broadcast_height: None, + latest_spending_tx: None, + confirmation_height: None, + confirmation_hash: None, + }; + + locked_outputs.push(output_info.clone()); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); + } + } + + self.rebroadcast_if_necessary(); + } + + fn rebroadcast_if_necessary(&self) { + let (cur_height, cur_hash) = { + let best_block = self.best_block.lock().unwrap(); + (best_block.height(), best_block.block_hash()) + }; + + let mut respend_descriptors = Vec::new(); + let mut respend_ids = Vec::new(); + + { + let mut locked_outputs = self.outputs.lock().unwrap(); + for output_info in locked_outputs.iter_mut() { + if output_info.confirmation_height.is_some() { + // Don't rebroadcast confirmed txs + debug_assert!(output_info.confirmation_hash.is_some()); + continue; + } + + if let Some(latest_broadcast_height) = output_info.latest_broadcast_height { + // Re-generate spending tx after REGENERATE_SPEND_THRESHOLD, rebroadcast + // after every block + if latest_broadcast_height + REGENERATE_SPEND_THRESHOLD >= cur_height { + respend_descriptors.push(output_info.descriptor.clone()); + respend_ids.push(output_info.id); + } else if latest_broadcast_height < cur_height { + if let Some(latest_spending_tx) = output_info.latest_spending_tx.as_ref() { + self.broadcaster.broadcast_transactions(&[&latest_spending_tx]); + output_info.latest_broadcast_height = Some(cur_height); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!( + self.logger, + "Error persisting SpendableOutputInfo: {:?}", + e + ) + }); + } + } + } else { + // Our first broadcast. + respend_descriptors.push(output_info.descriptor.clone()); + respend_ids.push(output_info.id); + output_info.first_broadcast_hash = Some(cur_hash); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); + } + } + } + + if !respend_descriptors.is_empty() { + match self.get_spending_tx(&respend_descriptors, cur_height) { + Ok(spending_tx) => { + self.broadcaster.broadcast_transactions(&[&spending_tx]); + let mut locked_outputs = self.outputs.lock().unwrap(); + for output_info in locked_outputs.iter_mut() { + if respend_ids.contains(&output_info.id) { + if let Some(filter) = self.chain_source.as_ref() { + let watched_output = output_info.to_watched_output(); + filter.register_output(watched_output); + } + + output_info.latest_spending_tx = Some(spending_tx.clone()); + output_info.latest_broadcast_height = Some(cur_height); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!( + self.logger, + "Error persisting SpendableOutputInfo: {:?}", + e + ) + }); + } + } + } + Err(e) => { + log_error!(self.logger, "Error spending outputs: {:?}", e); + } + }; + } + } + + fn prune_confirmed_outputs(&self) { + let cur_height = self.best_block.lock().unwrap().height(); + let mut locked_outputs = self.outputs.lock().unwrap(); + + // Prune all outputs that have sufficient depth by now. + locked_outputs.retain(|o| { + if let Some(confirmation_height) = o.confirmation_height { + if cur_height >= confirmation_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 { + let key = hex_utils::to_string(&o.id); + match self.kv_store.remove( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + false, + ) { + Ok(_) => return false, + Err(e) => { + log_error!( + self.logger, + "Removal of key {}/{}/{} failed due to: {}", + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + key, + e + ); + return true; + } + } + } + } + true + }); + } + + fn get_spending_tx( + &self, output_descriptors: &Vec, cur_height: u32, + ) -> Result { + let tx_feerate = + self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee); + + let destination_address = self.wallet.get_new_address().map_err(|e| { + log_error!(self.logger, "Failed to get destination address from wallet: {}", e); + })?; + + let locktime: PackedLockTime = + LockTime::from_height(cur_height).map_or(PackedLockTime::ZERO, |l| l.into()); + + let output_descriptors = output_descriptors.iter().collect::>(); + self.keys_manager.spend_spendable_outputs( + &output_descriptors, + Vec::new(), + destination_address.script_pubkey(), + tx_feerate, + Some(locktime), + &Secp256k1::new(), + ) + } + + fn persist_info(&self, output: &SpendableOutputInfo) -> Result<(), Error> { + let key = hex_utils::to_string(&output.id); + let data = output.encode(); + self.kv_store + .write( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + &data, + ) + .map_err(|e| { + log_error!( + self.logger, + "Write for key {}/{}/{} failed due to: {}", + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + key, + e + ); + Error::PersistenceFailed + }) + } +} + +impl Listen for OutputSweeper +where + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + fn filtered_block_connected( + &self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32, + ) { + { + let best_block = self.best_block.lock().unwrap(); + assert_eq!(best_block.block_hash(), header.prev_blockhash, + "Blocks must be connected in chain-order - the connected header must build on the last connected header"); + assert_eq!(best_block.height(), height - 1, + "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); + } + + self.transactions_confirmed(header, txdata, height); + self.best_block_updated(header, height); + } + + fn block_disconnected(&self, header: &BlockHeader, height: u32) { + let new_height = height - 1; + { + let mut best_block = self.best_block.lock().unwrap(); + assert_eq!(best_block.block_hash(), header.block_hash(), + "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); + assert_eq!(best_block.height(), height, + "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); + *best_block = BestBlock::new(header.prev_blockhash, new_height) + } + + let mut locked_outputs = self.outputs.lock().unwrap(); + for output_info in locked_outputs.iter_mut() { + if output_info.confirmation_hash == Some(header.block_hash()) { + debug_assert_eq!(output_info.confirmation_height, Some(height)); + output_info.confirmation_hash = None; + output_info.confirmation_height = None; + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); + } + } + } +} + +impl Confirm for OutputSweeper +where + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + fn transactions_confirmed( + &self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32, + ) { + let mut locked_outputs = self.outputs.lock().unwrap(); + for (_, tx) in txdata { + for output_info in locked_outputs.iter_mut() { + if output_info.is_spent_in(*tx) { + debug_assert!(Some(height) > output_info.latest_broadcast_height); + output_info.confirmation_hash = Some(header.block_hash()); + output_info.confirmation_height = Some(height); + output_info.latest_spending_tx = Some((*tx).clone()); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); + } + } + } + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + let mut locked_outputs = self.outputs.lock().unwrap(); + + // Get what height was unconfirmed. + let unconf_height = locked_outputs + .iter() + .find(|o| o.latest_spending_tx.as_ref().map(|tx| tx.txid()) == Some(*txid)) + .and_then(|o| o.confirmation_height); + + // Unconfirm all >= this height. + locked_outputs.iter_mut().filter(|o| o.confirmation_height >= unconf_height).for_each( + |o| { + o.confirmation_hash = None; + o.confirmation_height = None; + self.persist_info(&o).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting SpendableOutputInfo: {:?}", e) + }); + }, + ); + } + + fn best_block_updated(&self, header: &BlockHeader, height: u32) { + *self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height); + self.prune_confirmed_outputs(); + self.rebroadcast_if_necessary(); + } + + fn get_relevant_txids(&self) -> Vec<(Txid, Option)> { + let locked_outputs = self.outputs.lock().unwrap(); + locked_outputs + .iter() + .filter_map(|o| { + if let Some(confirmation_hash) = o.confirmation_hash { + if let Some(latest_spending_tx) = o.latest_spending_tx.as_ref() { + return Some((latest_spending_tx.txid(), Some(confirmation_hash))); + } + } + + None + }) + .collect::>() + } +} diff --git a/src/types.rs b/src/types.rs index ead397a75..4edba65c3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,4 +1,5 @@ use crate::logger::FilesystemLogger; +use crate::sweep::OutputSweeper; use lightning::chain::chainmonitor; use lightning::ln::channelmanager::ChannelDetails as LdkChannelDetails; @@ -118,6 +119,14 @@ impl lightning::onion_message::MessageRouter for FakeMessageRouter { } } +pub(crate) type Sweeper = OutputSweeper< + Arc, + Arc, + Arc>>, + Arc, + Arc, +>; + /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. diff --git a/src/wallet.rs b/src/wallet.rs index 0279225cd..0f60adccc 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -282,25 +282,15 @@ where &self, descriptors: &[&SpendableOutputDescriptor], outputs: Vec, change_destination_script: Script, feerate_sat_per_1000_weight: u32, locktime: Option, secp_ctx: &Secp256k1, - ) -> Result, ()> { - let only_non_static = &descriptors - .iter() - .filter(|desc| !matches!(desc, SpendableOutputDescriptor::StaticOutput { .. })) - .copied() - .collect::>(); - if only_non_static.is_empty() { - return Ok(None); - } - self.inner - .spend_spendable_outputs( - only_non_static, - outputs, - change_destination_script, - feerate_sat_per_1000_weight, - locktime, - secp_ctx, - ) - .map(Some) + ) -> Result { + self.inner.spend_spendable_outputs( + descriptors, + outputs, + change_destination_script, + feerate_sat_per_1000_weight, + locktime, + secp_ctx, + ) } pub fn sign_message(&self, msg: &[u8]) -> Result {