diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index e86885a83db..f7755715b88 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -21,6 +21,7 @@ pub mod message_signing; pub mod invoice; pub mod persist; pub mod string; +pub mod sweep; pub mod wakers; #[cfg(fuzzing)] pub mod base32; diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index e6329062051..1b2744a992e 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -65,6 +65,17 @@ pub const SCORER_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The key under which the [`WriteableScore`] will be persisted. pub const SCORER_PERSISTENCE_KEY: &str = "scorer"; +/// The primary namespace under which [`TrackedSpendableOutput`]s will be persisted by [`OutputSweeper`]. +/// +/// [`TrackedSpendableOutput`]: crate::util::sweep::TrackedSpendableOutput +/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper +pub const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs"; +/// The secondary namespace under which [`TrackedSpendableOutput`]s will be persisted by [`OutputSweeper`]. +/// +/// [`TrackedSpendableOutput`]: crate::util::sweep::TrackedSpendableOutput +/// [`OutputSweeper`]: crate::util::sweep::OutputSweeper +pub const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + /// A sentinel value to be prepended to monitors persisted by the [`MonitorUpdatingPersister`]. /// /// This serves to prevent someone from accidentally loading such monitors (which may need diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs new file mode 100644 index 00000000000..188db188e03 --- /dev/null +++ b/lightning/src/util/sweep.rs @@ -0,0 +1,670 @@ +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This module contains an [`OutputSweeper`] utility that keeps track of +//! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries +//! sweeping them. + +use crate::chain::chaininterface::BroadcasterInterface; +use crate::chain::channelmonitor::ANTI_REORG_DELAY; +use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput}; +use crate::{impl_writeable_tlv_based, log_error, log_debug}; +use crate::io; +use crate::ln::ChannelId; +use crate::prelude::{String, Vec, Box}; +use crate::sign::{EntropySource, SpendableOutputDescriptor}; +use crate::sync::Mutex; +use crate::util::persist::{KVStore, SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE}; +use crate::util::ser::{Readable, Writeable}; +use crate::util::logger::Logger; + +use bitcoin::blockdata::block::Header; +use bitcoin::{BlockHash, Transaction, Txid}; + +use core::ops::Deref; +use core::fmt::Write; + +/// The default interval after which we regenerate output spending transactions. +pub const DEFAULT_REGENERATE_SPEND_THRESHOLD: u32 = 144; + +/// The state of a spendable output currently tracked by an [`OutputSweeper`]. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TrackedSpendableOutput { + /// The output's unique identifier. + /// + /// A hex-encoding of this field is used as a key for serialization. + id: [u8; 32], + /// The tracked output descriptor. + pub descriptor: SpendableOutputDescriptor, + /// The channel this output belongs to. + /// + /// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`] + pub channel_id: Option, + /// The hash of the chain tip when we first broadcast a transaction spending this output. + /// + /// Will be `None` if it hasn't been broadcast yet. + pub first_broadcast_hash: Option, + /// The best height when we last broadcast a transaction spending this output. + /// + /// Will be `None` if it hasn't been broadcast yet. + pub latest_broadcast_height: Option, + /// The transaction spending this output we last broadcast. + /// + /// After confirmation, this will be set to the confirmed transaction. + /// + /// Will be `None` if it hasn't been broadcast yet. + pub latest_spending_tx: Option, + /// The height at which the spending transaction was confirmed. + /// + /// Will be `None` if it hasn't been confirmed yet. + pub confirmation_height: Option, + /// The hash of the block in which the spending transaction was confirmed. + /// + /// Will be `None` if it hasn't been confirmed yet. + pub confirmation_hash: Option, +} + +impl TrackedSpendableOutput { + fn to_watched_output(&self) -> WatchedOutput { + match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => { + WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: *outpoint, + script_pubkey: output.script_pubkey.clone(), + } + } + SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, + SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput { + block_hash: self.first_broadcast_hash, + outpoint: output.outpoint, + script_pubkey: output.output.script_pubkey.clone(), + }, + } + } + + fn is_spent_in(&self, tx: &Transaction) -> bool { + let prev_outpoint = match &self.descriptor { + SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint, + SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint, + SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint, + }; + + for input in &tx.input { + if input.previous_output == prev_outpoint.into_bitcoin_outpoint() { + return true; + } + } + + false + } +} + +impl_writeable_tlv_based!(TrackedSpendableOutput, { + (0, id, required), + (2, descriptor, required), + (4, channel_id, option), + (6, first_broadcast_hash, option), + (8, latest_broadcast_height, option), + (10, latest_spending_tx, option), + (12, confirmation_height, option), + (14, confirmation_hash, option), +}); + +/// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given +/// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor +/// methods. +/// +/// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`]. +/// +/// This needs to notified of chain state changes either via its [`Listen`] or [`Confirm`] +/// implementation and hence has to be connected with the utilized chain data sources. +/// +/// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are +/// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective +/// constructor. +/// +/// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs +pub struct OutputSweeper +where + B::Target: BroadcasterInterface, + ES::Target: EntropySource, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + outputs: Mutex>, + broadcaster: B, + entropy_source: ES, + kv_store: K, + best_block: Mutex, + chain_data_source: Option, + logger: L, + regenerate_spend_threshold: u32, + spend_outputs_callback: Box< + dyn Fn(&[&SpendableOutputDescriptor]) -> Result + Send + Sync + 'static, + >, +} + +impl OutputSweeper +where + B::Target: BroadcasterInterface, + ES::Target: EntropySource, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + /// Constructs a new [`OutputSweeper`]. + /// + /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also + /// need to register their [`Filter`] implementation via the given `chain_data_source`. + /// + /// The given `regenerate_spend_threshold` allows to override the interval in which the sweeper + /// will regenerate new spending transactions using updated fee estimates. If `None`, the + /// [`DEFAULT_REGENERATE_SPEND_THRESHOLD`] will be used. + /// + /// The given `spend_outputs_callback` is a function takes a list of + /// [`SpendableOutputDescriptor`] and returns a fully signed ready-for-broadcast + /// [`Transaction`]. Usually, this should retrieve a change address from the on-chain wallet + /// and call [`KeysManager::spend_spendable_outputs`]. + /// + /// [`KeysManager::spend_spendable_outputs`]: crate::sign::KeysManager::spend_spendable_outputs + /// + /// #### Example: + /// ``` + /// # use bitcoin::key::Secp256k1; + /// # use bitcoin::{Network, Script, ScriptBuf, Transaction, Txid}; + /// # use std::sync::Arc; + /// # use lightning::sign::SpendableOutputDescriptor; + /// # use lightning::sign::KeysManager; + /// # use lightning::chain::chaininterface::{ConfirmationTarget, FeeEstimator, BroadcasterInterface}; + /// # use lightning::chain::{BestBlock, Filter, WatchedOutput}; + /// # use lightning::util::sweep::OutputSweeper; + /// # use lightning::util::logger::{Logger, Record}; + /// # use lightning::io; + /// # struct MyStore {} + /// # impl lightning::util::persist::KVStore for MyStore { + /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } + /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } + /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } + /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } + /// # } + /// # struct MyWallet {} + /// # impl MyWallet { + /// # fn get_new_address(&self) -> ScriptBuf { ScriptBuf::new() } + /// # } + /// # struct MyFeeEstimator {} + /// # impl FeeEstimator for MyFeeEstimator { + /// # fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { 0 } + /// # } + /// # struct MyBroadcaster {} + /// # impl BroadcasterInterface for MyBroadcaster { + /// # fn broadcast_transactions(&self, txs: &[&Transaction]) {} + /// # } + /// # struct MyFilter {} + /// # impl Filter for MyFilter { + /// # fn register_tx(&self, txid: &Txid, script_pubkey: &Script) {} + /// # fn register_output(&self, output: WatchedOutput) {} + /// # } + /// # struct MyLogger {} + /// # impl Logger for MyLogger { + /// # fn log(&self, record: Record<'_>) {} + /// # } + /// # let my_wallet = Arc::new(MyWallet{}); + /// # let my_fee_estimator = Arc::new(MyFeeEstimator{}); + /// # let my_broadcaster = Arc::new(MyBroadcaster {}); + /// # let my_keys_manager = Arc::new(KeysManager::new(&[42u8; 32], 0, 0)); + /// # let my_store = Arc::new(MyStore {}); + /// # let my_logger = Arc::new(MyLogger {}); + /// # let my_best_block = BestBlock::from_network(Network::Regtest); + /// # let my_chain_data_source = Arc::new(MyFilter {}); + /// let spend_wallet = Arc::clone(&my_wallet); + /// let spend_keys_manager = Arc::clone(&my_keys_manager); + /// let spend_fee_estimator = Arc::clone(&my_fee_estimator); + /// let spend_outputs_callback = move |output_descriptors: &[&SpendableOutputDescriptor]| { + /// let change_destination_script = spend_wallet.get_new_address(); + /// let fee_rate = spend_fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee); + /// spend_keys_manager.spend_spendable_outputs( + /// output_descriptors, + /// Vec::new(), + /// change_destination_script, + /// fee_rate, + /// None, + /// &Secp256k1::new(), + /// ) + /// }; + /// + /// let sweeper = OutputSweeper::new(my_broadcaster, my_keys_manager, my_store, my_best_block, Some(my_chain_data_source), my_logger, None, spend_outputs_callback); + ///``` + pub fn new( + broadcaster: B, + entropy_source: ES, + kv_store: K, + best_block: BestBlock, + chain_data_source: Option, + logger: L, + regenerate_spend_threshold: Option, + spend_outputs_callback: impl Fn(&[&SpendableOutputDescriptor]) -> Result + Send + Sync + 'static, + ) -> Self { + let outputs = Vec::new(); + Self::from_outputs(outputs, broadcaster, entropy_source, kv_store, best_block, + chain_data_source, logger, + regenerate_spend_threshold, spend_outputs_callback) + } + + /// Constructs an [`OutputSweeper`] from the given list of [`TrackedSpendableOutput`]s. + /// + /// Outputs may be read from the given [`KVStore`] via [`read_spendable_outputs`]. + /// + /// See [`Self::new`] for more information regarding the remaining arguments. + pub fn from_outputs( + outputs: Vec, + broadcaster: B, + entropy_source: ES, + kv_store: K, + best_block: BestBlock, + chain_data_source: Option, + logger: L, + regenerate_spend_threshold: Option, + spend_outputs_callback: impl Fn(&[&SpendableOutputDescriptor]) -> Result + Send + Sync + 'static, + ) -> Self { + if let Some(filter) = chain_data_source.as_ref() { + for output_info in &outputs { + let watched_output = output_info.to_watched_output(); + filter.register_output(watched_output); + } + } + + let outputs = Mutex::new(outputs); + let best_block = Mutex::new(best_block); + let regenerate_spend_threshold = regenerate_spend_threshold + .unwrap_or(DEFAULT_REGENERATE_SPEND_THRESHOLD); + let spend_outputs_callback = Box::new(spend_outputs_callback); + Self { + outputs, + broadcaster, + entropy_source, + kv_store, + best_block, + chain_data_source, + logger, + spend_outputs_callback, + regenerate_spend_threshold, + } + } + + /// Tells the sweeper to track the given outputs descriptors. + /// + /// Usually, this should be called based on the values emitted by the + /// [`Event::SpendableOutputs`] event. + /// + /// The given `exclude_static_ouputs` flag controls whether the sweeper will filter out + /// [`SpendableOutputDescriptor::StaticOutput`]s that might be handled directly by the on-chain + /// wallet implementation. + /// + /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs + pub fn track_spendable_outputs( + &self, mut output_descriptors: Vec, + channel_id: Option, exclude_static_ouputs: bool, + ) { + let relevant_descriptors = if exclude_static_ouputs { + output_descriptors + .drain(..) + .filter(|desc| !matches!(desc, SpendableOutputDescriptor::StaticOutput { .. })) + .collect::>() + } else { + output_descriptors + }; + + if relevant_descriptors.is_empty() { + return; + } + + { + let mut locked_outputs = self.outputs.lock().unwrap(); + for descriptor in relevant_descriptors { + let id = self.entropy_source.get_secure_random_bytes(); + let output_info = TrackedSpendableOutput { + id, + descriptor, + channel_id, + first_broadcast_hash: None, + latest_broadcast_height: None, + latest_spending_tx: None, + confirmation_height: None, + confirmation_hash: None, + }; + + locked_outputs.push(output_info.clone()); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting TrackedSpendableOutput: {:?}", e); + }); + } + } + + self.rebroadcast_if_necessary(); + } + + /// Returns a list of the currently tracked spendable outputs. + pub fn tracked_spendable_outputs(&self) -> Vec { + self.outputs.lock().unwrap().clone() + } + + fn rebroadcast_if_necessary(&self) { + let (cur_height, cur_hash) = { + let best_block = self.best_block.lock().unwrap(); + (best_block.height(), best_block.block_hash()) + }; + + let mut respend_descriptors = Vec::new(); + let mut respend_ids = Vec::new(); + + { + let mut locked_outputs = self.outputs.lock().unwrap(); + for output_info in locked_outputs.iter_mut() { + if output_info.confirmation_height.is_some() { + // Don't rebroadcast confirmed txs + debug_assert!(output_info.confirmation_hash.is_some()); + continue; + } + + if let Some(latest_broadcast_height) = output_info.latest_broadcast_height { + // Re-generate spending tx after regenerate_spend_threshold, rebroadcast + // after every block + if latest_broadcast_height + self.regenerate_spend_threshold >= cur_height { + respend_descriptors.push(output_info.descriptor.clone()); + respend_ids.push(output_info.id); + } else if latest_broadcast_height < cur_height { + if let Some(latest_spending_tx) = output_info.latest_spending_tx.as_ref() { + log_debug!(self.logger, "Rebroadcasting output sweeping transaction {}", latest_spending_tx.txid()); + self.broadcaster.broadcast_transactions(&[&latest_spending_tx]); + output_info.latest_broadcast_height = Some(cur_height); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!( + self.logger, + "Error persisting TrackedSpendableOutput: {:?}", + e + ); + }); + } + } + } else { + // Our first broadcast. + respend_descriptors.push(output_info.descriptor.clone()); + respend_ids.push(output_info.id); + output_info.first_broadcast_hash = Some(cur_hash); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting TrackedSpendableOutput: {:?}", e); + }); + } + } + } + + if !respend_descriptors.is_empty() { + let respend_descriptors = + respend_descriptors.iter().collect::>(); + match (self.spend_outputs_callback)(&respend_descriptors) { + Ok(spending_tx) => { + log_debug!(self.logger, "Regenerating and broadcasting sweeping transaction {}", spending_tx.txid()); + self.broadcaster.broadcast_transactions(&[&spending_tx]); + let mut locked_outputs = self.outputs.lock().unwrap(); + for output_info in locked_outputs.iter_mut() { + if respend_ids.contains(&output_info.id) { + if let Some(filter) = self.chain_data_source.as_ref() { + let watched_output = output_info.to_watched_output(); + filter.register_output(watched_output); + } + + output_info.latest_spending_tx = Some(spending_tx.clone()); + output_info.latest_broadcast_height = Some(cur_height); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!( + self.logger, + "Error persisting TrackedSpendableOutput: {:?}", + e + ); + }); + } + } + } + Err(e) => { + log_error!(self.logger, "Error spending outputs: {:?}", e); + } + }; + } + } + + fn prune_confirmed_outputs(&self) { + let cur_height = self.best_block.lock().unwrap().height(); + let mut locked_outputs = self.outputs.lock().unwrap(); + + // Prune all outputs that have sufficient depth by now. + locked_outputs.retain(|o| { + if let Some(confirmation_height) = o.confirmation_height { + if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 { + let key = id_to_hex_string(&o.id); + match self.kv_store.remove( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + false, + ) { + Ok(_) => { + log_debug!(self.logger, "Pruning swept output as sufficiently confirmed via spend in transaction {:?}", + o.latest_spending_tx.as_ref().map(|t| t.txid())); + return false; + } + Err(e) => { + log_error!( + self.logger, + "Removal of key {}/{}/{} failed due to: {}", + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + key, + e + ); + return true; + } + } + } + } + true + }); + } + + fn persist_info(&self, output: &TrackedSpendableOutput) -> Result<(), io::Error> { + let key = id_to_hex_string(&output.id); + let data = output.encode(); + self.kv_store + .write( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + &data, + ) + .map_err(|e| { + log_error!( + self.logger, + "Write for key {}/{}/{} failed due to: {}", + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + key, + e + ); + e + }) + } +} + +impl Listen for OutputSweeper +where + B::Target: BroadcasterInterface, + ES::Target: EntropySource, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + fn filtered_block_connected( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { + { + let best_block = self.best_block.lock().unwrap(); + assert_eq!(best_block.block_hash(), header.prev_blockhash, + "Blocks must be connected in chain-order - the connected header must build on the last connected header"); + assert_eq!(best_block.height(), height - 1, + "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); + } + + self.transactions_confirmed(header, txdata, height); + self.best_block_updated(header, height); + } + + fn block_disconnected(&self, header: &Header, height: u32) { + let new_height = height - 1; + { + let mut best_block = self.best_block.lock().unwrap(); + assert_eq!(best_block.block_hash(), header.block_hash(), + "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header"); + assert_eq!(best_block.height(), height, + "Blocks must be disconnected in chain-order - the disconnected block must have the correct height"); + *best_block = BestBlock::new(header.prev_blockhash, new_height) + } + + let mut locked_outputs = self.outputs.lock().unwrap(); + for output_info in locked_outputs.iter_mut() { + if output_info.confirmation_hash == Some(header.block_hash()) { + debug_assert_eq!(output_info.confirmation_height, Some(height)); + output_info.confirmation_hash = None; + output_info.confirmation_height = None; + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting TrackedSpendableOutput: {:?}", e); + }); + } + } + } +} + +impl Confirm for OutputSweeper +where + B::Target: BroadcasterInterface, + ES::Target: EntropySource, + F::Target: Filter, + K::Target: KVStore, + L::Target: Logger, +{ + fn transactions_confirmed( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { + let mut locked_outputs = self.outputs.lock().unwrap(); + for (_, tx) in txdata { + for output_info in locked_outputs.iter_mut() { + if output_info.is_spent_in(*tx) { + debug_assert!(Some(height) > output_info.latest_broadcast_height); + output_info.confirmation_hash = Some(header.block_hash()); + output_info.confirmation_height = Some(height); + output_info.latest_spending_tx = Some((*tx).clone()); + self.persist_info(&output_info).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting TrackedSpendableOutput: {:?}", e); + }); + } + } + } + } + + fn transaction_unconfirmed(&self, txid: &Txid) { + let mut locked_outputs = self.outputs.lock().unwrap(); + + // Get what height was unconfirmed. + let unconf_height = locked_outputs + .iter() + .find(|o| o.latest_spending_tx.as_ref().map(|tx| tx.txid()) == Some(*txid)) + .and_then(|o| o.confirmation_height); + + // Unconfirm all >= this height. + locked_outputs.iter_mut().filter(|o| o.confirmation_height >= unconf_height).for_each( + |o| { + o.confirmation_hash = None; + o.confirmation_height = None; + self.persist_info(&o).unwrap_or_else(|e| { + log_error!(self.logger, "Error persisting TrackedSpendableOutput: {:?}", e); + }); + }, + ); + } + + fn best_block_updated(&self, header: &Header, height: u32) { + *self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height); + self.prune_confirmed_outputs(); + self.rebroadcast_if_necessary(); + } + + fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { + let locked_outputs = self.outputs.lock().unwrap(); + locked_outputs + .iter() + .filter_map(|o| { + if let Some(confirmation_hash) = o.confirmation_hash { + if let Some(confirmation_height) = o.confirmation_height { + if let Some(latest_spending_tx) = o.latest_spending_tx.as_ref() { + return Some(( + latest_spending_tx.txid(), + confirmation_height, + Some(confirmation_hash), + )); + } + } + } + + None + }) + .collect::>() + } +} + +/// Reads previously persisted spendable output information from the store. +pub fn read_spendable_outputs( + kv_store: K, logger: L, +) -> Result, io::Error> +where + K::Target: KVStore, + L::Target: Logger, +{ + let mut res = Vec::new(); + + for stored_key in kv_store.list( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + )? { + let mut reader = io::Cursor::new(kv_store.read( + SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &stored_key, + )?); + let output = TrackedSpendableOutput::read(&mut reader).map_err(|e| { + log_error!(logger, "Failed to deserialize TrackedSpendableOutput: {}", e); + io::Error::new( + io::ErrorKind::InvalidData, + "Failed to deserialize TrackedSpendableOutput", + ) + })?; + res.push(output); + } + Ok(res) +} + +#[inline] +fn id_to_hex_string(value: &[u8]) -> String { + let mut res = String::with_capacity(2 * value.len()); + for v in value { + write!(&mut res, "{:02x}", v).expect("Unable to write"); + } + res +}