Skip to content

Commit

Permalink
Merge pull request #254 from matthiasbeyer/no-async
Browse files Browse the repository at this point in the history
Remove all async from mqtt-format v5
  • Loading branch information
TheNeikos authored Mar 26, 2024
2 parents 55843b6 + d98e1d4 commit 37358a1
Show file tree
Hide file tree
Showing 25 changed files with 290 additions and 300 deletions.
12 changes: 6 additions & 6 deletions mqtt-format/src/v5/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ pub fn parse_binary_data<'i>(input: &mut &'i Bytes) -> MResult<&'i [u8]> {
.parse_next(input)
}

pub async fn write_binary_data<W: WriteMqttPacket>(buffer: &mut W, slice: &[u8]) -> WResult<W> {
pub fn write_binary_data<W: WriteMqttPacket>(buffer: &mut W, slice: &[u8]) -> WResult<W> {
let slice_len = slice
.len()
.try_into()
.map_err(|_| W::Error::from(super::write::MqttWriteError::Invariant))?;

buffer.write_u16(slice_len).await?;
buffer.write_slice(slice).await
buffer.write_u16(slice_len)?;
buffer.write_slice(slice)
}

#[inline]
Expand All @@ -53,12 +53,12 @@ mod tests {
);
}

#[tokio::test]
async fn test_write_binary_data() {
#[test]
fn test_write_binary_data() {
let mut writer = TestWriter { buffer: Vec::new() };
let data = &[0xFF, 0xAB, 0x42, 0x13, 0x37, 0x69];

write_binary_data(&mut writer, data).await.unwrap();
write_binary_data(&mut writer, data).unwrap();
let out = parse_binary_data(&mut Bytes::new(&writer.buffer)).unwrap();

assert_eq!(out, data);
Expand Down
4 changes: 2 additions & 2 deletions mqtt-format/src/v5/fixed_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl MFixedHeader {
Ok(MFixedHeader { packet_type })
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
pub fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
#[allow(clippy::identity_op)]
let byte = match self.packet_type {
PacketType::Connect => (1 << 4) | 0,
Expand Down Expand Up @@ -135,7 +135,7 @@ impl MFixedHeader {
PacketType::Auth => (15 << 4) | 0,
};

buffer.write_byte(byte).await
buffer.write_byte(byte)
}
}

Expand Down
60 changes: 30 additions & 30 deletions mqtt-format/src/v5/integers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub fn parse_u16(input: &mut &Bytes) -> MResult<u16> {
.parse_next(input)
}

pub async fn write_u16<W: WriteMqttPacket>(buffer: &mut W, u: u16) -> WResult<W> {
buffer.write_u16(u.to_be()).await?;
pub fn write_u16<W: WriteMqttPacket>(buffer: &mut W, u: u16) -> WResult<W> {
buffer.write_u16(u.to_be())?;
Ok(())
}

Expand All @@ -45,8 +45,8 @@ pub fn parse_u32(input: &mut &Bytes) -> MResult<u32> {
.parse_next(input)
}

pub async fn write_u32<W: WriteMqttPacket>(buffer: &mut W, u: u32) -> WResult<W> {
buffer.write_u32(u).await?;
pub fn write_u32<W: WriteMqttPacket>(buffer: &mut W, u: u32) -> WResult<W> {
buffer.write_u32(u)?;
Ok(())
}

Expand Down Expand Up @@ -86,36 +86,36 @@ pub const fn variable_u32_binary_size(u: u32) -> u32 {
}
}

pub async fn write_variable_u32<W: WriteMqttPacket>(buffer: &mut W, u: u32) -> WResult<W> {
pub fn write_variable_u32<W: WriteMqttPacket>(buffer: &mut W, u: u32) -> WResult<W> {
match u {
0..=127 => {
buffer.write_byte(u as u8).await?;
buffer.write_byte(u as u8)?;
}
len @ 128..=16383 => {
let first = (len % 128) | 0b1000_0000;
let second = len / 128;
buffer.write_byte(first as u8).await?;
buffer.write_byte(second as u8).await?;
buffer.write_byte(first as u8)?;
buffer.write_byte(second as u8)?;
}
len @ 16384..=2_097_151 => {
let first = (len % 128) | 0b1000_0000;
let second = ((len / 128) % 128) | 0b1000_0000;
let third = len / (128 * 128);

buffer.write_byte(first as u8).await?;
buffer.write_byte(second as u8).await?;
buffer.write_byte(third as u8).await?;
buffer.write_byte(first as u8)?;
buffer.write_byte(second as u8)?;
buffer.write_byte(third as u8)?;
}
len @ 2_097_152..=268_435_455 => {
let first = (len % 128) | 0b1000_0000;
let second = ((len / 128) % 128) | 0b1000_0000;
let third = ((len / (128 * 128)) % 128) | 0b1000_0000;
let fourth = len / (128 * 128 * 128);

buffer.write_byte(first as u8).await?;
buffer.write_byte(second as u8).await?;
buffer.write_byte(third as u8).await?;
buffer.write_byte(fourth as u8).await?;
buffer.write_byte(first as u8)?;
buffer.write_byte(second as u8)?;
buffer.write_byte(third as u8)?;
buffer.write_byte(fourth as u8)?;
}
_size => {
return Err(super::write::MqttWriteError::Invariant.into());
Expand Down Expand Up @@ -183,53 +183,53 @@ mod tests {
parse_variable_u32(&mut Bytes::new(&input)).unwrap_err();
}

#[tokio::test]
async fn test_write_byte() {
#[test]
fn test_write_byte() {
let mut writer = TestWriter { buffer: Vec::new() };

writer.write_byte(1).await.unwrap();
writer.write_byte(1).unwrap();

assert_eq!(writer.buffer, &[1]);
}

#[tokio::test]
async fn test_write_two_bytes() {
#[test]
fn test_write_two_bytes() {
let mut writer = TestWriter { buffer: Vec::new() };

writer.write_u16(1).await.unwrap();
writer.write_u16(1).unwrap();

assert_eq!(writer.buffer, &[0x00, 0x01]);
}

#[tokio::test]
async fn test_write_four_bytes() {
#[test]
fn test_write_four_bytes() {
let mut writer = TestWriter { buffer: Vec::new() };

writer.write_u32(1).await.unwrap();
writer.write_u32(1).unwrap();

assert_eq!(writer.buffer, &[0x00, 0x00, 0x00, 0x01]);
}

#[tokio::test]
async fn test_write_u32() {
#[test]
fn test_write_u32() {
// step by some prime number
for i in (0..268_435_455).step_by(271) {
let mut writer = TestWriter { buffer: Vec::new() };

writer.write_u32(i).await.unwrap();
writer.write_u32(i).unwrap();

let out = parse_u32(&mut Bytes::new(&writer.buffer)).unwrap();
assert_eq!(out, i);
}
}

#[tokio::test]
async fn test_write_variable_u32() {
#[test]
fn test_write_variable_u32() {
// step by some prime number
for i in (0..268_435_455).step_by(271) {
let mut writer = TestWriter { buffer: Vec::new() };

write_variable_u32(&mut writer, i).await.unwrap();
write_variable_u32(&mut writer, i).unwrap();

let out = parse_variable_u32(&mut Bytes::new(&writer.buffer)).unwrap();
assert_eq!(out, i);
Expand Down
14 changes: 7 additions & 7 deletions mqtt-format/src/v5/packets/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ impl<'i> MAuth<'i> {
self.reason.binary_size() + self.properties.binary_size()
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.reason.write(buffer).await?;
self.properties.write(buffer).await
pub fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.reason.write(buffer)?;
self.properties.write(buffer)
}
}

Expand All @@ -80,8 +80,8 @@ mod test {
use crate::v5::variable_header::ReasonString;
use crate::v5::variable_header::UserProperties;

#[tokio::test]
async fn test_roundtrip_mauth_no_props() {
#[test]
fn test_roundtrip_mauth_no_props() {
crate::v5::test::make_roundtrip_test!(MAuth {
reason: AuthReasonCode::ContinueAuthentication,
properties: AuthProperties {
Expand All @@ -93,8 +93,8 @@ mod test {
});
}

#[tokio::test]
async fn test_roundtrip_mauth_props() {
#[test]
fn test_roundtrip_mauth_props() {
crate::v5::test::make_roundtrip_test!(MAuth {
reason: AuthReasonCode::ContinueAuthentication,
properties: AuthProperties {
Expand Down
16 changes: 8 additions & 8 deletions mqtt-format/src/v5/packets/connack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ impl<'i> MConnack<'i> {
+ self.properties.binary_size()
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
pub fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
let byte = (self.session_present as u8) << 7;
buffer.write_byte(byte).await?;
self.reason_code.write(buffer).await?;
self.properties.write(buffer).await
buffer.write_byte(byte)?;
self.reason_code.write(buffer)?;
self.properties.write(buffer)
}
}

Expand All @@ -186,8 +186,8 @@ mod test {
use crate::v5::variable_header::UserProperties;
use crate::v5::variable_header::WildcardSubscriptionAvailable;

#[tokio::test]
async fn test_roundtrip_connack_no_props() {
#[test]
fn test_roundtrip_connack_no_props() {
crate::v5::test::make_roundtrip_test!(MConnack {
session_present: true,
reason_code: ConnackReasonCode::Success,
Expand All @@ -213,8 +213,8 @@ mod test {
});
}

#[tokio::test]
async fn test_roundtrip_connack_with_props() {
#[test]
fn test_roundtrip_connack_with_props() {
crate::v5::test::make_roundtrip_test!(MConnack {
session_present: true,
reason_code: ConnackReasonCode::Success,
Expand Down
46 changes: 23 additions & 23 deletions mqtt-format/src/v5/packets/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ impl<'i> MConnect<'i> {
+ self.password.as_ref().map(|p| crate::v5::bytes::binary_data_binary_size(p)).unwrap_or(0)
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
crate::v5::strings::write_string(buffer, "MQTT").await?;
ProtocolLevel::V5.write(buffer).await?;
pub fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
crate::v5::strings::write_string(buffer, "MQTT")?;
ProtocolLevel::V5.write(buffer)?;

let flags = {
let reserved = 0;
Expand All @@ -203,18 +203,18 @@ impl<'i> MConnect<'i> {
reserved | clean_start | will | password | username
};

buffer.write_byte(flags).await?;
buffer.write_u16(self.keep_alive).await?;
self.properties.write(buffer).await?;
crate::v5::strings::write_string(buffer, self.client_identifier).await?;
buffer.write_byte(flags)?;
buffer.write_u16(self.keep_alive)?;
self.properties.write(buffer)?;
crate::v5::strings::write_string(buffer, self.client_identifier)?;
if let Some(will) = self.will.as_ref() {
will.write(buffer).await?;
will.write(buffer)?;
}
if let Some(username) = self.username.as_ref() {
crate::v5::strings::write_string(buffer, username).await?;
crate::v5::strings::write_string(buffer, username)?;
}
if let Some(password) = self.password.as_ref() {
crate::v5::bytes::write_binary_data(buffer, password).await?;
crate::v5::bytes::write_binary_data(buffer, password)?;
}

Ok(())
Expand All @@ -231,10 +231,10 @@ pub struct Will<'i> {
}

impl<'i> Will<'i> {
pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.properties.write(buffer).await?;
crate::v5::strings::write_string(buffer, self.topic).await?;
crate::v5::bytes::write_binary_data(buffer, self.payload).await?;
pub fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.properties.write(buffer)?;
crate::v5::strings::write_string(buffer, self.topic)?;
crate::v5::bytes::write_binary_data(buffer, self.payload)?;
Ok(())
}

Expand Down Expand Up @@ -274,10 +274,10 @@ impl ProtocolLevel {
}
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
pub fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
match self {
ProtocolLevel::V3 => buffer.write_byte(3).await,
ProtocolLevel::V5 => buffer.write_byte(5).await,
ProtocolLevel::V3 => buffer.write_byte(3),
ProtocolLevel::V5 => buffer.write_byte(5),
}
}
}
Expand All @@ -304,8 +304,8 @@ mod test {
use crate::v5::variable_header::UserProperties;
use crate::v5::variable_header::WillDelayInterval;

#[tokio::test]
async fn test_roundtrip_connect_empty() {
#[test]
fn test_roundtrip_connect_empty() {
crate::v5::test::make_roundtrip_test!(MConnect {
client_identifier: "i am so cool",
username: None,
Expand All @@ -327,8 +327,8 @@ mod test {
});
}

#[tokio::test]
async fn test_roundtrip_connect_no_props() {
#[test]
fn test_roundtrip_connect_no_props() {
crate::v5::test::make_roundtrip_test!(MConnect {
client_identifier: "i am so cool",
username: Some("user"),
Expand Down Expand Up @@ -363,8 +363,8 @@ mod test {
});
}

#[tokio::test]
async fn test_roundtrip_connect_with_props() {
#[test]
fn test_roundtrip_connect_with_props() {
crate::v5::test::make_roundtrip_test!(MConnect {
client_identifier: "i am so cool",
username: Some("user"),
Expand Down
14 changes: 7 additions & 7 deletions mqtt-format/src/v5/packets/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl<'i> MDisconnect<'i> {
self.reason_code.binary_size() + self.properties.binary_size()
}

pub async fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.reason_code.write(buffer).await?;
self.properties.write(buffer).await
pub fn write<W: WriteMqttPacket>(&self, buffer: &mut W) -> WResult<W> {
self.reason_code.write(buffer)?;
self.properties.write(buffer)
}
}

Expand All @@ -111,8 +111,8 @@ mod test {
use crate::v5::variable_header::SessionExpiryInterval;
use crate::v5::variable_header::UserProperties;

#[tokio::test]
async fn test_roundtrip_disconnect_no_props() {
#[test]
fn test_roundtrip_disconnect_no_props() {
crate::v5::test::make_roundtrip_test!(MDisconnect {
reason_code: DisconnectReasonCode::NormalDisconnection,
properties: DisconnectProperties {
Expand All @@ -124,8 +124,8 @@ mod test {
});
}

#[tokio::test]
async fn test_roundtrip_disconnect_with_props() {
#[test]
fn test_roundtrip_disconnect_with_props() {
crate::v5::test::make_roundtrip_test!(MDisconnect {
reason_code: DisconnectReasonCode::NormalDisconnection,
properties: DisconnectProperties {
Expand Down
Loading

0 comments on commit 37358a1

Please sign in to comment.