Skip to content

Commit

Permalink
tests and extensions
Browse files Browse the repository at this point in the history
  • Loading branch information
mregen committed Jul 13, 2024
1 parent e25569b commit 603fbec
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 21 deletions.
21 changes: 20 additions & 1 deletion Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,26 @@ public void Read_Remaining_Data_From_Larger_Buffer()
Assert.AreEqual(5, remainingData.Length);
}

[TestMethod]
[TestMethod]
public void Read_Remaining_Payload_From_Larger_Buffer()
{
var buffer = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };

var reader = new MqttBufferReader();

// The used buffer contains more data than used!
reader.SetBuffer(buffer, 0, 5);

// This should only read 5 bytes even if more data is in the buffer
// due to custom bounds.
using var remainingData = reader.ReadBufferedPayload();

Assert.IsTrue(reader.EndOfStream);
Assert.AreEqual(0, reader.BytesLeft);
Assert.AreEqual(5, remainingData.Sequence.Length);
}

[TestMethod]
public void Read_Various_Positions_and_Offsets()
{
const int NumBufferElements = 1000;
Expand Down
19 changes: 10 additions & 9 deletions Source/MQTTnet.Tests/MQTTnet.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MSTest.TestAdapter" Version="3.3.1"/>
<PackageReference Include="MSTest.TestFramework" Version="3.3.1"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0"/>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.1" />
<PackageReference Include="MSTest.TestAdapter" Version="3.3.1" />
<PackageReference Include="MSTest.TestFramework" Version="3.3.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Source\MQTTnet.Server\MQTTnet.Server.csproj"/>
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj"/>
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj"/>
<ProjectReference Include="..\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj"/>
<ProjectReference Include="..\MQTTnet.Extensions.TopicTemplate\MQTTnet.Extensions.TopicTemplate.csproj"/>
<ProjectReference Include="..\..\Source\MQTTnet.Server\MQTTnet.Server.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj" />
<ProjectReference Include="..\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj" />
<ProjectReference Include="..\MQTTnet.Extensions.TopicTemplate\MQTTnet.Extensions.TopicTemplate.csproj" />
</ItemGroup>

</Project>
105 changes: 103 additions & 2 deletions Source/MQTTnet.Tests/MQTTv5/Client_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.IO;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Formatter;
Expand Down Expand Up @@ -62,7 +63,7 @@ await client.PublishAsync(new MqttApplicationMessageBuilder()
Assert.AreEqual(2, receivedMessage.UserProperties.Count);
}
}

[TestMethod]
public async Task Connect()
{
Expand Down Expand Up @@ -129,6 +130,54 @@ public async Task Unsubscribe()
}
}

[TestMethod]
public async Task Publish_QoS_0_LargeBuffer()
{
using var recyclableMemoryStream = GetLargePayload();
using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

var client = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V500));
var result = await client.PublishSequenceAsync("a", recyclableMemoryStream.GetReadOnlySequence());
await client.DisconnectAsync();

Assert.AreEqual(MqttClientPublishReasonCode.Success, result.ReasonCode);
}
}

[TestMethod]
public async Task Publish_QoS_1_LargeBuffer()
{
using var recyclableMemoryStream = GetLargePayload();
using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

var client = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V500));
var result = await client.PublishSequenceAsync("a", recyclableMemoryStream.GetReadOnlySequence(), MqttQualityOfServiceLevel.AtLeastOnce);
await client.DisconnectAsync();

Assert.AreEqual(MqttClientPublishReasonCode.NoMatchingSubscribers, result.ReasonCode);
}
}

[TestMethod]
public async Task Publish_QoS_2_LargeBuffer()
{
using var recyclableMemoryStream = GetLargePayload();
using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

var client = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V500));
var result = await client.PublishSequenceAsync("a", recyclableMemoryStream.GetReadOnlySequence(), MqttQualityOfServiceLevel.ExactlyOnce);
await client.DisconnectAsync();

Assert.AreEqual(MqttClientPublishReasonCode.NoMatchingSubscribers, result.ReasonCode);
}
}

[TestMethod]
public async Task Publish_QoS_0()
{
Expand Down Expand Up @@ -174,6 +223,45 @@ public async Task Publish_QoS_2()
}
}

[TestMethod]
public async Task Publish_With_RecyclableMemoryStream()
{
var memoryManager = new RecyclableMemoryStreamManager(options: new RecyclableMemoryStreamManager.Options { BlockSize = 4096 });
using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

var client = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V500));

const int payloadSize = 100000;
using var memoryStream = memoryManager.GetStream();

byte testValue = 0;
while (memoryStream.Length < payloadSize)
{
memoryStream.WriteByte(testValue++);
}

var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("Hello")
.WithPayload(memoryStream.GetReadOnlySequence())
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce)
.WithUserProperty("x", "1")
.WithUserProperty("y", "2")
.WithResponseTopic("response")
.WithContentType("text")
.WithMessageExpiryInterval(50)
.WithCorrelationData(new byte[12])
.WithTopicAlias(2)
.Build();

var result = await client.PublishAsync(applicationMessage);
await client.DisconnectAsync();

Assert.AreEqual(MqttClientPublishReasonCode.Success, result.ReasonCode);
}
}

[TestMethod]
public async Task Publish_With_Properties()
{
Expand Down Expand Up @@ -209,7 +297,7 @@ public async Task Subscribe_And_Publish()
using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

var client1 = await testEnvironment.ConnectClient(o => o.WithProtocolVersion(MqttProtocolVersion.V500).WithClientId("client1"));

var testMessageHandler = testEnvironment.CreateApplicationMessageHandler(client1);
Expand Down Expand Up @@ -278,5 +366,18 @@ public async Task Publish_And_Receive_New_Properties()
CollectionAssert.AreEqual(applicationMessage.UserProperties, receivedMessage.UserProperties);
}
}

private RecyclableMemoryStream GetLargePayload()
{
var memoryManager = new RecyclableMemoryStreamManager();
var memoryStream = memoryManager.GetStream();
for (var i = 0; i < 100000; i++)
{
memoryStream.WriteByte((byte)i);
}

memoryStream.Position = 0;
return memoryStream;
}
}
}
2 changes: 0 additions & 2 deletions Source/MQTTnet.Tests/MqttApplicationMessageBuilder_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.IO;
using System.Linq;
using System.Text;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Protocol;
Expand Down
28 changes: 28 additions & 0 deletions Source/MQTTnet/Client/MqttClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -65,6 +66,33 @@ public static Task<MqttClientPublishResult> PublishBinaryAsync(
return mqttClient.PublishAsync(applicationMessage, cancellationToken);
}

public static Task<MqttClientPublishResult> PublishSequenceAsync(
this IMqttClient mqttClient,
string topic,
ReadOnlySequence<byte> payload,
MqttQualityOfServiceLevel qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce,
bool retain = false,
CancellationToken cancellationToken = default)
{
if (mqttClient == null)
{
throw new ArgumentNullException(nameof(mqttClient));
}

if (topic == null)
{
throw new ArgumentNullException(nameof(topic));
}

var applicationMessage = new MqttApplicationMessageBuilder().WithTopic(topic)
.WithPayload(payload)
.WithRetainFlag(retain)
.WithQualityOfServiceLevel(qualityOfServiceLevel)
.Build();

return mqttClient.PublishAsync(applicationMessage, cancellationToken);
}

public static Task<MqttClientPublishResult> PublishStringAsync(
this IMqttClient mqttClient,
string topic,
Expand Down
7 changes: 5 additions & 2 deletions Source/MQTTnet/MqttApplicationMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,18 @@ public sealed class MqttApplicationMessage
public uint MessageExpiryInterval { get; set; }

/// <summary>
/// Set an ArraySegment as Payload.
/// Set an ArraySegment as Payload.
/// </summary>
public ArraySegment<byte> PayloadSegment
{
set { Payload = new ReadOnlySequence<byte>(value); }
}

/// <summary>
/// Get or set ArraySegment style of Payload.
/// Get or set ReadOnlySequence style of Payload.
/// This payload type is used internally and is recommended for public use.
/// It can be used in combination with a RecyclableMemoryStream to publish
/// large buffered messages without allocating large chunks of memory.
/// </summary>
public ReadOnlySequence<byte> Payload { get; set; } = EmptyBuffer.ReadOnlySequence;

Expand Down
17 changes: 12 additions & 5 deletions Source/MQTTnet/MqttApplicationMessageBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand All @@ -22,7 +23,7 @@ public sealed class MqttApplicationMessageBuilder
uint _messageExpiryInterval;

MqttPayloadFormatIndicator _payloadFormatIndicator;
ArraySegment<byte> _payloadSegment;
ReadOnlySequence<byte> _payload;
MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
string _responseTopic;
bool _retain;
Expand All @@ -41,7 +42,7 @@ public MqttApplicationMessage Build()
var applicationMessage = new MqttApplicationMessage
{
Topic = _topic,
PayloadSegment = _payloadSegment,
Payload = _payload,
QualityOfServiceLevel = _qualityOfServiceLevel,
Retain = _retain,
ContentType = _contentType,
Expand Down Expand Up @@ -89,13 +90,13 @@ public MqttApplicationMessageBuilder WithMessageExpiryInterval(uint messageExpir

public MqttApplicationMessageBuilder WithPayload(byte[] payload)
{
_payloadSegment = payload == null || payload.Length == 0 ? EmptyBuffer.ArraySegment : new ArraySegment<byte>(payload);
_payload = payload == null || payload.Length == 0 ? EmptyBuffer.ReadOnlySequence : new ReadOnlySequence<byte>(payload);
return this;
}

public MqttApplicationMessageBuilder WithPayload(ArraySegment<byte> payloadSegment)
{
_payloadSegment = payloadSegment;
_payload = new ReadOnlySequence<byte>(payloadSegment);
return this;
}

Expand Down Expand Up @@ -158,6 +159,12 @@ public MqttApplicationMessageBuilder WithPayload(string payload)
return WithPayload(payloadBuffer);
}

public MqttApplicationMessageBuilder WithPayload(ReadOnlySequence<byte> payload)
{
_payload = payload;
return this;
}

/// <summary>
/// Adds the payload format indicator to the message.
/// <remarks>MQTT 5.0.0+ feature.</remarks>
Expand All @@ -170,7 +177,7 @@ public MqttApplicationMessageBuilder WithPayloadFormatIndicator(MqttPayloadForma

public MqttApplicationMessageBuilder WithPayloadSegment(ArraySegment<byte> payloadSegment)
{
_payloadSegment = payloadSegment;
_payload = new ReadOnlySequence<byte>(payloadSegment);
return this;
}

Expand Down

0 comments on commit 603fbec

Please sign in to comment.