Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OutputSweeper persisting and spending outputs #152

Merged
merged 2 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::io::sqlite_store::SqliteStore;
use crate::logger::{log_error, FilesystemLogger, Logger};
use crate::payment_store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::sweep::OutputSweeper;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph,
Expand Down Expand Up @@ -777,6 +778,25 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
}
};

let best_block = channel_manager.current_best_block();
let output_sweeper =
match io::utils::read_spendable_outputs(Arc::clone(&kv_store), Arc::clone(&logger)) {
Ok(outputs) => Arc::new(OutputSweeper::new(
outputs,
Arc::clone(&wallet),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&keys_manager),
Arc::clone(&kv_store),
best_block,
Some(Arc::clone(&tx_sync)),
Arc::clone(&logger),
)),
Err(_) => {
return Err(BuildError::ReadFailed);
}
};

let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());

Ok(Node {
Expand All @@ -791,6 +811,7 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
event_queue,
channel_manager,
chain_monitor,
output_sweeper,
peer_manager,
keys_manager,
network_graph,
Expand Down
66 changes: 12 additions & 54 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::types::{Broadcaster, FeeEstimator, Wallet};
use crate::types::{Sweeper, Wallet};
use crate::{
hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore,
UserChannelId,
hex_utils, ChannelManager, Config, Error, NetworkGraph, PeerInfo, PeerStore, UserChannelId,
};

use crate::payment_store::{
Expand All @@ -12,11 +11,9 @@ use crate::io::{
EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_PRIMARY_NAMESPACE,
EVENT_QUEUE_PERSISTENCE_SECONDARY_NAMESPACE,
};
use crate::logger::{log_debug, log_error, log_info, Logger};
use crate::logger::{log_error, log_info, Logger};

use lightning::chain::chaininterface::{
BroadcasterInterface, ConfirmationTarget, FeeEstimator as LDKFeeEstimator,
};
use lightning::chain::chaininterface::ConfirmationTarget;
use lightning::events::Event as LdkEvent;
use lightning::events::PaymentPurpose;
use lightning::impl_writeable_tlv_based_enum;
Expand All @@ -26,8 +23,8 @@ use lightning::util::errors::APIError;
use lightning::util::persist::KVStore;
use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};

use bitcoin::secp256k1::{PublicKey, Secp256k1};
use bitcoin::{LockTime, OutPoint, PackedLockTime};
use bitcoin::secp256k1::PublicKey;
use bitcoin::{LockTime, OutPoint};
use rand::{thread_rng, Rng};
use std::collections::VecDeque;
use std::ops::Deref;
Expand Down Expand Up @@ -249,10 +246,8 @@ where
event_queue: Arc<EventQueue<K, L>>,
wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>,
tx_broadcaster: Arc<Broadcaster>,
fee_estimator: Arc<FeeEstimator>,
output_sweeper: Arc<Sweeper<K>>,
network_graph: Arc<NetworkGraph>,
keys_manager: Arc<KeysManager>,
payment_store: Arc<PaymentStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>,
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
Expand All @@ -266,20 +261,17 @@ where
{
pub fn new(
event_queue: Arc<EventQueue<K, L>>, wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>, tx_broadcaster: Arc<Broadcaster>,
fee_estimator: Arc<FeeEstimator>, network_graph: Arc<NetworkGraph>,
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
channel_manager: Arc<ChannelManager<K>>, output_sweeper: Arc<Sweeper<K>>,
network_graph: Arc<NetworkGraph>, payment_store: Arc<PaymentStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>, runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
logger: L, config: Arc<Config>,
) -> Self {
Self {
event_queue,
wallet,
channel_manager,
tx_broadcaster,
fee_estimator,
output_sweeper,
network_graph,
keys_manager,
payment_store,
peer_store,
logger,
Expand Down Expand Up @@ -585,42 +577,8 @@ where
});
}
}
LdkEvent::SpendableOutputs { outputs, channel_id: _ } => {
// TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so.
let destination_address = self.wallet.get_new_address().unwrap_or_else(|e| {
log_error!(self.logger, "Failed to get destination address: {}", e);
panic!("Failed to get destination address");
});

let output_descriptors = &outputs.iter().collect::<Vec<_>>();
let tx_feerate = self
.fee_estimator
.get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee);

// We set nLockTime to the current height to discourage fee sniping.
let cur_height = self.channel_manager.current_best_block().height();
let locktime: PackedLockTime =
LockTime::from_height(cur_height).map_or(PackedLockTime::ZERO, |l| l.into());
let res = self.keys_manager.spend_spendable_outputs(
output_descriptors,
Vec::new(),
destination_address.script_pubkey(),
tx_feerate,
Some(locktime),
&Secp256k1::new(),
);

match res {
Ok(Some(spending_tx)) => {
self.tx_broadcaster.broadcast_transactions(&[&spending_tx])
}
Ok(None) => {
log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs);
}
Err(err) => {
log_error!(self.logger, "Error spending outputs: {:?}", err);
}
}
LdkEvent::SpendableOutputs { outputs, channel_id } => {
self.output_sweeper.add_outputs(outputs, channel_id)
}
LdkEvent::OpenChannelRequest {
temporary_channel_id,
Expand Down
4 changes: 4 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
pub(crate) const PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "payments";
pub(crate) const PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// The spendable output information will be persisted under this prefix.
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "spendable_outputs";
pub(crate) const SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_PRIMARY_NAMESPACE: &str = "";
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_SECONDARY_NAMESPACE: &str = "";
Expand Down
31 changes: 31 additions & 0 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::WALLET_KEYS_SEED_LEN;

use crate::logger::log_error;
use crate::peer_store::PeerStore;
use crate::sweep::SpendableOutputInfo;
use crate::{Error, EventQueue, PaymentDetails};

use lightning::routing::gossip::NetworkGraph;
Expand Down Expand Up @@ -199,6 +200,36 @@ where
Ok(res)
}

/// Read previously persisted spendable output information from the store.
pub(crate) fn read_spendable_outputs<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
) -> Result<Vec<SpendableOutputInfo>, std::io::Error>
where
L::Target: Logger,
{
let mut res = Vec::new();

for stored_key in kv_store.list(
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)? {
let mut reader = Cursor::new(kv_store.read(
SPENDABLE_OUTPUT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
SPENDABLE_OUTPUT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&stored_key,
)?);
let output = SpendableOutputInfo::read(&mut reader).map_err(|e| {
log_error!(logger, "Failed to deserialize SpendableOutputInfo: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize SpendableOutputInfo",
)
})?;
res.push(output);
}
Ok(res)
}

pub(crate) fn read_latest_rgs_sync_timestamp<K: KVStore + Sync + Send, L: Deref>(
kv_store: Arc<K>, logger: L,
) -> Result<u32, std::io::Error>
Expand Down
12 changes: 8 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub mod io;
mod logger;
mod payment_store;
mod peer_store;
mod sweep;
#[cfg(test)]
mod test;
mod tx_broadcaster;
Expand Down Expand Up @@ -122,7 +123,7 @@ pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus};
use peer_store::{PeerInfo, PeerStore};
use types::{
Broadcaster, ChainMonitor, ChannelManager, FeeEstimator, KeysManager, NetworkGraph,
PeerManager, Router, Scorer, Wallet,
PeerManager, Router, Scorer, Sweeper, Wallet,
};
pub use types::{ChannelDetails, PeerDetails, UserChannelId};

Expand Down Expand Up @@ -295,6 +296,7 @@ pub struct Node<K: KVStore + Sync + Send + 'static> {
event_queue: Arc<EventQueue<K, Arc<FilesystemLogger>>>,
channel_manager: Arc<ChannelManager<K>>,
chain_monitor: Arc<ChainMonitor<K>>,
output_sweeper: Arc<Sweeper<K>>,
peer_manager: Arc<PeerManager<K>>,
keys_manager: Arc<KeysManager>,
network_graph: Arc<NetworkGraph>,
Expand Down Expand Up @@ -432,6 +434,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let mut stop_sync = self.stop_receiver.clone();
let wallet_sync_interval_secs =
Expand All @@ -449,6 +452,7 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];
let now = Instant::now();
match tx_sync.sync(confirmables).await {
Expand Down Expand Up @@ -695,10 +699,8 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
Arc::clone(&self.event_queue),
Arc::clone(&self.wallet),
Arc::clone(&self.channel_manager),
Arc::clone(&self.tx_broadcaster),
Arc::clone(&self.fee_estimator),
Arc::clone(&self.output_sweeper),
Arc::clone(&self.network_graph),
Arc::clone(&self.keys_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.peer_store),
Arc::clone(&self.runtime),
Expand Down Expand Up @@ -1036,10 +1038,12 @@ impl<K: KVStore + Sync + Send + 'static> Node<K> {
let tx_sync = Arc::clone(&self.tx_sync);
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
let sync_logger = Arc::clone(&self.logger);
let confirmables = vec![
&*sync_cman as &(dyn Confirm + Sync + Send),
&*sync_cmon as &(dyn Confirm + Sync + Send),
&*sync_sweeper as &(dyn Confirm + Sync + Send),
];

tokio::task::block_in_place(move || {
Expand Down
Loading
Loading