Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deserialize big block on rayon thread #3316

Open
wants to merge 5 commits into
base: staging
Choose a base branch
from
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,21 @@ impl<N: Network> Gateway<N> {
if let Some(sync_sender) = self.sync_sender.get() {
// Retrieve the block response.
let BlockResponse { request, blocks } = block_response;

// Perform the deferred non-blocking deserialization of the blocks.
let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?;
// The deserialization can take a long time (minutes). We should not be running
// this on a blocking task, but on a rayon thread pool.
let (send, recv) = tokio::sync::oneshot::channel();
rayon::spawn_fifo(move || {
let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}"));
let _ = send.send(blocks);
});
let blocks = match recv.await {
Ok(Ok(blocks)) => blocks,
Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"),
};

// Ensure the block response is well-formed.
blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?;
// Send the blocks to the sync module.
Expand Down Expand Up @@ -955,6 +968,22 @@ impl<N: Network> Gateway<N> {
}
}
}

/// Processes a message received from the network.
async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event<N>) {
// Process the message. Disconnect if the peer violated the protocol.
if let Err(error) = self.inbound(peer_addr, message).await {
if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
let self_ = self.clone();
tokio::spawn(async move {
Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
// Disconnect from this peer.
self_.disconnect(peer_ip);
});
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -1045,17 +1074,15 @@ impl<N: Network> Reading for Gateway<N> {

/// Processes a message received from the network.
async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> {
// Process the message. Disconnect if the peer violated the protocol.
if let Err(error) = self.inbound(peer_addr, message).await {
if let Some(peer_ip) = self.resolver.get_listener(peer_addr) {
warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}");
let self_ = self.clone();
tokio::spawn(async move {
Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await;
// Disconnect from this peer.
self_.disconnect(peer_ip);
});
}
if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) {
let self_ = self.clone();
// Handle BlockRequest and BlockResponse messages in a separate task to not block the
// inbound queue.
tokio::spawn(async move {
let _ = self_.process_message_inner(peer_addr, message).await;
elderhammer marked this conversation as resolved.
Show resolved Hide resolved
});
} else {
self.process_message_inner(peer_addr, message).await;
}
Ok(())
}
Expand Down