Skip to content

Commit

Permalink
Complete connection at gateway (#1252)
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez authored Oct 6, 2024
1 parent cf69578 commit 481889a
Show file tree
Hide file tree
Showing 9 changed files with 390 additions and 516 deletions.
393 changes: 209 additions & 184 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ pub(crate) trait InnerMessage: Into<NetMessage> {
fn requested_location(&self) -> Option<Location>;
}

type RemainingChecks = Option<usize>;
type ConnectResult = Result<(PeerId, RemainingChecks), ()>;

/// Internal node events emitted to the event loop.
#[derive(Debug, Clone)]
pub(crate) enum NodeEvent {
Expand All @@ -304,7 +307,7 @@ pub(crate) enum NodeEvent {
ConnectPeer {
peer: PeerId,
tx: Transaction,
callback: tokio::sync::mpsc::Sender<Result<PeerId, ()>>,
callback: tokio::sync::mpsc::Sender<ConnectResult>,
is_gw: bool,
},
Disconnect {
Expand Down
27 changes: 4 additions & 23 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,16 +853,11 @@ async fn subscribe(op_manager: Arc<OpManager>, key: ContractKey, client_id: Opti
}
}

async fn handle_aborted_op<CM>(
async fn handle_aborted_op(
tx: Transaction,
this_peer_pub_key: TransportPublicKey,
op_manager: &OpManager,
conn_manager: &mut CM,
gateways: &[PeerKeyLocation],
) -> Result<(), OpError>
where
CM: NetworkBridge + Send,
{
) -> Result<(), OpError> {
use crate::util::IterExt;
if let TransactionType::Connect = tx.transaction_type() {
// attempt to establish a connection failed, this could be a fatal error since the node
Expand All @@ -880,29 +875,15 @@ where
} = *op;
if let Some(gateway) = gateway {
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
connect::join_ring_request(
backoff,
this_peer_pub_key,
&gateway,
op_manager,
conn_manager,
)
.await?;
connect::join_ring_request(backoff, &gateway, op_manager).await?;
}
}
Ok(Some(OpEnum::Connect(_))) => {
// if no connections were achieved just fail
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
tracing::warn!("Retrying joining the ring with an other gateway");
if let Some(gateway) = gateways.iter().shuffle().next() {
connect::join_ring_request(
None,
this_peer_pub_key,
gateway,
op_manager,
conn_manager,
)
.await?
connect::join_ring_request(None, gateway, op_manager).await?
}
}
}
Expand Down
210 changes: 92 additions & 118 deletions crates/core/src/node/network_bridge/handshake.rs

Large diffs are not rendered by default.

69 changes: 41 additions & 28 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use tracing::Instrument;

use crate::dev_tool::Location;
use crate::node::network_bridge::handshake::{
EstablishConnection, Event as HandshakeEvent, HandshakeError, HandshakeHandler,
InboundGwJoinRequest, OutboundMessage,
EstablishConnection, Event as HandshakeEvent, HandshakeError, HandshakeHandler, OutboundMessage,
};
use crate::node::PeerId;
use crate::transport::{
Expand Down Expand Up @@ -200,9 +199,12 @@ impl P2pConnManager {
tracing::error!(%tx, "Aborted transaction");
}
ConnEvent::OutboundMessage(msg) => {
let target_peer = msg.target().expect(
"Target peer not set, must be set for connection outbound message",
);
let Some(target_peer) = msg.target() else {
let id = *msg.id();
tracing::error!(%id, "Target peer not set, must be set for connection outbound message");
self.bridge.op_manager.completed(id);
continue;
};
tracing::debug!(%target_peer, %msg, "Sending message to peer");
match self.connections.get(&target_peer.peer) {
Some(peer_connection) => {
Expand All @@ -211,7 +213,11 @@ impl P2pConnManager {
}
}
None => {
tracing::error!("No existing outbound connection to forward the message to {}", target_peer.peer);
tracing::error!(
id = %msg.id(),
target = %target_peer.peer,
"No existing outbound connection to forward the message"
);
}
}
}
Expand Down Expand Up @@ -306,17 +312,9 @@ impl P2pConnManager {
executor_listener: &ExecutorToEventLoopChannel<NetworkEventListenerHalve>,
cli_response_sender: &ClientResponsesSender,
) -> anyhow::Result<()> {
let mut cb = self.bridge.clone();
match msg {
NetMessage::V1(NetMessageV1::Aborted(tx)) => {
handle_aborted_op(
tx,
op_manager.ring.get_peer_pub_key(),
op_manager,
&mut cb,
&self.gateways,
)
.await?;
handle_aborted_op(tx, op_manager, &self.gateways).await?;
}
msg => {
if let Some(addr) = state.transient_conn.get(msg.id()) {
Expand Down Expand Up @@ -406,13 +404,12 @@ impl P2pConnManager {
state: &mut EventListenerState,
) -> anyhow::Result<()> {
match event {
HandshakeEvent::InboundConnection(InboundGwJoinRequest {
HandshakeEvent::InboundConnection {
id,
conn,
joiner,
hops_to_live,
..
}) => {
op,
} => {
let (tx, rx) = mpsc::channel(1);
self.connections.insert(joiner.clone(), tx);
let was_reserved = {
Expand All @@ -428,6 +425,12 @@ impl P2pConnManager {
was_reserved,
)
.await;
if let Some(op) = op {
self.bridge
.op_manager
.push(id, crate::operations::OpEnum::Connect(op.into()))
.await?;
}
let task = peer_connection_listener(rx, conn).boxed();
state.peer_connections.push(task);
}
Expand Down Expand Up @@ -458,15 +461,21 @@ impl P2pConnManager {
peer_id,
connection,
} => {
self.handle_successful_connection(peer_id, connection, state)
self.handle_successful_connection(peer_id, connection, state, None)
.await?;
}
HandshakeEvent::OutboundGatewayConnectionSuccessful {
peer_id,
connection,
remaining_checks,
} => {
self.handle_successful_connection(peer_id, connection, state)
.await?;
self.handle_successful_connection(
peer_id,
connection,
state,
Some(remaining_checks),
)
.await?;
}
HandshakeEvent::OutboundConnectionFailed { peer_id, error } => {
tracing::info!(%peer_id, "Connection failed: {:?}", error);
Expand Down Expand Up @@ -495,6 +504,7 @@ impl P2pConnManager {
peer_id: PeerId,
connection: PeerConnection,
state: &mut EventListenerState,
remaining_checks: Option<usize>,
) -> anyhow::Result<()> {
if let Some(mut cb) = state.awaiting_connection.remove(&peer_id.addr) {
let peer_id = if let Some(peer_id) = self
Expand All @@ -512,7 +522,7 @@ impl P2pConnManager {
let key = (&*self.bridge.op_manager.ring.connection_manager.pub_key).clone();
PeerId::new(self_addr, key)
};
let _ = cb.send_result(Ok(peer_id)).await;
let _ = cb.send_result(Ok((peer_id, remaining_checks))).await;
} else {
tracing::warn!(%peer_id, "No callback for connection established");
}
Expand Down Expand Up @@ -625,27 +635,30 @@ impl P2pConnManager {
trait ConnectResultSender {
fn send_result(
&mut self,
result: Result<PeerId, HandshakeError>,
result: Result<(PeerId, Option<usize>), HandshakeError>,
) -> Pin<Box<dyn Future<Output = Result<(), HandshakeError>> + Send + '_>>;
}

impl ConnectResultSender for Option<oneshot::Sender<Result<PeerId, HandshakeError>>> {
fn send_result(
&mut self,
result: Result<PeerId, HandshakeError>,
result: Result<(PeerId, Option<usize>), HandshakeError>,
) -> Pin<Box<dyn Future<Output = Result<(), HandshakeError>> + Send + '_>> {
async move {
let _ = self.take().expect("always set").send(result);
let _ = self
.take()
.expect("always set")
.send(result.map(|(id, _)| id));
Ok(())
}
.boxed()
}
}

impl ConnectResultSender for mpsc::Sender<Result<PeerId, ()>> {
impl ConnectResultSender for mpsc::Sender<Result<(PeerId, Option<usize>), ()>> {
fn send_result(
&mut self,
result: Result<PeerId, HandshakeError>,
result: Result<(PeerId, Option<usize>), HandshakeError>,
) -> Pin<Box<dyn Future<Output = Result<(), HandshakeError>> + Send + '_>> {
async move {
self.send(result.map_err(|_| ()))
Expand Down
15 changes: 3 additions & 12 deletions crates/core/src/node/p2p_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::{
},
NetEventRegister, PeerId,
};
use crate::ring::ConnectionManager;
use crate::{
client_events::{combinator::ClientEventsCombinator, BoxedClient},
config::GlobalExecutor,
Expand All @@ -20,12 +21,10 @@ use crate::{
node::NodeConfig,
operations::connect,
};
use crate::{ring::ConnectionManager, transport::TransportPublicKey};

use super::OpManager;

pub(crate) struct NodeP2P {
pub(crate) peer_pub_key: TransportPublicKey,
pub(crate) op_manager: Arc<OpManager>,
notification_channel: EventLoopNotificationsReceiver,
client_wait_for_transaction: ContractHandlerChannel<WaitingResolution>,
Expand All @@ -41,13 +40,8 @@ pub(crate) struct NodeP2P {
impl NodeP2P {
pub(super) async fn run_node(self) -> anyhow::Result<()> {
if self.should_try_connect {
connect::initial_join_procedure(
self.op_manager.clone(),
self.conn_manager.bridge.clone(),
self.peer_pub_key,
&self.conn_manager.gateways,
)
.await?;
connect::initial_join_procedure(self.op_manager.clone(), &self.conn_manager.gateways)
.await?;
}

// start the p2p event loop
Expand All @@ -73,8 +67,6 @@ impl NodeP2P {
CH: ContractHandler + Send + 'static,
ER: NetEventRegister + Clone,
{
let peer_pub_key = config.key_pair.public().clone();

let (notification_channel, notification_tx) = event_loop_notification_channel();
let (ch_outbound, ch_inbound, wait_for_event) = contract::contract_handler_channel();
let (client_responses, cli_response_sender) = contract::client_responses_channel();
Expand Down Expand Up @@ -113,7 +105,6 @@ impl NodeP2P {
);

Ok(NodeP2P {
peer_pub_key,
conn_manager,
notification_channel,
client_wait_for_transaction: wait_for_event,
Expand Down
17 changes: 2 additions & 15 deletions crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,13 +754,7 @@ where
NB: NetworkBridge + NetworkBridgeExt,
UsrEv: ClientEventsProxy + Send + 'static,
{
connect::initial_join_procedure(
config.op_manager.clone(),
config.conn_manager.clone(),
config.peer_key.pub_key.clone(),
&config.gateways,
)
.await?;
connect::initial_join_procedure(config.op_manager.clone(), &config.gateways).await?;
let (client_responses, cli_response_sender) = contract::client_responses_channel();
let span = {
config
Expand Down Expand Up @@ -852,14 +846,7 @@ where
};

if let Ok(Either::Left(NetMessage::V1(NetMessageV1::Aborted(tx)))) = msg {
super::handle_aborted_op(
tx,
peer_key.pub_key.clone(),
&op_manager,
&mut conn_manager,
&gateways,
)
.await?;
super::handle_aborted_op(tx, &op_manager, &gateways).await?;
}

let msg = match msg {
Expand Down
Loading

0 comments on commit 481889a

Please sign in to comment.