diff --git a/src/builder.rs b/src/builder.rs index 06e8ae035..a8b0749c0 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -6,8 +6,8 @@ // accordance with one or both of these licenses. use crate::config::{ - default_user_config, Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, - DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL, WALLET_KEYS_SEED_LEN, + default_user_config, Config, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL, + WALLET_KEYS_SEED_LEN, }; use crate::connection::ConnectionManager; use crate::event::EventQueue; @@ -562,7 +562,7 @@ fn build_with_store_internal( })?, }; - let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { + let (esplora_client, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora(server_url)) => { let mut client_builder = esplora_client::Builder::new(&server_url.clone()); client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); @@ -571,8 +571,6 @@ fn build_with_store_internal( esplora_client.clone(), Arc::clone(&logger), )); - let blockchain = EsploraBlockchain::from_client(esplora_client, BDK_CLIENT_STOP_GAP) - .with_concurrency(BDK_CLIENT_CONCURRENCY); let tx_broadcaster = Arc::new(TransactionBroadcaster::new( tx_sync.client().clone(), Arc::clone(&logger), @@ -582,15 +580,18 @@ fn build_with_store_internal( Arc::clone(&config), Arc::clone(&logger), )); - (blockchain, tx_sync, tx_broadcaster, fee_estimator) + (esplora_client, tx_sync, tx_broadcaster, fee_estimator) }, None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); - let tx_sync = Arc::new(EsploraSyncClient::new(server_url, Arc::clone(&logger))); - let blockchain = - EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) - .with_concurrency(BDK_CLIENT_CONCURRENCY); + let mut client_builder = esplora_client::Builder::new(&server_url.clone()); + client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); + let esplora_client = client_builder.build_async().unwrap(); + let tx_sync = Arc::new(EsploraSyncClient::from_client( + esplora_client.clone(), + Arc::clone(&logger), + )); let tx_broadcaster = Arc::new(TransactionBroadcaster::new( tx_sync.client().clone(), Arc::clone(&logger), @@ -600,14 +601,14 @@ fn build_with_store_internal( Arc::clone(&config), Arc::clone(&logger), )); - (blockchain, tx_sync, tx_broadcaster, fee_estimator) + (esplora_client, tx_sync, tx_broadcaster, fee_estimator) }, }; let runtime = Arc::new(RwLock::new(None)); let wallet = Arc::new(Wallet::new( - blockchain, bdk_wallet, + esplora_client, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator), Arc::clone(&logger), diff --git a/src/config.rs b/src/config.rs index fac25b562..008771ab6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -31,7 +31,7 @@ const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20; // The number of concurrent requests made against the API provider. -pub(crate) const BDK_CLIENT_CONCURRENCY: u8 = 4; +pub(crate) const BDK_CLIENT_CONCURRENCY: usize = 4; // The default Esplora server we're using. pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api"; diff --git a/src/error.rs b/src/error.rs index 9e0d7b5e6..07cbd9daa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,6 +6,7 @@ // accordance with one or both of these licenses. use bdk_chain::bitcoin::psbt::ExtractTxError as BdkExtractTxError; +use bdk_chain::local_chain::CannotConnectError as BdkChainConnectionError; use bdk_wallet::error::CreateTxError as BdkCreateTxError; use bdk_wallet::signer::SignerError as BdkSignerError; @@ -203,6 +204,12 @@ impl From for Error { } } +impl From for Error { + fn from(_: BdkChainConnectionError) -> Self { + Self::WalletOperationFailed + } +} + impl From for Error { fn from(_e: lightning_transaction_sync::TxSyncError) -> Self { Self::TxSyncFailed diff --git a/src/lib.rs b/src/lib.rs index 01b1992f1..a7b652fa1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -280,49 +280,44 @@ impl Node { .config .onchain_wallet_sync_interval_secs .max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS); - std::thread::spawn(move || { - tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( - async move { - let mut onchain_wallet_sync_interval = tokio::time::interval( - Duration::from_secs(onchain_wallet_sync_interval_secs), - ); - onchain_wallet_sync_interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = stop_sync.changed() => { + runtime.spawn(async move { + let mut onchain_wallet_sync_interval = + tokio::time::interval(Duration::from_secs(onchain_wallet_sync_interval_secs)); + onchain_wallet_sync_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_sync.changed() => { + log_trace!( + sync_logger, + "Stopping background syncing on-chain wallet.", + ); + return; + } + _ = onchain_wallet_sync_interval.tick() => { + let now = Instant::now(); + match wallet.sync().await { + Ok(()) => { log_trace!( - sync_logger, - "Stopping background syncing on-chain wallet.", - ); - return; + sync_logger, + "Background sync of on-chain wallet finished in {}ms.", + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; } - _ = onchain_wallet_sync_interval.tick() => { - let now = Instant::now(); - match wallet.sync().await { - Ok(()) => { - log_trace!( - sync_logger, - "Background sync of on-chain wallet finished in {}ms.", - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - *sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt; - } - Err(err) => { - log_error!( - sync_logger, - "Background sync of on-chain wallet failed: {}", - err - ) - } - } + Err(err) => { + log_error!( + sync_logger, + "Background sync of on-chain wallet failed: {}", + err + ) } } } - }, - ); + } + } }); let mut stop_fee_updates = self.stop_sender.subscribe(); diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index a969b5fa5..7939edd53 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -9,7 +9,7 @@ use persist::KVStoreWalletPersister; use crate::logger::{log_error, log_info, log_trace, Logger}; -use crate::config::BDK_WALLET_SYNC_TIMEOUT_SECS; +use crate::config::{BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS}; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator}; use crate::Error; @@ -26,8 +26,8 @@ use lightning::sign::{ use lightning::util::message_signing; use lightning_invoice::RawBolt11Invoice; -use bdk::blockchain::EsploraBlockchain; use bdk_chain::ChainPosition; +use bdk_esplora::EsploraAsyncExt; use bdk_wallet::{KeychainKind, PersistedWallet, SignOptions}; use bitcoin::blockdata::constants::WITNESS_SCALE_FACTOR; @@ -42,6 +42,8 @@ use bitcoin::{ Amount, ScriptBuf, Transaction, TxOut, Txid, WPubkeyHash, WitnessProgram, WitnessVersion, }; +use esplora_client::AsyncClient as EsploraAsyncClient; + use std::ops::{Deref, DerefMut}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -60,11 +62,9 @@ where E::Target: FeeEstimator, L::Target: Logger, { - // A BDK blockchain used for wallet sync. - blockchain: EsploraBlockchain, // A BDK on-chain wallet. inner: Mutex>, - // A cache storing the most recently retrieved fee rate estimations. + esplora_client: EsploraAsyncClient, broadcaster: B, fee_estimator: E, // A Mutex holding the current sync status. @@ -79,12 +79,12 @@ where L::Target: Logger, { pub(crate) fn new( - blockchain: EsploraBlockchain, wallet: bdk_wallet::PersistedWallet, - broadcaster: B, fee_estimator: E, logger: L, + wallet: bdk_wallet::PersistedWallet, + esplora_client: EsploraAsyncClient, broadcaster: B, fee_estimator: E, logger: L, ) -> Self { let inner = Mutex::new(wallet); let sync_status = Mutex::new(WalletSyncStatus::Completed); - Self { blockchain, inner, broadcaster, fee_estimator, sync_status, logger } + Self { inner, esplora_client, broadcaster, fee_estimator, sync_status, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { @@ -98,34 +98,42 @@ where } let res = { - let wallet_lock = self.inner.lock().unwrap(); + let full_scan_request = self.inner.lock().unwrap().start_full_scan().build(); let wallet_sync_timeout_fut = tokio::time::timeout( Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - wallet_lock.sync(&self.blockchain, SyncOptions { progress: None }), + self.esplora_client.full_scan( + full_scan_request, + BDK_CLIENT_STOP_GAP, + BDK_CLIENT_CONCURRENCY, + ), ); match wallet_sync_timeout_fut.await { Ok(res) => match res { - Ok(()) => Ok(()), - Err(e) => match e { - bdk::Error::Esplora(ref be) => match **be { - bdk::blockchain::esplora::EsploraError::Reqwest(_) => { - log_error!( - self.logger, - "Sync failed due to HTTP connection error: {}", - e - ); - Err(From::from(e)) - }, - _ => { - log_error!(self.logger, "Sync failed due to Esplora error: {}", e); - Err(From::from(e)) - }, + Ok(update) => match self.inner.lock().unwrap().apply_update(update) { + Ok(()) => Ok(()), + Err(e) => { + log_error!( + self.logger, + "Sync failed due to chain connection error: {}", + e + ); + Err(Error::WalletOperationFailed) + }, + }, + Err(e) => match *e { + esplora_client::Error::Reqwest(he) => { + log_error!( + self.logger, + "Sync failed due to HTTP connection error: {}", + he + ); + Err(Error::WalletOperationFailed) }, _ => { - log_error!(self.logger, "Wallet sync error: {}", e); - Err(From::from(e)) + log_error!(self.logger, "Sync failed due to Esplora error: {}", e); + Err(Error::WalletOperationFailed) }, }, },