Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move transaction broadcasting and fee estimation to dedicated modules #205

Merged
merged 5 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ enum NodeError {
"ChannelClosingFailed",
"ChannelConfigUpdateFailed",
"PersistenceFailed",
"FeerateEstimationUpdateFailed",
"WalletOperationFailed",
"OnchainTxSigningFailed",
"MessageSigningFailed",
Expand Down
43 changes: 32 additions & 11 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::event::EventQueue;
use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::GossipSource;
use crate::io;
use crate::io::sqlite_store::SqliteStore;
use crate::logger::{log_error, FilesystemLogger, Logger};
use crate::payment_store::PaymentStore;
use crate::peer_store::PeerStore;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, FakeMessageRouter, GossipSync, KeysManager, NetworkGraph,
OnionMessenger, PeerManager,
Expand Down Expand Up @@ -464,13 +466,19 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
BuildError::WalletSetupFailed
})?;

let (blockchain, tx_sync) = match chain_data_source_config {
let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
Some(ChainDataSourceConfig::Esplora(server_url)) => {
let tx_sync = Arc::new(EsploraSyncClient::new(server_url.clone(), Arc::clone(&logger)));
let blockchain =
EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY);
(blockchain, tx_sync)
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
));
let fee_estimator =
Arc::new(OnchainFeeEstimator::new(tx_sync.client().clone(), Arc::clone(&logger)));
(blockchain, tx_sync, tx_broadcaster, fee_estimator)
}
None => {
// Default to Esplora client.
Expand All @@ -479,20 +487,31 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
let blockchain =
EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY);
(blockchain, tx_sync)
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
));
let fee_estimator =
Arc::new(OnchainFeeEstimator::new(tx_sync.client().clone(), Arc::clone(&logger)));
(blockchain, tx_sync, tx_broadcaster, fee_estimator)
}
};

let runtime = Arc::new(RwLock::new(None));
let wallet =
Arc::new(Wallet::new(blockchain, bdk_wallet, Arc::clone(&runtime), Arc::clone(&logger)));
let wallet = Arc::new(Wallet::new(
blockchain,
bdk_wallet,
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&logger),
));

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor<K>> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&tx_sync)),
Arc::clone(&wallet),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&kv_store),
));

Expand Down Expand Up @@ -592,9 +611,9 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&chain_monitor),
Arc::clone(&wallet),
Arc::clone(&tx_broadcaster),
Arc::clone(&router),
Arc::clone(&logger),
user_config,
Expand All @@ -616,9 +635,9 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
best_block: BestBlock::new(genesis_block_hash, 0),
};
channelmanager::ChannelManager::new(
Arc::clone(&wallet),
Arc::clone(&fee_estimator),
Arc::clone(&chain_monitor),
Arc::clone(&wallet),
Arc::clone(&tx_broadcaster),
Arc::clone(&router),
Arc::clone(&logger),
Arc::clone(&keys_manager),
Expand Down Expand Up @@ -767,6 +786,8 @@ fn build_with_store_internal<K: KVStore + Sync + Send + 'static>(
config,
wallet,
tx_sync,
tx_broadcaster,
fee_estimator,
event_queue,
channel_manager,
chain_monitor,
Expand Down
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub enum Error {
ChannelConfigUpdateFailed,
/// Persistence failed.
PersistenceFailed,
/// A fee rate estimation update failed.
FeerateEstimationUpdateFailed,
/// A wallet operation failed.
WalletOperationFailed,
/// A signing operation for transaction failed.
Expand Down Expand Up @@ -79,6 +81,9 @@ impl fmt::Display for Error {
Self::ChannelClosingFailed => write!(f, "Failed to close channel."),
Self::ChannelConfigUpdateFailed => write!(f, "Failed to update channel config."),
Self::PersistenceFailed => write!(f, "Failed to persist data."),
Self::FeerateEstimationUpdateFailed => {
write!(f, "Failed to update fee rate estimates.")
}
Self::WalletOperationFailed => write!(f, "Failed to conduct wallet operation."),
Self::OnchainTxSigningFailed => write!(f, "Failed to sign given transaction."),
Self::MessageSigningFailed => write!(f, "Failed to sign given message."),
Expand Down
34 changes: 22 additions & 12 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::peer_store::{PeerInfo, PeerStore};
use crate::types::{Broadcaster, FeeEstimator, Wallet};
use crate::{
hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, UserChannelId, Wallet,
hex_utils, ChannelManager, Config, Error, KeysManager, NetworkGraph, PeerInfo, PeerStore,
UserChannelId,
};

use crate::payment_store::{
Expand All @@ -13,7 +14,9 @@ use crate::io::{
};
use crate::logger::{log_debug, log_error, log_info, Logger};

use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
use lightning::chain::chaininterface::{
BroadcasterInterface, ConfirmationTarget, FeeEstimator as LDKFeeEstimator,
};
use lightning::events::Event as LdkEvent;
use lightning::events::PaymentPurpose;
use lightning::impl_writeable_tlv_based_enum;
Expand Down Expand Up @@ -243,40 +246,45 @@ pub(crate) struct EventHandler<K: KVStore + Sync + Send, L: Deref>
where
L::Target: Logger,
{
wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>,
event_queue: Arc<EventQueue<K, L>>,
wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>,
tx_broadcaster: Arc<Broadcaster>,
fee_estimator: Arc<FeeEstimator>,
network_graph: Arc<NetworkGraph>,
keys_manager: Arc<KeysManager>,
payment_store: Arc<PaymentStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>,
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
logger: L,
config: Arc<Config>,
peer_store: Arc<PeerStore<K, L>>,
}

impl<K: KVStore + Sync + Send + 'static, L: Deref> EventHandler<K, L>
where
L::Target: Logger,
{
pub fn new(
wallet: Arc<Wallet<bdk::database::SqliteDatabase, L>>, event_queue: Arc<EventQueue<K, L>>,
channel_manager: Arc<ChannelManager<K>>, network_graph: Arc<NetworkGraph>,
event_queue: Arc<EventQueue<K, L>>, wallet: Arc<Wallet>,
channel_manager: Arc<ChannelManager<K>>, tx_broadcaster: Arc<Broadcaster>,
fee_estimator: Arc<FeeEstimator>, network_graph: Arc<NetworkGraph>,
keys_manager: Arc<KeysManager>, payment_store: Arc<PaymentStore<K, L>>,
runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>, logger: L, config: Arc<Config>,
peer_store: Arc<PeerStore<K, L>>,
peer_store: Arc<PeerStore<K, L>>, runtime: Arc<RwLock<Option<tokio::runtime::Runtime>>>,
logger: L, config: Arc<Config>,
) -> Self {
Self {
event_queue,
wallet,
channel_manager,
tx_broadcaster,
fee_estimator,
network_graph,
keys_manager,
payment_store,
peer_store,
logger,
runtime,
config,
peer_store,
}
}

Expand Down Expand Up @@ -586,7 +594,7 @@ where

let output_descriptors = &outputs.iter().collect::<Vec<_>>();
let tx_feerate = self
.wallet
.fee_estimator
.get_est_sat_per_1000_weight(ConfirmationTarget::NonAnchorChannelFee);

// We set nLockTime to the current height to discourage fee sniping.
Expand All @@ -603,7 +611,9 @@ where
);

match res {
Ok(Some(spending_tx)) => self.wallet.broadcast_transactions(&[&spending_tx]),
Ok(Some(spending_tx)) => {
self.tx_broadcaster.broadcast_transactions(&[&spending_tx])
}
Ok(None) => {
log_debug!(self.logger, "Omitted spending static outputs: {:?}", outputs);
}
Expand Down
131 changes: 131 additions & 0 deletions src/fee_estimator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use crate::logger::{log_error, log_trace, Logger};
use crate::Error;

use lightning::chain::chaininterface::{
ConfirmationTarget, FeeEstimator, FEERATE_FLOOR_SATS_PER_KW,
};

use bdk::FeeRate;
use esplora_client::AsyncClient as EsploraClient;

use std::collections::HashMap;
use std::ops::Deref;
use std::sync::RwLock;

pub(crate) struct OnchainFeeEstimator<L: Deref>
where
L::Target: Logger,
{
fee_rate_cache: RwLock<HashMap<ConfirmationTarget, FeeRate>>,
esplora_client: EsploraClient,
logger: L,
}

impl<L: Deref> OnchainFeeEstimator<L>
where
L::Target: Logger,
{
pub(crate) fn new(esplora_client: EsploraClient, logger: L) -> Self {
let fee_rate_cache = RwLock::new(HashMap::new());
Self { fee_rate_cache, esplora_client, logger }
}

pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> {
let confirmation_targets = vec![
ConfirmationTarget::OnChainSweep,
ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee,
ConfirmationTarget::MinAllowedAnchorChannelRemoteFee,
ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee,
ConfirmationTarget::AnchorChannelFee,
ConfirmationTarget::NonAnchorChannelFee,
ConfirmationTarget::ChannelCloseMinimum,
];
for target in confirmation_targets {
let num_blocks = match target {
ConfirmationTarget::OnChainSweep => 6,
ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 1,
ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => 1008,
ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => 144,
ConfirmationTarget::AnchorChannelFee => 1008,
ConfirmationTarget::NonAnchorChannelFee => 12,
ConfirmationTarget::ChannelCloseMinimum => 144,
};

let estimates = self.esplora_client.get_fee_estimates().await.map_err(|e| {
log_error!(
self.logger,
"Failed to retrieve fee rate estimates for {:?}: {}",
target,
e
);
Error::FeerateEstimationUpdateFailed
})?;

let converted_estimates = esplora_client::convert_fee_rate(num_blocks, estimates)
.map_err(|e| {
log_error!(
self.logger,
"Failed to convert fee rate estimates for {:?}: {}",
target,
e
);
Error::FeerateEstimationUpdateFailed
})?;

let fee_rate = FeeRate::from_sat_per_vb(converted_estimates);

// LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that
// require some post-estimation adjustments to the fee rates, which we do here.
let adjusted_fee_rate = match target {
ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => {
let really_high_prio = fee_rate.as_sat_per_vb() * 10.0;
FeeRate::from_sat_per_vb(really_high_prio)
}
ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => {
let slightly_less_than_background = fee_rate.fee_wu(1000) - 250;
FeeRate::from_sat_per_kwu(slightly_less_than_background as f32)
}
_ => fee_rate,
};

let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap();
locked_fee_rate_cache.insert(target, adjusted_fee_rate);
log_trace!(
self.logger,
"Fee rate estimation updated for {:?}: {} sats/kwu",
target,
adjusted_fee_rate.fee_wu(1000)
);
}
Ok(())
}

pub(crate) fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate {
let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap();

let fallback_sats_kwu = match confirmation_target {
ConfirmationTarget::OnChainSweep => 5000,
ConfirmationTarget::MaxAllowedNonAnchorChannelRemoteFee => 25 * 250,
ConfirmationTarget::MinAllowedAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW,
ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee => FEERATE_FLOOR_SATS_PER_KW,
ConfirmationTarget::AnchorChannelFee => 500,
ConfirmationTarget::NonAnchorChannelFee => 1000,
ConfirmationTarget::ChannelCloseMinimum => 500,
};

// We'll fall back on this, if we really don't have any other information.
let fallback_rate = FeeRate::from_sat_per_kwu(fallback_sats_kwu as f32);

*locked_fee_rate_cache.get(&confirmation_target).unwrap_or(&fallback_rate)
}
}

impl<L: Deref> FeeEstimator for OnchainFeeEstimator<L>
where
L::Target: Logger,
{
fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 {
(self.estimate_fee_rate(confirmation_target).fee_wu(1000) as u32)
.max(FEERATE_FLOOR_SATS_PER_KW)
}
}
Loading