Skip to content

Commit

Permalink
Cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed May 29, 2024
1 parent 4da77c1 commit 142cc69
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 69 deletions.
12 changes: 6 additions & 6 deletions src/net/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,25 @@ impl<R: AsyncRead, W: AsyncWrite> Connection<R, W> {
}
}


pub struct Sending<W: AsyncWrite>(FramedWrite<W, LengthDelimitedCodec>);
pub struct Receiving<R: AsyncRead>(FramedRead<R, LengthDelimitedCodec>);


impl<W: AsyncWrite + Unpin + Send> SendBytes for Sending<W> {
type SendError = ConnectionError;

async fn send_bytes(&mut self, bytes: Bytes) -> Result<(), Self::SendError> {
SinkExt::<_>::send(&mut self.0, bytes).await.map_err(|_| ConnectionError::Closed)
SinkExt::<_>::send(&mut self.0, bytes)
.await
.map_err(|_| ConnectionError::Closed)
}
}

impl<R: AsyncRead + Unpin + Send> RecvBytes for Receiving<R> {
type RecvError = ConnectionError;

async fn recv_bytes(&mut self) -> Result<BytesMut, Self::RecvError> {
self.0.next()
self.0
.next()
.await
.ok_or(ConnectionError::Closed)?
.map_err(|e| ConnectionError::Unknown(Box::new(e)))
Expand All @@ -117,7 +118,6 @@ impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Connection<R, W>
pub async fn recv<T: serde::de::DeserializeOwned>(&mut self) -> Result<T, ConnectionError> {
self.receiver.recv().await
}

}

impl<
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<
}
}

impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Channel for Connection<R,W> {
impl<R: AsyncRead + Unpin + Send, W: AsyncWrite + Unpin + Send> Channel for Connection<R, W> {
type Error = ConnectionError;
}

Expand Down
9 changes: 1 addition & 8 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod connection;
pub mod mux;
pub mod network;


pub trait SendBytes: Send {
type SendError: Error + Send + Sync + 'static;

Expand Down Expand Up @@ -39,7 +38,6 @@ impl<S: SendBytes> SendBytes for &mut S {
}
}


pub trait RecvBytes: Send {
type RecvError: Error + Send + Sync + 'static;
fn recv_bytes(
Expand All @@ -64,7 +62,6 @@ impl<R: RecvBytes> RecvBytes for &mut R {
) -> impl std::future::Future<Output = Result<BytesMut, Self::RecvError>> + Send {
(**self).recv_bytes()
}

}

/// A communication medium between you and another party.
Expand All @@ -77,17 +74,14 @@ impl<C: Channel> Channel for &mut C {
type Error = C::Error;
}



/// A [Channel] which can be split into a sender and receiver.
pub trait SplitChannel: Channel + Send {
type Sender: SendBytes<SendError = Self::SendError> + Send;
type Receiver: RecvBytes<RecvError = Self::RecvError> + Send;
fn split(&mut self) -> (&mut Self::Sender, &mut Self::Receiver);

}

impl<'a, C: SplitChannel> SplitChannel for &'a mut C {
impl<'a, C: SplitChannel> SplitChannel for &'a mut C {
type Sender = C::Sender;
type Receiver = C::Receiver;

Expand All @@ -96,7 +90,6 @@ impl<'a, C: SplitChannel> SplitChannel for &'a mut C {
}
}


/// Tune to a specific channel
pub trait Tuneable {
type TuningError: Error + Send + 'static;
Expand Down
4 changes: 1 addition & 3 deletions src/net/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ use tokio::join;

use crate::{
help,
net::{
network::Network, Channel, RecvBytes, SendBytes, SplitChannel
},
net::{network::Network, Channel, RecvBytes, SendBytes, SplitChannel},
};

#[derive(Debug, Error)]
Expand Down
71 changes: 37 additions & 34 deletions src/net/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ pub struct Network<C: SplitChannel> {

#[derive(thiserror::Error, Debug)]
#[error("Error communicating with {id}: {source}")]
pub enum NetworkError<E,U> {
Incoming {id: u32, source: E},
Outgoing {id: u32, source: U}
pub enum NetworkError<E, U> {
Incoming { id: u32, source: E },
Outgoing { id: u32, source: U },
}

#[allow(type_alias_bounds)] // It clearly matters, stop complaining
Expand All @@ -69,22 +69,18 @@ impl<C: SplitChannel> Network<C> {
}
}


/// Broadcast a message to all other parties.
///
/// Asymmetric, non-waiting
///
/// * `msg`: Message to send
pub async fn broadcast(
&mut self,
msg: &(impl serde::Serialize + Sync),
) -> NetResult<(), C> {
pub async fn broadcast(&mut self, msg: &(impl serde::Serialize + Sync)) -> NetResult<(), C> {
let my_id = self.index;
let packet : Bytes = bincode::serialize(&msg).unwrap().into();
let packet: Bytes = bincode::serialize(&msg).unwrap().into();
let outgoing = self.connections.iter_mut().enumerate().map(|(i, conn)| {
let id = if i < my_id { i } else { i + 1 } as u32;
conn.send_bytes(packet.clone())
.map_err(move |e| NetworkError::Outgoing{id, source: e})
.map_err(move |e| NetworkError::Outgoing { id, source: e })
});
future::try_join_all(outgoing).await?;
Ok(())
Expand All @@ -98,10 +94,7 @@ impl<C: SplitChannel> Network<C> {
/// Asymmetric, non-waiting
///
/// * `msgs`: Messages to send
pub async fn unicast(
&mut self,
msgs: &[impl serde::Serialize + Sync],
) -> NetResult<(), C> {
pub async fn unicast(&mut self, msgs: &[impl serde::Serialize + Sync]) -> NetResult<(), C> {
let my_id = self.index;
let outgoing = self
.connections
Expand All @@ -122,9 +115,7 @@ impl<C: SplitChannel> Network<C> {
/// Asymmetric, waiting
///
/// Returns: A list sorted by the connections (skipping yourself)
pub async fn receive_all<T: serde::de::DeserializeOwned>(
&mut self,
) -> NetResult<Vec<T>, C> {
pub async fn receive_all<T: serde::de::DeserializeOwned>(&mut self) -> NetResult<Vec<T>, C> {
let my_id = self.index;
let messages = self.connections.iter_mut().enumerate().map(|(i, conn)| {
let msg = conn.recv::<T>();
Expand Down Expand Up @@ -163,7 +154,7 @@ impl<C: SplitChannel> Network<C> {
let (mut tx, mut rx): (Vec<_>, Vec<_>) =
self.connections.iter_mut().map(|c| c.split()).unzip();

let packet : Bytes = bincode::serialize(&msg).unwrap().into();
let packet: Bytes = bincode::serialize(&msg).unwrap().into();
let outgoing = tx.iter_mut().enumerate().map(|(id, conn)| {
let id = if id < my_id { id } else { id + 1 } as u32;
conn.send_bytes(packet.clone())
Expand Down Expand Up @@ -209,10 +200,7 @@ impl<C: SplitChannel> Network<C> {
/// will be send to party `i`.
///
/// * `msg`: message to send and receive
pub async fn symmetric_unicast<T>(
&mut self,
mut msgs: Vec<T>,
) -> NetResult<Vec<T>, C>
pub async fn symmetric_unicast<T>(&mut self, mut msgs: Vec<T>) -> NetResult<Vec<T>, C>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync,
{
Expand Down Expand Up @@ -308,7 +296,6 @@ impl<C: SplitChannel> Network<C> {

async fn drop_party(_id: usize) -> Result<(), ()> {
todo!("Initiate a drop vote");

}
}

Expand All @@ -317,7 +304,7 @@ impl<C: SplitChannel> Network<C> {
//
// Outline:
// Currently we do not handle any unprepared protocols, but only expected 'happy path' behaviour.
// In case of protocols or communication failure we return an error, but we do not provide a solution.
// In case of protocols or communication failure we return an error, but we do not provide a solution.
// The current expection is for the downstream user to handle it themselves, instead of doing
// something automatic. However, we currently do not have any methods for removing parties,
// and if we had we still need all other parties to come to the same conclusion.
Expand All @@ -341,12 +328,14 @@ impl<C: SplitChannel> Network<C> {
//
//


impl<C: SplitChannel> Unicast for Network<C> {
type UnicastError = NetworkError<C::RecvError, C::SendError>;

#[tracing::instrument(skip_all)]
async fn unicast(&mut self, msgs: &[impl serde::Serialize + Sync]) -> Result<(), Self::UnicastError> {
async fn unicast(
&mut self,
msgs: &[impl serde::Serialize + Sync],
) -> Result<(), Self::UnicastError> {
self.unicast(msgs).await
}

Expand All @@ -359,7 +348,9 @@ impl<C: SplitChannel> Unicast for Network<C> {
}

#[tracing::instrument(skip_all)]
async fn receive_all<T: serde::de::DeserializeOwned>(&mut self) -> Result<Vec<T>, Self::UnicastError> {
async fn receive_all<T: serde::de::DeserializeOwned>(
&mut self,
) -> Result<Vec<T>, Self::UnicastError> {
self.receive_all().await
}

Expand All @@ -372,7 +363,10 @@ impl<C: SplitChannel> Broadcast for Network<C> {
type BroadcastError = NetworkError<C::RecvError, C::SendError>;

#[tracing::instrument(skip_all)]
async fn broadcast(&mut self, msg: &(impl serde::Serialize + Sync)) -> Result<(), Self::BroadcastError> {
async fn broadcast(
&mut self,
msg: &(impl serde::Serialize + Sync),
) -> Result<(), Self::BroadcastError> {
self.broadcast(msg).await
}

Expand Down Expand Up @@ -408,7 +402,13 @@ impl<C: SplitChannel> Tuneable for Network<C> {
idx: usize,
) -> Result<T, Self::TuningError> {
let idx = self.id_to_index(idx);
self.connections[idx].recv().await.map_err(|e | NetworkError::Incoming { id: idx as u32, source: e })
self.connections[idx]
.recv()
.await
.map_err(|e| NetworkError::Incoming {
id: idx as u32,
source: e,
})
}

async fn send_to<T: serde::Serialize + Sync>(
Expand All @@ -417,7 +417,13 @@ impl<C: SplitChannel> Tuneable for Network<C> {
msg: &T,
) -> Result<(), Self::TuningError> {
let idx = self.id_to_index(idx);
self.connections[idx].send(msg).await.map_err(|e | NetworkError::Outgoing { id: idx as u32, source: e })
self.connections[idx]
.send(msg)
.await
.map_err(|e| NetworkError::Outgoing {
id: idx as u32,
source: e,
})
}
}

Expand Down Expand Up @@ -474,10 +480,7 @@ impl TcpNetwork {
///
/// * `me`: Socket address to open to
/// * `peers`: Socket addresses of other peers
pub async fn connect(
me: SocketAddr,
peers: &[SocketAddr],
) -> NetResult<Self, TcpConnection> {
pub async fn connect(me: SocketAddr, peers: &[SocketAddr]) -> NetResult<Self, TcpConnection> {
let n = peers.len();

// Connecting to parties
Expand Down
7 changes: 5 additions & 2 deletions src/protocols/cutnchoose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ mod test {
use crate::{
algebra::element::Element32,
net::{
agency::Broadcast, connection::{self, ConnectionError, DuplexConnection}, Channel, RecvBytes, SendBytes, SplitChannel
agency::Broadcast,
connection::{self, ConnectionError, DuplexConnection},
Channel, RecvBytes, SendBytes, SplitChannel,
},
protocols::cutnchoose::{choose, cut},
schemes::{
Expand Down Expand Up @@ -157,7 +159,8 @@ mod test {

fn recv_bytes(
&mut self,
) -> impl std::future::Future<Output = Result<tokio_util::bytes::BytesMut, Self::RecvError>> + Send {
) -> impl std::future::Future<Output = Result<tokio_util::bytes::BytesMut, Self::RecvError>> + Send
{
self.inner.recv_bytes()
}
}
Expand Down
36 changes: 20 additions & 16 deletions src/protocols/voting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ pub struct Proposal<T> {
creator: usize,
}


pub trait Initiative: Serialize + DeserializeOwned + Sync {
type Args;
fn name() -> &'static str;
fn execute(args: Args);
}

impl<T: Initiative> Proposal<T> {

pub async fn initiate<C: Communicate>(initative: T, mut coms: C) -> Result<Option<Self>, C::BroadcastError> {
pub async fn initiate<C: Communicate>(
initative: T,
mut coms: C,
) -> Result<Option<Self>, C::BroadcastError> {
let creator = coms.id();
let proposal = Self {initative, creator};
let proposal = Self { initative, creator };
let request = ProposalRequest(proposal);

coms.broadcast(&request).await?;
Expand All @@ -31,33 +32,36 @@ impl<T: Initiative> Proposal<T> {
pub fn execute(self, args: Args) {
T::execute(args)
}

}

#[repr(transparent)]
#[derive(Clone, Serialize, Deserialize)]
pub struct ProposalRequest<T>(Proposal<T>);

impl<T: Initiative> ProposalRequest<T> {

async fn vote<C: Communicate>(self, vote: bool, mut coms: C) -> Result<Option<Proposal<T>>, C::BroadcastError> {
async fn vote<C: Communicate>(
self,
vote: bool,
mut coms: C,
) -> Result<Option<Proposal<T>>, C::BroadcastError> {
let votes = coms.symmetric_broadcast(vote).await?;
let n = votes.len();
let n = votes.len();
let yes_votes = votes.into_iter().filter(|&v| v).count();
let res = if yes_votes > n {
Some(self.0)
} else {
None
};
let res = if yes_votes > n { Some(self.0) } else { None };
Ok(res)
}

pub async fn accept<C: Communicate>(self, coms: C) -> Result<Option<Proposal<T>>, C::BroadcastError> {
pub async fn accept<C: Communicate>(
self,
coms: C,
) -> Result<Option<Proposal<T>>, C::BroadcastError> {
self.vote(true, coms).await
}

pub async fn reject<C: Communicate>(self, coms: C) -> Result<Option<Proposal<T>>, C::BroadcastError>{
pub async fn reject<C: Communicate>(
self,
coms: C,
) -> Result<Option<Proposal<T>>, C::BroadcastError> {
self.vote(false, coms).await

}
}

0 comments on commit 142cc69

Please sign in to comment.