diff --git a/roles/jd-server/config-examples/jds-config-local-example.toml b/roles/jd-server/config-examples/jds-config-local-example.toml index dc8ce0555..084089274 100644 --- a/roles/jd-server/config-examples/jds-config-local-example.toml +++ b/roles/jd-server/config-examples/jds-config-local-example.toml @@ -28,4 +28,4 @@ core_rpc_pass = "password" # Time interval used for JDS mempool update [mempool_update_interval] unit = "secs" -value = 1 +value = 0.1 diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index 29bbd7bf3..cd75e86af 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -7,14 +7,17 @@ use roles_logic_sv2::{ ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd, }, parsers::JobDeclaration, + utils::Mutex, }; -use std::{convert::TryInto, io::Cursor}; +use std::{convert::TryInto, io::Cursor, sync::Arc}; use stratum_common::bitcoin::{Transaction, Txid}; pub type SendTo = SendTo_, ()>; +use crate::mempool::JDsMempool; + use super::{signed_token, TransactionState}; use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages}; use stratum_common::bitcoin::consensus::Decodable; -use tracing::info; +use tracing::{debug, info}; use super::JobDeclaratorDownstream; @@ -61,6 +64,10 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { } fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result { + if let Some(old_mining_job) = self.declared_mining_job.0.take() { + clear_declared_mining_job(old_mining_job, &message, self.mempool.clone())?; + } + // the transactions that are present in the mempool are stored here, that is sent to the // mempool which use the rpc client to retrieve the whole data for each transaction. // The unknown transactions is a vector that contains the transactions that are not in the @@ -220,3 +227,66 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { Ok(SendTo::None(Some(m))) } } + +fn clear_declared_mining_job( + old_mining_job: DeclareMiningJob, + new_mining_job: &DeclareMiningJob, + mempool: Arc>, +) -> Result<(), Error> { + let old_transactions = old_mining_job.tx_short_hash_list.inner_as_ref(); + let new_transactions = new_mining_job.tx_short_hash_list.inner_as_ref(); + + if old_transactions.is_empty() { + info!("No transactions to remove from mempool"); + return Ok(()); + } + + let nonce = old_mining_job.tx_short_hash_nonce; + + let result = mempool + .safe_lock(|mempool_| -> Result<(), Error> { + let short_ids_map = mempool_ + .to_short_ids(nonce) + .ok_or(Error::JDSMissingTransactions)?; + + for short_id in old_transactions + .iter() + .filter(|&id| !new_transactions.contains(id)) + { + if let Some(transaction_with_hash) = short_ids_map.get(*short_id) { + let txid = transaction_with_hash.id; + match mempool_.mempool.get_mut(&txid) { + Some(Some((_transaction, counter))) => { + if *counter > 1 { + *counter -= 1; + debug!( + "Fat transaction {:?} counter decremented; job id {:?} dropped", + txid, old_mining_job.request_id + ); + } else { + mempool_.mempool.remove(&txid); + debug!( + "Fat transaction {:?} with job id {:?} removed from mempool", + txid, old_mining_job.request_id + ); + } + } + Some(None) => debug!( + "Thin transaction {:?} with job id {:?} removed from mempool", + txid, old_mining_job.request_id + ), + None => {} + } + } else { + debug!( + "Transaction with short id {:?} not found in mempool for old jobs", + short_id + ); + } + } + Ok(()) + }) + .map_err(|e| Error::PoisonLock(e.to_string()))?; + + result.map_err(|err| Error::PoisonLock(err.to_string())) +} diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index 9cf0d5ab9..9e775bc92 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -145,7 +145,7 @@ impl JobDeclaratorDownstream { .ok_or(Box::new(JdsError::ImpossibleToReconstructBlock( "Txid found in jds mempool but transactions not present".to_string(), )))?; - transactions_list.push(tx); + transactions_list.push(tx.0); } else { return Err(Box::new(JdsError::ImpossibleToReconstructBlock( "Unknown transaction".to_string(), diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index a9aa8ecc9..0541593b4 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -12,12 +12,12 @@ use stratum_common::{bitcoin, bitcoin::hash_types::Txid}; #[derive(Clone, Debug)] pub struct TransactionWithHash { pub id: Txid, - pub tx: Option, + pub tx: Option<(Transaction, u32)>, } #[derive(Clone, Debug)] pub struct JDsMempool { - pub mempool: HashMap>, + pub mempool: HashMap>, auth: mini_rpc_client::Auth, url: String, new_block_receiver: Receiver, @@ -50,7 +50,7 @@ impl JDsMempool { new_block_receiver: Receiver, ) -> Self { let auth = mini_rpc_client::Auth::new(username, password); - let empty_mempool: HashMap> = HashMap::new(); + let empty_mempool: HashMap> = HashMap::new(); JDsMempool { mempool: empty_mempool, auth, @@ -82,42 +82,67 @@ impl JDsMempool { .get_raw_transaction(&txid.to_string(), None) .await .map_err(JdsMempoolError::Rpc)?; - let _ = - self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + let _ = self_.safe_lock(|a| { + a.mempool + .entry(transaction.txid()) + .and_modify(|entry| { + if let Some((_, count)) = entry { + *count += 1; + } else { + *entry = Some((transaction.clone(), 1)); + } + }) + .or_insert(Some((transaction, 1))); + }); } } // fill in the mempool the transactions given in input for transaction in transactions { - let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + let _ = self_.safe_lock(|a| { + a.mempool + .entry(transaction.txid()) + .and_modify(|entry| { + if let Some((_, count)) = entry { + *count += 1; + } else { + *entry = Some((transaction.clone(), 1)); + } + }) + .or_insert(Some((transaction, 1))); + }); } Ok(()) } pub async fn update_mempool(self_: Arc>) -> Result<(), JdsMempoolError> { - let mut mempool_ordered: HashMap> = HashMap::new(); - let client = self_ .safe_lock(|x| x.get_client())? .ok_or(JdsMempoolError::NoClient)?; - let mempool: Vec = client.get_raw_mempool().await?; - for id in &mempool { - let key_id = Txid::from_str(id) - .map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))?; + let mempool = client.get_raw_mempool().await?; - let tx = self_.safe_lock(|x| match x.mempool.get(&key_id) { - Some(entry) => entry.clone(), - None => None, - })?; + let raw_mempool_txids: Result, _> = mempool + .into_iter() + .map(|id| { + Txid::from_str(&id) + .map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string()))) + }) + .collect(); - mempool_ordered.insert(key_id, tx); - } + let raw_mempool_txids = raw_mempool_txids?; + + // Holding the lock till the light mempool updation is complete. + let is_mempool_empty = self_.safe_lock(|x| { + raw_mempool_txids.iter().for_each(|txid| { + x.mempool.entry(*txid).or_insert(None); + }); + x.mempool.is_empty() + })?; - if mempool_ordered.is_empty() { + if is_mempool_empty { Err(JdsMempoolError::EmptyMempool) } else { - let _ = self_.safe_lock(|x| x.mempool = mempool_ordered); Ok(()) } }