Skip to content

Commit

Permalink
handle messages better
Browse files Browse the repository at this point in the history
  • Loading branch information
Rigidity committed Jul 17, 2024
1 parent 8a889d8 commit 39b059a
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 24 deletions.
6 changes: 3 additions & 3 deletions crates/chia-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl Client {
let (peer, mut receiver) = match result {
Ok(result) => result,
Err((ip, port, error)) => {
log::debug!(
log::warn!(
"{error} for peer {}",
if ip.is_ipv4() {
format!("{ip}:{port}")
Expand Down Expand Up @@ -303,7 +303,7 @@ impl Client {
.send(Event::Message(peer_id, message))
.await
{
log::debug!("Failed to send client message event: {error}");
log::warn!("Failed to send client message event: {error}");
break;
}
}
Expand All @@ -316,7 +316,7 @@ impl Client {
.send(Event::ConnectionClosed(peer_id))
.await
{
log::debug!("Failed to send client connection closed event: {error}");
log::warn!("Failed to send client connection closed event: {error}");
}

log::info!("Peer {ip} disconnected");
Expand Down
66 changes: 45 additions & 21 deletions crates/chia-client/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Peer(Arc<PeerInner>);
#[derive(Debug)]
struct PeerInner {
sink: Mutex<Sink>,
inbound_handle: JoinHandle<Result<()>>,
inbound_handle: JoinHandle<()>,
requests: Arc<RequestMap>,
peer_id: PeerId,
ip_addr: IpAddr,
Expand Down Expand Up @@ -90,10 +90,15 @@ impl Peer {
let peer_id = PeerId(hasher.finalize_fixed().into());
let (sink, stream) = ws.split();
let (sender, receiver) = mpsc::channel(32);

let requests = Arc::new(RequestMap::new());
let requests_clone = requests.clone();

let inbound_handle =
tokio::spawn(handle_inbound_messages(stream, sender, requests.clone()));
let inbound_handle = tokio::spawn(async move {
if let Err(error) = handle_inbound_messages(stream, sender, requests_clone).await {
log::warn!("Error handling message: {error}");
}
});

let peer = Self(Arc::new(PeerInner {
sink: Mutex::new(sink),
Expand Down Expand Up @@ -192,27 +197,46 @@ async fn handle_inbound_messages(
sender: mpsc::Sender<Message>,
requests: Arc<RequestMap>,
) -> Result<()> {
use tungstenite::Message::{Binary, Close, Frame, Ping, Pong, Text};

while let Some(message) = stream.next().await {
let message = message?;
let message = Message::from_bytes(&message.into_data())?;

let Some(id) = message.id else {
sender.send(message).await.map_err(|error| {
log::debug!("Failed to send peer message event: {error}");
Error::EventNotSent
})?;
continue;
};

let Some(request) = requests.remove(id).await else {
log::warn!(
"Received {:?} message with untracked id {id}",
message.msg_type
);
return Err(Error::UnexpectedMessage(message.msg_type));
};

request.send(message);
match message {
Text(text) => {
log::warn!("Received unexpected text message: {text}");
}
Close(close) => {
log::warn!("Received close: {close:?}");
break;
}
Ping(_ping) => {}
Pong(_pong) => {}
Binary(binary) => {
let message = Message::from_bytes(&binary)?;

let Some(id) = message.id else {
sender.send(message).await.map_err(|error| {
log::warn!("Failed to send peer message event: {error}");
Error::EventNotSent
})?;
continue;
};

let Some(request) = requests.remove(id).await else {
log::warn!(
"Received {:?} message with untracked id {id}",
message.msg_type
);
return Err(Error::UnexpectedMessage(message.msg_type));
};

request.send(message);
}
Frame(frame) => {
log::warn!("Received frame: {frame}");
}
}
}
Ok(())
}

0 comments on commit 39b059a

Please sign in to comment.