Skip to content

Commit

Permalink
fix: cache block requests in gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
niklaslong committed Jul 25, 2024
1 parent 878624d commit 59777bd
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
20 changes: 18 additions & 2 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,11 @@ impl<N: Network> Gateway<N> {
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
} else if matches!(&event, &Event::BlockRequest(_)) {
let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(());
}
}
trace!("{CONTEXT} Received '{}' from '{peer_ip}'", event.name());

Expand Down Expand Up @@ -631,6 +636,10 @@ impl<N: Network> Gateway<N> {
if let Some(sync_sender) = self.sync_sender.get() {
// Retrieve the block response.
let BlockResponse { request, blocks } = block_response;
// Check the response corresponds to a request.
if !self.cache.remove_outbound_block_request(peer_ip, &request) {
bail!("Unsolicited block response from '{peer_ip}'")
}
// Perform the deferred non-blocking deserialization of the blocks.
let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?;
// Ensure the block response is well-formed.
Expand Down Expand Up @@ -980,18 +989,24 @@ impl<N: Network> Transport<N> for Gateway<N> {
}

// If the event type is a certificate request, increment the cache.
if matches!(event, Event::CertificateRequest(_)) | matches!(event, Event::CertificateResponse(_)) {

if matches!(event, Event::CertificateRequest(_) | Event::CertificateResponse(_)) {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_certificate, CACHE_REQUESTS_INTERVAL, max_cache_certificates)
}
// If the event type is a transmission request, increment the cache.
else if matches!(event, Event::TransmissionRequest(_)) | matches!(event, Event::TransmissionResponse(_)) {
else if matches!(event, Event::TransmissionRequest(_) | Event::TransmissionResponse(_)) {
// Update the outbound event cache. This is necessary to ensure we don't under count the outbound events.
self.cache.insert_outbound_event(peer_ip, CACHE_EVENTS_INTERVAL);
// Send the event to the peer.
send!(self, insert_outbound_transmission, CACHE_REQUESTS_INTERVAL, max_cache_transmissions)
} else if let Event::BlockRequest(request) = event {
// Insert the outbound request so we can match it to responses.
self.cache.insert_outbound_block_request(peer_ip, request);
// Send the event to the peer and updatet the outbound event cache, use the general rate limit.
send!(self, insert_outbound_event, CACHE_EVENTS_INTERVAL, max_cache_events)
}
// Otherwise, employ a general rate limit.
else {
Expand Down Expand Up @@ -1090,6 +1105,7 @@ impl<N: Network> Disconnect for Gateway<N> {
// This is sufficient to avoid infinite growth as the committee has a fixed number
// of members.
self.cache.clear_outbound_validators_requests(peer_ip);
self.cache.clear_outbound_block_requests(peer_ip);
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion node/bft/src/helpers/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::events::BlockRequest;
use snarkvm::{console::types::Field, ledger::narwhal::TransmissionID, prelude::Network};

use core::hash::Hash;
use parking_lot::RwLock;
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
net::{IpAddr, SocketAddr},
};
use time::OffsetDateTime;
Expand All @@ -32,6 +33,8 @@ pub struct Cache<N: Network> {
seen_inbound_certificates: RwLock<BTreeMap<i64, HashMap<Field<N>, u32>>>,
/// The ordered timestamp map of transmission IDs and cache hits.
seen_inbound_transmissions: RwLock<BTreeMap<i64, HashMap<TransmissionID<N>, u32>>>,
/// The ordered timestamp map of inbound block requests and cache hits.
seen_inbound_block_requests: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The ordered timestamp map of peer IPs and their cache hits on outbound events.
seen_outbound_events: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The ordered timestamp map of peer IPs and their cache hits on certificate requests.
Expand All @@ -40,6 +43,8 @@ pub struct Cache<N: Network> {
seen_outbound_transmissions: RwLock<BTreeMap<i64, HashMap<SocketAddr, u32>>>,
/// The map of IPs to the number of validators requests.
seen_outbound_validators_requests: RwLock<HashMap<SocketAddr, u32>>,
/// The ordered timestamp map of outbound block requests and cache hits.
seen_outbound_block_requests: RwLock<HashMap<SocketAddr, HashSet<BlockRequest>>>,
}

impl<N: Network> Default for Cache<N> {
Expand All @@ -57,10 +62,12 @@ impl<N: Network> Cache<N> {
seen_inbound_events: Default::default(),
seen_inbound_certificates: Default::default(),
seen_inbound_transmissions: Default::default(),
seen_inbound_block_requests: Default::default(),
seen_outbound_events: Default::default(),
seen_outbound_certificates: Default::default(),
seen_outbound_transmissions: Default::default(),
seen_outbound_validators_requests: Default::default(),
seen_outbound_block_requests: Default::default(),
}
}
}
Expand All @@ -85,6 +92,11 @@ impl<N: Network> Cache<N> {
pub fn insert_inbound_transmission(&self, key: TransmissionID<N>, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_transmissions, key, interval_in_secs)
}

/// Inserts a block request into the cache, returning the number of recent events.
pub fn insert_inbound_block_request(&self, key: SocketAddr, interval_in_secs: i64) -> usize {
Self::retain_and_insert(&self.seen_inbound_block_requests, key, interval_in_secs)
}
}

impl<N: Network> Cache<N> {
Expand Down Expand Up @@ -124,6 +136,25 @@ impl<N: Network> Cache<N> {
pub fn clear_outbound_validators_requests(&self, peer_ip: SocketAddr) {
self.seen_outbound_validators_requests.write().remove(&peer_ip);
}

/// Inserts the block request for the given peer.
pub fn insert_outbound_block_request(&self, peer_ip: SocketAddr, request: BlockRequest) {
self.seen_outbound_block_requests.write().entry(peer_ip).or_default().insert(request);
}

// Removes the block request for the given peer. Returns whether the request was present.
pub fn remove_outbound_block_request(&self, peer_ip: SocketAddr, request: &BlockRequest) -> bool {
self.seen_outbound_block_requests
.write()
.get_mut(&peer_ip)
.map(|requests| requests.remove(request))
.unwrap_or(false)
}

/// Clears the IP's number of outbound block requests.
pub fn clear_outbound_block_requests(&self, peer_ip: SocketAddr) {
self.seen_outbound_block_requests.write().remove(&peer_ip);
}
}

impl<N: Network> Cache<N> {
Expand Down

0 comments on commit 59777bd

Please sign in to comment.