From 8e48117cd3918cc8866ca71fa092424968d8ed63 Mon Sep 17 00:00:00 2001 From: Filippo Costa Date: Thu, 21 Dec 2023 19:57:30 +0100 Subject: [PATCH] WIP --- full-node/sov-sequencer/src/batch_builder.rs | 111 +++----- full-node/sov-sequencer/src/lib.rs | 250 +++++++++++------- full-node/sov-sequencer/src/rpc.rs | 91 ------- full-node/sov-sequencer/src/utils.rs | 2 + .../src/runtime_rpc.rs | 4 +- .../src/node/services/batch_builder.rs | 16 +- 6 files changed, 196 insertions(+), 278 deletions(-) delete mode 100644 full-node/sov-sequencer/src/rpc.rs diff --git a/full-node/sov-sequencer/src/batch_builder.rs b/full-node/sov-sequencer/src/batch_builder.rs index 81b184f5e..f9717b744 100644 --- a/full-node/sov-sequencer/src/batch_builder.rs +++ b/full-node/sov-sequencer/src/batch_builder.rs @@ -1,3 +1,5 @@ +//! Concrete implementations of [`BatchBuilder`]. + use std::collections::{HashSet, VecDeque}; use std::io::Cursor; @@ -9,7 +11,7 @@ use sov_modules_api::{Context, DispatchCall, PublicKey, Spec, WorkingSet}; use sov_rollup_interface::services::batch_builder::{BatchBuilder, TxWithHash}; use tracing::{info, warn}; -use crate::{HexString, TxHash}; +use crate::TxHash; /// Transaction stored in the mempool. pub struct PooledTransaction> { @@ -44,71 +46,12 @@ where } } -struct Mempool -where - C: Context, - R: DispatchCall, -{ - txs: VecDeque>, - tx_hashes: HashSet, -} - -impl Mempool -where - C: Context, - R: DispatchCall, -{ - pub fn new() -> Self { - Self { - txs: VecDeque::new(), - tx_hashes: HashSet::new(), - } - } - - pub fn len(&self) -> usize { - self.txs.len() - } - - pub fn push_back(&mut self, tx: PooledTransaction) { - debug_assert_eq!( - self.txs.len(), - self.tx_hashes.len(), - "Mempool invariant violated: mempool txs and tx_hashes have different lengths. This is a bug!" - ); - - let tx_hash = tx.calculate_hash(); - - self.tx_hashes.insert(HexString(tx_hash)); - self.txs.push_back(tx); - } - - pub fn push_front(&mut self, tx: PooledTransaction) { - debug_assert_eq!( - self.txs.len(), - self.tx_hashes.len(), - "Mempool invariant violated: mempool txs and tx_hashes have different lengths. This is a bug!" - ); - - let tx_hash = tx.calculate_hash(); - - self.tx_hashes.insert(HexString(tx_hash)); - self.txs.push_front(tx); - } - - pub fn pop(&mut self) -> Option> { - let tx = self.txs.pop_front()?; - - let tx_hash = tx.calculate_hash(); - self.tx_hashes.remove(&HexString(tx_hash)); - - Some(tx) - } -} - /// BatchBuilder that creates batches of transactions in the order they were submitted /// Only transactions that were successfully dispatched are included. pub struct FiFoStrictBatchBuilder> { - mempool: Mempool, + txs: VecDeque>, + // Makes it cheap to check if transaction is already in the mempool. + tx_hashes: HashSet, mempool_max_txs_count: usize, runtime: R, max_batch_size_bytes: usize, @@ -130,7 +73,8 @@ where sequencer: C::Address, ) -> Self { Self { - mempool: Mempool::new(), + txs: VecDeque::new(), + tx_hashes: HashSet::new(), mempool_max_txs_count, max_batch_size_bytes, runtime, @@ -145,15 +89,19 @@ where C: Context, R: DispatchCall, { - type TxHash = HexString; - /// Attempt to add transaction to the mempool. /// /// The transaction is discarded if: /// - mempool is full /// - transaction is invalid (deserialization, verification or decoding of the runtime message failed) - fn accept_tx(&mut self, raw: Vec) -> anyhow::Result { - if self.mempool.len() >= self.mempool_max_txs_count { + fn accept_tx(&mut self, raw: Vec) -> anyhow::Result { + debug_assert_eq!( + self.txs.len(), + self.tx_hashes.len(), + "Mempool invariant violated: txs and tx_hashes got out of sync because they have different lengths. This is a bug!" + ); + + if self.txs.len() >= self.mempool_max_txs_count { bail!("Mempool is full") } @@ -182,30 +130,35 @@ where tx, msg: Some(msg), }; + let hash = tx.calculate_hash(); - self.mempool.push_back(tx); - Ok(HexString(hash)) + self.txs.push_back(tx); + self.tx_hashes.insert(hash); + + Ok(hash) } - fn contains(&self, hash: &Self::TxHash) -> bool { - self.mempool.tx_hashes.contains(hash) + fn contains(&self, hash: &TxHash) -> bool { + self.tx_hashes.contains(hash) } /// Builds a new batch of valid transactions in order they were added to mempool /// Only transactions, which are dispatched successfully are included in the batch - fn get_next_blob(&mut self) -> anyhow::Result>> { + fn get_next_blob(&mut self) -> anyhow::Result> { let mut working_set = WorkingSet::new(self.current_storage.clone()); let mut txs = Vec::new(); let mut current_batch_size = 0; - while let Some(mut pooled) = self.mempool.pop() { + while let Some(mut pooled) = self.txs.pop_front() { // In order to fill batch as big as possible, we only check if valid tx can fit in the batch. let tx_len = pooled.raw.len(); if current_batch_size + tx_len > self.max_batch_size_bytes { - self.mempool.push_front(pooled); + self.txs.push_front(pooled); break; } + self.tx_hashes.remove(&pooled.calculate_hash()); + // Take the decoded runtime message cached upon accepting transaction // into the pool or attempt to decode the message again if // the transaction was previously executed, @@ -238,7 +191,7 @@ where ); txs.push(TxWithHash { raw_tx: pooled.raw, - hash: HexString(tx_hash), + hash: tx_hash, }); } @@ -483,7 +436,7 @@ mod tests { batch_builder.accept_tx(tx.clone()).unwrap(); } - assert_eq!(txs.len(), batch_builder.mempool.len()); + assert_eq!(txs.len(), batch_builder.txs.len()); let build_result = batch_builder.get_next_blob(); assert!(build_result.is_err()); @@ -516,7 +469,7 @@ mod tests { batch_builder.accept_tx(tx.clone()).unwrap(); } - assert_eq!(txs.len(), batch_builder.mempool.len()); + assert_eq!(txs.len(), batch_builder.txs.len()); let build_result = batch_builder.get_next_blob(); assert!(build_result.is_ok()); @@ -530,7 +483,7 @@ mod tests { assert!(blob.contains(&txs[0])); assert!(blob.contains(&txs[2])); assert!(!blob.contains(&txs[3])); - assert_eq!(1, batch_builder.mempool.len()); + assert_eq!(1, batch_builder.txs.len()); } } } diff --git a/full-node/sov-sequencer/src/lib.rs b/full-node/sov-sequencer/src/lib.rs index e533571d8..1e2407ddb 100644 --- a/full-node/sov-sequencer/src/lib.rs +++ b/full-node/sov-sequencer/src/lib.rs @@ -1,66 +1,74 @@ #![deny(missing_docs)] #![doc = include_str!("../README.md")] -use std::fmt::Display; use std::hash::Hash; -use std::str::FromStr; +use std::sync::Arc; -/// Concrete implementations of `[BatchBuilder]` pub mod batch_builder; -mod rpc; -/// Utilities for the sequencer rpc pub mod utils; use anyhow::anyhow; use jsonrpsee::core::StringError; -use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage}; +use jsonrpsee::types::ErrorObjectOwned; +use jsonrpsee::{PendingSubscriptionSink, RpcModule, SubscriptionMessage}; use mini_moka::sync::Cache as MokaCache; -use sov_rollup_interface::services::batch_builder::BatchBuilder; +use sov_modules_api::utils::to_jsonrpsee_error_object; +use sov_rollup_interface::services::batch_builder::{BatchBuilder, TxHash}; use sov_rollup_interface::services::da::DaService; -use tokio::sync::broadcast; -use tokio::sync::Mutex; +use tokio::sync::{broadcast, Mutex}; -type TxHash = [u8; 32]; - -const MOKA_DEFAULT_MAX_CAPACITY: u64 = 100; +const SEQUENCER_RPC_ERROR: &str = "SEQUENCER_RPC_ERROR"; /// Single data structure that manages mempool and batch producing. pub struct Sequencer { - tx_status_updates_sender: broadcast::Sender>, - tx_status_updates_receiver: broadcast::Receiver>, - tx_statuses_cache: MokaCache>, batch_builder: Mutex, da_service: Da, + tx_statuses_cache: MokaCache>, + tx_statuses_sender: broadcast::Sender>, } impl Sequencer where - B: BatchBuilder + Send + Sync, - B::TxHash: Hash + Eq + Clone + FromStr + Send + Sync + 'static, - ::Err: Display, - Da: DaService + Send + Sync, + B: BatchBuilder + Send + Sync + 'static, + Da: DaService + Send + Sync + 'static, + Da::TransactionId: Clone + Send + Sync + serde::Serialize, { + // The cache capacity is kind of arbitrary, as long as it's big enough to + // fit a handful of typical batches worth of transactions it won't make much + // of a difference. + const TX_STATUSES_CACHE_CAPACITY: u64 = 300; + // As long as we're reasonably fast at processing transaction status updates + // (which we are!), the channel size won't matter significantly. + const TX_STATUSES_UPDATES_CHANNEL_CAPACITY: usize = 100; + /// Creates new Sequencer from BatchBuilder and DaService pub fn new(batch_builder: B, da_service: Da) -> Self { - let tx_statuses_cache = MokaCache::new(MOKA_DEFAULT_MAX_CAPACITY); - let tx_statuses_cache_clone = tx_statuses_cache.clone(); + let tx_statuses_cache = MokaCache::new(Self::TX_STATUSES_CACHE_CAPACITY); + let (tx_statuses_sender, mut receiver) = + broadcast::channel(Self::TX_STATUSES_UPDATES_CHANNEL_CAPACITY); - let (tx_status_updates_sender, tx_status_updates_receiver) = broadcast::channel(100); - let mut recv = tx_status_updates_receiver.resubscribe(); + let tx_statuses_cache_clone = tx_statuses_cache.clone(); tokio::spawn(async move { - while let Ok(TxStatusUpdate { tx_hash, status }) = recv.recv().await { + while let Ok(TxStatusUpdate { tx_hash, status }) = receiver.recv().await { tx_statuses_cache_clone.insert(tx_hash, status); } }); Self { - tx_status_updates_sender, - tx_status_updates_receiver, - tx_statuses_cache: MokaCache::new(MOKA_DEFAULT_MAX_CAPACITY), batch_builder: Mutex::new(batch_builder), da_service, + tx_statuses_cache, + tx_statuses_sender, } } + /// Returns the [`jsonrpsee::RpcModule`] for the sequencer-related RPC + /// methods. + pub fn rpc(self) -> RpcModule { + let mut rpc = RpcModule::new(self); + Self::register_txs_rpc_methods(&mut rpc).expect("Failed to register sequencer RPC methods"); + rpc + } + async fn submit_batch(&self) -> anyhow::Result { // Need to release lock before await, so the Future is `Send`. // But potentially it can create blobs that are sent out of order. @@ -78,49 +86,111 @@ where .map(|tx| (tx.raw_tx, tx.hash)) .unzip::<_, _, Vec<_>, Vec<_>>(); let blob = borsh::to_vec(&blob)?; + + let da_tx_id = match self.da_service.send_transaction(&blob).await { + Ok(id) => id, + Err(e) => return Err(anyhow!("failed to submit batch: {:?}", e)), + }; + for tx_hash in tx_hashes { - self.tx_status_updates_sender + self.tx_statuses_sender .send(TxStatusUpdate { tx_hash, - status: TxStatus::Submitted, + status: TxStatus::Published { + da_transaction_id: da_tx_id.clone(), + }, }) .map_err(|e| anyhow!("failed to send tx status update: {}", e.to_string()))?; } - match self.da_service.send_transaction(&blob).await { - Ok(_) => Ok(num_txs), - Err(e) => Err(anyhow!("failed to submit batch: {:?}", e)), - } + Ok(num_txs) } async fn accept_tx(&self, tx: Vec) -> anyhow::Result<()> { tracing::info!("Accepting tx: 0x{}", hex::encode(&tx)); let mut batch_builder = self.batch_builder.lock().await; - batch_builder.accept_tx(tx)?; + let tx_hash = batch_builder.accept_tx(tx)?; + self.tx_statuses_sender + .send(TxStatusUpdate { + tx_hash, + status: TxStatus::Submitted, + }) + .map_err(|e| anyhow!("failed to send tx status update: {}", e.to_string()))?; + Ok(()) + } + + async fn tx_status(&self, tx_hash: &TxHash) -> Option> { + let is_in_mempool = self.batch_builder.lock().await.contains(&tx_hash); + + if is_in_mempool { + Some(TxStatus::Submitted) + } else { + self.tx_statuses_cache.get(tx_hash) + } + } + + fn register_txs_rpc_methods(rpc: &mut RpcModule) -> Result<(), jsonrpsee::core::Error> { + rpc.register_async_method( + "sequencer_publishBatch", + |params, batch_builder| async move { + let mut params_iter = params.sequence(); + while let Some(tx) = params_iter.optional_next::>()? { + batch_builder + .accept_tx(tx) + .await + .map_err(|e| to_jsonrpsee_error_object(e, SEQUENCER_RPC_ERROR))?; + } + let num_txs = batch_builder + .submit_batch() + .await + .map_err(|e| to_jsonrpsee_error_object(e, SEQUENCER_RPC_ERROR))?; + + Ok::(format!("Submitted {} transactions", num_txs)) + }, + )?; + rpc.register_async_method("sequencer_acceptTx", |params, sequencer| async move { + let tx: SubmitTransaction = params.one()?; + let response = match sequencer.accept_tx(tx.body).await { + Ok(()) => SubmitTransactionResponse::Registered, + Err(e) => SubmitTransactionResponse::Failed(e.to_string()), + }; + Ok::<_, ErrorObjectOwned>(response) + })?; + + rpc.register_async_method("sequencer_txStatus", |params, sequencer| async move { + let tx_hash: HexHash = params.one()?; + + let status = sequencer.tx_status(&tx_hash.0).await; + Ok::<_, ErrorObjectOwned>(status) + })?; + rpc.register_subscription( + "sequencer_subscribeToTxStatusUpdates", + "sequencer_newTxStatus", + "sequencer_unsubscribeToTxStatusUpdates", + |params, pending, sequencer| async move { + Self::handle_tx_status_update_subscription(sequencer, params, pending).await + }, + )?; + Ok(()) } async fn handle_tx_status_update_subscription( - &self, + sequencer: Arc, params: jsonrpsee::types::Params<'_>, sink: PendingSubscriptionSink, ) -> Result<(), StringError> { - let tx_hash_str: String = params.one()?; - let tx_hash = B::TxHash::from_str(&tx_hash_str)?; + let tx_hash: HexHash = params.one()?; - let mut receiver = self.tx_status_updates_receiver.resubscribe(); + let mut receiver = sequencer.tx_statuses_sender.subscribe(); let subscription = sink.accept().await?; - while let Ok(TxStatusUpdate { - tx_hash: txh, - status, - }) = receiver.recv().await - { + while let Ok(update) = receiver.recv().await { // We're only interested in updates for the requested transaction hash. - if tx_hash != txh { + if tx_hash.0 != update.tx_hash { continue; } - let notification = SubscriptionMessage::from_json(&status)?; + let notification = SubscriptionMessage::from_json(&update.status)?; subscription.send(notification).await?; } @@ -128,45 +198,16 @@ where } } -#[derive(Debug, PartialEq, Eq)] -struct TxStatusUpdate { - tx_hash: Hash, - status: TxStatus, +#[derive(Debug, Clone, PartialEq, Eq)] +struct TxStatusUpdate { + tx_hash: TxHash, + status: TxStatus, } -impl Clone for TxStatusUpdate { - fn clone(&self) -> Self { - Self { - tx_hash: self.tx_hash.clone(), - status: self.status.clone(), - } - } -} - -/// Hex string -#[derive(Debug, Clone, Hash, PartialEq, Eq)] -pub struct HexString(pub [u8; 32]); - -impl FromStr for HexString { - type Err = hex::FromHexError; - - fn from_str(s: &str) -> Result { - let s = s.strip_prefix("0x").unwrap_or(s); - let bytes = hex::decode(s)?; - if bytes.len() != 32 { - return Err(hex::FromHexError::InvalidStringLength); - } - let mut arr = [0u8; 32]; - arr.copy_from_slice(&bytes); - Ok(HexString(arr)) - } -} - -impl ToString for HexString { - fn to_string(&self) -> String { - format!("0x{}", hex::encode(&self.0)) - } -} +/// A 32-byte hash [`serde`]-encoded as a hex string optionally prefixed with +/// `0x`. See [`sov_rollup_interface::rpc::utils::rpc_hex`]. +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub struct HexHash(#[serde(with = "sov_rollup_interface::rpc::utils::rpc_hex")] pub TxHash); /// A transaction to be submitted to the rollup #[derive(serde::Serialize, serde::Deserialize)] @@ -192,28 +233,41 @@ pub enum SubmitTransactionResponse { /// A rollup transaction status. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub enum TxStatus { +pub enum TxStatus { /// The transaction was successfully submitted to a sequencer and it's /// sitting in the mempool waiting to be included in a batch. Submitted, /// The transaction was published to the DA as part of a batch, but it may /// not be finalized yet. - Published { da_transaction_id: D::TransactionId }, + Published { + /// The ID of the DA transaction that included the rollup transaction to + /// which this [`TxStatus`] refers. + da_transaction_id: DaTxId, + }, /// The transaction was published to the DA as part of a batch that is /// considered finalized - Finalized, + Finalized { + /// The ID of the DA transaction that included the rollup transaction to + /// which this [`TxStatus`] refers. + da_transaction_id: DaTxId, + }, } #[cfg(test)] mod tests { - use sov_mock_da::{MockAddress, MockDaService}; - use sov_rollup_interface::{da::BlobReaderTrait, services::batch_builder::TxWithHash}; - - use crate::rpc::get_sequencer_rpc; + use sov_rollup_interface::da::BlobReaderTrait; + use sov_rollup_interface::services::batch_builder::TxWithHash; use super::*; + fn sequencer_rpc( + batch_builder: MockBatchBuilder, + da_service: MockDaService, + ) -> RpcModule> { + Sequencer::new(batch_builder, da_service).rpc() + } + /// BatchBuilder used in tests. pub struct MockBatchBuilder { /// Mempool with transactions. @@ -223,18 +277,16 @@ mod tests { // It only takes the first byte of the tx, when submits it. // This allows to show effect of batch builder impl BatchBuilder for MockBatchBuilder { - type TxHash = HexString; - - fn accept_tx(&mut self, tx: Vec) -> anyhow::Result { + fn accept_tx(&mut self, tx: Vec) -> anyhow::Result { self.mempool.push(tx); - Ok(HexString([0; 32])) + Ok([0; 32]) } - fn contains(&self, _tx_hash: &Self::TxHash) -> bool { + fn contains(&self, _tx_hash: &TxHash) -> bool { unimplemented!("MockBatchBuilder::contains is not implemented") } - fn get_next_blob(&mut self) -> anyhow::Result>> { + fn get_next_blob(&mut self) -> anyhow::Result> { if self.mempool.is_empty() { anyhow::bail!("Mock mempool is empty"); } @@ -244,7 +296,7 @@ mod tests { let first_byte = *tx.get(0)?; Some(TxWithHash { raw_tx: vec![first_byte], - hash: HexString([0; 32]), + hash: [0; 32], }) }) .collect(); @@ -256,7 +308,7 @@ mod tests { async fn test_submit_on_empty_mempool() { let batch_builder = MockBatchBuilder { mempool: vec![] }; let da_service = MockDaService::new(MockAddress::default()); - let rpc = get_sequencer_rpc(batch_builder, da_service.clone()); + let rpc = sequencer_rpc(batch_builder, da_service); let arg: &[u8] = &[]; let result: Result = @@ -278,7 +330,7 @@ mod tests { mempool: vec![tx1.clone(), tx2.clone()], }; let da_service = MockDaService::new(MockAddress::default()); - let rpc = get_sequencer_rpc(batch_builder, da_service.clone()); + let rpc = sequencer_rpc(batch_builder, da_service.clone()); let arg: &[u8] = &[]; let _: String = rpc.call("sequencer_publishBatch", arg).await.unwrap(); @@ -297,7 +349,7 @@ mod tests { let batch_builder = MockBatchBuilder { mempool: vec![] }; let da_service = MockDaService::new(MockAddress::default()); - let rpc = get_sequencer_rpc(batch_builder, da_service.clone()); + let rpc = sequencer_rpc(batch_builder, da_service.clone()); let tx: Vec = vec![1, 2, 3, 4, 5]; let request = SubmitTransaction { body: tx.clone() }; diff --git a/full-node/sov-sequencer/src/rpc.rs b/full-node/sov-sequencer/src/rpc.rs deleted file mode 100644 index 2eb798203..000000000 --- a/full-node/sov-sequencer/src/rpc.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::{fmt::Display, hash::Hash, str::FromStr}; - -use jsonrpsee::{types::ErrorObjectOwned, RpcModule}; -use sov_modules_api::utils::to_jsonrpsee_error_object; -use sov_rollup_interface::services::{batch_builder::BatchBuilder, da::DaService}; - -use crate::{Sequencer, SubmitTransaction, SubmitTransactionResponse, TxStatus}; - -const SEQUENCER_RPC_ERROR: &str = "SEQUENCER_RPC_ERROR"; - -/// Creates an RPC module with the sequencer's methods -pub fn get_sequencer_rpc(batch_builder: B, da_service: D) -> RpcModule> -where - B: BatchBuilder + Send + Sync + 'static, - B::TxHash: Hash + Eq + Clone + FromStr + ToString + Send + Sync, - ::Err: Display, - D: DaService, -{ - let sequencer = Sequencer::new(batch_builder, da_service); - let mut rpc = RpcModule::new(sequencer); - register_txs_rpc_methods::(&mut rpc).expect("Failed to register sequencer RPC methods"); - rpc -} - -fn register_txs_rpc_methods( - rpc: &mut RpcModule>, -) -> Result<(), jsonrpsee::core::Error> -where - B: BatchBuilder + Send + Sync + 'static, - B::TxHash: Hash + Eq + Clone + FromStr + ToString + Send + Sync, - ::Err: Display, - D: DaService, -{ - rpc.register_async_method( - "sequencer_publishBatch", - |params, batch_builder| async move { - let mut params_iter = params.sequence(); - while let Some(tx) = params_iter.optional_next::>()? { - batch_builder - .accept_tx(tx) - .await - .map_err(|e| to_jsonrpsee_error_object(e, SEQUENCER_RPC_ERROR))?; - } - let num_txs = batch_builder - .submit_batch() - .await - .map_err(|e| to_jsonrpsee_error_object(e, SEQUENCER_RPC_ERROR))?; - - Ok::(format!("Submitted {} transactions", num_txs)) - }, - )?; - rpc.register_async_method("sequencer_acceptTx", |params, sequencer| async move { - let tx: SubmitTransaction = params.one()?; - let response = match sequencer.accept_tx(tx.body).await { - Ok(()) => SubmitTransactionResponse::Registered, - Err(e) => SubmitTransactionResponse::Failed(e.to_string()), - }; - Ok::<_, ErrorObjectOwned>(response) - })?; - - rpc.register_async_method("sequencer_txStatus", |params, sequencer| async move { - let tx_hash_str: String = params.one()?; - let tx_hash = B::TxHash::from_str(&tx_hash_str).map_err(|err| { - to_jsonrpsee_error_object( - format!("invalid tx hash value: {}", err), - SEQUENCER_RPC_ERROR, - ) - })?; - - let is_in_mempool = sequencer.batch_builder.lock().await.contains(&tx_hash); - let status = if is_in_mempool { - Some(TxStatus::Submitted) - } else { - sequencer.tx_statuses_cache.get(&tx_hash) - }; - - Ok::<_, ErrorObjectOwned>(status) - })?; - rpc.register_subscription( - "sequencer_subscribeToTxStatusUpdates", - "sequencer_newTxStatus", - "sequencer_unsubscribeToTxStatusUpdates", - |params, pending, sequencer| async move { - sequencer - .handle_tx_status_update_subscription(params, pending) - .await - }, - )?; - - Ok(()) -} diff --git a/full-node/sov-sequencer/src/utils.rs b/full-node/sov-sequencer/src/utils.rs index befa37859..b6591773c 100644 --- a/full-node/sov-sequencer/src/utils.rs +++ b/full-node/sov-sequencer/src/utils.rs @@ -1,3 +1,5 @@ +//! Utilities for the sequencer RPC. + use borsh::BorshSerialize; use jsonrpsee::core::client::ClientT; use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; diff --git a/module-system/sov-modules-rollup-blueprint/src/runtime_rpc.rs b/module-system/sov-modules-rollup-blueprint/src/runtime_rpc.rs index d5bb230d3..1179b146b 100644 --- a/module-system/sov-modules-rollup-blueprint/src/runtime_rpc.rs +++ b/module-system/sov-modules-rollup-blueprint/src/runtime_rpc.rs @@ -4,6 +4,7 @@ use sov_modules_api::{Context, Spec}; use sov_modules_stf_blueprint::{Runtime as RuntimeTrait, SequencerOutcome, TxEffect}; use sov_rollup_interface::services::da::DaService; use sov_sequencer::batch_builder::FiFoStrictBatchBuilder; +use sov_sequencer::Sequencer; /// Register rollup's default rpc methods. pub fn register_rpc( @@ -16,6 +17,7 @@ where RT: RuntimeTrait::Spec> + Send + Sync + 'static, C: Context, Da: DaService + Clone, + Da::TransactionId: Clone + serde::Serialize + Send + Sync, { // runtime rpc. let mut rpc_methods = RT::rpc_methods(storage.clone()); @@ -39,7 +41,7 @@ where sequencer, ); - let sequencer_rpc = sov_sequencer::get_sequencer_rpc(batch_builder, da_service.clone()); + let sequencer_rpc = Sequencer::new(batch_builder, da_service.clone()).rpc(); rpc_methods .merge(sequencer_rpc) .context("Failed to merge Txs RPC modules")?; diff --git a/rollup-interface/src/node/services/batch_builder.rs b/rollup-interface/src/node/services/batch_builder.rs index 39f49b6a7..2e30e5064 100644 --- a/rollup-interface/src/node/services/batch_builder.rs +++ b/rollup-interface/src/node/services/batch_builder.rs @@ -2,30 +2,30 @@ use crate::maybestd::vec::Vec; +/// A rollup transaction hash. Used by [`BatchBuilder`]. +pub type TxHash = [u8; 32]; + /// BlockBuilder trait is responsible for managing mempool and building batches. pub trait BatchBuilder { - /// Uniquely identifies a transaction once it's in the mempool. - type TxHash; - /// Accept a new transaction. /// Can return error if transaction is invalid or mempool is full. - fn accept_tx(&mut self, tx: Vec) -> anyhow::Result; + fn accept_tx(&mut self, tx: Vec) -> anyhow::Result; /// Checks whether a transaction with the given `hash` is already in the /// mempool. - fn contains(&self, hash: &Self::TxHash) -> bool; + fn contains(&self, hash: &TxHash) -> bool; /// Builds a new batch out of transactions in mempool. /// Logic of which transactions and how many of them is included in batch is up to implementation. - fn get_next_blob(&mut self) -> anyhow::Result>>; + fn get_next_blob(&mut self) -> anyhow::Result>; } /// An encoded transaction with its hash as returned by /// [`BatchBuilder::get_next_blob`]. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct TxWithHash { +pub struct TxWithHash { /// Encoded transaction. pub raw_tx: Vec, /// Transaction hash. - pub hash: T, + pub hash: TxHash, }