Skip to content

Commit

Permalink
H-2837: harpc: wait to acquire address on listen (#5432)
Browse files Browse the repository at this point in the history
  • Loading branch information
indietyp authored Oct 21, 2024
1 parent f55a644 commit df1873a
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 30 deletions.
10 changes: 0 additions & 10 deletions libs/@local/harpc/net/src/session/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,6 @@ where
.await
.expect("should be able to listen on TCP");

// Give the swarm some time to acquire the external address
// This is necessary for CI, as otherwise the tests are a bit flaky.
// TODO: Implement waiting for server to be ready
// see https://linear.app/hash/issue/H-2837
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

let address = server_ipc
.external_addresses()
.await
Expand Down Expand Up @@ -308,10 +302,6 @@ async fn echo_concurrent<T>(
.await
.expect("should be able to listen on TCP");

// Give the swarm some time to acquire the external address
// This is necessary for CI, as otherwise the tests are a bit flaky.
// TODO: `listen_on` should wait until the transport layer has acquired said address.
// see https://linear.app/hash/issue/H-2837
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

let address = server_ipc
Expand Down
4 changes: 2 additions & 2 deletions libs/@local/harpc/net/src/transport/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use error_stack::{Result, ResultExt};
use libp2p::{Multiaddr, PeerId, core::transport::ListenerId};
use libp2p::{Multiaddr, PeerId};
use libp2p_stream::Control;
use tokio::sync::{mpsc, oneshot};

Expand Down Expand Up @@ -39,7 +39,7 @@ impl TransportLayerIpc {
.change_context(IpcError::Swarm)
}

pub(super) async fn listen_on(&self, address: Multiaddr) -> Result<ListenerId, IpcError> {
pub(super) async fn listen_on(&self, address: Multiaddr) -> Result<Multiaddr, IpcError> {
let (tx, rx) = oneshot::channel();

self.tx
Expand Down
5 changes: 2 additions & 3 deletions libs/@local/harpc/net/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use alloc::sync::Arc;
use error_stack::{Result, ResultExt};
use futures::stream::StreamExt;
use libp2p::{
Multiaddr, PeerId, StreamProtocol, core::transport::ListenerId, metrics,
tcp::tokio::Transport as TokioTcpTransport,
Multiaddr, PeerId, StreamProtocol, metrics, tcp::tokio::Transport as TokioTcpTransport,
};
use libp2p_core::transport::MemoryTransport;
use tokio::io::BufStream;
Expand Down Expand Up @@ -183,7 +182,7 @@ impl TransportLayer {
///
/// If the background task cannot be reached, crashes while processing the request, or the
/// multiaddr is not supported by the transport.
pub async fn listen_on(&self, address: Multiaddr) -> Result<ListenerId, TransportError> {
pub async fn listen_on(&self, address: Multiaddr) -> Result<Multiaddr, TransportError> {
self.ipc
.listen_on(address)
.await
Expand Down
81 changes: 66 additions & 15 deletions libs/@local/harpc/net/src/transport/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use super::{
};

type SenderPeerId = oneshot::Sender<core::result::Result<PeerId, DialError>>;
type SenderListenerId =
oneshot::Sender<core::result::Result<ListenerId, libp2p::TransportError<io::Error>>>;
type SenderListenOn =
oneshot::Sender<core::result::Result<Multiaddr, libp2p::TransportError<io::Error>>>;

pub(crate) enum Command {
IssueControl {
Expand All @@ -45,7 +45,7 @@ pub(crate) enum Command {
},
ListenOn {
address: Multiaddr,
tx: SenderListenerId,
tx: SenderListenOn,
},
ExternalAddresses {
tx: oneshot::Sender<Vec<Multiaddr>>,
Expand All @@ -63,9 +63,11 @@ pub(crate) struct TransportTask {
ipc: TransportLayerIpc,

peers: HashMap<Multiaddr, PeerId>,

peers_waiting: HashMap<Multiaddr, Vec<SenderPeerId>>,
peers_address_lookup: HashMap<ConnectionId, Multiaddr>,

listeners: HashMap<ListenerId, Vec<Multiaddr>>,
listeners_waiting: HashMap<ListenerId, Vec<SenderListenOn>>,
}

impl TransportTask {
Expand Down Expand Up @@ -108,10 +110,10 @@ impl TransportTask {
// 3-10% slower than using yamux.
//
// Another alternative would be using QUIC, this has a massive performance penalty
// of ~50% as well as is unable to be used with js as `nodejs` does not support QUIC
// yet.
// of ~50% as well as is unable to be used with JavaScript as `nodejs` does not
// support QUIC yet.
//
// As a compromise we're using `yamux 0.12` in `WindowUpdateMode::OnReceive` mode
// As a compromise we're using `yamux 0.12` in `WindowUpdateMode::OnReceive` mode
// with a buffer that is 16x higher than the default (as default) with a value of
// 16MiB.
let yamux: yamux::Config = config.yamux.into();
Expand Down Expand Up @@ -158,6 +160,9 @@ impl TransportTask {
peers: HashMap::new(),
peers_waiting: HashMap::new(),
peers_address_lookup: HashMap::new(),

listeners: HashMap::new(),
listeners_waiting: HashMap::new(),
})
}

Expand Down Expand Up @@ -206,19 +211,43 @@ impl TransportTask {
}
}

fn handle_listen_on(&mut self, address: Multiaddr, tx: SenderListenOn) {
tracing::debug!(%address, "starting to listening on address");

match self.swarm.listen_on(address) {
Ok(id) => {
if let Some(addresses) = self.listeners.get(&id) {
let address = addresses[0].clone();

Self::send_ipc_response(tx, Ok(address));
} else {
let entry = self.listeners_waiting.entry(id);

match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().push(tx);
}
Entry::Vacant(entry) => {
entry.insert(vec![tx]);
}
}
}
}
Err(error) => {
Self::send_ipc_response(tx, Err(error));
}
}
}

fn handle_command(&mut self, command: Command) {
match command {
Command::IssueControl { tx } => {
let control = self.swarm.behaviour().stream.new_control();

Self::send_ipc_response(tx, control);
}
Command::LookupPeer { address: addr, tx } => self.handle_dial(addr, tx),
Command::ListenOn { address, tx } => {
let result = self.swarm.listen_on(address);

Self::send_ipc_response(tx, result);
}
Command::LookupPeer { address, tx } => self.handle_dial(address, tx),
Command::ListenOn { address, tx } => self.handle_listen_on(address, tx),
Command::ExternalAddresses { tx } => {
let addresses = self.swarm.listeners().cloned().collect();

Expand Down Expand Up @@ -269,6 +298,25 @@ impl TransportTask {
}
}

fn handle_new_listen_addr(&mut self, address: Multiaddr, listener_id: ListenerId) {
tracing::info!(%address, "listening on address");

if let Some(senders) = self.listeners_waiting.remove(&listener_id) {
for tx in senders {
Self::send_ipc_response(tx, Ok(address.clone()));
}
}

match self.listeners.entry(listener_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(address);
}
Entry::Vacant(entry) => {
entry.insert(vec![address]);
}
}
}

fn handle_event(&mut self, event: SwarmEvent<TransportBehaviourEvent>) {
tracing::debug!(?event, "received swarm event");

Expand All @@ -283,8 +331,11 @@ impl TransportTask {
}

match event {
SwarmEvent::NewListenAddr { address, .. } => {
tracing::info!(%address, "listening on address");
SwarmEvent::NewListenAddr {
address,
listener_id,
} => {
self.handle_new_listen_addr(address, listener_id);
}
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
self.handle_new_external_address_of_peer(peer_id, address);
Expand Down
46 changes: 46 additions & 0 deletions libs/@local/harpc/net/src/transport/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ use harpc_wire_protocol::{
};
use libp2p::{
Multiaddr, TransportError, core::transport::MemoryTransport, multiaddr, swarm::DialError,
tcp::tokio::Transport,
};
use libp2p_stream::OpenStreamError;
use multiaddr::multiaddr;
use tokio_util::sync::CancellationToken;

use super::{TransportConfig, TransportLayer};
Expand Down Expand Up @@ -80,6 +82,17 @@ pub(crate) fn layer() -> (TransportLayer, impl Drop) {
(layer, cancel.drop_guard())
}

pub(crate) fn layer_tcp() -> (TransportLayer, impl Drop) {
let transport = Transport::default();
let config = TransportConfig::default();
let cancel = CancellationToken::new();

let layer = TransportLayer::start(config, transport, cancel.clone())
.expect("should be able to create swarm");

(layer, cancel.drop_guard())
}

#[tokio::test]
async fn lookup_peer() {
let (server, _guard_server) = layer();
Expand Down Expand Up @@ -479,3 +492,36 @@ async fn listen_on() {
.await
.expect("memory transport should be able to listen on memory address");
}

#[tokio::test]
async fn listen_on_duplicate_address() {
let (layer, _guard) = layer();

let address = memory_address();

layer
.listen_on(address.clone())
.await
.expect("memory transport should be able to listen on memory address");

let _error = layer
.listen_on(address)
.await
.expect_err("should not be able to listen on the same address twice");
}

#[tokio::test]
async fn listen_on_tcp_unspecified() {
let (layer, _guard) = layer_tcp();

let address = multiaddr![Ip4(Ipv4Addr::UNSPECIFIED), Tcp(0_u16)];

let chosen = layer
.listen_on(address)
.await
.expect("memory transport should be able to listen on memory address");

let protocol: Vec<_> = chosen.iter().collect();
assert_matches!(protocol[0], multiaddr::Protocol::Ip4(addr) if addr != Ipv4Addr::UNSPECIFIED);
assert_matches!(protocol[1], multiaddr::Protocol::Tcp(port) if port != 0);
}

0 comments on commit df1873a

Please sign in to comment.