Skip to content

Commit

Permalink
Merge pull request #152 from tnull/2023-08-sweep-sweep-sweep
Browse files Browse the repository at this point in the history
Add `OutputSweeper` persisting and spending outputs
  • Loading branch information
tnull authored Dec 11, 2023
2 parents 4685ad1 + 707c170 commit 15f4f2a
Show file tree
Hide file tree
Showing 8 changed files with 556 additions and 77 deletions.
21 changes: 21 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::io::sqlite_store::SqliteStore;
use crate::logger::{log_error, FilesystemLogger, Logger};
use crate::payment_store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::sweep::OutputSweeper;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph,
Expand Down Expand Up @@ -777,6 +778,25 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
}
};

let best_block = channel_manager.current_best_block();
let output_sweeper =
match io::utils::read_spendable_outputs(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(outputs) => Arc::new(OutputSweeper::new(
outputs,
Arc::clone(&wallet),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
best_block,
Some(Arc::clone(&tx_sync)),
Arc::clone(&logger),
)),
Err(_) => {
return Err(BuildError::ReadFailed);
}
};

let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());

Ok(Node {
Expand All @@ -791,6 +811,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
event_queue,
channel_manager,
chain_monitor,
output_sweeper,
peer_manager,
keys_manager,
network_graph,
Expand Down
66 changes: 12 additions & 54 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::types::{Broadcaster, FeeEstimator, Wallet};
use crate::types::{Sweeper, Wallet};
use crate::{
hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore,
UserChannelId,
hex_utils, ChannelManager, Config, Error, NetworkGraph, PeerInfo, PeerStore, UserChannelId,
};

use crate::payment_store::{
Expand All @@ -12,11 +11,9 @@ use crate::io::{
EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
};
use crate::logger::{log_debug, log_error, log_info, Logger};
use crate::logger::{log_error, log_info, Logger};

use lightning::chain::chaininterface::{
BroadcasterInterface, ConfirmationTarget, FeeEstimator as LDKFeeEstimator,
};
use lightning::chain::chaininterface::ConfirmationTarget;
use lightning::events::Event as LdkEvent;
use lightning::events::PaymentPurpose;
use lightning::impl_writeable_tlv_based_enum;
Expand All @@ -26,8 +23,8 @@ use lightning::util::errors::APIError;
use lightning::util::persist::KVStore;
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};

use bitcoin::secp256k1::{PublicKey, Secp256k1};
use bitcoin::{LockTime, OutPoint, PackedLockTime};
use bitcoin::secp256k1::PublicKey;
use bitcoin::{LockTime, OutPoint};
use rand::{thread_rng, Rng};
use std::collections::VecDeque;
use std::ops::Deref;
Expand Down Expand Up @@ -249,10 +246,8 @@ where
event_queue: Arc<EventQueue<K, L>>,
wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>,
tx_broadcaster: Arc<Broadcaster>,
fee_estimator: Arc<FeeEstimator>,
output_sweeper: Arc<Sweeper<K>>,
network_graph: Arc<NetworkGraph>,
keys_manager: Arc<KeysManager>,
payment_store: Arc<PaymentStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>,
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
Expand All @@ -266,20 +261,17 @@ where
{
pub fn new(
event_queue: Arc<EventQueue<K, L>>, wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>, tx_broadcaster: Arc<Broadcaster>,
fee_estimator: Arc<FeeEstimator>, network_graph: Arc<NetworkGraph>,
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
channel_manager: Arc<ChannelManager<K>>, output_sweeper: Arc<Sweeper<K>>,
network_graph: Arc<NetworkGraph>, payment_store: Arc<PaymentStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>, runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
logger: L, config: Arc<Config>,
) -> Self {
Self {
event_queue,
wallet,
channel_manager,
tx_broadcaster,
fee_estimator,
output_sweeper,
network_graph,
keys_manager,
payment_store,
peer_store,
logger,
Expand Down Expand Up @@ -585,42 +577,8 @@ where
});
}
}
LdkEvent::SpendableOutputs { outputs, channel_id: _ } => {
// TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so.
let destination_address = self.wallet.get_new_address().unwrap_or_else(|e| {
log_error!(self.logger, "Failed to get destination address: {}", e);
panic!("Failed to get destination address");
});

let output_descriptors = &outputs.iter().collect::<Vec<_>>();
let tx_feerate = self
.fee_estimator
.get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee);

// We set nLockTime to the current height to discourage fee sniping.
let cur_height = self.channel_manager.current_best_block().height();
let locktime: PackedLockTime =
LockTime::from_height(cur_height).map_or(PackedLockTime::ZERO, |l| l.into());
let res = self.keys_manager.spend_spendable_outputs(
output_descriptors,
Vec::new(),
destination_address.script_pubkey(),
tx_feerate,
Some(locktime),
&Secp256k1::new(),
);

match res {
Ok(Some(spending_tx)) => {
self.tx_broadcaster.broadcast_transactions(&[&spending_tx])
}
Ok(None) => {
log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs);
}
Err(err) => {
log_error!(self.logger, "Error spending outputs: {:?}", err);
}
}
LdkEvent::SpendableOutputs { outputs, channel_id } => {
self.output_sweeper.add_outputs(outputs, channel_id)
}
LdkEvent::OpenChannelRequest {
temporary_channel_id,
Expand Down
4 changes: 4 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments";
pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// The spendable output information will be persisted under this prefix.
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs";
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = "";
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE: &str = "";
Expand Down
31 changes: 31 additions & 0 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::WALLET_KEYS_SEED_LEN;

use crate::logger::log_error;
use crate::peer_store::PeerStore;
use crate::sweep::SpendableOutputInfo;
use crate::{Error, EventQueue, PaymentDetails};

use lightning::routing::gossip::NetworkGraph;
Expand Down Expand Up @@ -199,6 +200,36 @@ where
Ok(res)
}

/// Read previously persisted spendable output information from the store.
pub(crate) fn read_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
) -> Result<Vec<SpendableOutputInfo>, std::io::Error>
where
L::Target: Logger,
{
let mut res = Vec::new();

for stored_key in kv_store.list(
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)? {
let mut reader = Cursor::new(kv_store.read(
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
)?);
let output = SpendableOutputInfo::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize SpendableOutputInfo: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize SpendableOutputInfo",
)
})?;
res.push(output);
}
Ok(res)
}

pub(crate) fn read_latest_rgs_sync_timestamp<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
) -> Result<u32, std::io::Error>
Expand Down
12 changes: 8 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub mod io;
mod logger;
mod payment_store;
mod peer_store;
mod sweep;
#[cfg(test)]
mod test;
mod tx_broadcaster;
Expand Down Expand Up @@ -122,7 +123,7 @@ pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus};
use peer_store::{PeerInfo, PeerStore};
use types::{
Broadcaster, ChainMonitor, ChannelManager, FeeEstimator, KeysManager, NetworkGraph,
PeerManager, Router, Scorer, Wallet,
PeerManager, Router, Scorer, Sweeper, Wallet,
};
pub use types::{ChannelDetails, PeerDetails, UserChannelId};

Expand Down Expand Up @@ -295,6 +296,7 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
event_queue: Arc<EventQueue<K, Arc<FilesystemLogger>>>,
channel_manager: Arc<ChannelManager<K>>,
chain_monitor: Arc<ChainMonitor<K>>,
output_sweeper: Arc<Sweeper<K>>,
peer_manager: Arc<PeerManager<K>>,
keys_manager: Arc<KeysManager>,
network_graph: Arc<NetworkGraph>,
Expand Down Expand Up @@ -432,6 +434,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let mut stop_sync = self.stop_receiver.clone();
let wallet_sync_interval_secs =
Expand All @@ -449,6 +452,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];
let now = Instant::now();
match tx_sync.sync(confirmables).await {
Expand Down Expand Up @@ -695,10 +699,8 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
Arc::clone(&self.event_queue),
Arc::clone(&self.wallet),
Arc::clone(&self.channel_manager),
Arc::clone(&self.tx_broadcaster),
Arc::clone(&self.fee_estimator),
Arc::clone(&self.output_sweeper),
Arc::clone(&self.network_graph),
Arc::clone(&self.keys_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.peer_store),
Arc::clone(&self.runtime),
Expand Down Expand Up @@ -1036,10 +1038,12 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];

tokio::task::block_in_place(move || {
Expand Down
Loading

0 comments on commit 15f4f2a

Please sign in to comment.