Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose rollup transaction statuses over sequencer RPC #1243

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 108 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions full-node/sov-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,27 @@ resolver = "2"
[dependencies]
anyhow = { workspace = true }
borsh = { workspace = true }
dashmap = "5"
hex = { workspace = true }
jsonrpsee = { workspace = true, features = ["client", "server"] }
mini-moka = "0.10"
serde = { workspace = true, features = ["derive"] }
tracing = { workspace = true }
tokio = { workspace = true }
sov-rollup-interface = { path = "../../rollup-interface", version = "0.3" }
sov-modules-api = { path = "../../module-system/sov-modules-api", version = "0.3", features = ["native"] }
sov-state = { path = "../../module-system/sov-state", version = "0.3" }


[dev-dependencies]
tempfile = { workspace = true }
demo-stf = { path = "../../examples/demo-rollup/stf", features = ["native"] }
rand = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }
sov-data-generators = { path = "../../module-system/utils/sov-data-generators" }
sov-value-setter = { path = "../../module-system/module-implementations/examples/sov-value-setter", features = ["native"] }
sov-rollup-interface = { path = "../../rollup-interface", version = "0.3", features = ["native"] }
sov-mock-da = { path = "../../adapters/mock-da", features = ["native"] }
sov-prover-storage-manager = { path = "../../full-node/sov-prover-storage-manager", features = ["test-utils"] }
sov-schema-db = { path = "../db/sov-schema-db" }
sov-db = { path = "../db/sov-db" }
tempfile = { workspace = true }
69 changes: 51 additions & 18 deletions full-node/sov-sequencer/src/batch_builder.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::collections::VecDeque;
//! Concrete implementation(s) of [`BatchBuilder`].

use std::collections::{HashSet, VecDeque};
use std::io::Cursor;

use anyhow::{bail, Context as ErrorContext};
use borsh::BorshDeserialize;
use sov_modules_api::digest::Digest;
use sov_modules_api::transaction::Transaction;
use sov_modules_api::{Context, DispatchCall, PublicKey, Spec, WorkingSet};
use sov_rollup_interface::services::batch_builder::BatchBuilder;
use sov_rollup_interface::services::batch_builder::{BatchBuilder, TxWithHash};
use tracing::{info, warn};

use crate::TxHash;

/// Transaction stored in the mempool.
pub struct PooledTransaction<C: Context, R: DispatchCall<Context = C>> {
/// Raw transaction bytes.
Expand Down Expand Up @@ -37,7 +41,7 @@ where
C: Context,
R: DispatchCall<Context = C>,
{
fn calculate_hash(&self) -> [u8; 32] {
fn calculate_hash(&self) -> TxHash {
<C as Spec>::Hasher::digest(&self.raw[..]).into()
}
}
Expand All @@ -46,6 +50,8 @@ where
/// Only transactions that were successfully dispatched are included.
pub struct FiFoStrictBatchBuilder<C: Context, R: DispatchCall<Context = C>> {
mempool: VecDeque<PooledTransaction<C, R>>,
// Makes it cheap to check if transaction is already in the mempool.
mempool_hashes: HashSet<TxHash>,
mempool_max_txs_count: usize,
runtime: R,
max_batch_size_bytes: usize,
Expand All @@ -68,6 +74,7 @@ where
) -> Self {
Self {
mempool: VecDeque::new(),
mempool_hashes: HashSet::new(),
mempool_max_txs_count,
max_batch_size_bytes,
runtime,
Expand All @@ -87,7 +94,13 @@ where
/// The transaction is discarded if:
/// - mempool is full
/// - transaction is invalid (deserialization, verification or decoding of the runtime message failed)
fn accept_tx(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
fn accept_tx(&mut self, raw: Vec<u8>) -> anyhow::Result<TxHash> {
assert_eq!(
self.mempool.len(),
self.mempool_hashes.len(),
"Mempool invariant violated, the variables got out of sync. This is a bug!"
);

if self.mempool.len() >= self.mempool_max_txs_count {
bail!("Mempool is full")
}
Expand All @@ -112,22 +125,41 @@ where
.map_err(anyhow::Error::new)
.context("Failed to decode message in transaction")?;

self.mempool.push_back(PooledTransaction {
let tx = PooledTransaction {
raw,
tx,
msg: Some(msg),
});
Ok(())
};

let hash = tx.calculate_hash();
self.mempool.push_back(tx);
self.mempool_hashes.insert(hash);

Ok(hash)
}

fn contains(&self, hash: &TxHash) -> bool {
self.mempool_hashes.contains(hash)
}

/// Builds a new batch of valid transactions in order they were added to mempool
/// Only transactions, which are dispatched successfully are included in the batch
fn get_next_blob(&mut self) -> anyhow::Result<Vec<Vec<u8>>> {
fn get_next_blob(&mut self) -> anyhow::Result<Vec<TxWithHash>> {
let mut working_set = WorkingSet::new(self.current_storage.clone());
let mut txs = Vec::new();
let mut current_batch_size = 0;

while let Some(mut pooled) = self.mempool.pop_front() {
// In order to fill batch as big as possible, we only check if valid
// tx can fit in the batch.
let tx_len = pooled.raw.len();
if current_batch_size + tx_len > self.max_batch_size_bytes {
self.mempool.push_front(pooled);
break;
}

self.mempool_hashes.remove(&pooled.calculate_hash());

// Take the decoded runtime message cached upon accepting transaction
// into the pool or attempt to decode the message again if
// the transaction was previously executed,
Expand All @@ -150,22 +182,18 @@ where
}
}

// In order to fill batch as big as possible, we only check if valid tx can fit in the batch.
let tx_len = pooled.raw.len();
if current_batch_size + tx_len > self.max_batch_size_bytes {
self.mempool.push_front(pooled);
break;
}

// Update size of current batch
current_batch_size += tx_len;

let tx_hash: [u8; 32] = pooled.calculate_hash();
let tx_hash = pooled.calculate_hash();
info!(
hash = hex::encode(tx_hash),
"Transaction has been included in the batch",
);
txs.push(pooled.raw);
txs.push(TxWithHash {
raw_tx: pooled.raw,
hash: tx_hash,
});
}

if txs.is_empty() {
Expand Down Expand Up @@ -462,7 +490,12 @@ mod tests {

let build_result = batch_builder.get_next_blob();
assert!(build_result.is_ok());
let blob = build_result.unwrap();
let blob = build_result
.unwrap()
.iter()
// We discard hashes for the sake of comparison
.map(|t| t.raw_tx.clone())
.collect::<Vec<_>>();
assert_eq!(2, blob.len());
assert!(blob.contains(&txs[0]));
assert!(blob.contains(&txs[2]));
Expand Down
Loading
Loading