Skip to content

Commit

Permalink
f BDK: Account for new syncing interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tnull committed Sep 10, 2024
1 parent d614533 commit ca582a9
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 78 deletions.
25 changes: 13 additions & 12 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
// accordance with one or both of these licenses.

use crate::config::{
default_user_config, Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP,
DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL, WALLET_KEYS_SEED_LEN,
default_user_config, Config, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL,
WALLET_KEYS_SEED_LEN,
};
use crate::connection::ConnectionManager;
use crate::event::EventQueue;
Expand Down Expand Up @@ -562,7 +562,7 @@ fn build_with_store_internal(
})?,
};

let (blockchain, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
let (esplora_client, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
Some(ChainDataSourceConfig::Esplora(server_url)) => {
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
Expand All @@ -571,8 +571,6 @@ fn build_with_store_internal(
esplora_client.clone(),
Arc::clone(&logger),
));
let blockchain = EsploraBlockchain::from_client(esplora_client, BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY);
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
Expand All @@ -582,15 +580,18 @@ fn build_with_store_internal(
Arc::clone(&config),
Arc::clone(&logger),
));
(blockchain, tx_sync, tx_broadcaster, fee_estimator)
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
},
None => {
// Default to Esplora client.
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
let tx_sync = Arc::new(EsploraSyncClient::new(server_url, Arc::clone(&logger)));
let blockchain =
EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP)
.with_concurrency(BDK_CLIENT_CONCURRENCY);
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
let esplora_client = client_builder.build_async().unwrap();
let tx_sync = Arc::new(EsploraSyncClient::from_client(
esplora_client.clone(),
Arc::clone(&logger),
));
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
tx_sync.client().clone(),
Arc::clone(&logger),
Expand All @@ -600,14 +601,14 @@ fn build_with_store_internal(
Arc::clone(&config),
Arc::clone(&logger),
));
(blockchain, tx_sync, tx_broadcaster, fee_estimator)
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
},
};

let runtime = Arc::new(RwLock::new(None));
let wallet = Arc::new(Wallet::new(
blockchain,
bdk_wallet,
esplora_client,
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
Arc::clone(&logger),
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000;
pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20;

// The number of concurrent requests made against the API provider.
pub(crate) const BDK_CLIENT_CONCURRENCY: u8 = 4;
pub(crate) const BDK_CLIENT_CONCURRENCY: usize = 4;

// The default Esplora server we're using.
pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api";
Expand Down
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// accordance with one or both of these licenses.

use bdk_chain::bitcoin::psbt::ExtractTxError as BdkExtractTxError;
use bdk_chain::local_chain::CannotConnectError as BdkChainConnectionError;
use bdk_wallet::error::CreateTxError as BdkCreateTxError;
use bdk_wallet::signer::SignerError as BdkSignerError;

Expand Down Expand Up @@ -203,6 +204,12 @@ impl From<BdkExtractTxError> for Error {
}
}

impl From<BdkChainConnectionError> for Error {
fn from(_: BdkChainConnectionError) -> Self {
Self::WalletOperationFailed
}
}

impl From<lightning_transaction_sync::TxSyncError> for Error {
fn from(_e: lightning_transaction_sync::TxSyncError) -> Self {
Self::TxSyncFailed
Expand Down
71 changes: 33 additions & 38 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,49 +280,44 @@ impl Node {
.config
.onchain_wallet_sync_interval_secs
.max(config::WALLET_SYNC_INTERVAL_MINIMUM_SECS);
std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(
async move {
let mut onchain_wallet_sync_interval = tokio::time::interval(
Duration::from_secs(onchain_wallet_sync_interval_secs),
);
onchain_wallet_sync_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
runtime.spawn(async move {
let mut onchain_wallet_sync_interval =
tokio::time::interval(Duration::from_secs(onchain_wallet_sync_interval_secs));
onchain_wallet_sync_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_sync.changed() => {
log_trace!(
sync_logger,
"Stopping background syncing on-chain wallet.",
);
return;
}
_ = onchain_wallet_sync_interval.tick() => {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_trace!(
sync_logger,
"Stopping background syncing on-chain wallet.",
);
return;
sync_logger,
"Background sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
_ = onchain_wallet_sync_interval.tick() => {
let now = Instant::now();
match wallet.sync().await {
Ok(()) => {
log_trace!(
sync_logger,
"Background sync of on-chain wallet finished in {}ms.",
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
*sync_onchain_wallet_timestamp.write().unwrap() = unix_time_secs_opt;
}
Err(err) => {
log_error!(
sync_logger,
"Background sync of on-chain wallet failed: {}",
err
)
}
}
Err(err) => {
log_error!(
sync_logger,
"Background sync of on-chain wallet failed: {}",
err
)
}
}
}
},
);
}
}
});

let mut stop_fee_updates = self.stop_sender.subscribe();
Expand Down
62 changes: 35 additions & 27 deletions src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use persist::KVStoreWalletPersister;

use crate::logger::{log_error, log_info, log_trace, Logger};

use crate::config::BDK_WALLET_SYNC_TIMEOUT_SECS;
use crate::config::{BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS};
use crate::fee_estimator::{ConfirmationTarget, FeeEstimator};
use crate::Error;

Expand All @@ -26,8 +26,8 @@ use lightning::sign::{
use lightning::util::message_signing;
use lightning_invoice::RawBolt11Invoice;

use bdk::blockchain::EsploraBlockchain;
use bdk_chain::ChainPosition;
use bdk_esplora::EsploraAsyncExt;
use bdk_wallet::{KeychainKind, PersistedWallet, SignOptions};

use bitcoin::blockdata::constants::WITNESS_SCALE_FACTOR;
Expand All @@ -42,6 +42,8 @@ use bitcoin::{
Amount, ScriptBuf, Transaction, TxOut, Txid, WPubkeyHash, WitnessProgram, WitnessVersion,
};

use esplora_client::AsyncClient as EsploraAsyncClient;

use std::ops::{Deref, DerefMut};
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand All @@ -60,11 +62,9 @@ where
E::Target: FeeEstimator,
L::Target: Logger,
{
// A BDK blockchain used for wallet sync.
blockchain: EsploraBlockchain,
// A BDK on-chain wallet.
inner: Mutex<PersistedWallet<KVStoreWalletPersister>>,
// A cache storing the most recently retrieved fee rate estimations.
esplora_client: EsploraAsyncClient,
broadcaster: B,
fee_estimator: E,
// A Mutex holding the current sync status.
Expand All @@ -79,12 +79,12 @@ where
L::Target: Logger,
{
pub(crate) fn new(
blockchain: EsploraBlockchain, wallet: bdk_wallet::PersistedWallet<KVStoreWalletPersister>,
broadcaster: B, fee_estimator: E, logger: L,
wallet: bdk_wallet::PersistedWallet<KVStoreWalletPersister>,
esplora_client: EsploraAsyncClient, broadcaster: B, fee_estimator: E, logger: L,
) -> Self {
let inner = Mutex::new(wallet);
let sync_status = Mutex::new(WalletSyncStatus::Completed);
Self { blockchain, inner, broadcaster, fee_estimator, sync_status, logger }
Self { inner, esplora_client, broadcaster, fee_estimator, sync_status, logger }
}

pub(crate) async fn sync(&self) -> Result<(), Error> {
Expand All @@ -98,34 +98,42 @@ where
}

let res = {
let wallet_lock = self.inner.lock().unwrap();
let full_scan_request = self.inner.lock().unwrap().start_full_scan().build();

let wallet_sync_timeout_fut = tokio::time::timeout(
Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS),
wallet_lock.sync(&self.blockchain, SyncOptions { progress: None }),
self.esplora_client.full_scan(
full_scan_request,
BDK_CLIENT_STOP_GAP,
BDK_CLIENT_CONCURRENCY,
),
);

match wallet_sync_timeout_fut.await {
Ok(res) => match res {
Ok(()) => Ok(()),
Err(e) => match e {
bdk::Error::Esplora(ref be) => match **be {
bdk::blockchain::esplora::EsploraError::Reqwest(_) => {
log_error!(
self.logger,
"Sync failed due to HTTP connection error: {}",
e
);
Err(From::from(e))
},
_ => {
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
Err(From::from(e))
},
Ok(update) => match self.inner.lock().unwrap().apply_update(update) {
Ok(()) => Ok(()),
Err(e) => {
log_error!(
self.logger,
"Sync failed due to chain connection error: {}",
e
);
Err(Error::WalletOperationFailed)
},
},
Err(e) => match *e {
esplora_client::Error::Reqwest(he) => {
log_error!(
self.logger,
"Sync failed due to HTTP connection error: {}",
he
);
Err(Error::WalletOperationFailed)
},
_ => {
log_error!(self.logger, "Wallet sync error: {}", e);
Err(From::from(e))
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
Err(Error::WalletOperationFailed)
},
},
},
Expand Down

0 comments on commit ca582a9

Please sign in to comment.