Skip to content

Commit

Permalink
Refactor network
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed May 29, 2024
1 parent 46d6010 commit 33d4b3d
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 196 deletions.
120 changes: 36 additions & 84 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

use std::error::Error;

use futures::{Future, SinkExt, StreamExt};
use serde::{de::DeserializeOwned, Serialize};
use futures::{SinkExt, StreamExt};
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncWrite, DuplexStream, ReadHalf, WriteHalf},
Expand All @@ -35,7 +34,7 @@ use tokio_util::{
codec::{FramedRead, FramedWrite, LengthDelimitedCodec},
};

use crate::net::SplitChannel;
use crate::net::{Channel, RecvBytes, SendBytes, SplitChannel};

pub struct Connection<R: AsyncRead, W: AsyncWrite> {
sender: Sending<W>,
Expand Down Expand Up @@ -79,125 +78,78 @@ impl<R: AsyncRead, W: AsyncWrite> Connection<R, W> {
}
}

pub trait SendBytes: Send {
type SendError: Error + Send;

fn send_bytes(
&mut self,
bytes: Bytes,
) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send;
pub struct Sending<W: AsyncWrite>(FramedWrite<W, LengthDelimitedCodec>);
pub struct Receiving<R: AsyncRead>(FramedRead<R, LengthDelimitedCodec>);

fn send_thing<T: Serialize + Sync>(
&mut self,
msg: &T,
) -> impl Future<Output = Result<(), Self::SendError>> + Send {
async {
let msg = bincode::serialize(msg).unwrap();
self.send_bytes(msg.into()).await
}
}
}

impl<W: AsyncWrite + Unpin + Send> SendBytes for FramedWrite<W, LengthDelimitedCodec> {
impl<W: AsyncWrite + Unpin + Send> SendBytes for Sending<W> {
type SendError = ConnectionError;
async fn send_bytes(&mut self, bytes: Bytes) -> Result<(), Self::SendError> {
self.send(bytes).await.map_err(|_| ConnectionError::Closed)
}
}

pub trait RecvBytes: Send {
type RecvError: Error + Send;
fn recv_bytes(
&mut self,
) -> impl std::future::Future<Output = Result<BytesMut, Self::RecvError>> + Send;

fn recv_thing<T: DeserializeOwned>(
&mut self,
) -> impl Future<Output = Result<T, Self::RecvError>> + Send {
async {
let msg = self.recv_bytes().await?;
Ok(bincode::deserialize(&msg).unwrap())
}
async fn send_bytes(&mut self, bytes: Bytes) -> Result<(), Self::SendError> {
SinkExt::<_>::send(&mut self.0, bytes).await.map_err(|_| ConnectionError::Closed)
}
}

impl<R: AsyncRead + Unpin + Send> RecvBytes for FramedRead<R, LengthDelimitedCodec> {
impl<R: AsyncRead + Unpin + Send> RecvBytes for Receiving<R> {
type RecvError = ConnectionError;

async fn recv_bytes(&mut self) -> Result<BytesMut, Self::RecvError> {
self.next()
self.0.next()
.await
.ok_or(ConnectionError::Closed)?
.map_err(|e| ConnectionError::Unknown(Box::new(e)))
}
}

pub struct Sending<W: AsyncWrite>(FramedWrite<W, LengthDelimitedCodec>);
pub struct Receiving<R: AsyncRead>(FramedRead<R, LengthDelimitedCodec>);

impl<R: AsyncRead + Unpin + Send> Receiving<R> {
pub async fn recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<T, ConnectionError> {
self.0.recv_thing().await
}

pub async fn recv_bytes(&mut self) -> Result<BytesMut, ConnectionError> {
self.0.recv_bytes().await
impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Connection<R, W> {
/// Send a message, waiting until receival
///
/// * `msg`: Message to send
pub async fn send(
&mut self,
msg: &(impl serde::Serialize + Sync),
) -> Result<(), ConnectionError> {
self.sender.send(msg).await
}
}

impl<W: AsyncWrite + Unpin + Send> Sending<W> {
pub async fn send(&mut self, msg: &(impl Serialize + Sync)) -> Result<(), ConnectionError> {
self.0.send_thing(msg).await
/// Receive a message waiting for arrival
pub async fn recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<T, ConnectionError> {
self.receiver.recv().await
}

pub async fn send_bytes(&mut self, msg: Bytes) -> Result<(), ConnectionError> {
self.0.send_bytes(msg).await
}
}

impl<W: AsyncWrite + Unpin + Send> SendBytes for Sending<W> {
impl<
R: tokio::io::AsyncRead + std::marker::Unpin + Send,
W: tokio::io::AsyncWrite + std::marker::Unpin + Send,
> SendBytes for Connection<R, W>
{
type SendError = ConnectionError;

fn send_bytes(
&mut self,
bytes: Bytes,
) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send {
self.0.send_bytes(bytes)
self.sender.send_bytes(bytes)
}
}

impl<R: AsyncRead + Unpin + Send> RecvBytes for Receiving<R> {
impl<
R: tokio::io::AsyncRead + std::marker::Unpin + Send,
W: tokio::io::AsyncWrite + std::marker::Unpin + Send,
> RecvBytes for Connection<R, W>
{
type RecvError = ConnectionError;

fn recv_bytes(
&mut self,
) -> impl std::future::Future<Output = Result<BytesMut, Self::RecvError>> + Send {
self.0.recv_bytes()
self.receiver.recv_bytes()
}
}

impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Connection<R, W> {
/// Send a message, waiting until receival
///
/// * `msg`: Message to send
pub async fn send(
&mut self,
msg: &(impl serde::Serialize + Sync),
) -> Result<(), ConnectionError> {
self.sender.send(msg).await
}

/// Receive a message waiting for arrival
pub async fn recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<T, ConnectionError> {
self.receiver.recv().await
}

pub async fn recv_bytes(&mut self) -> Result<BytesMut, ConnectionError> {
self.receiver.recv_bytes().await
}

pub async fn send_bytes(&mut self, bytes: Bytes) -> Result<(), ConnectionError> {
self.sender.send_bytes(bytes).await
}
impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Channel for Connection<R,W> {
type Error = ConnectionError;
}

impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> SplitChannel for Connection<R, W> {
Expand Down
117 changes: 72 additions & 45 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,102 @@
use std::error::Error;
use std::{error::Error, future::Future};

use futures::Future;

use crate::net::connection::{Connection, ConnectionError, RecvBytes, SendBytes};
use serde::{de::DeserializeOwned, Serialize};
use tokio_util::bytes::{Bytes, BytesMut};

pub mod agency;
pub mod connection;
pub mod mux;
pub mod network;

/// A communication medium between you and another party.
///
/// Allows you to send and receive arbitrary messages.
pub trait Channel {
type Error: Error + Send + Sync + 'static;

/// Send a message over the channel
///
/// * `msg`: message to serialize and send
fn send<T: serde::Serialize + Sync>(
pub trait SendBytes: Send {
type SendError: Error + Send + Sync + 'static;

fn send_bytes(
&mut self,
bytes: Bytes,
) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send;

fn send<T: Serialize + Sync>(
&mut self,
msg: &T,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
) -> impl Future<Output = Result<(), Self::SendError>> + Send {
async {
let msg = bincode::serialize(msg).unwrap();
self.send_bytes(msg.into()).await
}
}
}

fn recv<T: serde::de::DeserializeOwned>(
impl<S: SendBytes> SendBytes for &mut S {
type SendError = S::SendError;

fn send_bytes(
&mut self,
) -> impl Future<Output = Result<T, Self::Error>> + Send;
bytes: Bytes,
) -> impl std::future::Future<Output = Result<(), Self::SendError>> + Send {
(**self).send_bytes(bytes)
}
}

impl<
R: tokio::io::AsyncRead + std::marker::Unpin + Send,
W: tokio::io::AsyncWrite + std::marker::Unpin + Send,
> Channel for Connection<R, W>
{
type Error = ConnectionError;

async fn send<T: serde::Serialize + Sync>(&mut self, msg: &T) -> Result<(), Self::Error> {
Connection::send(self, &msg).await
pub trait RecvBytes: Send {
type RecvError: Error + Send + Sync + 'static;
fn recv_bytes(
&mut self,
) -> impl std::future::Future<Output = Result<BytesMut, Self::RecvError>> + Send;

fn recv<T: DeserializeOwned>(
&mut self,
) -> impl Future<Output = Result<T, Self::RecvError>> + Send {
async {
let msg = self.recv_bytes().await?;
Ok(bincode::deserialize(&msg).unwrap())
}
}
}

fn recv<T: serde::de::DeserializeOwned>(
impl<R: RecvBytes> RecvBytes for &mut R {
type RecvError = R::RecvError;

fn recv_bytes(
&mut self,
) -> impl Future<Output = Result<T, Self::Error>> {
Connection::recv(self)
) -> impl std::future::Future<Output = Result<BytesMut, Self::RecvError>> + Send {
(**self).recv_bytes()
}

}

/// A communication medium between you and another party.
///
/// Allows you to send and receive arbitrary messages.
pub trait Channel: SendBytes + RecvBytes {
type Error: Error + Send;
}
impl<C: Channel> Channel for &mut C {
type Error = C::Error;
}



/// A [Channel] which can be split into a sender and receiver.
pub trait SplitChannel: Channel {
type Sender: SendBytes<SendError = Self::Error> + Send;
type Receiver: RecvBytes<RecvError = Self::Error> + Send;
pub trait SplitChannel: Channel + Send {
type Sender: SendBytes<SendError = Self::SendError> + Send;
type Receiver: RecvBytes<RecvError = Self::RecvError> + Send;
fn split(&mut self) -> (&mut Self::Sender, &mut Self::Receiver);
}

impl<'a, C: Channel> Channel for &'a mut C {
type Error = C::Error;
}

fn send<T: serde::Serialize + Sync>(
&mut self,
msg: &T,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
(**self).send(msg)
}
impl<'a, C: SplitChannel> SplitChannel for &'a mut C {
type Sender = C::Sender;
type Receiver = C::Receiver;

fn recv<T: serde::de::DeserializeOwned>(
&mut self,
) -> impl Future<Output = Result<T, Self::Error>> + Send {
(**self).recv()
fn split(&mut self) -> (&mut Self::Sender, &mut Self::Receiver) {
(**self).split()
}
}


/// Tune to a specific channel
pub trait Tuneable {
type TuningError: Error + Send + 'static;
Expand All @@ -85,7 +112,7 @@ pub trait Tuneable {
&mut self,
idx: usize,
msg: &T,
) -> impl std::future::Future<Output = Result<(), Self::TuningError>>;
) -> impl Future<Output = Result<(), Self::TuningError>>;
}

impl<'a, R: Tuneable + ?Sized> Tuneable for &'a mut R {
Expand All @@ -106,7 +133,7 @@ impl<'a, R: Tuneable + ?Sized> Tuneable for &'a mut R {
&mut self,
idx: usize,
msg: &T,
) -> impl std::future::Future<Output = Result<(), Self::TuningError>> {
) -> impl Future<Output = Result<(), Self::TuningError>> {
(**self).send_to(idx, msg)
}
}
Expand Down
Loading

0 comments on commit 33d4b3d

Please sign in to comment.