Skip to content

Commit

Permalink
Remove data request-response code. (#3747)
Browse files Browse the repository at this point in the history
Follow-up to #3740 which removes the remaining bits of data
request-response code.
  • Loading branch information
twittner authored Oct 11, 2024
1 parent 8632855 commit df63ec3
Show file tree
Hide file tree
Showing 9 changed files with 7 additions and 214 deletions.
9 changes: 1 addition & 8 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use hotshot_types::{
constants::LOOK_AHEAD,
data::ViewNumber,
network::NetworkConfig,
request_response::Request,
traits::{
election::Membership,
metrics::{Counter, Gauge, Metrics, NoMetrics},
Expand Down Expand Up @@ -738,9 +737,6 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
NetworkEvent::IsBootstrapped => {
error!("handle_recvd_events received `NetworkEvent::IsBootstrapped`, which should be impossible.");
}
NetworkEvent::ResponseRequested(..) => {
error!("received unexpected `NetworkEvent::ResponseRequested`");
}
NetworkEvent::ConnectedPeersUpdate(_) => {}
}
Ok::<(), NetworkError>(())
Expand Down Expand Up @@ -773,10 +769,7 @@ impl<K: SignatureKey + 'static> Libp2pNetwork<K> {
NetworkEvent::IsBootstrapped => {
is_bootstrapped.store(true, Ordering::Relaxed);
}
GossipMsg(_)
| DirectRequest(_, _, _)
| DirectResponse(_, _)
| NetworkEvent::ResponseRequested(Request(_), _) => {
GossipMsg(_) | DirectRequest(_, _, _) | DirectResponse(_, _) => {
let _ = handle.handle_recvd_events(message, &sender).await;
}
NetworkEvent::ConnectedPeersUpdate(num_peers) => {
Expand Down
3 changes: 0 additions & 3 deletions crates/libp2p-networking/src/network/behaviours/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,3 @@ pub mod exponential_backoff;

/// Wrapper around Kademlia
pub mod dht;

/// Request Response Handling for data requests
pub mod request_response;

This file was deleted.

17 changes: 1 addition & 16 deletions crates/libp2p-networking/src/network/def.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
// You should have received a copy of the MIT License
// along with the HotShot repository. If not, see <https://mit-license.org/>.

use hotshot_types::{
request_response::{Request, Response},
traits::signature_key::SignatureKey,
};
use hotshot_types::traits::signature_key::SignatureKey;
use libp2p::{
autonat,
gossipsub::{Behaviour as GossipBehaviour, Event as GossipEvent, IdentTopic},
Expand Down Expand Up @@ -50,10 +47,6 @@ pub struct NetworkDef<K: SignatureKey + 'static> {
#[debug(skip)]
pub direct_message: libp2p::request_response::cbor::Behaviour<Vec<u8>, Vec<u8>>,

/// Behaviour for requesting and receiving data
#[debug(skip)]
pub request_response: libp2p::request_response::cbor::Behaviour<Request, Response>,

/// Auto NAT behaviour to determine if we are publically reachable and
/// by which address
#[debug(skip)]
Expand All @@ -68,15 +61,13 @@ impl<K: SignatureKey + 'static> NetworkDef<K> {
dht: libp2p::kad::Behaviour<ValidatedStore<MemoryStore, K>>,
identify: IdentifyBehaviour,
direct_message: cbor::Behaviour<Vec<u8>, Vec<u8>>,
request_response: cbor::Behaviour<Request, Response>,
autonat: autonat::Behaviour,
) -> NetworkDef<K> {
Self {
gossipsub,
dht,
identify,
direct_message,
request_response,
autonat,
}
}
Expand Down Expand Up @@ -155,12 +146,6 @@ impl From<libp2p::request_response::Event<Vec<u8>, Vec<u8>>> for NetworkEventInt
}
}

impl From<libp2p::request_response::Event<Request, Response>> for NetworkEventInternal {
fn from(event: libp2p::request_response::Event<Request, Response>) -> Self {
Self::RequestResponseEvent(event)
}
}

impl From<libp2p::autonat::Event> for NetworkEventInternal {
fn from(event: libp2p::autonat::Event) -> Self {
Self::AutonatEvent(event)
Expand Down
27 changes: 2 additions & 25 deletions crates/libp2p-networking/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ pub mod transport;

use std::{collections::HashSet, fmt::Debug};

use futures::channel::oneshot::{self, Sender};
use hotshot_types::{
request_response::{Request, Response},
traits::{network::NetworkError, signature_key::SignatureKey},
};
use futures::channel::oneshot::Sender;
use hotshot_types::traits::{network::NetworkError, signature_key::SignatureKey};
#[cfg(async_executor_impl = "async-std")]
use libp2p::dns::async_std::Transport as DnsTransport;
#[cfg(async_executor_impl = "tokio")]
Expand Down Expand Up @@ -77,22 +74,6 @@ pub enum ClientRequest {
},
/// client request to send a direct reply to a message
DirectResponse(ResponseChannel<Vec<u8>>, Vec<u8>),
/// request for data from another peer
DataRequest {
/// request sent on wire
request: Request,
/// Peer to try sending the request to
peer: PeerId,
/// Send back request ID to client
chan: oneshot::Sender<Option<Response>>,
},
/// Respond with some data to another peer
DataResponse {
/// Data
response: Response,
/// Send back channel
chan: ResponseChannel<Response>,
},
/// prune a peer
Prune(PeerId),
/// add vec of known peers or addresses
Expand Down Expand Up @@ -139,8 +120,6 @@ pub enum NetworkEvent {
DirectRequest(Vec<u8>, PeerId, ResponseChannel<Vec<u8>>),
/// Recv-ed a direct response from a node (that hopefully was initiated by this node)
DirectResponse(Vec<u8>, PeerId),
/// A peer is asking us for data
ResponseRequested(Request, ResponseChannel<Response>),
/// Report that kademlia has successfully bootstrapped into the network
IsBootstrapped,
/// The number of connected peers has possibly changed
Expand All @@ -160,8 +139,6 @@ pub enum NetworkEventInternal {
GossipEvent(Box<GossipEvent>),
/// a direct message event
DMEvent(libp2p::request_response::Event<Vec<u8>, Vec<u8>>),
/// a request response event
RequestResponseEvent(libp2p::request_response::Event<Request, Response>),
/// a autonat event
AutonatEvent(libp2p::autonat::Event),
}
Expand Down
38 changes: 1 addition & 37 deletions crates/libp2p-networking/src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use async_compatibility_layer::{
};
use futures::{channel::mpsc, select, FutureExt, SinkExt, StreamExt};
use hotshot_types::{
constants::KAD_DEFAULT_REPUB_INTERVAL_SEC,
request_response::{Request, Response},
traits::signature_key::SignatureKey,
constants::KAD_DEFAULT_REPUB_INTERVAL_SEC, traits::signature_key::SignatureKey,
};
use libp2p::{
autonat,
Expand Down Expand Up @@ -71,7 +69,6 @@ use crate::network::behaviours::{
dht::{DHTBehaviour, DHTProgress, KadPutQuery, NUM_REPLICATED_TO_TRUST},
direct_message::{DMBehaviour, DMRequest},
exponential_backoff::ExponentialBackoff,
request_response::RequestResponseState,
};

/// Maximum size of a message
Expand All @@ -97,8 +94,6 @@ pub struct NetworkNode<K: SignatureKey + 'static> {
config: NetworkNodeConfig<K>,
/// the listener id we are listening on, if it exists
listener_id: Option<ListenerId>,
/// Handler for requests and response behavior events.
request_response_state: RequestResponseState,
/// Handler for direct messages
direct_message_state: DMBehaviour,
/// Handler for DHT Events
Expand Down Expand Up @@ -271,15 +266,6 @@ impl<K: SignatureKey + 'static> NetworkNode<K> {
.into_iter(),
rrconfig.clone(),
);
let request_response: libp2p::request_response::cbor::Behaviour<Request, Response> =
RequestResponse::new(
[(
StreamProtocol::new("/HotShot/request_response/1.0"),
ProtocolSupport::Full,
)]
.into_iter(),
rrconfig.clone(),
);

let autonat_config = autonat::Config {
only_global_ips: false,
Expand All @@ -291,7 +277,6 @@ impl<K: SignatureKey + 'static> NetworkNode<K> {
kadem,
identify,
direct_message,
request_response,
autonat::Behaviour::new(peer_id, autonat_config),
);

Expand Down Expand Up @@ -321,7 +306,6 @@ impl<K: SignatureKey + 'static> NetworkNode<K> {
swarm,
config: config.clone(),
listener_id: None,
request_response_state: RequestResponseState::default(),
direct_message_state: DMBehaviour::default(),
dht_handler: DHTBehaviour::new(
peer_id,
Expand Down Expand Up @@ -480,23 +464,6 @@ impl<K: SignatureKey + 'static> NetworkNode<K> {
ClientRequest::DirectResponse(chan, msg) => {
behaviour.add_direct_response(chan, msg);
}
ClientRequest::DataRequest {
request,
peer,
chan,
} => {
let id = behaviour.request_response.send_request(&peer, request);
self.request_response_state.add_request(id, chan);
}
ClientRequest::DataResponse { response, chan } => {
if behaviour
.request_response
.send_response(chan, response)
.is_err()
{
debug!("Data response dropped because client is no longer connected");
}
}
ClientRequest::AddKnownPeers(peers) => {
self.add_known_peers(&peers);
}
Expand Down Expand Up @@ -655,9 +622,6 @@ impl<K: SignatureKey + 'static> NetworkNode<K> {
NetworkEventInternal::DMEvent(e) => self
.direct_message_state
.handle_dm_event(e, self.resend_tx.clone()),
NetworkEventInternal::RequestResponseEvent(e) => {
self.request_response_state.handle_request_response(e)
}
NetworkEventInternal::AutonatEvent(e) => {
match e {
autonat::Event::InboundProbe(_) => {}
Expand Down
45 changes: 1 addition & 44 deletions crates/libp2p-networking/src/network/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ use async_compatibility_layer::{
art::{async_sleep, async_timeout},
channel::{Receiver, UnboundedReceiver, UnboundedSender},
};
use futures::channel::oneshot;
use hotshot_types::{
request_response::{Request, Response},
traits::{network::NetworkError, signature_key::SignatureKey},
};
use hotshot_types::traits::{network::NetworkError, signature_key::SignatureKey};
use libp2p::{request_response::ResponseChannel, Multiaddr};
use libp2p_identity::PeerId;
use tracing::{debug, info, instrument};
Expand Down Expand Up @@ -184,45 +180,6 @@ impl<K: SignatureKey + 'static> NetworkNodeHandle<K> {
Ok(())
}

/// Request another peer for some data we want. Returns the id of the request
///
/// # Errors
///
/// Will return a networking error if the channel closes before the result
/// can be sent back
pub async fn request_data(
&self,
request: &[u8],
peer: PeerId,
) -> Result<Option<Response>, NetworkError> {
let (tx, rx) = oneshot::channel();
let req = ClientRequest::DataRequest {
request: Request(request.to_vec()),
peer,
chan: tx,
};

self.send_request(req).await?;

rx.await
.map_err(|e| NetworkError::ChannelReceiveError(e.to_string()))
}

/// Send a response to a request with the response channel
/// # Errors
/// Will error if the client request channel is closed, or serialization fails.
pub async fn respond_data(
&self,
response: Vec<u8>,
chan: ResponseChannel<Response>,
) -> Result<(), NetworkError> {
let req = ClientRequest::DataResponse {
response: Response(response),
chan,
};
self.send_request(req).await
}

/// Look up a peer's addresses in kademlia
/// NOTE: this should always be called before any `request_response` is initiated
/// # Errors
Expand Down
4 changes: 1 addition & 3 deletions crates/libp2p-networking/tests/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ pub async fn counter_handle_network_event<K: SignatureKey + 'static>(
use CounterMessage::*;
use NetworkEvent::*;
match event {
IsBootstrapped
| NetworkEvent::ResponseRequested(..)
| NetworkEvent::ConnectedPeersUpdate(..) => {}
IsBootstrapped | NetworkEvent::ConnectedPeersUpdate(..) => {}
GossipMsg(m) | DirectResponse(m, _) => {
if let Ok(msg) = bincode::deserialize::<CounterMessage>(&m) {
match msg {
Expand Down
12 changes: 0 additions & 12 deletions crates/types/src/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,6 @@ use serde::{Deserialize, Serialize};

use crate::traits::{node_implementation::NodeType, signature_key::SignatureKey};

/// Request for Consenus data
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request(#[serde(with = "serde_bytes")] pub Vec<u8>);

/// Response for some VID data that we already collected
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Response(
/// Data
#[serde(with = "serde_bytes")]
pub Vec<u8>,
);

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
/// A signed request for a proposal.
pub struct ProposalRequestPayload<TYPES: NodeType> {
Expand Down

0 comments on commit df63ec3

Please sign in to comment.