From 5d68a6b681d1cb19b9ba95299489b9fc46f10847 Mon Sep 17 00:00:00 2001 From: Filippo Costa Date: Wed, 20 Dec 2023 14:53:36 +0100 Subject: [PATCH] WIP Signed-off-by: Filippo Costa --- Cargo.lock | 109 +++++++++++++- full-node/sov-sequencer/Cargo.toml | 3 +- full-node/sov-sequencer/src/batch_builder.rs | 120 ++++++++++++--- full-node/sov-sequencer/src/lib.rs | 140 ++++++++++++++++-- module-system/sov-modules-macros/target-path | 1 + .../sov-modules-macros/target-path-trybuild | 1 + .../src/node/services/batch_builder.rs | 23 ++- 7 files changed, 357 insertions(+), 40 deletions(-) create mode 100644 module-system/sov-modules-macros/target-path create mode 100644 module-system/sov-modules-macros/target-path-trybuild diff --git a/Cargo.lock b/Cargo.lock index 7d437d530..89829ac63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,6 +1161,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" +[[package]] +name = "bytecount" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205" + [[package]] name = "bytemuck" version = "1.14.0" @@ -1262,6 +1268,19 @@ dependencies = [ "serde", ] +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver 1.0.20", + "serde", + "serde_json", +] + [[package]] name = "cargo_metadata" version = "0.17.0" @@ -1796,6 +1815,16 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7059fff8937831a9ae6f0fe4d658ffabf58f2ca96aa9dec1c889f936f705f216" +[[package]] +name = "crossbeam-channel" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c3242926edf34aec4ac3a77108ad4854bffaa2e4ddc1824124ce59231302d5" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -1822,9 +1851,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.16" +version = "0.8.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +checksum = "c06d96137f14f244c37f989d9fff8f95e6c18b918e71f36638f8c49112e4c78f" dependencies = [ "cfg-if", ] @@ -2759,6 +2788,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "eth-keystore" version = "0.5.0" @@ -2924,7 +2962,7 @@ checksum = "c0a17f0708692024db9956b31d7a20163607d2745953f5ae8125ab368ba280ad" dependencies = [ "arrayvec 0.7.4", "bytes", - "cargo_metadata", + "cargo_metadata 0.17.0", "chrono", "const-hex", "elliptic-curve", @@ -5138,6 +5176,21 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mini-moka" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23e0b72e7c9042467008b10279fc732326bd605459ae03bda88825909dd19b56" +dependencies = [ + "crossbeam-channel", + "crossbeam-utils", + "dashmap", + "skeptic", + "smallvec 1.11.2", + "tagptr", + "triomphe", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -6429,6 +6482,17 @@ dependencies = [ "trust-dns-proto", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a1a2f1f0a7ecff9c31abbe177637be0e97a0aef46cf8738ece09327985d998" +dependencies = [ + "bitflags 1.3.2", + "memchr", + "unicase", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -7280,7 +7344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e58d4cc25e243e52d1ccd75d357b0aa55081736bf3052c65a823fdf169586843" dependencies = [ "anyhow", - "cargo_metadata", + "cargo_metadata 0.17.0", "docker-generate", "risc0-binfmt", "risc0-zkp", @@ -8343,6 +8407,21 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata 0.14.2", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.9" @@ -9130,6 +9209,7 @@ dependencies = [ "borsh", "hex", "jsonrpsee 0.20.3", + "mini-moka", "rand 0.8.5", "serde", "sov-db", @@ -10198,6 +10278,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -10889,6 +10975,12 @@ dependencies = [ "rlp", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "trust-dns-client" version = "0.20.4" @@ -11050,6 +11142,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" diff --git a/full-node/sov-sequencer/Cargo.toml b/full-node/sov-sequencer/Cargo.toml index bf2b71932..775220505 100644 --- a/full-node/sov-sequencer/Cargo.toml +++ b/full-node/sov-sequencer/Cargo.toml @@ -17,13 +17,14 @@ anyhow = { workspace = true } borsh = { workspace = true } hex = { workspace = true } jsonrpsee = { workspace = true, features = ["client", "server"] } +mini-moka = "0.10" serde = { workspace = true, features = ["derive"] } tracing = { workspace = true } +tokio = { workspace = true } sov-rollup-interface = { path = "../../rollup-interface", version = "0.3" } sov-modules-api = { path = "../../module-system/sov-modules-api", version = "0.3", features = ["native"] } sov-state = { path = "../../module-system/sov-state", version = "0.3" } - [dev-dependencies] tempfile = { workspace = true } rand = { workspace = true } diff --git a/full-node/sov-sequencer/src/batch_builder.rs b/full-node/sov-sequencer/src/batch_builder.rs index cef44904a..6f657123e 100644 --- a/full-node/sov-sequencer/src/batch_builder.rs +++ b/full-node/sov-sequencer/src/batch_builder.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::io::Cursor; use anyhow::{bail, Context as ErrorContext}; @@ -6,9 +6,11 @@ use borsh::BorshDeserialize; use sov_modules_api::digest::Digest; use sov_modules_api::transaction::Transaction; use sov_modules_api::{Context, DispatchCall, PublicKey, Spec, WorkingSet}; -use sov_rollup_interface::services::batch_builder::BatchBuilder; +use sov_rollup_interface::services::batch_builder::{BatchBuilder, TxWithHash}; use tracing::{info, warn}; +use crate::{HexString, TxHash}; + /// Transaction stored in the mempool. pub struct PooledTransaction> { /// Raw transaction bytes. @@ -37,15 +39,76 @@ where C: Context, R: DispatchCall, { - fn calculate_hash(&self) -> [u8; 32] { + fn calculate_hash(&self) -> TxHash { ::Hasher::digest(&self.raw[..]).into() } } +struct Mempool +where + C: Context, + R: DispatchCall, +{ + txs: VecDeque>, + tx_hashes: HashSet, +} + +impl Mempool +where + C: Context, + R: DispatchCall, +{ + pub fn new() -> Self { + Self { + txs: VecDeque::new(), + tx_hashes: HashSet::new(), + } + } + + pub fn len(&self) -> usize { + self.txs.len() + } + + pub fn push_back(&mut self, tx: PooledTransaction) { + debug_assert_eq!( + self.txs.len(), + self.tx_hashes.len(), + "Mempool invariant violated: mempool txs and tx_hashes have different lengths. This is a bug!" + ); + + let tx_hash = tx.calculate_hash(); + + self.tx_hashes.insert(tx_hash); + self.txs.push_back(tx); + } + + pub fn push_front(&mut self, tx: PooledTransaction) { + debug_assert_eq!( + self.txs.len(), + self.tx_hashes.len(), + "Mempool invariant violated: mempool txs and tx_hashes have different lengths. This is a bug!" + ); + + let tx_hash = tx.calculate_hash(); + + self.tx_hashes.insert(tx_hash); + self.txs.push_front(tx); + } + + pub fn pop(&mut self) -> Option> { + let tx = self.txs.pop_front()?; + + let tx_hash = tx.calculate_hash(); + self.tx_hashes.remove(&tx_hash); + + Some(tx) + } +} + /// BatchBuilder that creates batches of transactions in the order they were submitted /// Only transactions that were successfully dispatched are included. pub struct FiFoStrictBatchBuilder> { - mempool: VecDeque>, + mempool: Mempool, mempool_max_txs_count: usize, runtime: R, max_batch_size_bytes: usize, @@ -67,7 +130,7 @@ where sequencer: C::Address, ) -> Self { Self { - mempool: VecDeque::new(), + mempool: Mempool::new(), mempool_max_txs_count, max_batch_size_bytes, runtime, @@ -82,12 +145,14 @@ where C: Context, R: DispatchCall, { + type TxHash = HexString; + /// Attempt to add transaction to the mempool. /// /// The transaction is discarded if: /// - mempool is full /// - transaction is invalid (deserialization, verification or decoding of the runtime message failed) - fn accept_tx(&mut self, raw: Vec) -> anyhow::Result<()> { + fn accept_tx(&mut self, raw: Vec) -> anyhow::Result { if self.mempool.len() >= self.mempool_max_txs_count { bail!("Mempool is full") } @@ -112,22 +177,34 @@ where .map_err(anyhow::Error::new) .context("Failed to decode message in transaction")?; - self.mempool.push_back(PooledTransaction { + let tx = PooledTransaction { raw, tx, msg: Some(msg), - }); - Ok(()) + }; + self.mempool.push_back(tx); + Ok(tx.calculate_hash()) + } + + fn contains(&self, hash: &Self::TxHash) -> bool { + self.mempool.tx_hashes.contains(hash) } /// Builds a new batch of valid transactions in order they were added to mempool /// Only transactions, which are dispatched successfully are included in the batch - fn get_next_blob(&mut self) -> anyhow::Result>> { + fn get_next_blob(&mut self) -> anyhow::Result>> { let mut working_set = WorkingSet::new(self.current_storage.clone()); let mut txs = Vec::new(); let mut current_batch_size = 0; - while let Some(mut pooled) = self.mempool.pop_front() { + while let Some(mut pooled) = self.mempool.pop() { + // In order to fill batch as big as possible, we only check if valid tx can fit in the batch. + let tx_len = pooled.raw.len(); + if current_batch_size + tx_len > self.max_batch_size_bytes { + self.mempool.push_front(pooled); + break; + } + // Take the decoded runtime message cached upon accepting transaction // into the pool or attempt to decode the message again if // the transaction was previously executed, @@ -150,22 +227,18 @@ where } } - // In order to fill batch as big as possible, we only check if valid tx can fit in the batch. - let tx_len = pooled.raw.len(); - if current_batch_size + tx_len > self.max_batch_size_bytes { - self.mempool.push_front(pooled); - break; - } - // Update size of current batch current_batch_size += tx_len; - let tx_hash: [u8; 32] = pooled.calculate_hash(); + let tx_hash = pooled.calculate_hash(); info!( hash = hex::encode(tx_hash), "Transaction has been included in the batch", ); - txs.push(pooled.raw); + txs.push(TxWithHash { + raw_tx: pooled.raw, + hash: tx_hash, + }); } if txs.is_empty() { @@ -446,7 +519,12 @@ mod tests { let build_result = batch_builder.get_next_blob(); assert!(build_result.is_ok()); - let blob = build_result.unwrap(); + let blob = build_result + .unwrap() + .iter() + // We discard hashes for the sake of comparison + .map(|t| t.raw_tx) + .collect::>(); assert_eq!(2, blob.len()); assert!(blob.contains(&txs[0])); assert!(blob.contains(&txs[2])); diff --git a/full-node/sov-sequencer/src/lib.rs b/full-node/sov-sequencer/src/lib.rs index df422ecf6..bea67eeff 100644 --- a/full-node/sov-sequencer/src/lib.rs +++ b/full-node/sov-sequencer/src/lib.rs @@ -1,5 +1,6 @@ #![deny(missing_docs)] #![doc = include_str!("../README.md")] +use std::str::FromStr; use std::sync::Mutex; /// Concrete implementations of `[BatchBuilder]` @@ -10,22 +11,48 @@ pub mod utils; use anyhow::anyhow; use jsonrpsee::types::ErrorObjectOwned; use jsonrpsee::RpcModule; +use mini_moka::sync::Cache as MokaCache; use sov_modules_api::utils::to_jsonrpsee_error_object; use sov_rollup_interface::services::batch_builder::BatchBuilder; use sov_rollup_interface::services::da::DaService; +use tokio::sync::broadcast; + +type TxHash = [u8; 32]; const SEQUENCER_RPC_ERROR: &str = "SEQUENCER_RPC_ERROR"; +const MOKA_DEFAULT_MAX_CAPACITY: u64 = 100; /// Single data structure that manages mempool and batch producing. pub struct Sequencer { + tx_status_updates_sender: broadcast::Sender>, + tx_status_updates_receiver: broadcast::Receiver>, + tx_statuses_cache: MokaCache, batch_builder: Mutex, da_service: T, } -impl Sequencer { +impl Sequencer +where + B: BatchBuilder + Send + Sync, + T: DaService + Send + Sync, +{ /// Creates new Sequencer from BatchBuilder and DaService pub fn new(batch_builder: B, da_service: T) -> Self { + let tx_statuses_cache = MokaCache::new(MOKA_DEFAULT_MAX_CAPACITY); + let tx_statuses_cache_clone = tx_statuses_cache.clone(); + + let (tx_status_updates_sender, tx_status_updates_receiver) = broadcast::channel(100); + let recv = tx_status_updates_receiver.resubscribe(); + tokio::spawn(async move { + while let Ok(TxStatusUpdate { tx_hash, status }) = recv.recv().await { + tx_statuses_cache_clone.insert(tx_hash, status); + } + }); + Self { + tx_status_updates_sender, + tx_status_updates_receiver, + tx_statuses_cache: MokaCache::new(MOKA_DEFAULT_MAX_CAPACITY), batch_builder: Mutex::new(batch_builder), da_service, } @@ -37,15 +64,25 @@ impl Sequencer // It can be improved with atomics, // so a new batch is only created after previous was submitted. tracing::info!("Submit batch request has been received!"); - let blob = { - let mut batch_builder = self - .batch_builder - .lock() - .map_err(|e| anyhow!("failed to lock mempool: {}", e.to_string()))?; - batch_builder.get_next_blob()? - }; + let mut batch_builder = self + .batch_builder + .lock() + .map_err(|e| anyhow!("failed to lock mempool: {}", e.to_string()))?; + let blob = batch_builder.get_next_blob()?; let num_txs = blob.len(); - let blob: Vec = borsh::to_vec(&blob)?; + let (blob, tx_hashes) = blob + .into_iter() + .map(|tx| (tx.raw_tx, tx.hash)) + .unzip::<_, _, Vec<_>, Vec<_>>(); + let blob = borsh::to_vec(&blob)?; + for tx_hash in tx_hashes { + self.tx_status_updates_sender + .send(TxStatusUpdate { + tx_hash, + status: TxStatus::Submitted, + }) + .map_err(|e| anyhow!("failed to send tx status update: {}", e.to_string()))?; + } match self.da_service.send_transaction(&blob).await { Ok(_) => Ok(num_txs), @@ -88,6 +125,41 @@ where Ok::(format!("Submitted {} transactions", num_txs)) }, )?; + rpc.register_async_method("sequencer_txStatus", |params, sequencer| async move { + let tx_hash_str: String = params.one()?; + let tx_hash = B::TxHash::from_str(&tx_hash_str) + .map_err(|_| to_jsonrpsee_error_object("bad hash", SEQUENCER_RPC_ERROR))?; + let mut status; + let is_in_mempool = sequencer.batch_builder.lock().unwrap().contains(&tx_hash); + if is_in_mempool { + status = TxStatus::Submitted; + } else { + status = sequencer + .tx_statuses_cache + .get(&tx_hash) + .unwrap_or(TxStatus::Unavailable); + } + Ok::(status) + })?; + rpc.register_subscription( + "sequencer_subscribeToTxStatusUpdates", + "sequencer_newTxStatus", + "sequencer_unsubscribeToTxStatusUpdates", + |params, pending, sequencer| async move { + let receiver = sequencer.tx_status_updates_receiver.resubscribe(); + let subscription = pending.accept().await.unwrap(); + while let Ok(TxStatusUpdate { tx_hash, status }) = receiver.recv().await { + let tx_hash_str = tx_hash.to_string(); + let is_in_mempool = sequencer.batch_builder.lock().unwrap().contains(&tx_hash); + if is_in_mempool { + subscription.send(&tx_hash_str, &status); + } else { + sequencer.tx_statuses_cache.insert(tx_hash, status.clone()); + subscription.send(&tx_hash_str, &status); + } + } + }, + ); rpc.register_method("sequencer_acceptTx", move |params, sequencer| { let tx: SubmitTransaction = params.one()?; let response = match sequencer.accept_tx(tx.body) { @@ -100,6 +172,36 @@ where Ok(()) } +#[derive(Debug, Clone, PartialEq, Eq)] +struct TxStatusUpdate { + tx_hash: Hash, + status: TxStatus, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct HexString([u8; 32]); + +impl FromStr for HexString { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + let s = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(s)?; + if bytes.len() != 32 { + return Err(hex::FromHexError::InvalidStringLength); + } + let mut arr = [0u8; 32]; + arr.copy_from_slice(&bytes); + Ok(HexString(arr)) + } +} + +impl ToString for HexString { + fn to_string(&self) -> String { + format!("0x{}", hex::encode(&self.0)) + } +} + /// Creates an RPC module with the sequencer's methods pub fn get_sequencer_rpc(batch_builder: B, da_service: D) -> RpcModule> where @@ -134,6 +236,14 @@ pub enum SubmitTransactionResponse { Failed(String), } +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum TxStatus { + Unavailable, + Submitted, + Published, + Finalized, +} + #[cfg(test)] mod tests { @@ -151,9 +261,15 @@ mod tests { // It only takes the first byte of the tx, when submits it. // This allows to show effect of batch builder impl BatchBuilder for MockBatchBuilder { - fn accept_tx(&mut self, tx: Vec) -> anyhow::Result<()> { - self.mempool.push(tx); - Ok(()) + type TxHash = Vec; + + fn accept_tx(&mut self, tx: Vec) -> anyhow::Result { + self.mempool.push(tx.clone()); + Ok(tx) + } + + fn contains(&self, tx_hash: &Self::TxHash) -> bool { + self.mempool.iter().any(|tx| tx == tx_hash) } fn get_next_blob(&mut self) -> anyhow::Result>> { diff --git a/module-system/sov-modules-macros/target-path b/module-system/sov-modules-macros/target-path new file mode 100644 index 000000000..f55efdf74 --- /dev/null +++ b/module-system/sov-modules-macros/target-path @@ -0,0 +1 @@ +/home/neysofu/repos/sovereign/sovereign-sdk/target/riscv-guest/release/build/sov-modules-macros-c0709c2f52ba3808/out \ No newline at end of file diff --git a/module-system/sov-modules-macros/target-path-trybuild b/module-system/sov-modules-macros/target-path-trybuild new file mode 100644 index 000000000..b535a2b39 --- /dev/null +++ b/module-system/sov-modules-macros/target-path-trybuild @@ -0,0 +1 @@ +/home/neysofu/repos/sovereign/sovereign-sdk/target/tests/trybuild/debug/build/sov-modules-macros-768e0985f61e9070/out \ No newline at end of file diff --git a/rollup-interface/src/node/services/batch_builder.rs b/rollup-interface/src/node/services/batch_builder.rs index 90037aada..393704a36 100644 --- a/rollup-interface/src/node/services/batch_builder.rs +++ b/rollup-interface/src/node/services/batch_builder.rs @@ -1,14 +1,33 @@ //! This module defines the trait that is used to build batches of transactions. +use core::{hash::Hash, str::FromStr}; + use crate::maybestd::vec::Vec; +/// An encoded transaction with its hash as returned by +/// [`BatchBuilder::get_next_blob`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TxWithHash { + /// Encoded transaction. + pub raw_tx: Vec, + /// Transaction hash. + pub hash: T, +} + /// BlockBuilder trait is responsible for managing mempool and building batches. pub trait BatchBuilder { + /// Uniquely identifies a transaction once it's in the mempool. + type TxHash: Hash + Clone + Eq + FromStr + ToString + Send + Sync; + /// Accept a new transaction. /// Can return error if transaction is invalid or mempool is full. - fn accept_tx(&mut self, tx: Vec) -> anyhow::Result<()>; + fn accept_tx(&mut self, tx: Vec) -> anyhow::Result; + + /// Checks whether a transaction with the given `hash` is already in the + /// mempool. + fn contains(&self, hash: &Self::TxHash) -> bool; /// Builds a new batch out of transactions in mempool. /// Logic of which transactions and how many of them is included in batch is up to implementation. - fn get_next_blob(&mut self) -> anyhow::Result>>; + fn get_next_blob(&mut self) -> anyhow::Result>>; }