From 197b9186a1a25da9d60fe0e07a4bcffc886d9648 Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Thu, 15 Aug 2024 16:22:19 +0200 Subject: [PATCH 1/5] Estimate max block size --- Cargo.lock | 1 + Cargo.toml | 5 +++- src/builder_state.rs | 50 +++++++++++++++++++++++++++++++++------ src/service.rs | 41 ++++++++++++++++++++++++++++++-- src/testing/basic_test.rs | 3 ++- 5 files changed, 89 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2edc8c7..a597c0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3031,6 +3031,7 @@ dependencies = [ "async-lock 2.8.0", "async-std", "async-trait", + "bincode", "clap", "committable", "derivative", diff --git a/Cargo.toml b/Cargo.toml index b26048e..b0ef843 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ async-compatibility-layer = { version = "1.1", default-features = false, feature async-lock = "2.8" async-std = { version = "1.9.0", features = ["unstable", "attributes"] } async-trait = "0.1" +bincode = "1.3" clap = { version = "4.5", features = ["derive", "env"] } committable = "0.2" derivative = "2.2" @@ -37,4 +38,6 @@ hex = "0.4.3" hotshot-example-types = { git = "https://github.com/EspressoSystems/HotShot.git", tag = "0.5.70" } [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(async_executor_impl, values("async-std", "tokio"))'] } +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(async_executor_impl, values("async-std", "tokio"))', +] } diff --git a/src/builder_state.rs b/src/builder_state.rs index 28756ed..db6a848 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -32,7 +32,7 @@ use async_std::task::spawn_blocking; #[cfg(async_executor_impl = "tokio")] use tokio::task::spawn_blocking; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt::Debug; use std::sync::Arc; use std::time::Instant; @@ -87,6 +87,8 @@ pub struct BuildBlockInfo { pub metadata: <::BlockPayload as BlockPayload>::Metadata, pub vid_trigger: OneShotSender, pub vid_receiver: UnboundedReceiver<(VidCommitment, VidPrecomputeData)>, + // Could we have included more transactions, but chose not to? + pub truncated: bool, } /// Response Message to be put on the response channel @@ -169,7 +171,7 @@ pub struct BuilderState { pub tx_receiver: BroadcastReceiver>>, /// filtered queue of available transactions, taken from tx_receiver - pub tx_queue: Vec>>, + pub tx_queue: VecDeque>>, /// global state handle, defined in the service.rs pub global_state: Arc>>, @@ -490,9 +492,29 @@ impl BuilderState { async_sleep(sleep_interval).await } + + // Don't build an empty block + if self.tx_queue.is_empty() { + return None; + } + + let max_block_size = self.global_state.read_arc().await.max_block_size; + let transactions_to_include = self.tx_queue.iter().scan(0, |total_size, tx| { + if *total_size >= max_block_size { + None + } else { + // This way we will include one transaction over our target block length. + // This is done on purpose, otherwise we'd have a possible failure state + // where a single transaction larger than target block state is stuck in + // queue and we just build empty blocks + *total_size += tx.len; + Some(tx.tx.clone()) + } + }); + if let Ok((payload, metadata)) = >::from_transactions( - self.tx_queue.iter().map(|tx| tx.tx.clone()), + transactions_to_include, &self.validated_state, &self.instance_state, ) @@ -500,7 +522,20 @@ impl BuilderState { { let builder_hash = payload.builder_commitment(&metadata); // count the number of txns - let txn_count = self.tx_queue.len(); + let actual_txn_count = payload.num_transactions(&metadata); + + // Payload is empty despite us checking that tx_queue isn't empty earlier. + // This indicates that the block was truncated due to sequencer block length + // limits and we have a transaction too big to ever be included in the head of + // our queue. We need to drop it and mark as "included" so that if we receive + // it again we don't even bother with it. + if actual_txn_count == 0 { + if let Some(txn) = self.tx_queue.pop_front() { + self.txns_in_queue.remove(&txn.commit); + self.included_txns.insert(txn.commit); + }; + return None; + } // insert the recently built block into the builder commitments self.builder_commitments @@ -536,7 +571,7 @@ impl BuilderState { tracing::info!( "Builder view num {:?}, building block with {:?} txns, with builder hash {:?}", self.built_from_proposed_block.view_number, - txn_count, + actual_txn_count, builder_hash ); @@ -551,6 +586,7 @@ impl BuilderState { metadata, vid_trigger: trigger_send, vid_receiver: unbounded_receiver, + truncated: actual_txn_count < self.tx_queue.len(), }) } else { tracing::warn!("build block, returning None"); @@ -744,7 +780,7 @@ impl BuilderState { qc_receiver: BroadcastReceiver>, req_receiver: BroadcastReceiver>, tx_receiver: BroadcastReceiver>>, - tx_queue: Vec>>, + tx_queue: VecDeque>>, global_state: Arc>>, num_nodes: NonZeroUsize, maximize_txn_capture_timeout: Duration, @@ -841,7 +877,7 @@ impl BuilderState { continue; } self.txns_in_queue.insert(tx.commit); - self.tx_queue.push(tx); + self.tx_queue.push_back(tx); } Err(async_broadcast::TryRecvError::Empty) | Err(async_broadcast::TryRecvError::Closed) => { diff --git a/src/service.rs b/src/service.rs index 536b050..c2fd25c 100644 --- a/src/service.rs +++ b/src/service.rs @@ -60,6 +60,11 @@ use std::{fmt::Display, time::Instant}; use tagged_base64::TaggedBase64; use tide_disco::{method::ReadState, Url}; +// Start assuming we're fine calculatig VID for 100 kilobyte blocks +const INITIAL_MAX_BLOCK_SIZE: u64 = 100_000; +// Never go lower than 10 kilobytes +const MAX_BLOCK_SIZE_FLOOR: u64 = 10_000; + // It holds all the necessary information for a block #[derive(Debug)] pub struct BlockInfo { @@ -68,6 +73,8 @@ pub struct BlockInfo { pub vid_trigger: Arc>>>, pub vid_receiver: Arc>>, pub offered_fee: u64, + // Could we have included more transactions with this block, but chose not to? + pub truncated: bool, } // It holds the information for the proposed block @@ -105,9 +112,11 @@ pub struct BuilderStatesInfo { pub struct ReceivedTransaction { // the transaction pub tx: Types::Transaction, - // its hash + // transaction's hash pub commit: Commitment, - // its source + // transaction's esitmated length + pub len: u64, + // transaction's source pub source: TransactionSource, // received time pub time_in: Instant, @@ -135,6 +144,9 @@ pub struct GlobalState { // highest view running builder task pub highest_view_num_builder_id: BuilderStateId, + + // estimated maximum block size we can build in time + pub max_block_size: u64, } impl GlobalState { @@ -160,6 +172,7 @@ impl GlobalState { last_garbage_collected_view_num, builder_state_to_last_built_block: Default::default(), highest_view_num_builder_id: bootstrap_id, + max_block_size: INITIAL_MAX_BLOCK_SIZE, } } @@ -203,6 +216,7 @@ impl GlobalState { build_block_info.vid_receiver, ))), offered_fee: build_block_info.offered_fee, + truncated: build_block_info.truncated, }, ); } @@ -599,6 +613,15 @@ where Err(_toe) => { if Instant::now() >= timeout_after { tracing::warn!("Couldn't get vid commitment in time for block {id}",); + { + // we can't keep up with this block size, reduce max block size by 10% + let mut global_state = self.global_state.write_arc().await; + global_state.max_block_size = std::cmp::min( + global_state.max_block_size + - global_state.max_block_size.div_ceil(10), + MAX_BLOCK_SIZE_FLOOR, + ); + } break Err(BuildError::Error { message: "Couldn't get vid commitment in time".to_string(), }); @@ -619,6 +642,18 @@ where }; tracing::info!("Got vid commitment for block {id}",); + + // This block was truncated, but we got VID in time. + // Maybe we can handle bigger blocks? + if block_info.truncated { + // Increase max block size by 10% + let mut global_state = self.global_state.write_arc().await; + global_state.max_block_size = std::cmp::min( + global_state.max_block_size + global_state.max_block_size.div_ceil(10), + MAX_BLOCK_SIZE_FLOOR, + ); + } + if response_received.is_ok() { let (vid_commitment, vid_precompute_data) = response_received.map_err(|err| BuildError::Error { @@ -1062,12 +1097,14 @@ pub(crate) async fn handle_received_txns( let time_in = Instant::now(); for tx in txns.into_iter() { let commit = tx.commit(); + let len = bincode::serialized_size(&tx).unwrap_or_default(); let res = tx_sender .try_broadcast(Arc::new(ReceivedTransaction { tx, source: source.clone(), commit, time_in, + len, })) .inspect(|val| { if let Some(evicted_txn) = val { diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 4781ab9..28f6131 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -18,6 +18,7 @@ pub use async_broadcast::{ #[cfg(test)] mod tests { use super::*; + use std::collections::VecDeque; use std::{hash::Hash, marker::PhantomData, num::NonZeroUsize}; use async_compatibility_layer::channel::unbounded; @@ -100,7 +101,7 @@ mod tests { let (tx_sender, tx_receiver) = broadcast::>>( num_test_messages * multiplication_factor, ); - let tx_queue = Vec::new(); + let tx_queue = VecDeque::new(); // generate the keys for the buidler let seed = [201_u8; 32]; let (_builder_pub_key, _builder_private_key) = From e135f11e9b085a6d52679e8dc9c6b5aacaf1533a Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Thu, 15 Aug 2024 20:34:57 +0200 Subject: [PATCH 2/5] Make oscillation less likely --- src/service.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/service.rs b/src/service.rs index c2fd25c..acba0db 100644 --- a/src/service.rs +++ b/src/service.rs @@ -64,6 +64,13 @@ use tide_disco::{method::ReadState, Url}; const INITIAL_MAX_BLOCK_SIZE: u64 = 100_000; // Never go lower than 10 kilobytes const MAX_BLOCK_SIZE_FLOOR: u64 = 10_000; +// When adjusting max block size, we it will be decremented or incremented +// by current value / [`MAX_BLOCK_SIZE_CHANGE_DIVISOR`] +const MAX_BLOCK_SIZE_CHANGE_DIVISOR: u64 = 10; +// We will not increment max block value if we aren't able to serve a response +// with a margin below [`ProxyGlobalState::max_api_waiting_time`] +// more than [`ProxyGlobalState::max_api_waiting_time`] / `VID_RESPONSE_TARGET_MARGIN_DIVISOR` +const VID_RESPONSE_TARGET_MARGIN_DIVISOR: u32 = 10; // It holds all the necessary information for a block #[derive(Debug)] @@ -614,11 +621,13 @@ where if Instant::now() >= timeout_after { tracing::warn!("Couldn't get vid commitment in time for block {id}",); { - // we can't keep up with this block size, reduce max block size by 10% + // we can't keep up with this block size, reduce max block size let mut global_state = self.global_state.write_arc().await; global_state.max_block_size = std::cmp::min( global_state.max_block_size - - global_state.max_block_size.div_ceil(10), + - global_state + .max_block_size + .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR), MAX_BLOCK_SIZE_FLOOR, ); } @@ -643,13 +652,19 @@ where tracing::info!("Got vid commitment for block {id}",); - // This block was truncated, but we got VID in time. + // This block was truncated, but we got VID in time with margin left. // Maybe we can handle bigger blocks? - if block_info.truncated { - // Increase max block size by 10% + if block_info.truncated + && timeout_after.duration_since(Instant::now()) + > self.max_api_waiting_time / VID_RESPONSE_TARGET_MARGIN_DIVISOR + { + // Increase max block size let mut global_state = self.global_state.write_arc().await; global_state.max_block_size = std::cmp::min( - global_state.max_block_size + global_state.max_block_size.div_ceil(10), + global_state.max_block_size + + global_state + .max_block_size + .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR), MAX_BLOCK_SIZE_FLOOR, ); } From 0bb247fc896a9f7512f2fb63bf7147e27a0b8e75 Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Fri, 16 Aug 2024 03:18:39 +0200 Subject: [PATCH 3/5] Comments --- src/builder_state.rs | 13 ++++++-- src/service.rs | 65 ++++++++++++++++++++++++++++----------- src/testing/basic_test.rs | 10 ++++-- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/src/builder_state.rs b/src/builder_state.rs index db6a848..23d1287 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -525,9 +525,16 @@ impl BuilderState { let actual_txn_count = payload.num_transactions(&metadata); // Payload is empty despite us checking that tx_queue isn't empty earlier. - // This indicates that the block was truncated due to sequencer block length - // limits and we have a transaction too big to ever be included in the head of - // our queue. We need to drop it and mark as "included" so that if we receive + // + // This means that the block was truncated due to *sequencer* block length + // limits, which are different from our `max_block_size`. There's no good way + // for us to check for this in advance, so we detect transactions too big for + // the sequencer indirectly, by observing that we passed some transactions + // to `>::from_transactions`, but + // it returned an empty block. + // Thus we deduce that the first transaction in our queue is too big to *ever* + // be included, because it alone goes over sequencer's block size limit. + // We need to drop it and mark as "included" so that if we receive // it again we don't even bother with it. if actual_txn_count == 0 { if let Some(txn) = self.tx_queue.pop_front() { diff --git a/src/service.rs b/src/service.rs index acba0db..d2ea211 100644 --- a/src/service.rs +++ b/src/service.rs @@ -255,7 +255,13 @@ impl GlobalState { &self, txns: Vec<::Transaction>, ) -> Vec::Transaction>, BuildError>> { - handle_received_txns(&self.tx_sender, txns, TransactionSource::External).await + handle_received_txns( + &self.tx_sender, + txns, + TransactionSource::External, + self.max_block_size, + ) + .await } pub fn get_channel_for_matching_builder_or_highest_view_buider( @@ -660,13 +666,10 @@ where { // Increase max block size let mut global_state = self.global_state.write_arc().await; - global_state.max_block_size = std::cmp::min( - global_state.max_block_size - + global_state - .max_block_size - .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR), - MAX_BLOCK_SIZE_FLOOR, - ); + global_state.max_block_size = global_state.max_block_size + + global_state + .max_block_size + .div_ceil(MAX_BLOCK_SIZE_CHANGE_DIVISOR); } if response_received.is_ok() { @@ -835,11 +838,11 @@ pub async fn run_non_permissioned_standalone_builder_service>, - // shared accumulated transactions handle - tx_sender: BroadcastSender>>, - // Url to (re)connect to for the events stream hotshot_events_api_url: Url, + + // Global state + global_state: Arc>>, ) -> Result<(), anyhow::Error> { // connection to the events stream let connected = connect_to_events_service::(hotshot_events_api_url.clone()).await; @@ -851,6 +854,8 @@ pub async fn run_non_permissioned_standalone_builder_service { - handle_received_txns(&tx_sender, transactions, TransactionSource::HotShot) - .await; + let max_block_size = global_state.read_arc().await.max_block_size; + handle_received_txns( + &tx_sender, + transactions, + TransactionSource::HotShot, + max_block_size, + ) + .await; } // decide event EventType::Decide { @@ -928,9 +939,6 @@ pub async fn run_permissioned_standalone_builder_service< I: NodeImplementation, V: Versions, >( - // sending received transactions - tx_sender: BroadcastSender>>, - // sending a DA proposal from the hotshot to the builder states da_sender: BroadcastSender>, @@ -942,8 +950,12 @@ pub async fn run_permissioned_standalone_builder_service< // hotshot context handle hotshot_handle: Arc>, + + // Global state + global_state: Arc>>, ) { let mut event_stream = hotshot_handle.event_stream(); + let tx_sender = global_state.read_arc().await.tx_sender.clone(); loop { tracing::debug!("Waiting for events from HotShot"); match event_stream.next().await { @@ -958,8 +970,14 @@ pub async fn run_permissioned_standalone_builder_service< } // tx event EventType::Transactions { transactions } => { - handle_received_txns(&tx_sender, transactions, TransactionSource::HotShot) - .await; + let max_block_size = global_state.read_arc().await.max_block_size; + handle_received_txns( + &tx_sender, + transactions, + TransactionSource::HotShot, + max_block_size, + ) + .await; } // decide event EventType::Decide { leaf_chain, .. } => { @@ -1107,12 +1125,23 @@ pub(crate) async fn handle_received_txns( tx_sender: &BroadcastSender>>, txns: Vec, source: TransactionSource, + max_txn_len: u64, ) -> Vec::Transaction>, BuildError>> { let mut results = Vec::with_capacity(txns.len()); let time_in = Instant::now(); for tx in txns.into_iter() { let commit = tx.commit(); + // This is a rough estimate, but we don't have any other way to get real + // encoded transaction length. Luckily, this being roughly proportional + // to encoded length is enough, because we only use this value to estimate + // our limitations on computing the VID in time. let len = bincode::serialized_size(&tx).unwrap_or_default(); + if len > max_txn_len { + results.push(Err(BuildError::Error { + message: format!("Transaction too big (estimated length {len}, currently accepting <= {max_txn_len})"), + })); + continue; + } let res = tx_sender .try_broadcast(Arc::new(ReceivedTransaction { tx, diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 28f6131..9ed2d7f 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -19,6 +19,7 @@ pub use async_broadcast::{ mod tests { use super::*; use std::collections::VecDeque; + use std::u64; use std::{hash::Hash, marker::PhantomData, num::NonZeroUsize}; use async_compatibility_layer::channel::unbounded; @@ -312,8 +313,13 @@ mod tests { // validate the signature before pushing the message to the builder_state channels // currently this step happens in the service.rs, wheneve we receiver an hotshot event tracing::debug!("Sending transaction message: {:?}", tx); - for res in - handle_received_txns(&tx_sender, vec![tx.clone()], TransactionSource::HotShot).await + for res in handle_received_txns( + &tx_sender, + vec![tx.clone()], + TransactionSource::HotShot, + u64::MAX, + ) + .await { res.unwrap(); } From e6a5b3dd3c042aa1e1fd694718f859e42da8749d Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Fri, 16 Aug 2024 03:24:28 +0200 Subject: [PATCH 4/5] Comments 2 --- src/builder_state.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/builder_state.rs b/src/builder_state.rs index 23d1287..3e68c52 100644 --- a/src/builder_state.rs +++ b/src/builder_state.rs @@ -500,14 +500,15 @@ impl BuilderState { let max_block_size = self.global_state.read_arc().await.max_block_size; let transactions_to_include = self.tx_queue.iter().scan(0, |total_size, tx| { - if *total_size >= max_block_size { + let prev_size = *total_size; + *total_size += tx.len; + // We will include one transaction over our target block length + // if it's the first transaction in queue, otherwise we'd have a possible failure + // state where a single transaction larger than target block state is stuck in + // queue and we just build empty blocks forever + if *total_size >= max_block_size && prev_size != 0 { None } else { - // This way we will include one transaction over our target block length. - // This is done on purpose, otherwise we'd have a possible failure state - // where a single transaction larger than target block state is stuck in - // queue and we just build empty blocks - *total_size += tx.len; Some(tx.tx.clone()) } }); From eb5fa6a61122a104ccf14fc3eac16d668d23e90f Mon Sep 17 00:00:00 2001 From: Artemii Gerasimovich Date: Fri, 16 Aug 2024 03:29:32 +0200 Subject: [PATCH 5/5] Clippy --- src/testing/basic_test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/testing/basic_test.rs b/src/testing/basic_test.rs index 9ed2d7f..1e2c955 100644 --- a/src/testing/basic_test.rs +++ b/src/testing/basic_test.rs @@ -19,7 +19,6 @@ pub use async_broadcast::{ mod tests { use super::*; use std::collections::VecDeque; - use std::u64; use std::{hash::Hash, marker::PhantomData, num::NonZeroUsize}; use async_compatibility_layer::channel::unbounded;