Skip to content

Commit

Permalink
Implement Confirm/Listen interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Nov 1, 2023
1 parent 801353e commit a7a979c
Showing 1 changed file with 226 additions and 6 deletions.
232 changes: 226 additions & 6 deletions src/sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,23 @@ use crate::wallet::Wallet;
use crate::{Error, KeysManager};

use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::BestBlock;
use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
use lightning::impl_writeable_tlv_based;
use lightning::sign::{EntropySource, SpendableOutputDescriptor};
use lightning::util::persist::KVStore;
use lightning::util::ser::Writeable;

use bitcoin::secp256k1::Secp256k1;
use bitcoin::{BlockHash, LockTime, PackedLockTime, Transaction};
use bitcoin::{BlockHash, BlockHeader, LockTime, PackedLockTime, Transaction, Txid};

use std::collections::HashSet;
use std::ops::Deref;
use std::sync::{Arc, Mutex};

const CONSIDERED_SPENT_THRESHOLD_CONF: u32 = 6;

const REGENERATE_SPEND_THRESHOLD: u32 = 144;

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct SpendableOutputInfo {
id: [u8; 32],
Expand All @@ -37,29 +42,43 @@ impl_writeable_tlv_based!(SpendableOutputInfo, {
(8, confirmed_in_block, option),
});

pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, L: Deref>
pub(crate) struct OutputSweeper<K: KVStore + Sync + Send, F: Deref, L: Deref>
where
F::Target: Filter,
L::Target: Logger,
{
outputs: Mutex<Vec<SpendableOutputInfo>>,
wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
keys_manager: Arc<KeysManager>,
kv_store: Arc<K>,
best_block: Mutex<BestBlock>,
chain_source: Option<F>,
logger: L,
}

impl<K: KVStore + Sync + Send, L: Deref> OutputSweeper<K, L>
impl<K: KVStore + Sync + Send, F: Deref, L: Deref> OutputSweeper<K, F, L>
where
F::Target: Filter,
L::Target: Logger,
{
pub(crate) fn new(
outputs: Vec<SpendableOutputInfo>, wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock, logger: L,
keys_manager: Arc<KeysManager>, kv_store: Arc<K>, best_block: BestBlock,
chain_source: Option<F>, logger: L,
) -> Self {
if let Some(filter) = chain_source.as_ref() {
for o in &outputs {
if let Some(tx) = o.spending_tx.as_ref() {
if let Some(tx_out) = tx.output.first() {
filter.register_tx(&tx.txid(), &tx_out.script_pubkey);
}
}
}
}

let outputs = Mutex::new(outputs);
let best_block = Mutex::new(best_block);
Self { outputs, wallet, keys_manager, kv_store, best_block, logger }
Self { outputs, wallet, keys_manager, kv_store, best_block, chain_source, logger }
}

pub(crate) fn add_outputs(&self, mut output_descriptors: Vec<SpendableOutputDescriptor>) {
Expand All @@ -76,6 +95,11 @@ where
match self.get_spending_tx(&non_static_outputs, cur_height) {
Ok(spending_tx) => {
self.wallet.broadcast_transactions(&[&spending_tx]);
if let Some(filter) = self.chain_source.as_ref() {
if let Some(tx_out) = spending_tx.output.first() {
filter.register_tx(&spending_tx.txid(), &tx_out.script_pubkey);
}
}
(Some(spending_tx), Some(cur_height))
}
Err(e) => {
Expand Down Expand Up @@ -150,3 +174,199 @@ where
})
}
}

impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Listen for OutputSweeper<K, F, L>
where
F::Target: Filter,
L::Target: Logger,
{
fn filtered_block_connected(
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
) {
{
let best_block = self.best_block.lock().unwrap();
assert_eq!(best_block.block_hash(), header.prev_blockhash,
"Blocks must be connected in chain-order - the connected header must build on the last connected header");
assert_eq!(best_block.height(), height - 1,
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
}

self.transactions_confirmed(header, txdata, height);
self.best_block_updated(header, height);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let new_height = height - 1;
{
let mut best_block = self.best_block.lock().unwrap();
assert_eq!(best_block.block_hash(), header.block_hash(),
"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
assert_eq!(best_block.height(), height,
"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
*best_block = BestBlock::new(header.prev_blockhash, new_height)
}

let mut locked_outputs = self.outputs.lock().unwrap();
for output_info in locked_outputs.iter_mut() {
if output_info.confirmed_in_block == Some((height, header.block_hash())) {
output_info.confirmed_in_block = None;
match self.persist_info(output_info) {
Ok(()) => {}
Err(e) => {
log_error!(self.logger, "Error persisting spendable output info: {:?}", e)
}
}
}
}
}
}

impl<K: KVStore + Sync + Send, F: Deref, L: Deref> Confirm for OutputSweeper<K, F, L>
where
F::Target: Filter,
L::Target: Logger,
{
fn transactions_confirmed(
&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32,
) {
let mut locked_outputs = self.outputs.lock().unwrap();
for (_, tx) in txdata {
locked_outputs
.iter_mut()
.filter(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(tx.txid()))
.for_each(|o| {
o.confirmed_in_block = Some((height, header.block_hash()));
match self.persist_info(o) {
Ok(()) => {}
Err(e) => {
log_error!(
self.logger,
"Error persisting spendable output info: {:?}",
e
)
}
}
});
}
}

fn transaction_unconfirmed(&self, txid: &Txid) {
let mut locked_outputs = self.outputs.lock().unwrap();

// Get what height was unconfirmed.
let unconf_height = locked_outputs
.iter()
.find(|o| o.spending_tx.as_ref().map(|t| t.txid()) == Some(*txid))
.and_then(|o| o.confirmed_in_block)
.map(|t| t.0);

// Unconfirm all >= this height.
locked_outputs
.iter_mut()
.filter(|o| o.confirmed_in_block.map(|t| t.0) >= unconf_height)
.for_each(|o| {
o.confirmed_in_block = None;
match self.persist_info(o) {
Ok(()) => {}
Err(e) => {
log_error!(self.logger, "Error persisting spendable output info: {:?}", e)
}
}
});
}

fn best_block_updated(&self, header: &BlockHeader, height: u32) {
*self.best_block.lock().unwrap() = BestBlock::new(header.block_hash(), height);

let mut locked_outputs = self.outputs.lock().unwrap();

// Regenerate spending tx and fee bump all outputs that didn't get confirmed by now.
for output_info in locked_outputs.iter_mut().filter(|o| o.confirmed_in_block.is_none()) {
let bcast_height = output_info.broadcast_height.unwrap_or(0);
if height >= bcast_height + REGENERATE_SPEND_THRESHOLD {
let output_descriptors = vec![output_info.descriptor.clone()];
match self.get_spending_tx(&output_descriptors, height) {
Ok(Some(spending_tx)) => {
if let Some(filter) = self.chain_source.as_ref() {
if let Some(tx_out) = spending_tx.output.first() {
filter.register_tx(&spending_tx.txid(), &tx_out.script_pubkey);
}
}
output_info.spending_tx = Some(spending_tx);
output_info.broadcast_height = Some(height);
match self.persist_info(output_info) {
Ok(()) => {}
Err(e) => {
log_error!(
self.logger,
"Error persisting spendable output info: {:?}",
e
)
}
}
}
Ok(None) => {
log_debug!(
self.logger,
"Omitted spending static outputs: {:?}",
output_descriptors
);
}
Err(err) => {
log_error!(self.logger, "Error spending outputs: {:?}", err);
}
};
}
}

// Prune all outputs that have sufficient depth by now.
locked_outputs.retain(|o| {
if let Some((conf_height, _)) = o.confirmed_in_block {
if height >= conf_height + CONSIDERED_SPENT_THRESHOLD_CONF - 1 {
let key = hex_utils::to_string(&o.id);
match self.kv_store.remove(
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
false,
) {
Ok(_) => return false,
Err(e) => {
log_error!(
self.logger,
"Removal of key {}/{}/{} failed due to: {}",
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
key,
e
);
return true;
}
}
}
}
true
});

// Rebroadcast all pending spending txs
let mut txs = locked_outputs
.iter()
.filter_map(|o| o.spending_tx.as_ref())
.collect::<HashSet<&Transaction>>();
self.wallet.broadcast_transactions(&txs.drain().collect::<Vec<_>>());
}

fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
let locked_outputs = self.outputs.lock().unwrap();
locked_outputs
.iter()
.filter_map(|o| {
if let Some(tx) = o.spending_tx.as_ref() {
Some((tx.txid(), o.confirmed_in_block.map(|c| c.1)))
} else {
None
}
})
.collect::<Vec<_>>()
}
}

0 comments on commit a7a979c

Please sign in to comment.