diff --git a/Cargo.toml b/Cargo.toml index ad50a1cc4..bb21f2772 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ lightning-persister = { version = "0.0.121" } lightning-background-processor = { version = "0.0.121", features = ["futures"] } lightning-rapid-gossip-sync = { version = "0.0.121" } lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-https", "time"] } +lightning-liquidity = { version = "0.1.0-alpha", features = ["std"] } #lightning = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["std"] } #lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } @@ -43,6 +44,7 @@ lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-h #lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["futures"] } #lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } #lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["esplora-async"] } +#lightning-liquidity = { git = "https://github.com/lightningdevkit/lightning-liquidity", branch="main", features = ["std"] } #lightning = { path = "../rust-lightning/lightning", features = ["std"] } #lightning-invoice = { path = "../rust-lightning/lightning-invoice" } @@ -51,6 +53,7 @@ lightning-transaction-sync = { version = "0.0.121", features = ["esplora-async-h #lightning-background-processor = { path = "../rust-lightning/lightning-background-processor", features = ["futures"] } #lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } #lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] } +#lightning-liquidity = { path = "../lightning-liquidity", features = ["std"] } bdk = { version = "0.29.0", default-features = false, features = ["std", "async-interface", "use-esplora-async", "sqlite-bundled", "keys-bip39"]} diff --git a/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt b/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt index 936fa558e..a65d4678d 100644 --- a/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt +++ b/bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt @@ -217,8 +217,8 @@ class LibraryTest { assert(channelReadyEvent2 is Event.ChannelReady) node2.eventHandled() - val channelId = when (channelReadyEvent2) { - is Event.ChannelReady -> channelReadyEvent2.channelId + val userChannelId = when (channelReadyEvent2) { + is Event.ChannelReady -> channelReadyEvent2.userChannelId else -> return } @@ -239,7 +239,7 @@ class LibraryTest { assert(node1.listPayments().size == 1) assert(node2.listPayments().size == 1) - node2.closeChannel(channelId, nodeId1) + node2.closeChannel(userChannelId, nodeId1) val channelClosedEvent1 = node1.waitNextEvent() println("Got event: $channelClosedEvent1") diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index b5c459d67..393ca3326 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -28,6 +28,7 @@ interface Builder { void set_esplora_server(string esplora_server_url); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); + void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token); void set_storage_dir_path(string storage_dir_path); void set_network(Network network); [Throws=BuildError] @@ -63,11 +64,11 @@ interface LDKNode { [Throws=NodeError] void disconnect(PublicKey node_id); [Throws=NodeError] - void connect_open_channel(PublicKey node_id, SocketAddress address, u64 channel_amount_sats, u64? push_to_counterparty_msat, ChannelConfig? channel_config, boolean announce_channel); + UserChannelId connect_open_channel(PublicKey node_id, SocketAddress address, u64 channel_amount_sats, u64? push_to_counterparty_msat, ChannelConfig? channel_config, boolean announce_channel); [Throws=NodeError] - void close_channel([ByRef]ChannelId channel_id, PublicKey counterparty_node_id); + void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id); [Throws=NodeError] - void update_channel_config([ByRef]ChannelId channel_id, PublicKey counterparty_node_id, ChannelConfig channel_config); + void update_channel_config([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, ChannelConfig channel_config); [Throws=NodeError] void sync_wallets(); [Throws=NodeError] @@ -86,6 +87,10 @@ interface LDKNode { Bolt11Invoice receive_payment(u64 amount_msat, [ByRef]string description, u32 expiry_secs); [Throws=NodeError] Bolt11Invoice receive_variable_amount_payment([ByRef]string description, u32 expiry_secs); + [Throws=NodeError] + Bolt11Invoice receive_payment_via_jit_channel(u64 amount_msat, [ByRef]string description, u32 expiry_secs, u64? max_lsp_fee_limit_msat); + [Throws=NodeError] + Bolt11Invoice receive_variable_amount_payment_via_jit_channel([ByRef]string description, u32 expiry_secs, u64? max_proportional_lsp_fee_limit_ppm_msat); PaymentDetails? payment([ByRef]PaymentHash payment_hash); [Throws=NodeError] void remove_payment([ByRef]PaymentHash payment_hash); @@ -117,6 +122,7 @@ enum NodeError { "MessageSigningFailed", "TxSyncFailed", "GossipUpdateFailed", + "LiquidityRequestFailed", "InvalidAddress", "InvalidSocketAddress", "InvalidPublicKey", @@ -130,6 +136,8 @@ enum NodeError { "InvalidNetwork", "DuplicatePayment", "InsufficientFunds", + "LiquiditySourceUnavailable", + "LiquidityFeeTooHigh", }; [Error] @@ -168,12 +176,9 @@ enum PaymentStatus { "Failed", }; -[NonExhaustive] -enum Network { - "Bitcoin", - "Testnet", - "Signet", - "Regtest", +dictionary LSPFeeLimits { + u64? max_total_opening_fee_msat; + u64? max_proportional_opening_fee_ppm_msat; }; dictionary PaymentDetails { @@ -183,6 +188,15 @@ dictionary PaymentDetails { u64? amount_msat; PaymentDirection direction; PaymentStatus status; + LSPFeeLimits? lsp_fee_limits; +}; + +[NonExhaustive] +enum Network { + "Bitcoin", + "Testnet", + "Signet", + "Regtest", }; dictionary OutPoint { diff --git a/bindings/python/src/ldk_node/test_ldk_node.py b/bindings/python/src/ldk_node/test_ldk_node.py index 7b8ea7239..555ffbcc7 100644 --- a/bindings/python/src/ldk_node/test_ldk_node.py +++ b/bindings/python/src/ldk_node/test_ldk_node.py @@ -198,7 +198,7 @@ def test_channel_full_cycle(self): print("EVENT:", payment_received_event_2) node_2.event_handled() - node_2.close_channel(channel_ready_event_2.channel_id, node_id_1) + node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) channel_closed_event_1 = node_1.wait_next_event() assert isinstance(channel_closed_event_1, Event.CHANNEL_CLOSED) diff --git a/src/builder.rs b/src/builder.rs index d8519815e..bfe557229 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -3,7 +3,9 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io; use crate::io::sqlite_store::SqliteStore; +use crate::liquidity::LiquiditySource; use crate::logger::{log_error, FilesystemLogger, Logger}; +use crate::message_handler::NodeCustomMessageHandler; use crate::payment_store::PaymentStore; use crate::peer_store::PeerStore; use crate::sweep::OutputSweeper; @@ -40,6 +42,9 @@ use lightning_persister::fs_store::FilesystemStore; use lightning_transaction_sync::EsploraSyncClient; +use lightning_liquidity::lsps2::client::LSPS2ClientConfig; +use lightning_liquidity::{LiquidityClientConfig, LiquidityManager}; + #[cfg(any(vss, vss_test))] use crate::io::vss_store::VssStore; use bdk::bitcoin::secp256k1::Secp256k1; @@ -49,6 +54,7 @@ use bdk::template::Bip84; use bip39::Mnemonic; +use bitcoin::secp256k1::PublicKey; use bitcoin::{BlockHash, Network}; #[cfg(any(vss, vss_test))] @@ -80,6 +86,18 @@ enum GossipSourceConfig { RapidGossipSync(String), } +#[derive(Debug, Clone)] +struct LiquiditySourceConfig { + // LSPS2 service's (address, node_id, token) + lsps2_service: Option<(SocketAddress, PublicKey, Option)>, +} + +impl Default for LiquiditySourceConfig { + fn default() -> Self { + Self { lsps2_service: None } + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -146,16 +164,14 @@ pub struct NodeBuilder { entropy_source_config: Option, chain_data_source_config: Option, gossip_source_config: Option, + liquidity_source_config: Option, } impl NodeBuilder { /// Creates a new builder instance with the default configuration. pub fn new() -> Self { let config = Config::default(); - let entropy_source_config = None; - let chain_data_source_config = None; - let gossip_source_config = None; - Self { config, entropy_source_config, chain_data_source_config, gossip_source_config } + Self::from_config(config) } /// Creates a new builder instance from an [`Config`]. @@ -164,7 +180,14 @@ impl NodeBuilder { let entropy_source_config = None; let chain_data_source_config = None; let gossip_source_config = None; - Self { config, entropy_source_config, chain_data_source_config, gossip_source_config } + let liquidity_source_config = None; + Self { + config, + entropy_source_config, + chain_data_source_config, + gossip_source_config, + liquidity_source_config, + } } /// Configures the [`Node`] instance to source its wallet entropy from a seed file on disk. @@ -218,6 +241,25 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its inbound liquidity from the given + /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) + /// service. + /// + /// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`]. + /// + /// The given `token` will be used by the LSP to authenticate the user. + pub fn set_liquidity_source_lsps2( + &mut self, address: SocketAddress, node_id: PublicKey, token: Option, + ) -> &mut Self { + // Mark the LSP as trusted for 0conf + self.config.trusted_peers_0conf.push(node_id.clone()); + + let liquidity_source_config = + self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); + liquidity_source_config.lsps2_service = Some((address, node_id, token)); + self + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self { self.config.storage_dir_path = storage_dir_path; @@ -318,6 +360,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.liquidity_source_config.as_ref(), seed_bytes, logger, vss_store, @@ -340,6 +383,7 @@ impl NodeBuilder { config, self.chain_data_source_config.as_ref(), self.gossip_source_config.as_ref(), + self.liquidity_source_config.as_ref(), seed_bytes, logger, kv_store, @@ -413,6 +457,19 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url); } + /// Configures the [`Node`] instance to source its inbound liquidity from the given + /// [LSPS2](https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md) + /// service. + /// + /// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`]. + /// + /// The given `token` will be used by the LSP to authenticate the user. + pub fn set_liquidity_source_lsps2( + &self, address: SocketAddress, node_id: PublicKey, token: Option, + ) { + self.inner.write().unwrap().set_liquidity_source_lsps2(address, node_id, token); + } + /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); @@ -463,7 +520,8 @@ impl ArcedNodeBuilder { /// Builds a [`Node`] instance according to the options previously configured. fn build_with_store_internal( config: Arc, chain_data_source_config: Option<&ChainDataSourceConfig>, - gossip_source_config: Option<&GossipSourceConfig>, seed_bytes: [u8; 64], + gossip_source_config: Option<&GossipSourceConfig>, + liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64], logger: Arc, kv_store: Arc, ) -> Result, BuildError> { // Initialize the on-chain wallet and chain access @@ -636,6 +694,12 @@ fn build_with_store_internal( // generating the events otherwise. user_config.manually_accept_inbound_channels = true; } + + if liquidity_source_config.is_some() { + // Generally allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll + // check that they don't take too much before claiming. + user_config.channel_config.accept_underpaying_htlcs = true; + } let channel_manager = { if let Ok(res) = kv_store.read( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, @@ -746,20 +810,51 @@ fn build_with_store_internal( } }; + let liquidity_source = liquidity_source_config.as_ref().and_then(|lsc| { + lsc.lsps2_service.as_ref().map(|(address, node_id, token)| { + let lsps2_client_config = Some(LSPS2ClientConfig {}); + let liquidity_client_config = Some(LiquidityClientConfig { lsps2_client_config }); + let liquidity_manager = Arc::new(LiquidityManager::new( + Arc::clone(&keys_manager), + Arc::clone(&channel_manager), + Some(Arc::clone(&tx_sync)), + None, + None, + liquidity_client_config, + )); + Arc::new(LiquiditySource::new_lsps2( + address.clone(), + *node_id, + token.clone(), + Arc::clone(&channel_manager), + Arc::clone(&keys_manager), + liquidity_manager, + Arc::clone(&config), + Arc::clone(&logger), + )) + }) + }); + + let custom_message_handler = if let Some(liquidity_source) = liquidity_source.as_ref() { + Arc::new(NodeCustomMessageHandler::new_liquidity(Arc::clone(&liquidity_source))) + } else { + Arc::new(NodeCustomMessageHandler::new_ignoring()) + }; + let msg_handler = match gossip_source.as_gossip_sync() { GossipSync::P2P(p2p_gossip_sync) => MessageHandler { chan_handler: Arc::clone(&channel_manager), route_handler: Arc::clone(&p2p_gossip_sync) as Arc, onion_message_handler: onion_messenger, - custom_message_handler: IgnoringMessageHandler {}, + custom_message_handler, }, GossipSync::Rapid(_) => MessageHandler { chan_handler: Arc::clone(&channel_manager), route_handler: Arc::new(IgnoringMessageHandler {}) as Arc, onion_message_handler: onion_messenger, - custom_message_handler: IgnoringMessageHandler {}, + custom_message_handler, }, GossipSync::None => { unreachable!("We must always have a gossip sync!"); @@ -782,6 +877,8 @@ fn build_with_store_internal( Arc::clone(&keys_manager), )); + liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager))); + // Init payment info storage let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) { Ok(payments) => { @@ -853,6 +950,7 @@ fn build_with_store_internal( keys_manager, network_graph, gossip_source, + liquidity_source, kv_store, logger, _router: router, diff --git a/src/error.rs b/src/error.rs index 267239e04..6bb743e6f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,7 +21,7 @@ pub enum Error { ChannelCreationFailed, /// A channel could not be closed. ChannelClosingFailed, - /// A channel config could not be updated. + /// A channel configuration could not be updated. ChannelConfigUpdateFailed, /// Persistence failed. PersistenceFailed, @@ -37,6 +37,8 @@ pub enum Error { TxSyncFailed, /// A gossip updating operation failed. GossipUpdateFailed, + /// A liquidity request operation failed. + LiquidityRequestFailed, /// The given address is invalid. InvalidAddress, /// The given network address is invalid. @@ -47,7 +49,7 @@ pub enum Error { InvalidSecretKey, /// The given payment hash is invalid. InvalidPaymentHash, - /// The given payment preimage is invalid. + /// The given payment pre-image is invalid. InvalidPaymentPreimage, /// The given payment secret is invalid. InvalidPaymentSecret, @@ -59,10 +61,14 @@ pub enum Error { InvalidChannelId, /// The given network is invalid. InvalidNetwork, - /// A payment with the given hash has already been intiated. + /// A payment with the given hash has already been initiated. DuplicatePayment, - /// There are insufficient funds to complete the given operation. + /// The available funds are insufficient to complete the given operation. InsufficientFunds, + /// The given operation failed due to the required liquidity source being unavailable. + LiquiditySourceUnavailable, + /// The given operation failed due to the LSP's required opening fee being too high. + LiquidityFeeTooHigh, } impl fmt::Display for Error { @@ -89,6 +95,7 @@ impl fmt::Display for Error { Self::MessageSigningFailed => write!(f, "Failed to sign given message."), Self::TxSyncFailed => write!(f, "Failed to sync transactions."), Self::GossipUpdateFailed => write!(f, "Failed to update gossip data."), + Self::LiquidityRequestFailed => write!(f, "Failed to request inbound liquidity."), Self::InvalidAddress => write!(f, "The given address is invalid."), Self::InvalidSocketAddress => write!(f, "The given network address is invalid."), Self::InvalidPublicKey => write!(f, "The given public key is invalid."), @@ -104,7 +111,13 @@ impl fmt::Display for Error { write!(f, "A payment with the given hash has already been initiated.") } Self::InsufficientFunds => { - write!(f, "There are insufficient funds to complete the given operation.") + write!(f, "The available funds are insufficient to complete the given operation.") + } + Self::LiquiditySourceUnavailable => { + write!(f, "The given operation failed due to the required liquidity source being unavailable.") + } + Self::LiquidityFeeTooHigh => { + write!(f, "The given operation failed due to the LSP's required opening fee being too high.") } } } diff --git a/src/event.rs b/src/event.rs index a22607f7f..3745801fe 100644 --- a/src/event.rs +++ b/src/event.rs @@ -23,12 +23,16 @@ use lightning::util::errors::APIError; use lightning::util::persist::KVStore; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; +use lightning_liquidity::lsps2::utils::compute_opening_fee; + use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; use bitcoin::OutPoint; + +use rand::{thread_rng, Rng}; + use core::future::Future; use core::task::{Poll, Waker}; -use rand::{thread_rng, Rng}; use std::collections::VecDeque; use std::ops::Deref; use std::sync::{Arc, Condvar, Mutex, RwLock}; @@ -395,7 +399,7 @@ where via_user_channel_id: _, claim_deadline: _, onion_fields: _, - counterparty_skimmed_fee_msat: _, + counterparty_skimmed_fee_msat, } => { if let Some(info) = self.payment_store.get(&payment_hash) { if info.status == PaymentStatus::Succeeded { @@ -417,6 +421,39 @@ where }); return; } + + let max_total_opening_fee_msat = info + .lsp_fee_limits + .and_then(|l| { + l.max_total_opening_fee_msat.or_else(|| { + l.max_proportional_opening_fee_ppm_msat.and_then(|max_prop_fee| { + // If it's a variable amount payment, compute the actual fee. + compute_opening_fee(amount_msat, 0, max_prop_fee) + }) + }) + }) + .unwrap_or(0); + + if counterparty_skimmed_fee_msat > max_total_opening_fee_msat { + log_info!( + self.logger, + "Refusing inbound payment with hash {} as the counterparty-withheld fee of {}msat exceeds our limit of {}msat", + hex_utils::to_string(&payment_hash.0), + counterparty_skimmed_fee_msat, + max_total_opening_fee_msat, + ); + self.channel_manager.fail_htlc_backwards(&payment_hash); + + let update = PaymentDetailsUpdate { + status: Some(PaymentStatus::Failed), + ..PaymentDetailsUpdate::new(payment_hash) + }; + self.payment_store.update(&update).unwrap_or_else(|e| { + log_error!(self.logger, "Failed to access payment store: {}", e); + panic!("Failed to access payment store"); + }); + return; + } } log_info!( @@ -510,6 +547,7 @@ where amount_msat: Some(amount_msat), direction: PaymentDirection::Inbound, status: PaymentStatus::Succeeded, + lsp_fee_limits: None, }; match self.payment_store.insert(payment) { diff --git a/src/lib.rs b/src/lib.rs index 6b5b66f69..ccb9010ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -82,7 +82,9 @@ mod fee_estimator; mod gossip; mod hex_utils; pub mod io; +mod liquidity; mod logger; +mod message_handler; mod payment_store; mod peer_store; mod sweep; @@ -116,8 +118,9 @@ pub use builder::NodeBuilder as Builder; use event::{EventHandler, EventQueue}; use gossip::GossipSource; +use liquidity::LiquiditySource; use payment_store::PaymentStore; -pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus}; +pub use payment_store::{LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentStatus}; use peer_store::{PeerInfo, PeerStore}; use types::{ Broadcaster, ChainMonitor, ChannelManager, FeeEstimator, KeysManager, NetworkGraph, @@ -130,7 +133,10 @@ use logger::{log_error, log_info, log_trace, FilesystemLogger, Logger}; use lightning::chain::Confirm; use lightning::ln::channelmanager::{self, PaymentId, RecipientOnionFields, Retry}; use lightning::ln::msgs::SocketAddress; -use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage}; +use lightning::ln::{PaymentHash, PaymentPreimage}; + +#[cfg(feature = "uniffi")] +use lightning::ln::ChannelId; use lightning::sign::EntropySource; use lightning::util::persist::KVStore; @@ -308,6 +314,7 @@ pub struct Node { keys_manager: Arc, network_graph: Arc, gossip_source: Arc, + liquidity_source: Option>>>, kv_store: Arc, logger: Arc, _router: Arc, @@ -761,6 +768,21 @@ impl Node { }); }); + if let Some(liquidity_source) = self.liquidity_source.as_ref() { + let mut stop_liquidity_handler = self.stop_receiver.clone(); + let liquidity_handler = Arc::clone(&liquidity_source); + runtime.spawn(async move { + loop { + tokio::select! { + _ = stop_liquidity_handler.changed() => { + return; + } + _ = liquidity_handler.handle_next_event() => {} + } + } + }); + } + *runtime_lock = Some(runtime); log_info!(self.logger, "Startup complete."); @@ -969,12 +991,12 @@ impl Node { /// channel counterparty on channel open. This can be useful to start out with the balance not /// entirely shifted to one side, therefore allowing to receive payments from the getgo. /// - /// Returns a temporary channel id. + /// Returns a [`UserChannelId`] allowing to locally keep track of the channel. pub fn connect_open_channel( &self, node_id: PublicKey, address: SocketAddress, channel_amount_sats: u64, push_to_counterparty_msat: Option, channel_config: Option>, announce_channel: bool, - ) -> Result<(), Error> { + ) -> Result { let rt_lock = self.runtime.read().unwrap(); if rt_lock.is_none() { return Err(Error::NotRunning); @@ -1031,7 +1053,7 @@ impl Node { peer_info.node_id ); self.peer_store.add_peer(peer_info)?; - Ok(()) + Ok(UserChannelId(user_channel_id)) } Err(e) => { log_error!(self.logger, "Failed to initiate channel creation: {:?}", e); @@ -1104,27 +1126,51 @@ impl Node { /// Close a previously opened channel. pub fn close_channel( - &self, channel_id: &ChannelId, counterparty_node_id: PublicKey, + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, ) -> Result<(), Error> { - self.peer_store.remove_peer(&counterparty_node_id)?; - match self.channel_manager.close_channel(&channel_id, &counterparty_node_id) { - Ok(_) => Ok(()), - Err(_) => Err(Error::ChannelClosingFailed), + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + match self + .channel_manager + .close_channel(&channel_details.channel_id, &counterparty_node_id) + { + Ok(_) => { + // Check if this was the last open channel, if so, forget the peer. + if open_channels.len() == 1 { + self.peer_store.remove_peer(&counterparty_node_id)?; + } + Ok(()) + } + Err(_) => Err(Error::ChannelClosingFailed), + } + } else { + Ok(()) } } /// Update the config for a previously opened channel. pub fn update_channel_config( - &self, channel_id: &ChannelId, counterparty_node_id: PublicKey, + &self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey, channel_config: Arc, ) -> Result<(), Error> { - self.channel_manager - .update_channel_config( - &counterparty_node_id, - &[*channel_id], - &(*channel_config).clone().into(), - ) - .map_err(|_| Error::ChannelConfigUpdateFailed) + let open_channels = + self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); + if let Some(channel_details) = + open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0) + { + self.channel_manager + .update_channel_config( + &counterparty_node_id, + &[channel_details.channel_id], + &(*channel_config).clone().into(), + ) + .map_err(|_| Error::ChannelConfigUpdateFailed) + } else { + Err(Error::ChannelConfigUpdateFailed) + } } /// Send a payment given an invoice. @@ -1171,6 +1217,7 @@ impl Node { amount_msat: invoice.amount_milli_satoshis(), direction: PaymentDirection::Outbound, status: PaymentStatus::Pending, + lsp_fee_limits: None, }; self.payment_store.insert(payment)?; @@ -1190,6 +1237,7 @@ impl Node { amount_msat: invoice.amount_milli_satoshis(), direction: PaymentDirection::Outbound, status: PaymentStatus::Failed, + lsp_fee_limits: None, }; self.payment_store.insert(payment)?; @@ -1277,6 +1325,7 @@ impl Node { amount_msat: Some(amount_msat), direction: PaymentDirection::Outbound, status: PaymentStatus::Pending, + lsp_fee_limits: None, }; self.payment_store.insert(payment)?; @@ -1297,6 +1346,7 @@ impl Node { amount_msat: Some(amount_msat), direction: PaymentDirection::Outbound, status: PaymentStatus::Failed, + lsp_fee_limits: None, }; self.payment_store.insert(payment)?; @@ -1351,6 +1401,7 @@ impl Node { status: PaymentStatus::Pending, direction: PaymentDirection::Outbound, amount_msat: Some(amount_msat), + lsp_fee_limits: None, }; self.payment_store.insert(payment)?; @@ -1371,6 +1422,7 @@ impl Node { status: PaymentStatus::Failed, direction: PaymentDirection::Outbound, amount_msat: Some(amount_msat), + lsp_fee_limits: None, }; self.payment_store.insert(payment)?; @@ -1544,6 +1596,7 @@ impl Node { amount_msat, direction: PaymentDirection::Inbound, status: PaymentStatus::Pending, + lsp_fee_limits: None, }; self.payment_store.insert(payment)?; @@ -1551,6 +1604,136 @@ impl Node { Ok(invoice) } + /// Returns a payable invoice that can be used to request a payment of the amount given and + /// receive it via a newly created just-in-time (JIT) channel. + /// + /// When the returned invoice is paid, the configured [LSPS2]-compliant LSP will open a channel + /// to us, supplying just-in-time inbound liquidity. + /// + /// If set, `max_total_lsp_fee_limit_msat` will limit how much fee we allow the LSP to take for opening the + /// channel to us. We'll use its cheapest offer otherwise. + /// + /// [LSPS2]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md + pub fn receive_payment_via_jit_channel( + &self, amount_msat: u64, description: &str, expiry_secs: u32, + max_total_lsp_fee_limit_msat: Option, + ) -> Result { + self.receive_payment_via_jit_channel_inner( + Some(amount_msat), + description, + expiry_secs, + max_total_lsp_fee_limit_msat, + None, + ) + } + + /// Returns a payable invoice that can be used to request a variable amount payment (also known + /// as "zero-amount" invoice) and receive it via a newly created just-in-time (JIT) channel. + /// + /// When the returned invoice is paid, the configured [LSPS2]-compliant LSP will open a channel + /// to us, supplying just-in-time inbound liquidity. + /// + /// If set, `max_proportional_lsp_fee_limit_ppm_msat` will limit how much proportional fee, in + /// parts-per-million millisatoshis, we allow the LSP to take for opening the channel to us. + /// We'll use its cheapest offer otherwise. + /// + /// [LSPS2]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md + pub fn receive_variable_amount_payment_via_jit_channel( + &self, description: &str, expiry_secs: u32, + max_proportional_lsp_fee_limit_ppm_msat: Option, + ) -> Result { + self.receive_payment_via_jit_channel_inner( + None, + description, + expiry_secs, + None, + max_proportional_lsp_fee_limit_ppm_msat, + ) + } + + fn receive_payment_via_jit_channel_inner( + &self, amount_msat: Option, description: &str, expiry_secs: u32, + max_total_lsp_fee_limit_msat: Option, + max_proportional_lsp_fee_limit_ppm_msat: Option, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let (node_id, address) = liquidity_source + .get_liquidity_source_details() + .ok_or(Error::LiquiditySourceUnavailable)?; + + let rt_lock = self.runtime.read().unwrap(); + let runtime = rt_lock.as_ref().unwrap(); + + let peer_info = PeerInfo { node_id, address }; + + let con_node_id = peer_info.node_id; + let con_addr = peer_info.address.clone(); + let con_logger = Arc::clone(&self.logger); + let con_pm = Arc::clone(&self.peer_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + tokio::task::block_in_place(move || { + runtime.block_on(async move { + connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await + }) + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", peer_info.node_id, peer_info.address); + + let liquidity_source = Arc::clone(&liquidity_source); + let (invoice, lsp_total_opening_fee, lsp_prop_opening_fee) = + tokio::task::block_in_place(move || { + runtime.block_on(async move { + if let Some(amount_msat) = amount_msat { + liquidity_source + .lsps2_receive_to_jit_channel( + amount_msat, + description, + expiry_secs, + max_total_lsp_fee_limit_msat, + ) + .await + .map(|(invoice, total_fee)| (invoice, Some(total_fee), None)) + } else { + liquidity_source + .lsps2_receive_variable_amount_to_jit_channel( + description, + expiry_secs, + max_proportional_lsp_fee_limit_ppm_msat, + ) + .await + .map(|(invoice, prop_fee)| (invoice, None, Some(prop_fee))) + } + }) + })?; + + // Register payment in payment store. + let payment_hash = PaymentHash(invoice.payment_hash().to_byte_array()); + let lsp_fee_limits = Some(LSPFeeLimits { + max_total_opening_fee_msat: lsp_total_opening_fee, + max_proportional_opening_fee_ppm_msat: lsp_prop_opening_fee, + }); + let payment = PaymentDetails { + hash: payment_hash, + preimage: None, + secret: Some(invoice.payment_secret().clone()), + amount_msat, + direction: PaymentDirection::Inbound, + status: PaymentStatus::Pending, + lsp_fee_limits, + }; + + self.payment_store.insert(payment)?; + + // Persist LSP peer to make sure we reconnect on restart. + self.peer_store.add_peer(peer_info)?; + + Ok(invoice) + } + /// Retrieve the details of a specific payment with the given hash. /// /// Returns `Some` if the payment was known and `None` otherwise. diff --git a/src/liquidity.rs b/src/liquidity.rs new file mode 100644 index 000000000..965e25b77 --- /dev/null +++ b/src/liquidity.rs @@ -0,0 +1,434 @@ +use crate::logger::{log_debug, log_error, log_info, Logger}; +use crate::types::{ChannelManager, KeysManager, LiquidityManager, PeerManager}; +use crate::{Config, Error}; + +use lightning::ln::channelmanager::MIN_FINAL_CLTV_EXPIRY_DELTA; +use lightning::ln::msgs::SocketAddress; +use lightning::routing::router::{RouteHint, RouteHintHop}; +use lightning::util::persist::KVStore; +use lightning_invoice::{Bolt11Invoice, InvoiceBuilder, RoutingFees}; +use lightning_liquidity::events::Event; +use lightning_liquidity::lsps0::msgs::RequestId; +use lightning_liquidity::lsps2::event::LSPS2ClientEvent; +use lightning_liquidity::lsps2::msgs::OpeningFeeParams; +use lightning_liquidity::lsps2::utils::compute_opening_fee; + +use bitcoin::hashes::{sha256, Hash}; +use bitcoin::secp256k1::{PublicKey, Secp256k1}; + +use tokio::sync::oneshot; + +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; + +struct LSPS2Service { + address: SocketAddress, + node_id: PublicKey, + token: Option, + pending_fee_requests: Mutex>>, + pending_buy_requests: Mutex>>, +} + +pub(crate) struct LiquiditySource +where + L::Target: Logger, +{ + lsps2_service: Option, + channel_manager: Arc>, + keys_manager: Arc, + liquidity_manager: Arc>, + config: Arc, + logger: L, +} + +impl LiquiditySource +where + L::Target: Logger, +{ + pub(crate) fn new_lsps2( + address: SocketAddress, node_id: PublicKey, token: Option, + channel_manager: Arc>, keys_manager: Arc, + liquidity_manager: Arc>, config: Arc, logger: L, + ) -> Self { + let pending_fee_requests = Mutex::new(HashMap::new()); + let pending_buy_requests = Mutex::new(HashMap::new()); + let lsps2_service = Some(LSPS2Service { + address, + node_id, + token, + pending_fee_requests, + pending_buy_requests, + }); + Self { lsps2_service, channel_manager, keys_manager, liquidity_manager, config, logger } + } + + pub(crate) fn set_peer_manager(&self, peer_manager: Arc>) { + let process_msgs_callback = move || peer_manager.process_events(); + self.liquidity_manager.set_process_msgs_callback(process_msgs_callback); + } + + pub(crate) fn liquidity_manager(&self) -> &LiquidityManager { + self.liquidity_manager.as_ref() + } + + pub(crate) fn get_liquidity_source_details(&self) -> Option<(PublicKey, SocketAddress)> { + self.lsps2_service.as_ref().map(|s| (s.node_id, s.address.clone())) + } + + pub(crate) async fn handle_next_event(&self) { + match self.liquidity_manager.next_event_async().await { + Event::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady { + request_id, + counterparty_node_id, + opening_fee_params_menu, + }) => { + if let Some(lsps2_service) = self.lsps2_service.as_ref() { + if counterparty_node_id != lsps2_service.node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + if let Some(sender) = + lsps2_service.pending_fee_requests.lock().unwrap().remove(&request_id) + { + let response = LSPS2FeeResponse { opening_fee_params_menu }; + + match sender.send(response) { + Ok(()) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + } + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS2Client::OpeningParametersReady event!" + ); + } + } + Event::LSPS2Client(LSPS2ClientEvent::InvoiceParametersReady { + request_id, + counterparty_node_id, + intercept_scid, + cltv_expiry_delta, + .. + }) => { + if let Some(lsps2_service) = self.lsps2_service.as_ref() { + if counterparty_node_id != lsps2_service.node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + if let Some(sender) = + lsps2_service.pending_buy_requests.lock().unwrap().remove(&request_id) + { + let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta }; + + match sender.send(response) { + Ok(()) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + } + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS2Client::InvoiceParametersReady event!" + ); + } + } + e => { + log_error!(self.logger, "Received unexpected liquidity event: {:?}", e); + } + } + } + + pub(crate) async fn lsps2_receive_to_jit_channel( + &self, amount_msat: u64, description: &str, expiry_secs: u32, + max_total_lsp_fee_limit_msat: Option, + ) -> Result<(Bolt11Invoice, u64), Error> { + let fee_response = self.lsps2_request_opening_fee_params().await?; + + let (min_total_fee_msat, min_opening_params) = fee_response + .opening_fee_params_menu + .into_iter() + .filter_map(|params| { + if amount_msat < params.min_payment_size_msat + || amount_msat > params.max_payment_size_msat + { + log_debug!(self.logger, + "Skipping LSP-offered JIT parameters as the payment of {}msat doesn't meet LSP limits (min: {}msat, max: {}msat)", + amount_msat, + params.min_payment_size_msat, + params.max_payment_size_msat + ); + None + } else { + compute_opening_fee(amount_msat, params.min_fee_msat, params.proportional as u64) + .map(|fee| (fee, params)) + } + }) + .min_by_key(|p| p.0) + .ok_or_else(|| { + log_error!(self.logger, "Failed to handle response from liquidity service",); + Error::LiquidityRequestFailed + })?; + + if let Some(max_total_lsp_fee_limit_msat) = max_total_lsp_fee_limit_msat { + if min_total_fee_msat > max_total_lsp_fee_limit_msat { + log_error!(self.logger, + "Failed to request inbound JIT channel as LSP's requested total opening fee of {}msat exceeds our fee limit of {}msat", + min_total_fee_msat, max_total_lsp_fee_limit_msat + ); + return Err(Error::LiquidityFeeTooHigh); + } + } + + log_debug!( + self.logger, + "Choosing cheapest liquidity offer, will pay {}msat in total LSP fees", + min_total_fee_msat + ); + + let buy_response = + self.lsps2_send_buy_request(Some(amount_msat), min_opening_params).await?; + let invoice = self.lsps2_create_jit_invoice( + buy_response, + Some(amount_msat), + description, + expiry_secs, + )?; + + log_info!(self.logger, "JIT-channel invoice created: {}", invoice); + Ok((invoice, min_total_fee_msat)) + } + + pub(crate) async fn lsps2_receive_variable_amount_to_jit_channel( + &self, description: &str, expiry_secs: u32, + max_proportional_lsp_fee_limit_ppm_msat: Option, + ) -> Result<(Bolt11Invoice, u64), Error> { + let fee_response = self.lsps2_request_opening_fee_params().await?; + + let (min_prop_fee_ppm_msat, min_opening_params) = fee_response + .opening_fee_params_menu + .into_iter() + .map(|params| (params.proportional as u64, params)) + .min_by_key(|p| p.0) + .ok_or_else(|| { + log_error!(self.logger, "Failed to handle response from liquidity service",); + Error::LiquidityRequestFailed + })?; + + if let Some(max_proportional_lsp_fee_limit_ppm_msat) = + max_proportional_lsp_fee_limit_ppm_msat + { + if min_prop_fee_ppm_msat > max_proportional_lsp_fee_limit_ppm_msat { + log_error!(self.logger, + "Failed to request inbound JIT channel as LSP's requested proportional opening fee of {} ppm msat exceeds our fee limit of {} ppm msat", + min_prop_fee_ppm_msat, + max_proportional_lsp_fee_limit_ppm_msat + ); + return Err(Error::LiquidityFeeTooHigh); + } + } + + log_debug!( + self.logger, + "Choosing cheapest liquidity offer, will pay {}ppm msat in proportional LSP fees", + min_prop_fee_ppm_msat + ); + + let buy_response = self.lsps2_send_buy_request(None, min_opening_params).await?; + let invoice = + self.lsps2_create_jit_invoice(buy_response, None, description, expiry_secs)?; + + log_info!(self.logger, "JIT-channel invoice created: {}", invoice); + Ok((invoice, min_prop_fee_ppm_msat)) + } + + async fn lsps2_request_opening_fee_params(&self) -> Result { + let lsps2_service = self.lsps2_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { + log_error!(self.logger, "Liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (fee_request_sender, fee_request_receiver) = oneshot::channel(); + { + let mut pending_fee_requests_lock = lsps2_service.pending_fee_requests.lock().unwrap(); + let request_id = client_handler + .request_opening_params(lsps2_service.node_id, lsps2_service.token.clone()); + pending_fee_requests_lock.insert(request_id, fee_request_sender); + } + + tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + fee_request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + }) + } + + async fn lsps2_send_buy_request( + &self, amount_msat: Option, opening_fee_params: OpeningFeeParams, + ) -> Result { + let lsps2_service = self.lsps2_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { + log_error!(self.logger, "Liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (buy_request_sender, buy_request_receiver) = oneshot::channel(); + { + let mut pending_buy_requests_lock = lsps2_service.pending_buy_requests.lock().unwrap(); + let request_id = client_handler + .select_opening_params(lsps2_service.node_id, amount_msat, opening_fee_params) + .map_err(|e| { + log_error!( + self.logger, + "Failed to send buy request to liquidity service: {:?}", + e + ); + Error::LiquidityRequestFailed + })?; + pending_buy_requests_lock.insert(request_id, buy_request_sender); + } + + let buy_response = tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + buy_request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {:?}", e); + Error::LiquidityRequestFailed + })?; + + Ok(buy_response) + } + + fn lsps2_create_jit_invoice( + &self, buy_response: LSPS2BuyResponse, amount_msat: Option, description: &str, + expiry_secs: u32, + ) -> Result { + let lsps2_service = self.lsps2_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + // LSPS2 requires min_final_cltv_expiry_delta to be at least 2 more than usual. + let min_final_cltv_expiry_delta = MIN_FINAL_CLTV_EXPIRY_DELTA + 2; + let (payment_hash, payment_secret) = self + .channel_manager + .create_inbound_payment(None, expiry_secs, Some(min_final_cltv_expiry_delta)) + .map_err(|e| { + log_error!(self.logger, "Failed to register inbound payment: {:?}", e); + Error::InvoiceCreationFailed + })?; + + let route_hint = RouteHint(vec![RouteHintHop { + src_node_id: lsps2_service.node_id, + short_channel_id: buy_response.intercept_scid, + fees: RoutingFees { base_msat: 0, proportional_millionths: 0 }, + cltv_expiry_delta: buy_response.cltv_expiry_delta as u16, + htlc_minimum_msat: None, + htlc_maximum_msat: None, + }]); + + let payment_hash = sha256::Hash::from_slice(&payment_hash.0).map_err(|e| { + log_error!(self.logger, "Invalid payment hash: {:?}", e); + Error::InvoiceCreationFailed + })?; + + let currency = self.config.network.into(); + let mut invoice_builder = InvoiceBuilder::new(currency) + .description(description.to_string()) + .payment_hash(payment_hash) + .payment_secret(payment_secret) + .current_timestamp() + .min_final_cltv_expiry_delta(min_final_cltv_expiry_delta.into()) + .expiry_time(Duration::from_secs(expiry_secs.into())) + .private_route(route_hint); + + if let Some(amount_msat) = amount_msat { + invoice_builder = invoice_builder.amount_milli_satoshis(amount_msat).basic_mpp(); + } + + invoice_builder + .build_signed(|hash| { + Secp256k1::new() + .sign_ecdsa_recoverable(hash, &self.keys_manager.get_node_secret_key()) + }) + .map_err(|e| { + log_error!(self.logger, "Failed to build and sign invoice: {}", e); + Error::InvoiceCreationFailed + }) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS2FeeResponse { + opening_fee_params_menu: Vec, +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS2BuyResponse { + intercept_scid: u64, + cltv_expiry_delta: u32, +} diff --git a/src/message_handler.rs b/src/message_handler.rs new file mode 100644 index 000000000..7e6cbf722 --- /dev/null +++ b/src/message_handler.rs @@ -0,0 +1,98 @@ +use crate::liquidity::LiquiditySource; + +use lightning::ln::features::{InitFeatures, NodeFeatures}; +use lightning::ln::peer_handler::CustomMessageHandler; +use lightning::ln::wire::CustomMessageReader; +use lightning::util::logger::Logger; +use lightning::util::persist::KVStore; + +use lightning_liquidity::lsps0::msgs::RawLSPSMessage; + +use bitcoin::secp256k1::PublicKey; + +use std::ops::Deref; +use std::sync::Arc; + +pub(crate) enum NodeCustomMessageHandler +where + L::Target: Logger, +{ + Ignoring, + Liquidity { liquidity_source: Arc> }, +} + +impl NodeCustomMessageHandler +where + L::Target: Logger, +{ + pub(crate) fn new_liquidity(liquidity_source: Arc>) -> Self { + Self::Liquidity { liquidity_source } + } + + pub(crate) fn new_ignoring() -> Self { + Self::Ignoring + } +} + +impl CustomMessageReader + for NodeCustomMessageHandler +where + L::Target: Logger, +{ + type CustomMessage = RawLSPSMessage; + + fn read( + &self, message_type: u16, buffer: &mut RD, + ) -> Result, lightning::ln::msgs::DecodeError> { + match self { + Self::Ignoring => Ok(None), + Self::Liquidity { liquidity_source, .. } => { + liquidity_source.liquidity_manager().read(message_type, buffer) + } + } + } +} + +impl CustomMessageHandler + for NodeCustomMessageHandler +where + L::Target: Logger, +{ + fn handle_custom_message( + &self, msg: Self::CustomMessage, sender_node_id: &PublicKey, + ) -> Result<(), lightning::ln::msgs::LightningError> { + match self { + Self::Ignoring => Ok(()), // Should be unreachable!() as the reader will return `None` + Self::Liquidity { liquidity_source, .. } => { + liquidity_source.liquidity_manager().handle_custom_message(msg, sender_node_id) + } + } + } + + fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { + match self { + Self::Ignoring => Vec::new(), + Self::Liquidity { liquidity_source, .. } => { + liquidity_source.liquidity_manager().get_and_clear_pending_msg() + } + } + } + + fn provided_node_features(&self) -> NodeFeatures { + match self { + Self::Ignoring => NodeFeatures::empty(), + Self::Liquidity { liquidity_source, .. } => { + liquidity_source.liquidity_manager().provided_node_features() + } + } + } + + fn provided_init_features(&self, their_node_id: &PublicKey) -> InitFeatures { + match self { + Self::Ignoring => InitFeatures::empty(), + Self::Liquidity { liquidity_source, .. } => { + liquidity_source.liquidity_manager().provided_init_features(their_node_id) + } + } + } +} diff --git a/src/payment_store.rs b/src/payment_store.rs index bc0534188..704966878 100644 --- a/src/payment_store.rs +++ b/src/payment_store.rs @@ -30,10 +30,20 @@ pub struct PaymentDetails { pub direction: PaymentDirection, /// The status of the payment. pub status: PaymentStatus, + /// Limits applying to how much fee we allow an LSP to deduct from the payment amount. + /// + /// This is only `Some` for payments received via a JIT-channel, in which case the first + /// inbound payment will pay for the LSP's channel opening fees. + /// + /// See [`LdkChannelConfig::accept_underpaying_htlcs`] for more information. + /// + /// [`LdkChannelConfig::accept_underpaying_htlcs`]: lightning::util::config::ChannelConfig::accept_underpaying_htlcs + pub lsp_fee_limits: Option, } impl_writeable_tlv_based!(PaymentDetails, { (0, hash, required), + (1, lsp_fee_limits, option), (2, preimage, required), (4, secret, required), (6, amount_msat, required), @@ -72,6 +82,26 @@ impl_writeable_tlv_based_enum!(PaymentStatus, (4, Failed) => {}; ); +/// Limits applying to how much fee we allow an LSP to deduct from the payment amount. +/// +/// See [`LdkChannelConfig::accept_underpaying_htlcs`] for more information. +/// +/// [`LdkChannelConfig::accept_underpaying_htlcs`]: lightning::util::config::ChannelConfig::accept_underpaying_htlcs +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct LSPFeeLimits { + /// The maximal total amount we allow any configured LSP withhold from us when forwarding the + /// payment. + pub max_total_opening_fee_msat: Option, + /// The maximal proportional fee, in parts-per-million millisatoshi, we allow any configured + /// LSP withhold from us when forwarding the payment. + pub max_proportional_opening_fee_ppm_msat: Option, +} + +impl_writeable_tlv_based!(LSPFeeLimits, { + (0, max_total_opening_fee_msat, option), + (2, max_proportional_opening_fee_ppm_msat, option), +}); + #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) struct PaymentDetailsUpdate { pub hash: PaymentHash, @@ -80,6 +110,7 @@ pub(crate) struct PaymentDetailsUpdate { pub amount_msat: Option>, pub direction: Option, pub status: Option, + pub lsp_fee_limits: Option>, } impl PaymentDetailsUpdate { @@ -91,6 +122,7 @@ impl PaymentDetailsUpdate { amount_msat: None, direction: None, status: None, + lsp_fee_limits: None, } } } @@ -171,6 +203,10 @@ where payment.status = status; } + if let Some(lsp_fee_limits) = update.lsp_fee_limits { + payment.lsp_fee_limits = lsp_fee_limits + } + self.persist_info(&update.hash, payment)?; updated = true; } @@ -247,6 +283,7 @@ mod tests { amount_msat: None, direction: PaymentDirection::Inbound, status: PaymentStatus::Pending, + lsp_fee_limits: None, }; assert_eq!(Ok(false), payment_store.insert(payment.clone())); diff --git a/src/types.rs b/src/types.rs index 522dd28fd..7a2485916 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,4 +1,5 @@ use crate::logger::FilesystemLogger; +use crate::message_handler::NodeCustomMessageHandler; use crate::sweep::OutputSweeper; use lightning::blinded_path::BlindedPath; @@ -25,7 +26,7 @@ use std::sync::{Arc, Mutex, RwLock}; pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, - Arc>>, + Arc, Arc, Arc, Arc, @@ -38,8 +39,16 @@ pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager< Arc, Arc, Arc, - IgnoringMessageHandler, + Arc>>, + Arc, +>; + +pub(crate) type ChainSource = EsploraSyncClient>; + +pub(crate) type LiquidityManager = lightning_liquidity::LiquidityManager< Arc, + Arc>, + Arc, >; pub(crate) type ChannelManager = lightning::ln::channelmanager::ChannelManager< @@ -132,7 +141,7 @@ impl lightning::onion_message::messenger::MessageRouter for FakeMessageRouter { pub(crate) type Sweeper = OutputSweeper< Arc, Arc, - Arc>>, + Arc, Arc, Arc, >; diff --git a/src/wallet.rs b/src/wallet.rs index d3e6c0a47..5caa67264 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -23,7 +23,7 @@ use bitcoin::bech32::u5; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; -use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, Signing}; +use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing}; use bitcoin::{ScriptBuf, Transaction, TxOut, Txid}; use std::ops::Deref; @@ -299,6 +299,10 @@ where .or(Err(Error::MessageSigningFailed)) } + pub fn get_node_secret_key(&self) -> SecretKey { + self.inner.get_node_secret_key() + } + pub fn verify_signature(&self, msg: &[u8], sig: &str, pkey: &PublicKey) -> bool { message_signing::verify(msg, sig, pkey) } diff --git a/tests/common.rs b/tests/common.rs index a1e1e993c..3ea38284a 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -65,11 +65,11 @@ pub(crate) use expect_channel_pending_event; macro_rules! expect_channel_ready_event { ($node: expr, $counterparty_node_id: expr) => {{ match $node.wait_next_event() { - ref e @ Event::ChannelReady { channel_id, counterparty_node_id, .. } => { + ref e @ Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { println!("{} got event {:?}", $node.node_id(), e); assert_eq!(counterparty_node_id, Some($counterparty_node_id)); $node.event_handled(); - channel_id + user_channel_id } ref e => { panic!("{} got unexpected event!: {:?}", std::stringify!($node), e); @@ -377,7 +377,7 @@ pub(crate) fn do_channel_full_cycle( expect_channel_ready_event!(node_a, node_b.node_id()); - let channel_id = expect_channel_ready_event!(node_b, node_a.node_id()); + let user_channel_id = expect_channel_ready_event!(node_b, node_a.node_id()); println!("\nB receive_payment"); let invoice_amount_1_msat = 2500_000; @@ -520,7 +520,7 @@ pub(crate) fn do_channel_full_cycle( ); println!("\nB close_channel"); - node_b.close_channel(&channel_id, node_a.node_id()).unwrap(); + node_b.close_channel(&user_channel_id, node_a.node_id()).unwrap(); expect_event!(node_a, ChannelClosed); expect_event!(node_b, ChannelClosed); diff --git a/tests/integration_tests_cln.rs b/tests/integration_tests_cln.rs index f941c8fdf..b2059281b 100644 --- a/tests/integration_tests_cln.rs +++ b/tests/integration_tests_cln.rs @@ -88,7 +88,7 @@ fn test_cln() { let funding_txo = common::expect_channel_pending_event!(node, cln_node_id); common::wait_for_tx(&electrs_client, funding_txo.txid); common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6); - let channel_id = common::expect_channel_ready_event!(node, cln_node_id); + let user_channel_id = common::expect_channel_ready_event!(node, cln_node_id); // Send a payment to CLN let mut rng = thread_rng(); @@ -110,7 +110,7 @@ fn test_cln() { cln_client.pay(&ldk_invoice.to_string(), Default::default()).unwrap(); common::expect_event!(node, PaymentReceived); - node.close_channel(&channel_id, cln_node_id).unwrap(); + node.close_channel(&user_channel_id, cln_node_id).unwrap(); common::expect_event!(node, ChannelClosed); node.stop().unwrap(); }