diff --git a/Cargo.lock b/Cargo.lock index 2b66ad47..64ea5de9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,9 +171,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.5.3" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "949626d00e063efc93b6dca932419ceb5432f99769911c0b995f7e884c778813" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", "clap_derive", @@ -193,9 +193,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.3" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90239a040c80f5e14809ca132ddc4176ab33d5e17e49691793296e3fcb34d72f" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ "heck", "proc-macro2", @@ -215,14 +215,25 @@ version = "0.5.0" dependencies = [ "futures", "mqtt-format", + "paste", "stable_deref_trait", "thiserror", "tokio", "tokio-util", + "typed-builder", "winnow 0.6.5", "yoke", ] +[[package]] +name = "cloudmqtt-bin" +version = "0.1.0" +dependencies = [ + "clap", + "cloudmqtt", + "tokio", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -646,6 +657,12 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -992,6 +1009,7 @@ checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -1076,6 +1094,26 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "typed-builder" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "444d8748011b93cb168770e8092458cb0f8854f931ff82fdf6ddfbd72a9c933e" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "563b3b88238ec95680aef36bdece66896eaa7ce3c0f1b4f39d38fb2435261352" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "unicode-ident" version = "1.0.12" diff --git a/Cargo.toml b/Cargo.toml index 09ef8ad2..27c147ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ categories = ["embedded"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [workspace] -members = ["mqtt-format", "mqtt-tester"] +members = ["cloudmqtt-bin", "mqtt-format", "mqtt-tester"] [dependencies] futures = "0.3.30" @@ -21,9 +21,11 @@ mqtt-format = { version = "0.5.0", path = "mqtt-format", features = [ "yoke", "mqttv5", ] } +paste = "1.0.14" stable_deref_trait = "1.2.0" thiserror = "1.0.58" tokio = { version = "1.36.0", features = ["macros", "full"] } -tokio-util = { version = "0.7.10", features = ["codec"] } +tokio-util = { version = "0.7.10", features = ["codec", "compat"] } +typed-builder = "0.18" winnow = "0.6.5" yoke = "0.7.3" diff --git a/cloudmqtt-bin/Cargo.toml b/cloudmqtt-bin/Cargo.toml new file mode 100644 index 00000000..6c766564 --- /dev/null +++ b/cloudmqtt-bin/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "cloudmqtt-bin" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = { version = "4.5.4", features = ["derive"] } +cloudmqtt = { version = "0.5.0", path = ".." } +tokio = { version = "1.36.0", features = ["full"] } diff --git a/cloudmqtt-bin/src/bin/client.rs b/cloudmqtt-bin/src/bin/client.rs new file mode 100644 index 00000000..5188edcf --- /dev/null +++ b/cloudmqtt-bin/src/bin/client.rs @@ -0,0 +1,38 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +use clap::Parser; +use cloudmqtt::client::MqttClientConnector; +use cloudmqtt::transport::MqttConnectTransport; +use tokio::net::TcpStream; + +#[derive(Debug, Parser)] +#[command(version, about, long_about = None)] +struct Args { + #[arg(long)] + hostname: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let socket = TcpStream::connect(args.hostname).await.unwrap(); + + let connection = MqttConnectTransport::TokioTcp(socket); + let client_id = cloudmqtt::client_identifier::ClientIdentifier::PotentiallyServerProvided; + + let connector = MqttClientConnector::new( + connection, + client_id, + cloudmqtt::client::CleanStart::Yes, + cloudmqtt::keep_alive::KeepAlive::Disabled, + ); + + let _client = connector.connect().await.unwrap(); + + println!("Yay, we connected! That's all for now"); +} diff --git a/mqtt-format/src/v5/integers.rs b/mqtt-format/src/v5/integers.rs index eb5c0949..58d26f69 100644 --- a/mqtt-format/src/v5/integers.rs +++ b/mqtt-format/src/v5/integers.rs @@ -29,7 +29,7 @@ pub fn parse_u16(input: &mut &Bytes) -> MResult { } pub fn write_u16(buffer: &mut W, u: u16) -> WResult { - buffer.write_u16(u.to_be())?; + buffer.write_u16(u)?; Ok(()) } diff --git a/mqtt-format/src/v5/packets/connect.rs b/mqtt-format/src/v5/packets/connect.rs index 8e9be321..fe5bfa22 100644 --- a/mqtt-format/src/v5/packets/connect.rs +++ b/mqtt-format/src/v5/packets/connect.rs @@ -246,13 +246,23 @@ impl<'i> Will<'i> { } crate::v5::properties::define_properties! { + packet_type: Will, + anker: "_Toc3901060", pub struct ConnectWillProperties<'i> { + (anker: "_Toc3901062") will_delay_interval: WillDelayInterval, + (anker: "_Toc3901063") payload_format_indicator: PayloadFormatIndicator, + (anker: "_Toc3901064") message_expiry_interval: MessageExpiryInterval, + (anker: "_Toc3901065") content_type: ContentType<'i>, + (anker: "_Toc3901066") response_topic: ResponseTopic<'i>, + (anker: "_Toc3901067") correlation_data: CorrelationData<'i>, + (anker: "_Toc3901068") + user_properties: UserProperties<'i>, } } @@ -342,6 +352,7 @@ mod test { content_type: None, response_topic: None, correlation_data: None, + user_properties: None, }, topic: "crazy topic", payload: &[0xAB, 0xCD, 0xEF], @@ -378,6 +389,7 @@ mod test { content_type: Some(ContentType("json")), response_topic: Some(ResponseTopic("resp")), correlation_data: Some(CorrelationData(&[0xFF])), + user_properties: None, }, topic: "crazy topic", payload: &[0xAB, 0xCD, 0xEF], diff --git a/mqtt-format/src/v5/packets/subscribe.rs b/mqtt-format/src/v5/packets/subscribe.rs index 6f68386a..803d73f3 100644 --- a/mqtt-format/src/v5/packets/subscribe.rs +++ b/mqtt-format/src/v5/packets/subscribe.rs @@ -24,8 +24,13 @@ use crate::v5::write::WriteMqttPacket; use crate::v5::MResult; define_properties! { + packet_type: MSubscribe, + anker: "_Toc3901164", pub struct SubscribeProperties<'i> { + (anker: "_Toc3901166") subscription_identifier: SubscriptionIdentifier, + + (anker: "_Toc3901167") user_properties: UserProperties<'i>, } } diff --git a/mqtt-format/src/v5/packets/unsuback.rs b/mqtt-format/src/v5/packets/unsuback.rs index cc74131c..120ba490 100644 --- a/mqtt-format/src/v5/packets/unsuback.rs +++ b/mqtt-format/src/v5/packets/unsuback.rs @@ -27,8 +27,13 @@ crate::v5::reason_code::make_combined_reason_code! { } crate::v5::properties::define_properties! { + packet_type: MUnsuback, + anker: "_Toc3901190", pub struct UnsubackProperties<'i> { + (anker: "_Toc3901192") reason_string: ReasonString<'i>, + + (anker: "_Toc3901193") user_properties: UserProperties<'i>, } } diff --git a/mqtt-format/src/v5/variable_header.rs b/mqtt-format/src/v5/variable_header.rs index 9e8fd7d9..0fdfcaa4 100644 --- a/mqtt-format/src/v5/variable_header.rs +++ b/mqtt-format/src/v5/variable_header.rs @@ -14,6 +14,7 @@ use super::integers::write_variable_u32; use super::write::WResult; use super::write::WriteMqttPacket; use super::MResult; +use crate::v5::integers::parse_variable_u32; #[derive(Debug, Clone, Copy, PartialEq)] pub struct PacketIdentifier(pub u16); @@ -190,9 +191,9 @@ define_properties! {[ ], SubscriptionIdentifier as 0x0B => - parse with parse_u32 as u32; - write with super::integers::write_u32; - with size |_| 4; + parse with parse_variable_u32 as u32; + write with super::integers::write_variable_u32; + with size |&v: &u32| super::integers::variable_u32_binary_size(v); testfnname: test_roundtrip_subscriptionidentifier; testvalues: [12, 14, 42, 1337], @@ -211,9 +212,9 @@ define_properties! {[ testvalues: ["fooobarbar"], ServerKeepAlive as 0x13 => - parse with parse_u32 as u32; - write with super::integers::write_u32; - with size |_| 4; + parse with parse_u16 as u16; + write with super::integers::write_u16; + with size |_| 2; testfnname: test_roundtrip_serverkeepalive; testvalues: [12, 14, 42, 1337], @@ -279,23 +280,23 @@ define_properties! {[ testvalues: ["fooobarbar"], ReceiveMaximum as 0x21 => - parse with parse_u32 as u32; - write with super::integers::write_u32; - with size |_| 4; + parse with parse_u16 as u16; + write with super::integers::write_u16; + with size |_| 2; testfnname: test_roundtrip_receivemaximum; testvalues: [12, 14, 42, 1337], TopicAliasMaximum as 0x22 => - parse with parse_u32 as u32; - write with super::integers::write_u32; - with size |_| 4; + parse with parse_u16 as u16; + write with super::integers::write_u16; + with size |_| 2; testfnname: test_roundtrip_topicaliasmaximum; testvalues: [12, 14, 42, 1337], TopicAlias as 0x23 => - parse with parse_u32 as u32; - write with super::integers::write_u32; - with size |_| 4; + parse with parse_u16 as u16; + write with super::integers::write_u16; + with size |_| 2; testfnname: test_roundtrip_topicalias; testvalues: [12, 14, 42, 1337], diff --git a/src/bytes.rs b/src/bytes.rs new file mode 100644 index 00000000..f2f02461 --- /dev/null +++ b/src/bytes.rs @@ -0,0 +1,48 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] +pub struct MqttBytes(Vec); + +impl MqttBytes { + pub const MAX_LEN: usize = u16::MAX as usize; +} + +#[derive(Debug, thiserror::Error)] +pub enum MqttBytesError { + #[error("A vector/slice of length {} is too long, max length is {}", .0, MqttBytes::MAX_LEN)] + TooLong(usize), +} + +impl AsRef<[u8]> for MqttBytes { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl TryFrom> for MqttBytes { + type Error = MqttBytesError; + + fn try_from(s: Vec) -> Result { + if s.len() > Self::MAX_LEN { + Err(MqttBytesError::TooLong(s.len())) + } else { + Ok(Self(s)) + } + } +} + +impl TryFrom<&[u8]> for MqttBytes { + type Error = MqttBytesError; + + fn try_from(s: &[u8]) -> Result { + if s.len() > Self::MAX_LEN { + Err(MqttBytesError::TooLong(s.len())) + } else { + Ok(Self(s.to_vec())) + } + } +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 00000000..f31369db --- /dev/null +++ b/src/client.rs @@ -0,0 +1,200 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +use futures::SinkExt; +use futures::StreamExt; +use tokio_util::codec::Framed; + +use crate::bytes::MqttBytes; +use crate::client_identifier::ClientIdentifier; +use crate::codecs::MqttPacketCodec; +use crate::codecs::MqttPacketCodecError; +use crate::keep_alive::KeepAlive; +use crate::string::MqttString; +use crate::transport::MqttConnectTransport; +use crate::transport::MqttConnection; + +pub enum CleanStart { + No, + Yes, +} + +impl CleanStart { + pub fn as_bool(&self) -> bool { + match self { + CleanStart::No => false, + CleanStart::Yes => true, + } + } +} + +#[derive(typed_builder::TypedBuilder)] +pub struct MqttWill { + #[builder(default = crate::packets::connect::ConnectWillProperties::new())] + properties: crate::packets::connect::ConnectWillProperties, + topic: MqttString, + payload: MqttBytes, + qos: mqtt_format::v5::fixed_header::QualityOfService, + retain: bool, +} + +impl MqttWill { + pub fn get_properties_mut(&mut self) -> &mut crate::packets::connect::ConnectWillProperties { + &mut self.properties + } +} + +impl MqttWill { + fn as_ref(&self) -> mqtt_format::v5::packets::connect::Will<'_> { + mqtt_format::v5::packets::connect::Will { + properties: self.properties.as_ref(), + topic: self.topic.as_ref(), + payload: self.payload.as_ref(), + will_qos: self.qos, + will_retain: self.retain, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum MqttClientConnectError { + #[error("An error occured while encoding or sending an MQTT Packet")] + Send(#[source] MqttPacketCodecError), + + #[error("An error occured while decoding or receiving an MQTT Packet")] + Receive(#[source] MqttPacketCodecError), + + #[error("The transport unexpectedly closed")] + TransportUnexpectedlyClosed, + + #[error("The server sent a response with a protocol error: {reason}")] + ServerProtocolError { reason: &'static str }, +} + +pub struct MqttClientConnector { + transport: MqttConnectTransport, + client_identifier: ClientIdentifier, + clean_start: CleanStart, + keep_alive: KeepAlive, + properties: crate::packets::connect::ConnectProperties, + username: Option, + password: Option, + will: Option, +} + +impl MqttClientConnector { + pub fn new( + transport: MqttConnectTransport, + client_identifier: ClientIdentifier, + clean_start: CleanStart, + keep_alive: KeepAlive, + ) -> MqttClientConnector { + MqttClientConnector { + transport, + client_identifier, + clean_start, + keep_alive, + properties: crate::packets::connect::ConnectProperties::new(), + username: None, + password: None, + will: None, + } + } + + pub fn with_username(&mut self, username: MqttString) -> &mut Self { + self.username = Some(username); + self + } + + pub fn with_password(&mut self, password: MqttBytes) -> &mut Self { + self.password = Some(password); + self + } + + pub fn with_will(&mut self, will: MqttWill) -> &mut Self { + self.will = Some(will); + self + } + + pub async fn connect(self) -> Result { + type Mcce = MqttClientConnectError; + let mut conn = + tokio_util::codec::Framed::new(MqttConnection::from(self.transport), MqttPacketCodec); + + let conn_packet = mqtt_format::v5::packets::connect::MConnect { + client_identifier: self.client_identifier.as_str(), + username: self.username.as_ref().map(AsRef::as_ref), + password: self.password.as_ref().map(AsRef::as_ref), + clean_start: self.clean_start.as_bool(), + will: self.will.as_ref().map(|w| w.as_ref()), + properties: self.properties.as_ref(), + keep_alive: self.keep_alive.as_u16(), + }; + + conn.send(mqtt_format::v5::packets::MqttPacket::Connect(conn_packet)) + .await + .map_err(Mcce::Send)?; + + let Some(maybe_connack) = conn.next().await else { + return Err(Mcce::TransportUnexpectedlyClosed); + }; + + let maybe_connack = match maybe_connack { + Ok(maybe_connack) => maybe_connack, + Err(e) => { + return Err(Mcce::Receive(e)); + } + }; + + let connack = loop { + let can_use_auth = self.properties.authentication_data.is_some(); + let _auth = match maybe_connack.get() { + mqtt_format::v5::packets::MqttPacket::Connack(connack) => break connack, + mqtt_format::v5::packets::MqttPacket::Auth(auth) => { + if can_use_auth { + auth + } else { + // MQTT-4.12.0-6 + return Err(Mcce::ServerProtocolError { + reason: "MQTT-4.12.0-6", + }); + } + } + _ => { + return Err(MqttClientConnectError::ServerProtocolError { + reason: "MQTT-3.1.4-5", + }); + } + }; + + // TODO: Use user-provided method to authenticate further + + todo!() + }; + + // TODO: Timeout here if the server doesn't respond + + if connack.reason_code == mqtt_format::v5::packets::connack::ConnackReasonCode::Success { + // TODO: Read properties, configure client + + return Ok(MqttClient { _conn: conn }); + } + + // TODO: Do something with error code + + todo!() + } + + pub fn properties_mut(&mut self) -> &mut crate::packets::connect::ConnectProperties { + &mut self.properties + } +} + +pub struct MqttClient { + _conn: Framed, +} + +impl MqttClient {} diff --git a/src/client_identifier.rs b/src/client_identifier.rs new file mode 100644 index 00000000..bce8d405 --- /dev/null +++ b/src/client_identifier.rs @@ -0,0 +1,84 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +pub enum ClientIdentifier { + MinimalRequired(MinimalRequiredClientIdentifier), + PotentiallyServerProvided, + PotentiallyAccepted(PotentiallyAcceptedClientIdentifier), +} + +impl ClientIdentifier { + pub fn new_minimal_required( + s: impl Into, + ) -> Result { + const ALLOWED_CHARS: &str = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + let s = s.into(); + + let disallowed_chars = s + .chars() + .filter(|c| !ALLOWED_CHARS.chars().any(|allowed| allowed == *c)) + .collect::>(); + + if !disallowed_chars.is_empty() { + return Err(ClientIdentifierError::MinimalNotAllowedChar( + disallowed_chars, + )); + } + + if s.len() > 23 { + return Err(ClientIdentifierError::MinimalTooLong(s.len())); + } + + Ok(ClientIdentifier::MinimalRequired( + MinimalRequiredClientIdentifier(s), + )) + } + + pub fn new_potentially_server_provided() -> ClientIdentifier { + ClientIdentifier::PotentiallyServerProvided + } + + pub fn new_potetially_accepted( + s: impl Into, + ) -> Result { + let s = s.into(); + if s.is_empty() { + return Err(ClientIdentifierError::Zero); + } + crate::string::MqttString::try_from(s) + .map(PotentiallyAcceptedClientIdentifier) + .map(ClientIdentifier::PotentiallyAccepted) + .map_err(ClientIdentifierError::from) + } + + pub fn as_str(&self) -> &str { + match self { + ClientIdentifier::MinimalRequired(s) => s.0.as_ref(), + ClientIdentifier::PotentiallyServerProvided => "", + ClientIdentifier::PotentiallyAccepted(s) => s.0.as_ref(), + } + } +} + +pub struct MinimalRequiredClientIdentifier(String); +pub struct PotentiallyAcceptedClientIdentifier(crate::string::MqttString); + +#[derive(Debug, thiserror::Error)] +pub enum ClientIdentifierError { + // I am ugly + #[error("Minimal client identifier contains disallowed characters: {}", .0.iter().copied().map(String::from).collect::>().join(", "))] + MinimalNotAllowedChar(Vec), + + #[error("Minimal client identifier contains more characters than allowed: {}", .0)] + MinimalTooLong(usize), + + #[error("Client identifier is not allowed to be empty")] + Zero, + + #[error(transparent)] + String(#[from] crate::string::MqttStringError), +} diff --git a/src/codecs.rs b/src/codecs.rs index 8f582f4f..a5020b6a 100644 --- a/src/codecs.rs +++ b/src/codecs.rs @@ -5,14 +5,13 @@ // use mqtt_format::v5::packets::MqttPacket as FormatMqttPacket; -use tokio_util::bytes::Bytes; use tokio_util::codec::Decoder; use tokio_util::codec::Encoder; use winnow::Partial; use yoke::Yoke; -use crate::packet::MqttPacket; -use crate::packet::MqttWriterError; +use crate::packets::MqttPacket; +use crate::packets::MqttWriterError; #[derive(Debug, thiserror::Error)] pub enum MqttPacketCodecError { @@ -24,6 +23,9 @@ pub enum MqttPacketCodecError { #[error("A protocol error occurred")] Protocol, + + #[error("Could not parse during decoding due to: {:?}", .0)] + Parsing(winnow::error::ErrMode), } pub(crate) struct MqttPacketCodec; @@ -45,22 +47,24 @@ impl Decoder for MqttPacketCodec { return Ok(None); } - let packet_size = + let remaining_length = match mqtt_format::v5::integers::parse_variable_u32(&mut Partial::new(&src[1..])) { - Ok(size) => size, + Ok(size) => size as usize, Err(winnow::error::ErrMode::Incomplete(winnow::error::Needed::Size(needed))) => { src.reserve(needed.into()); return Ok(None); } + Err(winnow::error::ErrMode::Incomplete(winnow::error::Needed::Unknown)) => { + src.reserve(1); + return Ok(None); + } _ => { return Err(MqttPacketCodecError::Protocol); } }; - let remaining_length = packet_size as usize; - let total_packet_length = 1 - + mqtt_format::v5::integers::variable_u32_binary_size(packet_size) as usize + + mqtt_format::v5::integers::variable_u32_binary_size(remaining_length as u32) as usize + remaining_length; if src.len() < total_packet_length { @@ -71,9 +75,9 @@ impl Decoder for MqttPacketCodec { let cart = src.split_to(total_packet_length).freeze(); let packet = Yoke::try_attach_to_cart( - crate::packet::StableBytes(cart), + crate::packets::StableBytes(cart), |data| -> Result<_, MqttPacketCodecError> { - FormatMqttPacket::parse_complete(data).map_err(|_| MqttPacketCodecError::Protocol) + FormatMqttPacket::parse_complete(data).map_err(MqttPacketCodecError::Parsing) }, )?; @@ -81,16 +85,15 @@ impl Decoder for MqttPacketCodec { } } -impl Encoder for MqttPacketCodec { +impl Encoder> for MqttPacketCodec { type Error = MqttPacketCodecError; fn encode( &mut self, - packet: Bytes, + packet: FormatMqttPacket<'_>, dst: &mut tokio_util::bytes::BytesMut, ) -> Result<(), Self::Error> { - dst.extend_from_slice(&packet); - + packet.write(&mut crate::packets::MqttWriter(dst))?; Ok(()) } } @@ -99,31 +102,57 @@ impl Encoder for MqttPacketCodec { mod tests { use futures::SinkExt; use futures::StreamExt; + use mqtt_format::v5::packets::connect::MConnect; use mqtt_format::v5::packets::pingreq::MPingreq; use mqtt_format::v5::packets::MqttPacket as FormatMqttPacket; - use tokio_util::bytes::BytesMut; use tokio_util::codec::Framed; + use tokio_util::compat::TokioAsyncReadCompatExt; use super::MqttPacketCodec; - use crate::packet::MqttWriter; + use crate::transport::MqttConnection; #[tokio::test] async fn simple_test_codec() { let (client, server) = tokio::io::duplex(100); - let mut framed_client = Framed::new(client, MqttPacketCodec); - let mut framed_server = Framed::new(server, MqttPacketCodec); - - let mut data = BytesMut::new(); + let mut framed_client = + Framed::new(MqttConnection::Duplex(client.compat()), MqttPacketCodec); + let mut framed_server = + Framed::new(MqttConnection::Duplex(server.compat()), MqttPacketCodec); let packet = FormatMqttPacket::Pingreq(MPingreq); - packet.write(&mut MqttWriter(&mut data)).unwrap(); - - let send_data = data.clone().freeze(); + let sent_packet = packet.clone(); tokio::spawn(async move { - framed_client.send(send_data).await.unwrap(); + framed_client.send(sent_packet).await.unwrap(); }); + let recv_packet = framed_server.next().await.unwrap().unwrap(); + assert_eq!(packet, *recv_packet.get()); + } + + #[tokio::test] + async fn test_connect_codec() { + let (client, server) = tokio::io::duplex(100); + let mut framed_client = + Framed::new(MqttConnection::Duplex(client.compat()), MqttPacketCodec); + let mut framed_server = + Framed::new(MqttConnection::Duplex(server.compat()), MqttPacketCodec); + + let packet = FormatMqttPacket::Connect(MConnect { + client_identifier: "test", + username: None, + password: None, + clean_start: false, + will: None, + properties: mqtt_format::v5::packets::connect::ConnectProperties::new(), + keep_alive: 0, + }); + + let sent_packet = packet.clone(); + tokio::spawn(async move { + framed_client.send(sent_packet.clone()).await.unwrap(); + framed_client.send(sent_packet).await.unwrap(); + }); let recv_packet = framed_server.next().await.unwrap().unwrap(); assert_eq!(packet, *recv_packet.get()); diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 00000000..687a1ec2 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,8 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +#[derive(Debug, thiserror::Error)] +pub enum MqttError {} diff --git a/src/keep_alive.rs b/src/keep_alive.rs new file mode 100644 index 00000000..12cf6e9b --- /dev/null +++ b/src/keep_alive.rs @@ -0,0 +1,45 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +use std::num::NonZeroU16; +use std::time::Duration; + +pub enum KeepAlive { + Disabled, + Seconds(NonZeroU16), +} + +impl KeepAlive { + pub(crate) fn as_u16(&self) -> u16 { + match self { + KeepAlive::Disabled => 0, + KeepAlive::Seconds(s) => s.get(), + } + } +} + +impl TryFrom for KeepAlive { + type Error = KeepAliveError; + + fn try_from(value: Duration) -> Result { + let secs = value.as_secs(); + if secs > u16::MAX.into() { + return Err(KeepAliveError::OutOfBounds); + } + let secs = secs as u16; + + Ok(KeepAlive::Seconds(NonZeroU16::try_from(secs)?)) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum KeepAliveError { + #[error("KeepAlive cannot be of zero duration")] + KeepAliveZero(#[from] std::num::TryFromIntError), + + #[error("KeepAlive out of bounds, maximum is {} seconds", u16::MAX)] + OutOfBounds, +} diff --git a/src/lib.rs b/src/lib.rs index 5ee766e8..6f2b685c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,5 +4,14 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // +pub mod bytes; +pub mod client; +pub mod client_identifier; mod codecs; -pub mod packet; +mod error; +pub mod keep_alive; +mod packets; +mod properties; +pub mod string; +pub mod transport; +mod util; diff --git a/src/packets/auth.rs b/src/packets/auth.rs new file mode 100644 index 00000000..828c8063 --- /dev/null +++ b/src/packets/auth.rs @@ -0,0 +1,23 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::auth::AuthProperties, + anker: "_Toc3901221", + pub struct AuthProperties { + (anker: "_Toc3901223") + authentication_method: AuthenticationMethod<'a> with setter = String, + + (anker: "_Toc3901224") + authentication_data: AuthenticationData<'a> with setter = Vec, + + (anker: "_Toc3901225") + reason_string: ReasonString<'a> with setter = String, + + (anker: "_Toc3901226") + user_properties: UserProperties<'a> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/connack.rs b/src/packets/connack.rs new file mode 100644 index 00000000..75035a09 --- /dev/null +++ b/src/packets/connack.rs @@ -0,0 +1,62 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::connack::ConnackProperties, + anker: "_Toc3901080", + pub struct ConnackProperties { + (anker: "_Toc3901082") + session_expiry_interval: SessionExpiryInterval with setter = u32, + + (anker: "_Toc3901083") + receive_maximum: ReceiveMaximum with setter = u16, + + (anker: "_Toc3901084") + maximum_qos: MaximumQoS with setter = u8, + + (anker: "_Toc3901085") + retain_available: RetainAvailable with setter = u8, + + (anker: "_Toc3901086") + maximum_packet_size: MaximumPacketSize with setter = u32, + + (anker: "_Toc3901087") + assigned_client_identifier: AssignedClientIdentifier<'i> with setter = String, + + (anker: "_Toc3901088") + topic_alias_maximum: TopicAliasMaximum with setter = u16, + + (anker: "_Toc3901089") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901090") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + + (anker: "_Toc3901091") + wildcard_subscription_available: WildcardSubscriptionAvailable with setter = u8, + + (anker: "_Toc3901092") + subscription_identifiers_available: SubscriptionIdentifiersAvailable with setter = u8, + + (anker: "_Toc3901093") + shared_scubscription_available: SharedSubscriptionAvailable with setter = u8, + + (anker: "_Toc3901094") + server_keep_alive: ServerKeepAlive with setter = u16, + + (anker: "_Toc3901095") + response_information: ResponseInformation<'i> with setter = String, + + (anker: "_Toc3901096") + server_reference: ServerReference<'i> with setter = String, + + (anker: "_Toc3901097") + authentication_method: AuthenticationMethod<'i> with setter = String, + + (anker: "_Toc3901098") + authentication_data: AuthenticationData<'i> with setter = Vec, + } +} diff --git a/src/packets/connect.rs b/src/packets/connect.rs new file mode 100644 index 00000000..ea0ac8ce --- /dev/null +++ b/src/packets/connect.rs @@ -0,0 +1,65 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::connect::ConnectProperties, + anker: "_Toc3901046", + pub struct ConnectProperties { + (anker: "_Toc3901048") + session_expiry_interval: SessionExpiryInterval with setter = u32, + + (anker: "_Toc3901049") + receive_maximum: ReceiveMaximum with setter = u16, + + (anker: "_Toc3901050") + maximum_packet_size: MaximumPacketSize with setter = u32, + + (anker: "_Toc3901051") + topic_alias_maximum: TopicAliasMaximum with setter = u16, + + (anker: "_Toc3901052") + request_response_information: RequestResponseInformation with setter = u8, + + (anker: "_Toc3901053") + request_problem_information: RequestProblemInformation with setter = u8, + + (anker: "_Toc3901054") + user_properties: UserProperties<'a> with setter = crate::properties::UserProperty, + + (anker: "_Toc3901055") + authentication_method: AuthenticationMethod<'a> with setter = String, + + (anker: "_Toc3901056") + authentication_data: AuthenticationData<'a> with setter = Vec, + } +} + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::connect::ConnectWillProperties, + anker: "_Toc3901060", + pub struct ConnectWillProperties { + (anker: "_Toc3901062") + will_delay_interval: WillDelayInterval with setter = u16, + + (anker: "_Toc3901063") + payload_format_indicator: PayloadFormatIndicator with setter = u8, + + (anker: "_Toc3901064") + message_expiry_interval: MessageExpiryInterval with setter = u32, + + (anker: "_Toc3901065") + content_type: ContentType<'i> with setter = String, + + (anker: "_Toc3901066") + response_topic: ResponseTopic<'i> with setter = String, + + (anker: "_Toc3901067") + correlation_data: CorrelationData<'i> with setter = Vec, + + (anker: "_Toc3901068") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/disconnect.rs b/src/packets/disconnect.rs new file mode 100644 index 00000000..d588363c --- /dev/null +++ b/src/packets/disconnect.rs @@ -0,0 +1,23 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::disconnect::DisconnectProperties, + anker: "_Toc3901209", + pub struct DisconnectProperties { + (anker: "_Toc3901211") + session_expiry_interval: SessionExpiryInterval with setter = u32, + + (anker: "_Toc3901212") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901213") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + + (anker: "_Toc3901214") + server_reference: ServerReference<'i> with setter = String, + } +} diff --git a/src/packet.rs b/src/packets/mod.rs similarity index 74% rename from src/packet.rs rename to src/packets/mod.rs index 500baa9e..268d31c8 100644 --- a/src/packet.rs +++ b/src/packets/mod.rs @@ -7,6 +7,7 @@ use std::ops::Deref; use mqtt_format::v5::packets::MqttPacket as FormatMqttPacket; +use mqtt_format::v5::write::MqttWriteError; use mqtt_format::v5::write::WriteMqttPacket; use stable_deref_trait::StableDeref; use tokio_util::bytes::BufMut; @@ -15,6 +16,22 @@ use tokio_util::bytes::BytesMut; use yoke::CloneableCart; use yoke::Yoke; +pub mod auth; +pub mod connack; +pub mod connect; +pub mod disconnect; +pub mod pingreq; +pub mod pingresp; +pub mod puback; +pub mod pubcomp; +pub mod publish; +pub mod pubrec; +pub mod pubrel; +pub mod suback; +pub mod subscribe; +pub mod unsuback; +pub mod unsubscribe; + #[derive(Debug, Clone)] pub(crate) struct StableBytes(pub(crate) Bytes); @@ -61,6 +78,23 @@ impl From for MqttWriterError { } } +pub struct VecWriter<'a>(pub &'a mut Vec); + +impl<'a> WriteMqttPacket for VecWriter<'a> { + type Error = MqttWriteError; + + fn write_byte(&mut self, u: u8) -> mqtt_format::v5::write::WResult { + self.0.push(u); + + Ok(()) + } + + fn write_slice(&mut self, u: &[u8]) -> mqtt_format::v5::write::WResult { + self.0.extend_from_slice(u); + Ok(()) + } +} + pub struct MqttWriter<'a>(pub &'a mut BytesMut); impl<'a> WriteMqttPacket for MqttWriter<'a> { diff --git a/src/packets/pingreq.rs b/src/packets/pingreq.rs new file mode 100644 index 00000000..75e7fb2d --- /dev/null +++ b/src/packets/pingreq.rs @@ -0,0 +1,5 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// diff --git a/src/packets/pingresp.rs b/src/packets/pingresp.rs new file mode 100644 index 00000000..75e7fb2d --- /dev/null +++ b/src/packets/pingresp.rs @@ -0,0 +1,5 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// diff --git a/src/packets/puback.rs b/src/packets/puback.rs new file mode 100644 index 00000000..cc001cf0 --- /dev/null +++ b/src/packets/puback.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::puback::PubackProperties, + anker: "_Toc3901125", + pub struct PubackProperties { + (anker: "_Toc3901127") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901128") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/pubcomp.rs b/src/packets/pubcomp.rs new file mode 100644 index 00000000..4ec170e3 --- /dev/null +++ b/src/packets/pubcomp.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::pubcomp::PubcompProperties, + anker: "_Toc3901153", + pub struct PubcompProperties { + (anker: "_Toc3901154") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901155") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/publish.rs b/src/packets/publish.rs new file mode 100644 index 00000000..5b3a42a0 --- /dev/null +++ b/src/packets/publish.rs @@ -0,0 +1,35 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::publish::PublishProperties, + anker: "_Toc3901109", + pub struct PublishProperties { + (anker: "_Toc3901111") + payload_format_indicator: PayloadFormatIndicator with setter = u8, + + (anker: "_Toc3901112") + message_expiry_interval: MessageExpiryInterval with setter = u32, + + (anker: "_Toc3901113") + topic_alias: TopicAlias with setter = u16, + + (anker: "_Toc3901114") + response_topic: ResponseTopic<'i> with setter = String, + + (anker: "_Toc3901115") + correlation_data: CorrelationData<'i> with setter = Vec, + + (anker: "_Toc3901116") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + + (anker: "_Toc3901117") + subscription_identifier: SubscriptionIdentifier with setter = u32, + + (anker: "_Toc3901118") + content_type: ContentType<'i> with setter = String, + } +} diff --git a/src/packets/pubrec.rs b/src/packets/pubrec.rs new file mode 100644 index 00000000..c6f9e82e --- /dev/null +++ b/src/packets/pubrec.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::pubrec::PubrecProperties, + anker: "_Toc3901135", + pub struct PubrecProperties { + (anker: "_Toc3901137") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901138") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/pubrel.rs b/src/packets/pubrel.rs new file mode 100644 index 00000000..09b44943 --- /dev/null +++ b/src/packets/pubrel.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties!( + properties_type: mqtt_format::v5::packets::pubrel::PubrelProperties, + anker: "_Toc3901145", + pub struct PubrelProperties { + (anker: "_Toc3901147") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901148") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +); diff --git a/src/packets/suback.rs b/src/packets/suback.rs new file mode 100644 index 00000000..042b3be8 --- /dev/null +++ b/src/packets/suback.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::suback::SubackProperties, + anker: "_Toc3901174", + pub struct SubackProperties { + (anker: "_Toc3901175") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901176") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/subscribe.rs b/src/packets/subscribe.rs new file mode 100644 index 00000000..c40efaec --- /dev/null +++ b/src/packets/subscribe.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::subscribe::SubscribeProperties, + anker: "_Toc3901164", + pub struct SubscribeProperties { + (anker: "_Toc3901166") + subscription_identifier: SubscriptionIdentifier with setter = u32, + + (anker: "_Toc3901167") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/unsuback.rs b/src/packets/unsuback.rs new file mode 100644 index 00000000..5913cbc5 --- /dev/null +++ b/src/packets/unsuback.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::unsuback::UnsubackProperties, + anker: "_Toc3901190", + pub struct UnsubackProperties { + (anker: "_Toc3901192") + reason_string: ReasonString<'i> with setter = String, + + (anker: "_Toc3901193") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/packets/unsubscribe.rs b/src/packets/unsubscribe.rs new file mode 100644 index 00000000..f8ef53e5 --- /dev/null +++ b/src/packets/unsubscribe.rs @@ -0,0 +1,17 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +crate::properties::define_properties! { + properties_type: mqtt_format::v5::packets::unsubscribe::UnsubscribeProperties, + anker: "_Toc3901182", + pub struct UnsubscribeProperties { + (anker: "_Toc3901183") + subscription_identifier: SubscriptionIdentifier with setter = u32, + + (anker: "_Toc3901183") + user_properties: UserProperties<'i> with setter = crate::properties::UserProperty, + } +} diff --git a/src/properties.rs b/src/properties.rs new file mode 100644 index 00000000..6a7ff0b6 --- /dev/null +++ b/src/properties.rs @@ -0,0 +1,227 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +pub type TypeOfProperty

=

::Inner; + +macro_rules! define_properties { + (@no_lt $name:ident $pat:ident $lt:lifetime) => { + type $name<'a> = mqtt_format::v5::variable_header:: $pat <'a>; + }; + (@no_lt $name:ident $pat:ident) => { + type $name = mqtt_format::v5::variable_header:: $pat; + }; + + (@statify $pat:ident $lt:lifetime) => { + mqtt_format::v5::variable_header:: $pat <'static> + }; + (@statify $pat:ident) => { + mqtt_format::v5::variable_header:: $pat + }; + ( + properties_type: $packettypename:ty, + $( anker: $anker:literal $(,)?)? + pub struct $name:ident { + $( $((anker: $prop_anker:literal ))? $prop_name:ident : $prop:ident $(<$prop_lt:lifetime>)? with setter = $setter:ty),* $(,)? + } + ) => { + #[derive(Clone, Debug, PartialEq)] + pub struct $name { + $( + pub(crate) $prop_name: Option> + ),* + } + + paste::paste! { + #[allow(dead_code)] + impl $name { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + $name { + $($prop_name: None),* + } + } + + $( + #[doc = core::concat!("Set the ", stringify!($prop_name), " property.") ] + $( #[doc = core::concat!("See also: ", crate::util::md_speclink!($prop_anker)) ] )? + pub fn [](&mut self, value: $setter) -> &mut Self { + ::apply(&mut self.$prop_name, value); + + self + } + )* + + pub fn as_ref(&self) -> $packettypename { + $packettypename { + $($prop_name: { + self.$prop_name.as_ref().map(|v| { + crate::properties::define_properties!(@no_lt Prop $prop $($prop_lt)?); + Prop { + 0: ::get(v) + } + }) + }),* + } + } + } + } + }; +} +pub(crate) use define_properties; + +use crate::packets::VecWriter; +use crate::string::MqttString; + +pub struct UserProperty { + key: MqttString, + value: MqttString, +} + +pub(crate) trait FormatProperty { + type Inner; + type Setter; + type Outer<'a>; + + fn apply(inner: &mut Option, value: impl Into); + + fn get(inner: &Self::Inner) -> Self::Outer<'_>; +} + +impl<'i> FormatProperty for mqtt_format::v5::variable_header::UserProperties<'i> { + type Inner = Vec; + type Setter = UserProperty; + type Outer<'a> = &'a [u8]; + + fn apply(inner: &mut Option, key_value: impl Into) { + let key_value = key_value.into(); + let user_prop = mqtt_format::v5::variable_header::UserProperty { + key: key_value.key.as_ref(), + value: key_value.value.as_ref(), + }; + let inner = inner.get_or_insert_with(Default::default); + if !inner.is_empty() { + mqtt_format::v5::integers::write_variable_u32( + &mut VecWriter(inner), + ::IDENTIFIER, + ) + .expect("Writing a u32 should not fail") + } + user_prop + .write(&mut VecWriter(inner)) + .expect("Writing MqttStrings should never be invalid"); + } + + fn get(inner: &Self::Inner) -> Self::Outer<'_> { + inner.as_ref() + } +} + +macro_rules! define_property_types { + (@access_pattern ref $value:ident) => { + $value.as_ref() + }; + (@access_pattern deref $value:ident) => { + *$value + }; + ([ $( $prop:ty => inner = $inner:ty; setter = $setter:ty; outer $mode:tt = $outer:ty ),* $(,)? ]) => { + $( + impl<'i> FormatProperty for $prop { + type Inner = $inner; + type Setter = $setter; + type Outer<'a> = $outer; + + fn apply(inner: &mut Option, value: impl Into) { + *inner = Some(value.into()); + } + + fn get(inner: &Self::Inner) -> Self::Outer<'_> { + define_property_types!(@access_pattern $mode inner) + } + } + )* + }; +} + +define_property_types! {[ + mqtt_format::v5::variable_header::PayloadFormatIndicator => inner = u8; setter = u8; outer deref = u8, + mqtt_format::v5::variable_header::MessageExpiryInterval => inner = u32; setter = u32; outer deref = u32, + mqtt_format::v5::variable_header::ContentType<'i> => inner = String; setter = String; outer ref = &'a str, + mqtt_format::v5::variable_header::ResponseTopic<'i> => inner = String; setter = String; outer ref = &'a str, + mqtt_format::v5::variable_header::CorrelationData<'i> => inner = Vec; setter = Vec; outer ref = &'a [u8], + mqtt_format::v5::variable_header::SubscriptionIdentifier => inner = u32; setter = u32; outer deref = u32, + mqtt_format::v5::variable_header::SessionExpiryInterval => inner = u32; setter = u32; outer deref = u32, + mqtt_format::v5::variable_header::AssignedClientIdentifier<'i> => inner = String; setter = String; outer ref = &'a str, + mqtt_format::v5::variable_header::ServerKeepAlive => inner = u16; setter = u16; outer deref = u16, + mqtt_format::v5::variable_header::AuthenticationMethod<'i> => inner = String; setter = String; outer ref = &'a str, + mqtt_format::v5::variable_header::AuthenticationData<'i> => inner = Vec; setter = Vec; outer ref = &'a [u8], + mqtt_format::v5::variable_header::RequestProblemInformation => inner = u8; setter = u8; outer deref = u8, + mqtt_format::v5::variable_header::WillDelayInterval => inner = u32; setter = u32; outer deref = u32, + mqtt_format::v5::variable_header::RequestResponseInformation => inner = u8; setter = u8; outer deref = u8, + mqtt_format::v5::variable_header::ResponseInformation<'i> => inner = String; setter = String; outer ref = &'a str, + mqtt_format::v5::variable_header::ServerReference<'i> => inner = String; setter = String; outer ref = &'a str, + mqtt_format::v5::variable_header::ReasonString<'i> => inner = String; setter = String; outer ref = &'a str, + mqtt_format::v5::variable_header::ReceiveMaximum => inner = u16; setter = u16; outer deref = u16, + mqtt_format::v5::variable_header::TopicAliasMaximum => inner = u16; setter = u16; outer deref = u16, + mqtt_format::v5::variable_header::TopicAlias => inner = u16; setter = u16; outer deref = u16, + mqtt_format::v5::variable_header::MaximumQoS => inner = u8; setter = u8; outer deref = u8, + mqtt_format::v5::variable_header::RetainAvailable => inner = u8; setter = u8; outer deref = u8, + mqtt_format::v5::variable_header::MaximumPacketSize => inner = u32; setter = u32; outer deref = u32, + mqtt_format::v5::variable_header::WildcardSubscriptionAvailable => inner = u8; setter = u8; outer deref = u8, + mqtt_format::v5::variable_header::SubscriptionIdentifiersAvailable => inner = u8; setter = u8; outer deref = u8, + mqtt_format::v5::variable_header::SharedSubscriptionAvailable => inner = u8; setter = u8; outer deref = u8, +]} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use super::UserProperty; + use crate::packets::connect::ConnectProperties; + use crate::packets::VecWriter; + use crate::string::MqttString; + + #[test] + fn check_properties() { + let mut props = ConnectProperties::new(); + + props.with_session_expiry_interval(16u32); + props.with_user_properties(UserProperty { + key: MqttString::from_str("foo").unwrap(), + value: MqttString::from_str("bar").unwrap(), + }); + for _ in 0..5 { + props.with_user_properties(UserProperty { + key: MqttString::from_str("foo").unwrap(), + value: MqttString::from_str("bar").unwrap(), + }); + } + props.with_receive_maximum(4); + + let conn_props = props.as_ref(); + + assert_eq!( + conn_props + .user_properties() + .as_ref() + .unwrap() + .iter() + .count(), + 6 + ); + + let mut buffer = vec![]; + let mut buffer = VecWriter(&mut buffer); + + conn_props.write(&mut buffer).unwrap(); + + let new_props = mqtt_format::v5::packets::connect::ConnectProperties::parse( + &mut winnow::Bytes::new(&buffer.0), + ) + .unwrap(); + + assert_eq!(conn_props, new_props); + } +} diff --git a/src/string.rs b/src/string.rs new file mode 100644 index 00000000..67c4777f --- /dev/null +++ b/src/string.rs @@ -0,0 +1,54 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] +pub struct MqttString(String); + +impl MqttString { + pub const MAX_LEN: usize = u16::MAX as usize; +} + +impl std::str::FromStr for MqttString { + type Err = MqttStringError; + + fn from_str(s: &str) -> Result { + if s.len() > Self::MAX_LEN { + Err(MqttStringError::TooLong(s.len())) + } else { + Ok(Self(s.to_string())) + } + } +} + +impl TryFrom for MqttString { + type Error = MqttStringError; + + fn try_from(s: String) -> Result { + if s.len() > Self::MAX_LEN { + Err(MqttStringError::TooLong(s.len())) + } else { + Ok(Self(s.to_string())) + } + } +} + +impl AsRef for MqttString { + fn as_ref(&self) -> &str { + self.0.as_ref() + } +} + +impl std::fmt::Debug for MqttString { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum MqttStringError { + #[error("String of length {} is too long, max length is {}", .0, MqttString::MAX_LEN)] + TooLong(usize), +} diff --git a/src/transport.rs b/src/transport.rs new file mode 100644 index 00000000..cbed93ed --- /dev/null +++ b/src/transport.rs @@ -0,0 +1,125 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +use futures::AsyncRead as FuturesAsyncRead; +use futures::AsyncWrite as FuturesAsyncWrite; +use tokio::io::AsyncRead as TokioAsyncRead; +use tokio::io::AsyncWrite as TokioAsyncWrite; +use tokio::io::DuplexStream; +use tokio::net::TcpStream; +use tokio_util::compat::Compat as TokioCompat; +use tokio_util::compat::TokioAsyncReadCompatExt; + +pub(crate) enum MqttConnection { + Tokio(TokioCompat), + Duplex(TokioCompat), +} + +impl TokioAsyncRead for MqttConnection { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t.get_mut()).poll_read(cx, buf), + MqttConnection::Duplex(d) => std::pin::pin!(d.get_mut()).poll_read(cx, buf), + } + } +} + +impl TokioAsyncWrite for MqttConnection { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t.get_mut()).poll_write(cx, buf), + MqttConnection::Duplex(d) => std::pin::pin!(d.get_mut()).poll_write(cx, buf), + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t.get_mut()).poll_flush(cx), + MqttConnection::Duplex(d) => std::pin::pin!(d.get_mut()).poll_flush(cx), + } + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t.get_mut()).poll_shutdown(cx), + MqttConnection::Duplex(d) => std::pin::pin!(d.get_mut()).poll_shutdown(cx), + } + } +} + +impl FuturesAsyncRead for MqttConnection { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t).poll_read(cx, buf), + MqttConnection::Duplex(d) => std::pin::pin!(d).poll_read(cx, buf), + } + } +} + +impl FuturesAsyncWrite for MqttConnection { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t).poll_write(cx, buf), + MqttConnection::Duplex(d) => std::pin::pin!(d).poll_write(cx, buf), + } + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t).poll_flush(cx), + MqttConnection::Duplex(d) => std::pin::pin!(d).poll_flush(cx), + } + } + + fn poll_close( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut *self { + MqttConnection::Tokio(t) => std::pin::pin!(t).poll_close(cx), + MqttConnection::Duplex(d) => std::pin::pin!(d).poll_close(cx), + } + } +} + +pub enum MqttConnectTransport { + TokioTcp(TcpStream), + TokioDuplex(DuplexStream), +} + +impl From for MqttConnection { + fn from(value: MqttConnectTransport) -> Self { + match value { + MqttConnectTransport::TokioTcp(t) => MqttConnection::Tokio(t.compat()), + MqttConnectTransport::TokioDuplex(d) => MqttConnection::Duplex(d.compat()), + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 00000000..ba17ab35 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,22 @@ +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. +// + +macro_rules! speclink { + ($anker:literal) => { + core::concat!( + "https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#", + $anker + ) + }; +} +pub(crate) use speclink; + +macro_rules! md_speclink { + ($anker:literal) => { + core::concat!("[📖 Specification](", $crate::util::speclink!($anker), ")") + }; +} +pub(crate) use md_speclink;