Skip to content

Commit

Permalink
refactor: add bundle module
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobkaufmann committed Aug 29, 2023
1 parent 0983198 commit 328fca5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 60 deletions.
62 changes: 2 additions & 60 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::future::Future;
use std::matches;
use std::ops::RangeInclusive;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crate::bundle::{pool::BundlePool, Bundle, BundleCompact, BundleId};

use ethers::{
signers::{LocalWallet, Signer},
types::{
Expand Down Expand Up @@ -52,65 +53,6 @@ use tokio::{
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tokio_util::time::DelayQueue;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct BundleCompact(Vec<TransactionSignedEcRecovered>);

impl BundleCompact {
/// returns whether `self` conflicts with `other` in the sense that both cannot be executed
pub fn conflicts(&self, other: &Self) -> bool {
let hashes = self
.0
.iter()
.map(|tx| tx.hash_ref())
.collect::<HashSet<_>>();
let other_hashes = other
.0
.iter()
.map(|tx| tx.hash_ref())
.collect::<HashSet<_>>();
!hashes.is_disjoint(&other_hashes)
}
}

type BundleId = u64;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct Bundle {
pub id: BundleId,
pub txs: Vec<TransactionSignedEcRecovered>,
pub block_num: BlockNumber,
pub eligibility: RangeInclusive<u64>,
}

#[derive(Default)]
pub struct BundlePool(HashSet<Bundle>);

impl BundlePool {
/// returns all bundles eligible w.r.t. time `now` and canonical chain tip `block`
pub fn eligible(&self, block: BlockNumber, now: SystemTime) -> Vec<Bundle> {
let now = now.duration_since(UNIX_EPOCH).unwrap().as_secs();
self.0
.iter()
.filter(|bundle| bundle.eligibility.contains(&now) && bundle.block_num == block)
.cloned()
.collect()
}

/// removes all bundles whose eligibility expires w.r.t. time `now`
pub fn tick(&mut self, now: SystemTime) {
let now = now.duration_since(UNIX_EPOCH).unwrap().as_secs();
self.0.retain(|bundle| *bundle.eligibility.end() >= now);
}

/// maintains the pool based on updates to the canonical state.
///
/// returns the IDs of the bundles removed from the pool.
pub fn maintain(&mut self, _event: CanonStateNotification) -> Vec<BundleId> {
// remove all bundles
self.0.drain().map(|bundle| bundle.id).collect()
}
}

struct UnpackagedPayload<S: StateProvider> {
attributes: PayloadBuilderAttributes,
block_env: BlockEnv,
Expand Down
36 changes: 36 additions & 0 deletions src/bundle/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::collections::HashSet;
use std::ops::RangeInclusive;

use reth_primitives::{BlockNumber, TransactionSignedEcRecovered};

pub mod pool;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub(crate) struct BundleCompact(pub Vec<TransactionSignedEcRecovered>);

impl BundleCompact {
/// returns whether `self` conflicts with `other` in the sense that both cannot be executed
pub fn conflicts(&self, other: &Self) -> bool {
let hashes = self
.0
.iter()
.map(|tx| tx.hash_ref())
.collect::<HashSet<_>>();
let other_hashes = other
.0
.iter()
.map(|tx| tx.hash_ref())
.collect::<HashSet<_>>();
!hashes.is_disjoint(&other_hashes)
}
}

pub type BundleId = u64;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct Bundle {
pub id: BundleId,
pub txs: Vec<TransactionSignedEcRecovered>,
pub block_num: BlockNumber,
pub eligibility: RangeInclusive<u64>,
}
36 changes: 36 additions & 0 deletions src/bundle/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};

use super::{Bundle, BundleId};

use reth_primitives::BlockNumber;
use reth_provider::CanonStateNotification;

#[derive(Default)]
pub struct BundlePool(pub(crate) HashSet<Bundle>);

impl BundlePool {
/// returns all bundles eligible w.r.t. time `now` and canonical chain tip `block`
pub fn eligible(&self, block: BlockNumber, now: SystemTime) -> Vec<Bundle> {
let now = now.duration_since(UNIX_EPOCH).unwrap().as_secs();
self.0
.iter()
.filter(|bundle| bundle.eligibility.contains(&now) && bundle.block_num == block)
.cloned()
.collect()
}

/// removes all bundles whose eligibility expires w.r.t. time `now`
pub fn tick(&mut self, now: SystemTime) {
let now = now.duration_since(UNIX_EPOCH).unwrap().as_secs();
self.0.retain(|bundle| *bundle.eligibility.end() >= now);
}

/// maintains the pool based on updates to the canonical state.
///
/// returns the IDs of the bundles removed from the pool.
pub fn maintain(&mut self, _event: CanonStateNotification) -> Vec<BundleId> {
// remove all bundles
self.0.drain().map(|bundle| bundle.id).collect()
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod builder;
pub mod bundle;

0 comments on commit 328fca5

Please sign in to comment.