Skip to content

Commit

Permalink
refactor: convert broadcast receiver into stream (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobkaufmann authored Aug 1, 2023
1 parent 75a29c2 commit d8f0ebf
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 25 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ reth-provider = { git = "https://github.com/paradigmxyz/reth.git", package = "re
reth-revm = { git = "https://github.com/paradigmxyz/reth.git", package = "reth-revm" }
reth-revm-primitives = { git = "https://github.com/paradigmxyz/reth.git", package = "reth-revm-primitives" }
tokio = "1.29.1"
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio-util = { version = "0.7.8", features = ["time"] }

[patch.crates-io]
Expand Down
57 changes: 32 additions & 25 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use futures_util::{FutureExt, StreamExt};
use futures_util::{stream::Fuse, FutureExt, Stream, StreamExt};
use reth_interfaces::Error as RethError;
use reth_payload_builder::{
database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive,
Expand Down Expand Up @@ -34,6 +34,7 @@ use tokio::{
sync::{broadcast, mpsc, oneshot},
task,
};
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
use tokio_util::time::DelayQueue;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -111,8 +112,8 @@ pub struct Job<Client> {
config: JobConfig,
client: Arc<Client>,
bundles: HashMap<BundleId, BundleCompact>,
incoming: broadcast::Receiver<(BundleId, BlockNumber, BundleCompact)>,
invalidated: broadcast::Receiver<BundleId>,
incoming: Fuse<BroadcastStream<(BundleId, BlockNumber, BundleCompact)>>,
invalidated: Fuse<BroadcastStream<BundleId>>,
built_payloads: Vec<Payload>,
pending_payloads: VecDeque<oneshot::Receiver<Result<Payload, PayloadBuilderError>>>,
}
Expand All @@ -122,8 +123,8 @@ impl<Client> Job<Client> {
config: JobConfig,
client: Arc<Client>,
bundles: I,
incoming: broadcast::Receiver<(BundleId, BlockNumber, BundleCompact)>,
invalidated: broadcast::Receiver<BundleId>,
incoming: Fuse<BroadcastStream<(BundleId, BlockNumber, BundleCompact)>>,
invalidated: Fuse<BroadcastStream<BundleId>>,
) -> Self {
let bundles = bundles
.map(|bundle| (bundle.id, BundleCompact(bundle.txs)))
Expand Down Expand Up @@ -301,30 +302,36 @@ where
let this = self.get_mut();

// incorporate new incoming bundles
//
// TODO: handle `TryRecvError::Lagged`
let mut num_incoming_bundles = 0;
let incoming = this.incoming.recv();
tokio::pin!(incoming);
while let Poll::Ready(Ok((id, block_num, bundle))) = incoming.as_mut().poll(cx) {
// if the bundle is not eligible for the job, then skip the bundle
if block_num != this.config.parent.number + 1 {
continue;
}
let mut incoming = Pin::new(&mut this.incoming);
loop {
match incoming.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok((id, block_num, bundle)))) => {
// if the bundle is not eligible for the job, then skip the bundle
if block_num != this.config.parent.number + 1 {
continue;
}

this.bundles.insert(id, bundle);
num_incoming_bundles += 1;
this.bundles.insert(id, bundle);
num_incoming_bundles += 1;
}
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_skipped)))) => continue,
Poll::Ready(None) | Poll::Pending => break,
}
}

// remove any invalidated bundles
//
// TODO: handle `TryRecvError::Lagged`
let mut expired_bundles = HashSet::new();
let invalidated = this.invalidated.recv();
tokio::pin!(invalidated);
while let Poll::Ready(Ok(exp)) = invalidated.as_mut().poll(cx) {
this.bundles.remove(&exp);
expired_bundles.insert(exp);
let mut invalidated = Pin::new(&mut this.invalidated);
loop {
match invalidated.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(exp))) => {
this.bundles.remove(&exp);
expired_bundles.insert(exp);
}
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(_skipped)))) => continue,
Poll::Ready(None) | Poll::Pending => break,
}
}

// remove all payloads that contain an expired bundle
Expand Down Expand Up @@ -590,8 +597,8 @@ where
.unwrap()
.eligible(config.parent.number, SystemTime::now());

let incoming = self.incoming.subscribe();
let invalidated = self.invalidated.subscribe();
let incoming = BroadcastStream::new(self.incoming.subscribe()).fuse();
let invalidated = BroadcastStream::new(self.invalidated.subscribe()).fuse();

Ok(Job::new(
config,
Expand Down

0 comments on commit d8f0ebf

Please sign in to comment.