Skip to content

Commit

Permalink
Merge pull request #258 from TheNeikos/feature/add_client
Browse files Browse the repository at this point in the history
Add rudimentary client
  • Loading branch information
TheNeikos authored Mar 27, 2024
2 parents 92c031e + 958e026 commit d43a8d3
Show file tree
Hide file tree
Showing 36 changed files with 1,398 additions and 47 deletions.
46 changes: 42 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ categories = ["embedded"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace]
members = ["mqtt-format", "mqtt-tester"]
members = ["cloudmqtt-bin", "mqtt-format", "mqtt-tester"]

[dependencies]
futures = "0.3.30"
mqtt-format = { version = "0.5.0", path = "mqtt-format", features = [
"yoke",
"mqttv5",
] }
paste = "1.0.14"
stable_deref_trait = "1.2.0"
thiserror = "1.0.58"
tokio = { version = "1.36.0", features = ["macros", "full"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
tokio-util = { version = "0.7.10", features = ["codec", "compat"] }
typed-builder = "0.18"
winnow = "0.6.5"
yoke = "0.7.3"
11 changes: 11 additions & 0 deletions cloudmqtt-bin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "cloudmqtt-bin"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
cloudmqtt = { version = "0.5.0", path = ".." }
tokio = { version = "1.36.0", features = ["full"] }
38 changes: 38 additions & 0 deletions cloudmqtt-bin/src/bin/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//

use clap::Parser;
use cloudmqtt::client::MqttClientConnector;
use cloudmqtt::transport::MqttConnectTransport;
use tokio::net::TcpStream;

#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(long)]
hostname: String,
}

#[tokio::main]
async fn main() {
let args = Args::parse();

let socket = TcpStream::connect(args.hostname).await.unwrap();

let connection = MqttConnectTransport::TokioTcp(socket);
let client_id = cloudmqtt::client_identifier::ClientIdentifier::PotentiallyServerProvided;

let connector = MqttClientConnector::new(
connection,
client_id,
cloudmqtt::client::CleanStart::Yes,
cloudmqtt::keep_alive::KeepAlive::Disabled,
);

let _client = connector.connect().await.unwrap();

println!("Yay, we connected! That's all for now");
}
2 changes: 1 addition & 1 deletion mqtt-format/src/v5/integers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn parse_u16(input: &mut &Bytes) -> MResult<u16> {
}

pub fn write_u16<W: WriteMqttPacket>(buffer: &mut W, u: u16) -> WResult<W> {
buffer.write_u16(u.to_be())?;
buffer.write_u16(u)?;
Ok(())
}

Expand Down
12 changes: 12 additions & 0 deletions mqtt-format/src/v5/packets/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,23 @@ impl<'i> Will<'i> {
}

crate::v5::properties::define_properties! {
packet_type: Will,
anker: "_Toc3901060",
pub struct ConnectWillProperties<'i> {
(anker: "_Toc3901062")
will_delay_interval: WillDelayInterval,
(anker: "_Toc3901063")
payload_format_indicator: PayloadFormatIndicator,
(anker: "_Toc3901064")
message_expiry_interval: MessageExpiryInterval,
(anker: "_Toc3901065")
content_type: ContentType<'i>,
(anker: "_Toc3901066")
response_topic: ResponseTopic<'i>,
(anker: "_Toc3901067")
correlation_data: CorrelationData<'i>,
(anker: "_Toc3901068")
user_properties: UserProperties<'i>,
}
}

Expand Down Expand Up @@ -342,6 +352,7 @@ mod test {
content_type: None,
response_topic: None,
correlation_data: None,
user_properties: None,
},
topic: "crazy topic",
payload: &[0xAB, 0xCD, 0xEF],
Expand Down Expand Up @@ -378,6 +389,7 @@ mod test {
content_type: Some(ContentType("json")),
response_topic: Some(ResponseTopic("resp")),
correlation_data: Some(CorrelationData(&[0xFF])),
user_properties: None,
},
topic: "crazy topic",
payload: &[0xAB, 0xCD, 0xEF],
Expand Down
5 changes: 5 additions & 0 deletions mqtt-format/src/v5/packets/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ use crate::v5::write::WriteMqttPacket;
use crate::v5::MResult;

define_properties! {
packet_type: MSubscribe,
anker: "_Toc3901164",
pub struct SubscribeProperties<'i> {
(anker: "_Toc3901166")
subscription_identifier: SubscriptionIdentifier,

(anker: "_Toc3901167")
user_properties: UserProperties<'i>,
}
}
Expand Down
5 changes: 5 additions & 0 deletions mqtt-format/src/v5/packets/unsuback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ crate::v5::reason_code::make_combined_reason_code! {
}

crate::v5::properties::define_properties! {
packet_type: MUnsuback,
anker: "_Toc3901190",
pub struct UnsubackProperties<'i> {
(anker: "_Toc3901192")
reason_string: ReasonString<'i>,

(anker: "_Toc3901193")
user_properties: UserProperties<'i>,
}
}
Expand Down
31 changes: 16 additions & 15 deletions mqtt-format/src/v5/variable_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use super::integers::write_variable_u32;
use super::write::WResult;
use super::write::WriteMqttPacket;
use super::MResult;
use crate::v5::integers::parse_variable_u32;

#[derive(Debug, Clone, Copy, PartialEq)]
pub struct PacketIdentifier(pub u16);
Expand Down Expand Up @@ -190,9 +191,9 @@ define_properties! {[
],

SubscriptionIdentifier as 0x0B =>
parse with parse_u32 as u32;
write with super::integers::write_u32;
with size |_| 4;
parse with parse_variable_u32 as u32;
write with super::integers::write_variable_u32;
with size |&v: &u32| super::integers::variable_u32_binary_size(v);
testfnname: test_roundtrip_subscriptionidentifier;
testvalues: [12, 14, 42, 1337],

Expand All @@ -211,9 +212,9 @@ define_properties! {[
testvalues: ["fooobarbar"],

ServerKeepAlive as 0x13 =>
parse with parse_u32 as u32;
write with super::integers::write_u32;
with size |_| 4;
parse with parse_u16 as u16;
write with super::integers::write_u16;
with size |_| 2;
testfnname: test_roundtrip_serverkeepalive;
testvalues: [12, 14, 42, 1337],

Expand Down Expand Up @@ -279,23 +280,23 @@ define_properties! {[
testvalues: ["fooobarbar"],

ReceiveMaximum as 0x21 =>
parse with parse_u32 as u32;
write with super::integers::write_u32;
with size |_| 4;
parse with parse_u16 as u16;
write with super::integers::write_u16;
with size |_| 2;
testfnname: test_roundtrip_receivemaximum;
testvalues: [12, 14, 42, 1337],

TopicAliasMaximum as 0x22 =>
parse with parse_u32 as u32;
write with super::integers::write_u32;
with size |_| 4;
parse with parse_u16 as u16;
write with super::integers::write_u16;
with size |_| 2;
testfnname: test_roundtrip_topicaliasmaximum;
testvalues: [12, 14, 42, 1337],

TopicAlias as 0x23 =>
parse with parse_u32 as u32;
write with super::integers::write_u32;
with size |_| 4;
parse with parse_u16 as u16;
write with super::integers::write_u16;
with size |_| 2;
testfnname: test_roundtrip_topicalias;
testvalues: [12, 14, 42, 1337],

Expand Down
48 changes: 48 additions & 0 deletions src/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//

#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub struct MqttBytes(Vec<u8>);

impl MqttBytes {
pub const MAX_LEN: usize = u16::MAX as usize;
}

#[derive(Debug, thiserror::Error)]
pub enum MqttBytesError {
#[error("A vector/slice of length {} is too long, max length is {}", .0, MqttBytes::MAX_LEN)]
TooLong(usize),
}

impl AsRef<[u8]> for MqttBytes {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}

impl TryFrom<Vec<u8>> for MqttBytes {
type Error = MqttBytesError;

fn try_from(s: Vec<u8>) -> Result<Self, Self::Error> {
if s.len() > Self::MAX_LEN {
Err(MqttBytesError::TooLong(s.len()))
} else {
Ok(Self(s))
}
}
}

impl TryFrom<&[u8]> for MqttBytes {
type Error = MqttBytesError;

fn try_from(s: &[u8]) -> Result<Self, Self::Error> {
if s.len() > Self::MAX_LEN {
Err(MqttBytesError::TooLong(s.len()))
} else {
Ok(Self(s.to_vec()))
}
}
}
Loading

0 comments on commit d43a8d3

Please sign in to comment.