Skip to content
This repository has been archived by the owner on Jul 5, 2024. It is now read-only.

Commit

Permalink
refactor: fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
simbleau committed Mar 24, 2024
1 parent 2d85f72 commit 3eaf371
Show file tree
Hide file tree
Showing 19 changed files with 136 additions and 244 deletions.
11 changes: 3 additions & 8 deletions bevy_rtc/src/client/plugin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::{
systems, AddProtocolExt, ConnectionRequest, RtcClientEvent,
RtcClientStatus, RtcState,
systems, AddProtocolExt, ConnectionRequest, RtcClientEvent, RtcClientStatus, RtcState,
};
use crate::{
events::SocketRecvEvent,
Expand All @@ -21,10 +20,7 @@ impl Plugin for RtcClientPlugin {
.init_state::<RtcClientStatus>()
.add_event::<ConnectionRequest>()
.add_event::<RtcClientEvent>()
.add_systems(
OnEnter(RtcClientStatus::Establishing),
systems::init_socket,
)
.add_systems(OnEnter(RtcClientStatus::Establishing), systems::init_socket)
.add_systems(
OnEnter(RtcClientStatus::Disconnected),
systems::reset_socket,
Expand All @@ -46,8 +42,7 @@ impl Plugin for RtcClientPlugin {
Update,
(
systems::read_latency_tracers,
systems::send_latency_tracers
.run_if(on_timer(Duration::from_millis(100))),
systems::send_latency_tracers.run_if(on_timer(Duration::from_millis(100))),
)
.run_if(in_state(RtcClientStatus::Connected)),
);
Expand Down
16 changes: 4 additions & 12 deletions bevy_rtc/src/client/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ pub trait AddProtocolExt {
fn add_sendonly_protocol<M: Payload>(&mut self) -> &mut Self;
/// Register a protocol that is only read, never sent. Allocate a bounded
/// buffer per peer for receiving, and do not run systems for sending.
fn add_readonly_bounded_protocol<M: Payload>(
&mut self,
bound: usize,
) -> &mut Self;
fn add_readonly_bounded_protocol<M: Payload>(&mut self, bound: usize) -> &mut Self;
/// Register a protocol that is only read, never sent. Use a growable buffer
/// for receiving, and do not run systems for sending.
fn add_readonly_unbounded_protocol<M: Payload>(&mut self) -> &mut Self;
Expand All @@ -43,8 +40,7 @@ impl AddProtocolExt for App {
})
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<RtcSocket>),
OutgoingMessages::<M>::send_payloads.run_if(resource_exists::<RtcSocket>),
);
self
}
Expand All @@ -53,10 +49,7 @@ impl AddProtocolExt for App {
self.add_readonly_bounded_protocol::<M>(usize::MAX)
}

fn add_readonly_bounded_protocol<M: Payload>(
&mut self,
bound: usize,
) -> &mut Self {
fn add_readonly_bounded_protocol<M: Payload>(&mut self, bound: usize) -> &mut Self {
if self.world.contains_resource::<IncomingMessages<M>>() {
panic!("client already contains resource: {}", M::reflect_name());
}
Expand Down Expand Up @@ -102,8 +95,7 @@ impl AddProtocolExt for App {
)
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<RtcSocket>),
OutgoingMessages::<M>::send_payloads.run_if(resource_exists::<RtcSocket>),
);
self
}
Expand Down
5 changes: 1 addition & 4 deletions bevy_rtc/src/client/router/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ pub struct IncomingMessages<M: Payload> {
}

impl<M: Payload> IncomingMessages<M> {
pub fn receive_payloads(
mut incoming: ResMut<Self>,
mut events: EventReader<SocketRecvEvent>,
) {
pub fn receive_payloads(mut incoming: ResMut<Self>, mut events: EventReader<SocketRecvEvent>) {
let bound = incoming.bound;
let packets: Vec<_> = events
.read()
Expand Down
4 changes: 1 addition & 3 deletions bevy_rtc/src/client/router/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ impl<M: Payload> OutgoingMessages<M> {
.try_send(message.to_packet(), host)
.is_err()
{
error!(
"failed to send reliable packet to {host}: {message:?}"
);
error!("failed to send reliable packet to {host}: {message:?}");
}
}
if !queue.reliable_to_host.is_empty() {
Expand Down
15 changes: 4 additions & 11 deletions bevy_rtc/src/client/systems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ pub(crate) fn connection_request_handler(
) {
match cxn_event_reader.read().next() {
Some(ConnectionRequest::Connect { addr }) => {
if let RtcClientStatus::Disconnected =
current_connection_state.get()
{
if let RtcClientStatus::Disconnected = current_connection_state.get() {
debug!(
previous = format!("{current_connection_state:?}"),
"set state: connecting"
Expand Down Expand Up @@ -121,8 +119,7 @@ pub(crate) fn client_event_writer(
event_wtr.send(RtcClientEvent::ConnectedToHost(id));
}
matchbox_socket::PeerState::Disconnected => {
next_connection_state
.set(RtcClientStatus::Disconnected);
next_connection_state.set(RtcClientStatus::Disconnected);
event_wtr.send(RtcClientEvent::DisconnectedFromHost {
reason: Some("Server reset".to_string()),
});
Expand All @@ -143,10 +140,7 @@ pub(crate) fn client_event_writer(
}
}

pub fn send_latency_tracers(
state: Res<RtcState>,
mut writer: NetworkWriter<LatencyTracerPayload>,
) {
pub fn send_latency_tracers(state: Res<RtcState>, mut writer: NetworkWriter<LatencyTracerPayload>) {
let peer_id = state.id.expect("expected peer id");
writer.unreliable_to_host(LatencyTracerPayload::new(peer_id));
}
Expand Down Expand Up @@ -190,8 +184,7 @@ pub fn calculate_latency(
match last_latency {
Some(last_latency) => {
state.latency.replace(last_latency);
let current_smoothed =
state.smoothed_latency.get_or_insert(last_latency);
let current_smoothed = state.smoothed_latency.get_or_insert(last_latency);
const AVG_SECS: f32 = 1.0; // 1 second average
let alpha = 1.0 - f32::exp(-time.delta_seconds() / AVG_SECS);
let current_f32 = current_smoothed.as_secs_f32() * (1.0 - alpha);
Expand Down
3 changes: 1 addition & 2 deletions bevy_rtc/src/latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ impl LatencyTracer {
.unwrap_or(std::cmp::Ordering::Equal)
});
let mid = self.latency_hist.len() / 2;
let median =
self.latency_hist.get(mid).map(|(_, lat)| lat.as_secs_f32());
let median = self.latency_hist.get(mid).map(|(_, lat)| lat.as_secs_f32());
self.last_latency = median;
}
}
4 changes: 1 addition & 3 deletions bevy_rtc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(all(target_arch = "wasm32", feature = "server"))]
compile_error!(
"The 'server' feature is not supported on the wasm32 target architecture."
);
compile_error!("The 'server' feature is not supported on the wasm32 target architecture.");

pub(crate) mod events;
pub(crate) mod latency;
Expand Down
14 changes: 4 additions & 10 deletions bevy_rtc/src/server/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use bevy::{prelude::*, time::common_conditions::on_timer};
use instant::Duration;
use std::net::Ipv4Addr;

use super::{
systems, AddProtocolExt, RtcServerEvent, RtcServerStatus, RtcState,
};
use super::{systems, AddProtocolExt, RtcServerEvent, RtcServerStatus, RtcState};

/// A plugin to serve a WebRTC server.
pub struct RtcServerPlugin {
Expand All @@ -23,15 +21,12 @@ impl Plugin for RtcServerPlugin {
.add_event::<RtcServerEvent>()
.add_bounded_protocol::<LatencyTracerPayload>(2)
.init_state::<RtcServerStatus>()
.insert_resource(RtcState::new(
(Ipv4Addr::UNSPECIFIED, self.port).into(),
))
.insert_resource(RtcState::new((Ipv4Addr::UNSPECIFIED, self.port).into()))
.add_systems(
Startup,
// We start a signaling server on localhost and the first peer
// becomes host
(systems::init_signaling_server, systems::init_server_socket)
.chain(),
(systems::init_signaling_server, systems::init_server_socket).chain(),
)
.add_systems(
First,
Expand All @@ -47,8 +42,7 @@ impl Plugin for RtcServerPlugin {
Update,
(
systems::read_latency_tracers,
systems::send_latency_tracers
.run_if(on_timer(Duration::from_millis(100))),
systems::send_latency_tracers.run_if(on_timer(Duration::from_millis(100))),
)
.run_if(in_state(RtcServerStatus::Ready)),
);
Expand Down
16 changes: 4 additions & 12 deletions bevy_rtc/src/server/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ pub trait AddProtocolExt {
fn add_sendonly_protocol<M: Payload>(&mut self) -> &mut Self;
/// Register a protocol that is only read, never sent. Allocate a bounded
/// buffer per peer for receiving, and do not run systems for sending.
fn add_readonly_bounded_protocol<M: Payload>(
&mut self,
bound: usize,
) -> &mut Self;
fn add_readonly_bounded_protocol<M: Payload>(&mut self, bound: usize) -> &mut Self;
/// Register a protocol that is only read, never sent. Use a growable buffer
/// for receiving, and do not run systems for sending.
fn add_readonly_unbounded_protocol<M: Payload>(&mut self) -> &mut Self;
Expand All @@ -46,8 +43,7 @@ impl AddProtocolExt for App {
})
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<RtcSocket>),
OutgoingMessages::<M>::send_payloads.run_if(resource_exists::<RtcSocket>),
);

self
Expand All @@ -57,10 +53,7 @@ impl AddProtocolExt for App {
self.add_readonly_bounded_protocol::<M>(usize::MAX)
}

fn add_readonly_bounded_protocol<M: Payload>(
&mut self,
bound: usize,
) -> &mut Self {
fn add_readonly_bounded_protocol<M: Payload>(&mut self, bound: usize) -> &mut Self {
if self.world.contains_resource::<IncomingMessages<M>>() {
panic!("server already contains resource: {}", M::reflect_name());
}
Expand Down Expand Up @@ -111,8 +104,7 @@ impl AddProtocolExt for App {
)
.add_systems(
Last,
OutgoingMessages::<M>::send_payloads
.run_if(resource_exists::<RtcSocket>),
OutgoingMessages::<M>::send_payloads.run_if(resource_exists::<RtcSocket>),
);

self
Expand Down
5 changes: 1 addition & 4 deletions bevy_rtc/src/server/router/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ pub struct IncomingMessages<M: Payload> {
}

impl<M: Payload> IncomingMessages<M> {
pub fn receive_payloads(
mut incoming: ResMut<Self>,
mut events: EventReader<SocketRecvEvent>,
) {
pub fn receive_payloads(mut incoming: ResMut<Self>, mut events: EventReader<SocketRecvEvent>) {
let bound = incoming.bound;
let packets: HashMap<PeerId, Vec<M>> = events.read().fold(
HashMap::new(),
Expand Down
51 changes: 22 additions & 29 deletions bevy_rtc/src/server/router/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ impl<M: Payload> OutgoingMessages<M> {
self.unreliable_to_peer.clear();
}

pub(crate) fn send_payloads(
mut queue: ResMut<Self>,
mut socket: ResMut<RtcSocket>,
) {
pub(crate) fn send_payloads(mut queue: ResMut<Self>, mut socket: ResMut<RtcSocket>) {
// Server is sending
for message in queue.reliable_to_all.iter() {
let peers: Vec<PeerId> = socket.connected_peers().collect();
Expand All @@ -40,9 +37,7 @@ impl<M: Payload> OutgoingMessages<M> {
.try_send(message.to_packet(), peer)
.is_err()
{
error!(
"failed to send reliable packet to {peer}: {message:?}"
);
error!("failed to send reliable packet to {peer}: {message:?}");
}
})
}
Expand All @@ -56,12 +51,14 @@ impl<M: Payload> OutgoingMessages<M> {
for message in queue.unreliable_to_all.iter() {
let peers: Vec<PeerId> = socket.connected_peers().collect();
peers.into_iter().for_each(|peer| {
if socket
.channel_mut(UNRELIABLE_CHANNEL_INDEX)
.try_send(message.to_packet(), peer).is_err() {
error!("failed to send unreliable packet to {peer}: {message:?}");
}
})
if socket
.channel_mut(UNRELIABLE_CHANNEL_INDEX)
.try_send(message.to_packet(), peer)
.is_err()
{
error!("failed to send unreliable packet to {peer}: {message:?}");
}
})
}
if !queue.unreliable_to_all.is_empty() {
trace!(
Expand All @@ -71,17 +68,14 @@ impl<M: Payload> OutgoingMessages<M> {
);
}
for (peer, message) in queue.reliable_to_all_except.iter() {
let peers: Vec<PeerId> =
socket.connected_peers().filter(|p| p != peer).collect();
let peers: Vec<PeerId> = socket.connected_peers().filter(|p| p != peer).collect();
peers.into_iter().for_each(|peer| {
if socket
.channel_mut(RELIABLE_CHANNEL_INDEX)
.try_send(message.to_packet(), peer)
.is_err()
{
error!(
"failed to send reliable packet to {peer}: {message:?}"
);
error!("failed to send reliable packet to {peer}: {message:?}");
}
});
}
Expand All @@ -93,15 +87,16 @@ impl<M: Payload> OutgoingMessages<M> {
);
}
for (peer, message) in queue.unreliable_to_all_except.iter() {
let peers: Vec<PeerId> =
socket.connected_peers().filter(|p| p != peer).collect();
let peers: Vec<PeerId> = socket.connected_peers().filter(|p| p != peer).collect();
peers.into_iter().for_each(|peer| {
if socket
.channel_mut(UNRELIABLE_CHANNEL_INDEX)
.try_send(message.to_packet(), peer).is_err() {
error!("failed to send unreliable packet to {peer}: {message:?}");
}
});
if socket
.channel_mut(UNRELIABLE_CHANNEL_INDEX)
.try_send(message.to_packet(), peer)
.is_err()
{
error!("failed to send unreliable packet to {peer}: {message:?}");
}
});
}
if !queue.unreliable_to_all_except.is_empty() {
trace!(
Expand Down Expand Up @@ -132,9 +127,7 @@ impl<M: Payload> OutgoingMessages<M> {
.try_send(message.to_packet(), *peer)
.is_err()
{
error!(
"failed to send unreliable packet to {peer}: {message:?}"
);
error!("failed to send unreliable packet to {peer}: {message:?}");
}
}
if !queue.unreliable_to_peer.is_empty() {
Expand Down
13 changes: 3 additions & 10 deletions bevy_rtc/src/server/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,15 @@ impl RtcState {
}

/// Return the instantaneous latencies for all peers
pub fn iter_latencies(
&self,
) -> impl Iterator<Item = (PeerId, Duration)> + '_ {
pub fn iter_latencies(&self) -> impl Iterator<Item = (PeerId, Duration)> + '_ {
self.latencies
.iter()
.filter_map(|(p, l)| l.map(|l| (p, l)))
.map(|(p, l)| (*p, l))
}

/// Return the smoothed latencies for all peers
pub fn iter_smoothed_latencies(
&self,
) -> impl Iterator<Item = (PeerId, Duration)> + '_ {
pub fn iter_smoothed_latencies(&self) -> impl Iterator<Item = (PeerId, Duration)> + '_ {
self.smoothed_latencies
.iter()
.filter_map(|(p, l)| l.map(|l| (p, l)))
Expand All @@ -77,10 +73,7 @@ impl RtcState {
}

/// Return the smoothed latency for a peer if they exist
pub fn get_smoothed_latency_for(
&self,
peer_id: PeerId,
) -> Option<Duration> {
pub fn get_smoothed_latency_for(&self, peer_id: PeerId) -> Option<Duration> {
*self.smoothed_latencies.get(&peer_id)?
}
}
Loading

0 comments on commit 3eaf371

Please sign in to comment.