Skip to content

Commit

Permalink
feat(rpc): implement Filecoin.NetFindPeer (#4569)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 authored Jul 19, 2024
1 parent 531d928 commit 1782e76
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 163 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
- [#4474](https://github.com/ChainSafe/forest/pull/4474) Add new subcommand
`forest-cli healthcheck ready`.

- [#4569](https://github.com/ChainSafe/forest/pull/4569) Add support for the
`Filecoin.NetFindPeer` RPC method.

- [#4565](https://github.com/ChainSafe/forest/pull/4565) Add support for the
`Filecoin.StateGetRandomnessDigestFromBeacon` RPC method.

Expand Down
2 changes: 0 additions & 2 deletions scripts/tests/api_compare/filter-list
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,5 @@
!Filecoin.StateReplay
# CustomCheckFailed in Forest: https://github.com/ChainSafe/forest/actions/runs/9593268587/job/26453560366
!Filecoin.StateCall
# Rejected("item not found") in Forest: https://github.com/ChainSafe/forest/actions/runs/9581179496/job/26417600610
!Filecoin.NetAgentVersion
# CustomCheckFailed in Forest: https://github.com/ChainSafe/forest/issues/4446
!Filecoin.StateCirculatingSupply
1 change: 1 addition & 0 deletions scripts/tests/api_compare/filter-list-offline
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
!Filecoin.NetAgentVersion
!Filecoin.NetAutoNatStatus
!Filecoin.NetPeers
!Filecoin.NetFindPeer
# CustomCheckFailed in Forest: https://github.com/ChainSafe/forest/actions/runs/9593268587/job/26453560366
!Filecoin.StateReplay
# CustomCheckFailed in Forest: https://github.com/ChainSafe/forest/actions/runs/9593268587/job/26453560366
Expand Down
62 changes: 27 additions & 35 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use crate::libp2p_bitswap::{
request_manager::{BitswapRequestManager, ValidatePeerCallback},
BitswapStoreRead, BitswapStoreReadWrite,
};
use crate::message::SignedMessage;
use crate::{blocks::GossipBlock, rpc::net::NetInfoResult};
use crate::{chain::ChainStore, utils::encoding::from_slice_with_fallback};
use crate::{
libp2p_bitswap::{
request_manager::{BitswapRequestManager, ValidatePeerCallback},
BitswapStoreRead, BitswapStoreReadWrite,
},
utils::flume::FlumeSenderExt as _,
};
use ahash::{HashMap, HashSet};
use cid::Cid;
use flume::Sender;
use futures::{channel::oneshot, select, stream::StreamExt as _};
use futures::{select, stream::StreamExt as _};
use fvm_ipld_blockstore::Blockstore;
pub use libp2p::gossipsub::{IdentTopic, Topic};
use libp2p::{
Expand Down Expand Up @@ -145,13 +148,14 @@ pub enum NetworkMessage {
/// Network RPC API methods used to gather data from libp2p node.
#[derive(Debug)]
pub enum NetRPCMethods {
AddrsListen(oneshot::Sender<(PeerId, HashSet<Multiaddr>)>),
Peers(oneshot::Sender<HashMap<PeerId, HashSet<Multiaddr>>>),
Info(oneshot::Sender<NetInfoResult>),
Connect(oneshot::Sender<bool>, PeerId, HashSet<Multiaddr>),
Disconnect(oneshot::Sender<()>, PeerId),
AgentVersion(oneshot::Sender<Option<String>>, PeerId),
AutoNATStatus(oneshot::Sender<NatStatus>),
AddrsListen(flume::Sender<(PeerId, HashSet<Multiaddr>)>),
Peer(flume::Sender<Option<HashSet<Multiaddr>>>, PeerId),
Peers(flume::Sender<HashMap<PeerId, HashSet<Multiaddr>>>),
Info(flume::Sender<NetInfoResult>),
Connect(flume::Sender<bool>, PeerId, HashSet<Multiaddr>),
Disconnect(flume::Sender<()>, PeerId),
AgentVersion(flume::Sender<Option<String>>, PeerId),
AutoNATStatus(flume::Sender<NatStatus>),
}

/// The `Libp2pService` listens to events from the libp2p swarm.
Expand Down Expand Up @@ -479,25 +483,21 @@ async fn handle_network_message(
NetRPCMethods::AddrsListen(response_channel) => {
let listeners = Swarm::listeners(swarm).cloned().collect();
let peer_id = Swarm::local_peer_id(swarm);

if response_channel.send((*peer_id, listeners)).is_err() {
warn!("Failed to get Libp2p listeners");
}
response_channel.send_or_warn((*peer_id, listeners));
}
NetRPCMethods::Peer(response_channel, peer) => {
let addresses = swarm.behaviour().peer_addresses().get(&peer).cloned();
response_channel.send_or_warn(addresses);
}
NetRPCMethods::Peers(response_channel) => {
let peer_addresses = swarm.behaviour().peer_addresses();
if response_channel.send(peer_addresses).is_err() {
warn!("Failed to get Libp2p peers");
}
response_channel.send_or_warn(peer_addresses);
}
NetRPCMethods::Info(response_channel) => {
if response_channel.send(swarm.network_info().into()).is_err() {
warn!("Failed to get Libp2p peers");
}
response_channel.send_or_warn(swarm.network_info().into());
}
NetRPCMethods::Connect(response_channel, peer_id, addresses) => {
let mut success = false;

for mut multiaddr in addresses {
multiaddr.push(Protocol::P2p(peer_id));

Expand Down Expand Up @@ -525,31 +525,23 @@ async fn handle_network_message(
};
}

if response_channel.send(success).is_err() {
warn!("Failed to connect to a peer");
}
response_channel.send_or_warn(success);
}
NetRPCMethods::Disconnect(response_channel, peer_id) => {
let _ = Swarm::disconnect_peer_id(swarm, peer_id);
if response_channel.send(()).is_err() {
warn!("Failed to disconnect from a peer");
}
response_channel.send_or_warn(());
}
NetRPCMethods::AgentVersion(response_channel, peer_id) => {
let agent_version = swarm.behaviour().peer_info(&peer_id).and_then(|info| {
info.identify_info
.as_ref()
.map(|id| id.agent_version.clone())
});
if response_channel.send(agent_version).is_err() {
warn!("Failed to get agent version");
}
response_channel.send_or_warn(agent_version);
}
NetRPCMethods::AutoNATStatus(response_channel) => {
let nat_status = swarm.behaviour().discovery.nat_status();
if response_channel.send(nat_status).is_err() {
warn!("Failed to get nat status");
}
response_channel.send_or_warn(nat_status);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ from2internal! {
std::time::SystemTimeError,
tokio::task::JoinError,
fil_actors_shared::fvm_ipld_hamt::Error,
flume::RecvError,
}

impl From<ServerError> for ClientError {
Expand Down
Loading

0 comments on commit 1782e76

Please sign in to comment.