Skip to content

Commit

Permalink
Merge pull request #1239 from Shourya742/2024-10-missing-transaction-…
Browse files Browse the repository at this point in the history
…mempool

Fix missing transaction in light mempool bug
  • Loading branch information
rrybarczyk authored Nov 13, 2024
2 parents 66b1807 + 7cc0424 commit dee3cae
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ core_rpc_pass = "password"
# Time interval used for JDS mempool update
[mempool_update_interval]
unit = "secs"
value = 1
value = 0.1
74 changes: 72 additions & 2 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use roles_logic_sv2::{
ProvideMissingTransactions, ProvideMissingTransactionsSuccess, SubmitSolutionJd,
},
parsers::JobDeclaration,
utils::Mutex,
};
use std::{convert::TryInto, io::Cursor};
use std::{convert::TryInto, io::Cursor, sync::Arc};
use stratum_common::bitcoin::{Transaction, Txid};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use crate::mempool::JDsMempool;

use super::{signed_token, TransactionState};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;
use tracing::info;
use tracing::{debug, info};

use super::JobDeclaratorDownstream;

Expand Down Expand Up @@ -61,6 +64,10 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
}

fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result<SendTo, Error> {
if let Some(old_mining_job) = self.declared_mining_job.0.take() {
clear_declared_mining_job(old_mining_job, &message, self.mempool.clone())?;
}

// the transactions that are present in the mempool are stored here, that is sent to the
// mempool which use the rpc client to retrieve the whole data for each transaction.
// The unknown transactions is a vector that contains the transactions that are not in the
Expand Down Expand Up @@ -220,3 +227,66 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
Ok(SendTo::None(Some(m)))
}
}

fn clear_declared_mining_job(
old_mining_job: DeclareMiningJob,
new_mining_job: &DeclareMiningJob,
mempool: Arc<Mutex<JDsMempool>>,
) -> Result<(), Error> {
let old_transactions = old_mining_job.tx_short_hash_list.inner_as_ref();
let new_transactions = new_mining_job.tx_short_hash_list.inner_as_ref();

if old_transactions.is_empty() {
info!("No transactions to remove from mempool");
return Ok(());
}

let nonce = old_mining_job.tx_short_hash_nonce;

let result = mempool
.safe_lock(|mempool_| -> Result<(), Error> {
let short_ids_map = mempool_
.to_short_ids(nonce)
.ok_or(Error::JDSMissingTransactions)?;

for short_id in old_transactions
.iter()
.filter(|&id| !new_transactions.contains(id))
{
if let Some(transaction_with_hash) = short_ids_map.get(*short_id) {
let txid = transaction_with_hash.id;
match mempool_.mempool.get_mut(&txid) {
Some(Some((_transaction, counter))) => {
if *counter > 1 {
*counter -= 1;
debug!(
"Fat transaction {:?} counter decremented; job id {:?} dropped",
txid, old_mining_job.request_id
);
} else {
mempool_.mempool.remove(&txid);
debug!(
"Fat transaction {:?} with job id {:?} removed from mempool",
txid, old_mining_job.request_id
);
}
}
Some(None) => debug!(
"Thin transaction {:?} with job id {:?} removed from mempool",
txid, old_mining_job.request_id
),
None => {}
}
} else {
debug!(
"Transaction with short id {:?} not found in mempool for old jobs",
short_id
);
}
}
Ok(())
})
.map_err(|e| Error::PoisonLock(e.to_string()))?;

result.map_err(|err| Error::PoisonLock(err.to_string()))
}
2 changes: 1 addition & 1 deletion roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl JobDeclaratorDownstream {
.ok_or(Box::new(JdsError::ImpossibleToReconstructBlock(
"Txid found in jds mempool but transactions not present".to_string(),
)))?;
transactions_list.push(tx);
transactions_list.push(tx.0);
} else {
return Err(Box::new(JdsError::ImpossibleToReconstructBlock(
"Unknown transaction".to_string(),
Expand Down
65 changes: 45 additions & 20 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use stratum_common::{bitcoin, bitcoin::hash_types::Txid};
#[derive(Clone, Debug)]
pub struct TransactionWithHash {
pub id: Txid,
pub tx: Option<Transaction>,
pub tx: Option<(Transaction, u32)>,
}

#[derive(Clone, Debug)]
pub struct JDsMempool {
pub mempool: HashMap<Txid, Option<Transaction>>,
pub mempool: HashMap<Txid, Option<(Transaction, u32)>>,
auth: mini_rpc_client::Auth,
url: String,
new_block_receiver: Receiver<String>,
Expand Down Expand Up @@ -50,7 +50,7 @@ impl JDsMempool {
new_block_receiver: Receiver<String>,
) -> Self {
let auth = mini_rpc_client::Auth::new(username, password);
let empty_mempool: HashMap<Txid, Option<Transaction>> = HashMap::new();
let empty_mempool: HashMap<Txid, Option<(Transaction, u32)>> = HashMap::new();
JDsMempool {
mempool: empty_mempool,
auth,
Expand Down Expand Up @@ -82,42 +82,67 @@ impl JDsMempool {
.get_raw_transaction(&txid.to_string(), None)
.await
.map_err(JdsMempoolError::Rpc)?;
let _ =
self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction)));
let _ = self_.safe_lock(|a| {
a.mempool
.entry(transaction.txid())
.and_modify(|entry| {
if let Some((_, count)) = entry {
*count += 1;
} else {
*entry = Some((transaction.clone(), 1));
}
})
.or_insert(Some((transaction, 1)));
});
}
}

// fill in the mempool the transactions given in input
for transaction in transactions {
let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction)));
let _ = self_.safe_lock(|a| {
a.mempool
.entry(transaction.txid())
.and_modify(|entry| {
if let Some((_, count)) = entry {
*count += 1;
} else {
*entry = Some((transaction.clone(), 1));
}
})
.or_insert(Some((transaction, 1)));
});
}
Ok(())
}

pub async fn update_mempool(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
let mut mempool_ordered: HashMap<Txid, Option<Transaction>> = HashMap::new();

let client = self_
.safe_lock(|x| x.get_client())?
.ok_or(JdsMempoolError::NoClient)?;

let mempool: Vec<String> = client.get_raw_mempool().await?;
for id in &mempool {
let key_id = Txid::from_str(id)
.map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))?;
let mempool = client.get_raw_mempool().await?;

let tx = self_.safe_lock(|x| match x.mempool.get(&key_id) {
Some(entry) => entry.clone(),
None => None,
})?;
let raw_mempool_txids: Result<Vec<Txid>, _> = mempool
.into_iter()
.map(|id| {
Txid::from_str(&id)
.map_err(|err| JdsMempoolError::Rpc(RpcError::Deserialization(err.to_string())))
})
.collect();

mempool_ordered.insert(key_id, tx);
}
let raw_mempool_txids = raw_mempool_txids?;

// Holding the lock till the light mempool updation is complete.
let is_mempool_empty = self_.safe_lock(|x| {
raw_mempool_txids.iter().for_each(|txid| {
x.mempool.entry(*txid).or_insert(None);
});
x.mempool.is_empty()
})?;

if mempool_ordered.is_empty() {
if is_mempool_empty {
Err(JdsMempoolError::EmptyMempool)
} else {
let _ = self_.safe_lock(|x| x.mempool = mempool_ordered);
Ok(())
}
}
Expand Down

0 comments on commit dee3cae

Please sign in to comment.