diff --git a/src/net/connection.rs b/src/net/connection.rs index 5d5cc62..0212033 100644 --- a/src/net/connection.rs +++ b/src/net/connection.rs @@ -78,16 +78,16 @@ impl Connection { } } - pub struct Sending(FramedWrite); pub struct Receiving(FramedRead); - impl SendBytes for Sending { type SendError = ConnectionError; async fn send_bytes(&mut self, bytes: Bytes) -> Result<(), Self::SendError> { - SinkExt::<_>::send(&mut self.0, bytes).await.map_err(|_| ConnectionError::Closed) + SinkExt::<_>::send(&mut self.0, bytes) + .await + .map_err(|_| ConnectionError::Closed) } } @@ -95,7 +95,8 @@ impl RecvBytes for Receiving { type RecvError = ConnectionError; async fn recv_bytes(&mut self) -> Result { - self.0.next() + self.0 + .next() .await .ok_or(ConnectionError::Closed)? .map_err(|e| ConnectionError::Unknown(Box::new(e))) @@ -117,7 +118,6 @@ impl Connection pub async fn recv(&mut self) -> Result { self.receiver.recv().await } - } impl< @@ -148,7 +148,7 @@ impl< } } -impl Channel for Connection { +impl Channel for Connection { type Error = ConnectionError; } diff --git a/src/net/mod.rs b/src/net/mod.rs index 64b9531..32e1c68 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -8,7 +8,6 @@ pub mod connection; pub mod mux; pub mod network; - pub trait SendBytes: Send { type SendError: Error + Send + Sync + 'static; @@ -39,7 +38,6 @@ impl SendBytes for &mut S { } } - pub trait RecvBytes: Send { type RecvError: Error + Send + Sync + 'static; fn recv_bytes( @@ -64,7 +62,6 @@ impl RecvBytes for &mut R { ) -> impl std::future::Future> + Send { (**self).recv_bytes() } - } /// A communication medium between you and another party. @@ -77,17 +74,14 @@ impl Channel for &mut C { type Error = C::Error; } - - /// A [Channel] which can be split into a sender and receiver. pub trait SplitChannel: Channel + Send { type Sender: SendBytes + Send; type Receiver: RecvBytes + Send; fn split(&mut self) -> (&mut Self::Sender, &mut Self::Receiver); - } -impl<'a, C: SplitChannel> SplitChannel for &'a mut C { +impl<'a, C: SplitChannel> SplitChannel for &'a mut C { type Sender = C::Sender; type Receiver = C::Receiver; @@ -96,7 +90,6 @@ impl<'a, C: SplitChannel> SplitChannel for &'a mut C { } } - /// Tune to a specific channel pub trait Tuneable { type TuningError: Error + Send + 'static; diff --git a/src/net/mux.rs b/src/net/mux.rs index 04bea50..c32846e 100644 --- a/src/net/mux.rs +++ b/src/net/mux.rs @@ -19,9 +19,7 @@ use tokio::join; use crate::{ help, - net::{ - network::Network, Channel, RecvBytes, SendBytes, SplitChannel - }, + net::{network::Network, Channel, RecvBytes, SendBytes, SplitChannel}, }; #[derive(Debug, Error)] diff --git a/src/net/network.rs b/src/net/network.rs index 68fc83f..049608b 100644 --- a/src/net/network.rs +++ b/src/net/network.rs @@ -44,9 +44,9 @@ pub struct Network { #[derive(thiserror::Error, Debug)] #[error("Error communicating with {id}: {source}")] -pub enum NetworkError { - Incoming {id: u32, source: E}, - Outgoing {id: u32, source: U} +pub enum NetworkError { + Incoming { id: u32, source: E }, + Outgoing { id: u32, source: U }, } #[allow(type_alias_bounds)] // It clearly matters, stop complaining @@ -69,22 +69,18 @@ impl Network { } } - /// Broadcast a message to all other parties. /// /// Asymmetric, non-waiting /// /// * `msg`: Message to send - pub async fn broadcast( - &mut self, - msg: &(impl serde::Serialize + Sync), - ) -> NetResult<(), C> { + pub async fn broadcast(&mut self, msg: &(impl serde::Serialize + Sync)) -> NetResult<(), C> { let my_id = self.index; - let packet : Bytes = bincode::serialize(&msg).unwrap().into(); + let packet: Bytes = bincode::serialize(&msg).unwrap().into(); let outgoing = self.connections.iter_mut().enumerate().map(|(i, conn)| { let id = if i < my_id { i } else { i + 1 } as u32; conn.send_bytes(packet.clone()) - .map_err(move |e| NetworkError::Outgoing{id, source: e}) + .map_err(move |e| NetworkError::Outgoing { id, source: e }) }); future::try_join_all(outgoing).await?; Ok(()) @@ -98,10 +94,7 @@ impl Network { /// Asymmetric, non-waiting /// /// * `msgs`: Messages to send - pub async fn unicast( - &mut self, - msgs: &[impl serde::Serialize + Sync], - ) -> NetResult<(), C> { + pub async fn unicast(&mut self, msgs: &[impl serde::Serialize + Sync]) -> NetResult<(), C> { let my_id = self.index; let outgoing = self .connections @@ -122,9 +115,7 @@ impl Network { /// Asymmetric, waiting /// /// Returns: A list sorted by the connections (skipping yourself) - pub async fn receive_all( - &mut self, - ) -> NetResult, C> { + pub async fn receive_all(&mut self) -> NetResult, C> { let my_id = self.index; let messages = self.connections.iter_mut().enumerate().map(|(i, conn)| { let msg = conn.recv::(); @@ -163,7 +154,7 @@ impl Network { let (mut tx, mut rx): (Vec<_>, Vec<_>) = self.connections.iter_mut().map(|c| c.split()).unzip(); - let packet : Bytes = bincode::serialize(&msg).unwrap().into(); + let packet: Bytes = bincode::serialize(&msg).unwrap().into(); let outgoing = tx.iter_mut().enumerate().map(|(id, conn)| { let id = if id < my_id { id } else { id + 1 } as u32; conn.send_bytes(packet.clone()) @@ -209,10 +200,7 @@ impl Network { /// will be send to party `i`. /// /// * `msg`: message to send and receive - pub async fn symmetric_unicast( - &mut self, - mut msgs: Vec, - ) -> NetResult, C> + pub async fn symmetric_unicast(&mut self, mut msgs: Vec) -> NetResult, C> where T: serde::Serialize + serde::de::DeserializeOwned + Sync, { @@ -308,7 +296,6 @@ impl Network { async fn drop_party(_id: usize) -> Result<(), ()> { todo!("Initiate a drop vote"); - } } @@ -317,7 +304,7 @@ impl Network { // // Outline: // Currently we do not handle any unprepared protocols, but only expected 'happy path' behaviour. -// In case of protocols or communication failure we return an error, but we do not provide a solution. +// In case of protocols or communication failure we return an error, but we do not provide a solution. // The current expection is for the downstream user to handle it themselves, instead of doing // something automatic. However, we currently do not have any methods for removing parties, // and if we had we still need all other parties to come to the same conclusion. @@ -341,12 +328,14 @@ impl Network { // // - impl Unicast for Network { type UnicastError = NetworkError; #[tracing::instrument(skip_all)] - async fn unicast(&mut self, msgs: &[impl serde::Serialize + Sync]) -> Result<(), Self::UnicastError> { + async fn unicast( + &mut self, + msgs: &[impl serde::Serialize + Sync], + ) -> Result<(), Self::UnicastError> { self.unicast(msgs).await } @@ -359,7 +348,9 @@ impl Unicast for Network { } #[tracing::instrument(skip_all)] - async fn receive_all(&mut self) -> Result, Self::UnicastError> { + async fn receive_all( + &mut self, + ) -> Result, Self::UnicastError> { self.receive_all().await } @@ -372,7 +363,10 @@ impl Broadcast for Network { type BroadcastError = NetworkError; #[tracing::instrument(skip_all)] - async fn broadcast(&mut self, msg: &(impl serde::Serialize + Sync)) -> Result<(), Self::BroadcastError> { + async fn broadcast( + &mut self, + msg: &(impl serde::Serialize + Sync), + ) -> Result<(), Self::BroadcastError> { self.broadcast(msg).await } @@ -408,7 +402,13 @@ impl Tuneable for Network { idx: usize, ) -> Result { let idx = self.id_to_index(idx); - self.connections[idx].recv().await.map_err(|e | NetworkError::Incoming { id: idx as u32, source: e }) + self.connections[idx] + .recv() + .await + .map_err(|e| NetworkError::Incoming { + id: idx as u32, + source: e, + }) } async fn send_to( @@ -417,7 +417,13 @@ impl Tuneable for Network { msg: &T, ) -> Result<(), Self::TuningError> { let idx = self.id_to_index(idx); - self.connections[idx].send(msg).await.map_err(|e | NetworkError::Outgoing { id: idx as u32, source: e }) + self.connections[idx] + .send(msg) + .await + .map_err(|e| NetworkError::Outgoing { + id: idx as u32, + source: e, + }) } } @@ -474,10 +480,7 @@ impl TcpNetwork { /// /// * `me`: Socket address to open to /// * `peers`: Socket addresses of other peers - pub async fn connect( - me: SocketAddr, - peers: &[SocketAddr], - ) -> NetResult { + pub async fn connect(me: SocketAddr, peers: &[SocketAddr]) -> NetResult { let n = peers.len(); // Connecting to parties diff --git a/src/protocols/cutnchoose.rs b/src/protocols/cutnchoose.rs index 3e782e6..0a1d8c7 100644 --- a/src/protocols/cutnchoose.rs +++ b/src/protocols/cutnchoose.rs @@ -90,7 +90,9 @@ mod test { use crate::{ algebra::element::Element32, net::{ - agency::Broadcast, connection::{self, ConnectionError, DuplexConnection}, Channel, RecvBytes, SendBytes, SplitChannel + agency::Broadcast, + connection::{self, ConnectionError, DuplexConnection}, + Channel, RecvBytes, SendBytes, SplitChannel, }, protocols::cutnchoose::{choose, cut}, schemes::{ @@ -157,7 +159,8 @@ mod test { fn recv_bytes( &mut self, - ) -> impl std::future::Future> + Send { + ) -> impl std::future::Future> + Send + { self.inner.recv_bytes() } } diff --git a/src/protocols/voting.rs b/src/protocols/voting.rs index b6ce2d8..1584c2f 100644 --- a/src/protocols/voting.rs +++ b/src/protocols/voting.rs @@ -10,7 +10,6 @@ pub struct Proposal { creator: usize, } - pub trait Initiative: Serialize + DeserializeOwned + Sync { type Args; fn name() -> &'static str; @@ -18,10 +17,12 @@ pub trait Initiative: Serialize + DeserializeOwned + Sync { } impl Proposal { - - pub async fn initiate(initative: T, mut coms: C) -> Result, C::BroadcastError> { + pub async fn initiate( + initative: T, + mut coms: C, + ) -> Result, C::BroadcastError> { let creator = coms.id(); - let proposal = Self {initative, creator}; + let proposal = Self { initative, creator }; let request = ProposalRequest(proposal); coms.broadcast(&request).await?; @@ -31,7 +32,6 @@ impl Proposal { pub fn execute(self, args: Args) { T::execute(args) } - } #[repr(transparent)] @@ -39,25 +39,29 @@ impl Proposal { pub struct ProposalRequest(Proposal); impl ProposalRequest { - - async fn vote(self, vote: bool, mut coms: C) -> Result>, C::BroadcastError> { + async fn vote( + self, + vote: bool, + mut coms: C, + ) -> Result>, C::BroadcastError> { let votes = coms.symmetric_broadcast(vote).await?; - let n = votes.len(); + let n = votes.len(); let yes_votes = votes.into_iter().filter(|&v| v).count(); - let res = if yes_votes > n { - Some(self.0) - } else { - None - }; + let res = if yes_votes > n { Some(self.0) } else { None }; Ok(res) } - pub async fn accept(self, coms: C) -> Result>, C::BroadcastError> { + pub async fn accept( + self, + coms: C, + ) -> Result>, C::BroadcastError> { self.vote(true, coms).await } - pub async fn reject(self, coms: C) -> Result>, C::BroadcastError>{ + pub async fn reject( + self, + coms: C, + ) -> Result>, C::BroadcastError> { self.vote(false, coms).await - } }