Skip to content

Commit

Permalink
Add documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed Jun 14, 2024
1 parent bb81a97 commit 2f74eac
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 24 deletions.
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# vim: ft=toml
[toolchain]
channel = "nightly-2024-04-17"
channel = "nightly-2024-05-15"
2 changes: 1 addition & 1 deletion src/algebra/element.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ overload!((a: &mut Mod11) *= (b: ?Mod11) {

impl ConstantTimeEq for Mod11 {
fn ct_eq(&self, other: &Self) -> Choice {
((self == other) as u8).into()
u8::from(self == other).into()
}
}

Expand Down
64 changes: 42 additions & 22 deletions src/net/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,14 @@ impl RecvBytes for MuxedReceiver {
}
}

// NOTE: This can still be axed in favor of revamping Connection with
// generic parameters using Send/RecvBytes instead of AsyncWrite/AsyncRead.
// (FramedRead is made compatible with the Send/RecvBytes traits)
//
/// Multiplexed Connection
///
/// Aqquirred by constructing a [Gateway] using [``Gateway::multiplex``]
/// Aqquirred by constructing a [Gateway] using [`Gateway::single`], [`Gateway::multiplex`],
/// [`Gateway::multiplex_array`] or [`Gateway::muxify`] on an existing gateway.
///
/// Errors are propogated from the underlying connection inside the gateway.
///
/// The gateway needs to be driven for the muxed connection to function using [`Gateway::drive`].
pub struct MuxConn(MuxedSender, MuxedReceiver);

impl Channel for MuxConn {
Expand Down Expand Up @@ -200,9 +201,9 @@ impl<C: SplitChannel + Send> Gateway<C> {
///
/// # Errors
///
/// - [``GatewayError::MailboxNotFound``] if a given multiplexed connections has been
/// - [`GatewayError::MailboxNotFound`] if a given multiplexed connections has been
/// dropped and is receiving messages.
/// - [``GatewayError::DeadConnection``] if the underlying connection have failed.
/// - [`GatewayError::DeadConnection`] if the underlying connection have failed.
///
pub async fn drive(mut self) -> Result<Self, GatewayError<C::Error>> {
// TODO: maybe have this be nonconsuming so it can be resumed after new muxes are added?
Expand Down Expand Up @@ -269,19 +270,27 @@ impl<C: SplitChannel + Send> Gateway<C> {
GatewayError::DeadConnection(err)
}

pub fn single(channel: C) -> (Self, MuxConn) {
fn new(channel: C) -> (Self, UnboundedSender<MultiplexedMessage>) {
let (outbox, inbox) = unbounded_channel();
let gateway = outbox.clone();
let link = outbox.clone(); // needs to kept alive
let outbox = outbox.downgrade();
let mut new = Self {
let gateway = Self {
channel,
mailboxes: vec![],
errors: vec![],
inbox,
outbox,
};
let con = new.add_mux(gateway);
(new, con)
(gateway, link)
}

/// Multiplex a channel to a single new muxed connection.
///
/// New muxed connections can be constructed using [`Gateway::muxify`].
pub fn single(channel: C) -> (Self, MuxConn) {
let (mut gateway, link) = Self::new(channel);
let con = gateway.add_mux(link);
(gateway, con)
}

pub fn destroy(self) -> C {
Expand All @@ -290,31 +299,42 @@ impl<C: SplitChannel + Send> Gateway<C> {

/// Multiplex a channel to share it into `n` new connections.
///
/// * `net`: Connection to use as a gateway for multiplexing
/// * `con`: Connection to use as a gateway for multiplexing
/// * `n`: Number of new connections to multiplex into
///
/// Returns a gateway which the ``MuxConn`` communicate through, along with the MuxConn
/// Returns a gateway which the [`MuxConn`] communicate through, along with the [`MuxConn`]'s
#[must_use]

Check warning on line 306 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs

Check warning on line 306 in src/net/mux.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/caring/caring/src/net/mux.rs
pub fn multiplex(con: C, n: usize) -> (Self, Vec<MuxConn>) {
let (mut gateway, con) = Self::single(con);
let mut muxes = vec![con];
for _ in 1..n {
muxes.push(gateway.muxify());
}
let (mut gateway, link) = Self::new(con);
let muxes : Vec<_> = (0..n).map(|_| gateway.add_mux(link.clone())).collect();
(gateway, muxes)
}

/// Multiplex a channel to share into `N` new connections
///
/// * `con`: connection to use
///
/// Returns a gateway which the [`MuxConn`] communicate through, along with the [`MuxConn`]'s
#[must_use]
pub fn multiplex_array<const N: usize>(con: C) -> (Self, [MuxConn; N]) {
let (mut gateway, link) = Self::new(con);
let muxes = std::array::from_fn(|_| gateway.add_mux(link.clone()));
(gateway, muxes)
}

fn add_mux(&mut self, gateway: UnboundedSender<MultiplexedMessage>) -> MuxConn {
let id = self.mailboxes.len();
let (errors_coms1, error) = oneshot::channel();
let mx_sender = MuxedSender { id, gateway, error };
let sender = MuxedSender { id, gateway, error };
let (outbox, mailbox) = tokio::sync::mpsc::unbounded_channel();
let (errors_coms2, error) = oneshot::channel();
let mx_receiver = MuxedReceiver { id, mailbox, error };
let receiver = MuxedReceiver { id, mailbox, error };
self.errors.push([errors_coms1, errors_coms2]);
self.mailboxes.push(outbox);
MuxConn(mx_sender, mx_receiver)
MuxConn(sender, receiver)
}

/// Add a new muxed connection
pub fn muxify(&mut self) -> MuxConn {
let gateway = self
.outbox
Expand Down

0 comments on commit 2f74eac

Please sign in to comment.