Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
neysofu committed Dec 21, 2023
1 parent 63b8fc8 commit 8e48117
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 278 deletions.
111 changes: 32 additions & 79 deletions full-node/sov-sequencer/src/batch_builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Concrete implementations of [`BatchBuilder`].

use std::collections::{HashSet, VecDeque};
use std::io::Cursor;

Expand All @@ -9,7 +11,7 @@ use sov_modules_api::{Context, DispatchCall, PublicKey, Spec, WorkingSet};
use sov_rollup_interface::services::batch_builder::{BatchBuilder, TxWithHash};
use tracing::{info, warn};

use crate::{HexString, TxHash};
use crate::TxHash;

/// Transaction stored in the mempool.
pub struct PooledTransaction<C: Context, R: DispatchCall<Context = C>> {
Expand Down Expand Up @@ -44,71 +46,12 @@ where
}
}

struct Mempool<C, R>
where
C: Context,
R: DispatchCall<Context = C>,
{
txs: VecDeque<PooledTransaction<C, R>>,
tx_hashes: HashSet<HexString>,
}

impl<C, R> Mempool<C, R>
where
C: Context,
R: DispatchCall<Context = C>,
{
pub fn new() -> Self {
Self {
txs: VecDeque::new(),
tx_hashes: HashSet::new(),
}
}

pub fn len(&self) -> usize {
self.txs.len()
}

pub fn push_back(&mut self, tx: PooledTransaction<C, R>) {
debug_assert_eq!(
self.txs.len(),
self.tx_hashes.len(),
"Mempool invariant violated: mempool txs and tx_hashes have different lengths. This is a bug!"
);

let tx_hash = tx.calculate_hash();

self.tx_hashes.insert(HexString(tx_hash));
self.txs.push_back(tx);
}

pub fn push_front(&mut self, tx: PooledTransaction<C, R>) {
debug_assert_eq!(
self.txs.len(),
self.tx_hashes.len(),
"Mempool invariant violated: mempool txs and tx_hashes have different lengths. This is a bug!"
);

let tx_hash = tx.calculate_hash();

self.tx_hashes.insert(HexString(tx_hash));
self.txs.push_front(tx);
}

pub fn pop(&mut self) -> Option<PooledTransaction<C, R>> {
let tx = self.txs.pop_front()?;

let tx_hash = tx.calculate_hash();
self.tx_hashes.remove(&HexString(tx_hash));

Some(tx)
}
}

/// BatchBuilder that creates batches of transactions in the order they were submitted
/// Only transactions that were successfully dispatched are included.
pub struct FiFoStrictBatchBuilder<C: Context, R: DispatchCall<Context = C>> {
mempool: Mempool<C, R>,
txs: VecDeque<PooledTransaction<C, R>>,
// Makes it cheap to check if transaction is already in the mempool.
tx_hashes: HashSet<TxHash>,
mempool_max_txs_count: usize,
runtime: R,
max_batch_size_bytes: usize,
Expand All @@ -130,7 +73,8 @@ where
sequencer: C::Address,
) -> Self {
Self {
mempool: Mempool::new(),
txs: VecDeque::new(),
tx_hashes: HashSet::new(),
mempool_max_txs_count,
max_batch_size_bytes,
runtime,
Expand All @@ -145,15 +89,19 @@ where
C: Context,
R: DispatchCall<Context = C>,
{
type TxHash = HexString;

/// Attempt to add transaction to the mempool.
///
/// The transaction is discarded if:
/// - mempool is full
/// - transaction is invalid (deserialization, verification or decoding of the runtime message failed)
fn accept_tx(&mut self, raw: Vec<u8>) -> anyhow::Result<HexString> {
if self.mempool.len() >= self.mempool_max_txs_count {
fn accept_tx(&mut self, raw: Vec<u8>) -> anyhow::Result<TxHash> {
debug_assert_eq!(
self.txs.len(),
self.tx_hashes.len(),
"Mempool invariant violated: txs and tx_hashes got out of sync because they have different lengths. This is a bug!"
);

if self.txs.len() >= self.mempool_max_txs_count {
bail!("Mempool is full")
}

Expand Down Expand Up @@ -182,30 +130,35 @@ where
tx,
msg: Some(msg),
};

let hash = tx.calculate_hash();
self.mempool.push_back(tx);
Ok(HexString(hash))
self.txs.push_back(tx);
self.tx_hashes.insert(hash);

Ok(hash)
}

fn contains(&self, hash: &Self::TxHash) -> bool {
self.mempool.tx_hashes.contains(hash)
fn contains(&self, hash: &TxHash) -> bool {
self.tx_hashes.contains(hash)
}

/// Builds a new batch of valid transactions in order they were added to mempool
/// Only transactions, which are dispatched successfully are included in the batch
fn get_next_blob(&mut self) -> anyhow::Result<Vec<TxWithHash<HexString>>> {
fn get_next_blob(&mut self) -> anyhow::Result<Vec<TxWithHash>> {
let mut working_set = WorkingSet::new(self.current_storage.clone());
let mut txs = Vec::new();
let mut current_batch_size = 0;

while let Some(mut pooled) = self.mempool.pop() {
while let Some(mut pooled) = self.txs.pop_front() {
// In order to fill batch as big as possible, we only check if valid tx can fit in the batch.
let tx_len = pooled.raw.len();
if current_batch_size + tx_len > self.max_batch_size_bytes {
self.mempool.push_front(pooled);
self.txs.push_front(pooled);
break;
}

self.tx_hashes.remove(&pooled.calculate_hash());

// Take the decoded runtime message cached upon accepting transaction
// into the pool or attempt to decode the message again if
// the transaction was previously executed,
Expand Down Expand Up @@ -238,7 +191,7 @@ where
);
txs.push(TxWithHash {
raw_tx: pooled.raw,
hash: HexString(tx_hash),
hash: tx_hash,
});
}

Expand Down Expand Up @@ -483,7 +436,7 @@ mod tests {
batch_builder.accept_tx(tx.clone()).unwrap();
}

assert_eq!(txs.len(), batch_builder.mempool.len());
assert_eq!(txs.len(), batch_builder.txs.len());

let build_result = batch_builder.get_next_blob();
assert!(build_result.is_err());
Expand Down Expand Up @@ -516,7 +469,7 @@ mod tests {
batch_builder.accept_tx(tx.clone()).unwrap();
}

assert_eq!(txs.len(), batch_builder.mempool.len());
assert_eq!(txs.len(), batch_builder.txs.len());

let build_result = batch_builder.get_next_blob();
assert!(build_result.is_ok());
Expand All @@ -530,7 +483,7 @@ mod tests {
assert!(blob.contains(&txs[0]));
assert!(blob.contains(&txs[2]));
assert!(!blob.contains(&txs[3]));
assert_eq!(1, batch_builder.mempool.len());
assert_eq!(1, batch_builder.txs.len());
}
}
}
Loading

0 comments on commit 8e48117

Please sign in to comment.