Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for ReadOnlySequence as payload #2046

Merged
merged 8 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
sign:
needs: build
runs-on: windows-latest # Code signing must run on a Windows agent for Authenticode signing (dll/exe)

if: github.repository == 'dotnet/MQTTnet'
steps:
chkr1011 marked this conversation as resolved.
Show resolved Hide resolved
- name: Setup .NET SDK
uses: actions/setup-dotnet@v4
Expand Down
3 changes: 2 additions & 1 deletion Samples/Server/Server_Retained_Messages_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// ReSharper disable UnusedMember.Global
// ReSharper disable InconsistentNaming

using System.Buffers;
using System.Text.Json;
using MQTTnet.Packets;
using MQTTnet.Protocol;
Expand Down Expand Up @@ -112,7 +113,7 @@ public static MqttRetainedMessageModel Create(MqttApplicationMessage message)

// Create a copy of the buffer from the payload segment because
// it cannot be serialized and deserialized with the JSON serializer.
Payload = message.PayloadSegment.ToArray(),
Payload = message.Payload.ToArray(),
UserProperties = message.UserProperties,
ResponseTopic = message.ResponseTopic,
CorrelationData = message.CorrelationData,
Expand Down
10 changes: 8 additions & 2 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellat
{
var buffer = PacketFormatterAdapter.Encode(packet);

if (buffer.Payload.Count == 0)
if (buffer.Payload.Length == 0)
{
// zero copy
// https://github.com/dotnet/runtime/blob/e31ddfdc4f574b26231233dc10c9a9c402f40590/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs#L279
Expand Down Expand Up @@ -232,7 +232,13 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer)
var span = output.GetSpan(buffer.Length);

buffer.Packet.AsSpan().CopyTo(span);
buffer.Payload.AsSpan().CopyTo(span.Slice(buffer.Packet.Count));

int offset = buffer.Packet.Count;
foreach (var segment in buffer.Payload)
{
segment.Span.CopyTo(span.Slice(offset));
offset += segment.Length;
}

output.Advance(buffer.Length);
}
Expand Down
3 changes: 2 additions & 1 deletion Source/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -78,7 +79,7 @@ async Task WriteAsync(int iterations, int size)
{
await Task.Yield();

var buffer = new ArraySegment<byte>(new byte[size]);
var buffer = new ReadOnlySequence<byte>(new byte[size]);

for (var i = 0; i < iterations; i++)
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/ReaderExtensionsBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void GlobalSetup()
var buffer = mqttPacketFormatter.Encode(packet);
stream = new MemoryStream();
stream.Write(buffer.Packet);
stream.Write(buffer.Payload);
stream.Write(buffer.Payload.ToArray());
mqttPacketFormatter.Cleanup();
}

Expand Down
5 changes: 3 additions & 2 deletions Source/MQTTnet.Benchmarks/SendPacketAsyncBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using BenchmarkDotNet.Jobs;
using MQTTnet.Formatter;
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
Expand Down Expand Up @@ -40,7 +41,7 @@ public async ValueTask After()
stream.Position = 0;
var output = PipeWriter.Create(stream);

if (buffer.Payload.Count == 0)
if (buffer.Payload.Length == 0)
{
await output.WriteAsync(buffer.Packet).ConfigureAwait(false);
}
Expand All @@ -60,7 +61,7 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer)
var span = output.GetSpan(buffer.Length);

buffer.Packet.AsSpan().CopyTo(span);
buffer.Payload.AsSpan().CopyTo(span.Slice(buffer.Packet.Count));
buffer.Payload.CopyTo(span.Slice(buffer.Packet.Count));

output.Advance(buffer.Length);
}
Expand Down
3 changes: 2 additions & 1 deletion Source/MQTTnet.Benchmarks/SerializerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using MQTTnet.Formatter.V3;
using BenchmarkDotNet.Jobs;
using MQTTnet.Diagnostics;
using System.Buffers;

namespace MQTTnet.Benchmarks
{
Expand Down Expand Up @@ -104,7 +105,7 @@ public Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationTok
return Task.FromResult(count);
}

public Task WriteAsync(ArraySegment<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
public Task WriteAsync(ReadOnlySequence<byte> buffer, bool isEndOfPacket, CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
Expand Down
3 changes: 2 additions & 1 deletion Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -132,7 +133,7 @@ Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventAr
return CompletedTask.Instance;
}

var payloadBuffer = eventArgs.ApplicationMessage.PayloadSegment.ToArray();
var payloadBuffer = eventArgs.ApplicationMessage.Payload.ToArray();
awaitable.TrySetResult(payloadBuffer);

// Set this message to handled to that other code can avoid execution etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static MqttPublishPacket Create(MqttApplicationMessage applicationMessage
var packet = new MqttPublishPacket
{
Topic = applicationMessage.Topic,
PayloadSegment = applicationMessage.PayloadSegment,
Payload = applicationMessage.Payload,
QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel,
Retain = applicationMessage.Retain,
Dup = applicationMessage.Dup,
Expand Down
12 changes: 4 additions & 8 deletions Source/MQTTnet.Server/Internal/MqttRetainedMessagesManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public async Task UpdateMessage(string clientId, MqttApplicationMessage applicat

lock (_messages)
{
var payloadSegment = applicationMessage.PayloadSegment;
var hasPayload = payloadSegment.Count > 0;
var payload = applicationMessage.Payload;
var hasPayload = payload.Length > 0;

if (!hasPayload)
{
Expand All @@ -82,7 +82,8 @@ public async Task UpdateMessage(string clientId, MqttApplicationMessage applicat
}
else
{
if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel || !SequenceEqual(existingMessage.PayloadSegment, payloadSegment))
if (existingMessage.QualityOfServiceLevel != applicationMessage.QualityOfServiceLevel ||
!MqttMemoryHelper.SequenceEqual(existingMessage.Payload, payload))
{
_messages[applicationMessage.Topic] = applicationMessage;
saveIsRequired = true;
Expand Down Expand Up @@ -147,10 +148,5 @@ public async Task ClearMessages()
await _eventContainer.RetainedMessagesClearedEvent.InvokeAsync(EventArgs.Empty).ConfigureAwait(false);
}
}

static bool SequenceEqual(ArraySegment<byte> source, ArraySegment<byte> target)
{
return source.AsSpan().SequenceEqual(target);
}
}
}
7 changes: 2 additions & 5 deletions Source/MQTTnet.TestApp/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ public static async Task RunAsync()
client.ApplicationMessageReceivedAsync += e =>
{
var payloadText = string.Empty;
if (e.ApplicationMessage.PayloadSegment.Count > 0)
if (e.ApplicationMessage.Payload.Length > 0)
{
payloadText = Encoding.UTF8.GetString(
e.ApplicationMessage.PayloadSegment.Array,
e.ApplicationMessage.PayloadSegment.Offset,
e.ApplicationMessage.PayloadSegment.Count);
payloadText = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
}

Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Expand Down
7 changes: 2 additions & 5 deletions Source/MQTTnet.TestApp/ServerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,9 @@ public static async Task RunAsync()
mqttServer.InterceptingPublishAsync += e =>
{
var payloadText = string.Empty;
if (e.ApplicationMessage.PayloadSegment.Count > 0)
if (e.ApplicationMessage.Payload.Length > 0)
{
payloadText = Encoding.UTF8.GetString(
e.ApplicationMessage.PayloadSegment.Array,
e.ApplicationMessage.PayloadSegment.Offset,
e.ApplicationMessage.PayloadSegment.Count);
payloadText = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
}

MqttNetConsoleLogger.PrintToConsole($"'{e.ClientId}' reported '{e.ApplicationMessage.Topic}' > '{payloadText}'", ConsoleColor.Magenta);
Expand Down
3 changes: 2 additions & 1 deletion Source/MQTTnet.Tests/ASP/MqttConnectionContextTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
Expand Down Expand Up @@ -99,7 +100,7 @@ public async Task TestLargePacket()
connection.Transport = pipe;
var ctx = new MqttConnectionContext(serializer, connection);

await ctx.SendPacketAsync(new MqttPublishPacket { PayloadSegment = new ArraySegment<byte>(new byte[20_000]) }, CancellationToken.None).ConfigureAwait(false);
await ctx.SendPacketAsync(new MqttPublishPacket { PayloadSegment = new byte[20_000] }, CancellationToken.None).ConfigureAwait(false);

var readResult = await pipe.Send.Reader.ReadAsync();
Assert.IsTrue(readResult.Buffer.Length > 20000);
Expand Down
5 changes: 3 additions & 2 deletions Source/MQTTnet.Tests/Clients/MqttClient/MqttClient_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -297,7 +298,7 @@ await receiver.SubscribeAsync(

Assert.IsNotNull(receivedMessage);
Assert.AreEqual("A", receivedMessage.Topic);
Assert.AreEqual(null, receivedMessage.PayloadSegment.Array);
Assert.AreEqual(0, receivedMessage.Payload.Length);
}
}

Expand Down Expand Up @@ -508,7 +509,7 @@ public async Task Publish_QoS_1_In_ApplicationMessageReceiveHandler()

client2.ApplicationMessageReceivedAsync += e =>
{
client2TopicResults.Add(Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()));
client2TopicResults.Add(Encoding.UTF8.GetString(e.ApplicationMessage.Payload.ToArray()));
return CompletedTask.Instance;
};

Expand Down
1 change: 1 addition & 0 deletions Source/MQTTnet.Tests/Formatter/MqttBufferReader_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void Read_Remaining_Data_From_Larger_Buffer()
Assert.AreEqual(5, remainingData.Length);
}


[TestMethod]
public void Read_Various_Positions_and_Offsets()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand Down Expand Up @@ -328,7 +329,7 @@ public void Serialize_LargePacket()

Assert.IsNotNull(publishPacketCopy);
Assert.AreEqual(publishPacket.Topic, publishPacketCopy.Topic);
CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), publishPacketCopy.PayloadSegment.ToArray());
CollectionAssert.AreEqual(publishPacket.Payload.ToArray(), publishPacketCopy.Payload.ToArray());

// Now modify the payload and test again.
publishPacket.PayloadSegment = new ArraySegment<byte>(Encoding.UTF8.GetBytes("MQTT"));
Expand All @@ -338,7 +339,7 @@ public void Serialize_LargePacket()

Assert.IsNotNull(publishPacketCopy2);
Assert.AreEqual(publishPacket.Topic, publishPacketCopy2.Topic);
CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), publishPacketCopy2.PayloadSegment.ToArray());
CollectionAssert.AreEqual(publishPacket.Payload.ToArray(), publishPacketCopy2.Payload.ToArray());
}

[TestMethod]
Expand Down Expand Up @@ -587,7 +588,7 @@ MqttProtocolVersion DeserializeAndDetectVersion(MqttPacketFormatterAdapter packe
return packetFormatterAdapter.ProtocolVersion;
}

TPacket Roundtrip<TPacket>(TPacket packet, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, MqttBufferWriter bufferWriter = null) where TPacket : MqttPacket
TPacket Roundtrip<TPacket>(TPacket packet, MqttProtocolVersion protocolVersion = MqttProtocolVersion.V311, MqttBufferReader bufferReader = null, MqttBufferWriter bufferWriter = null) where TPacket : MqttPacket
{
var writer = bufferWriter ?? WriterFactory();
var serializer = MqttPacketFormatterAdapter.GetMqttPacketFormatter(protocolVersion, writer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
Expand Down Expand Up @@ -322,7 +323,7 @@ public void Serialize_Full_MqttPublishPacket_V311()
Assert.AreEqual(publishPacket.PacketIdentifier, deserialized.PacketIdentifier);
Assert.AreEqual(publishPacket.Dup, deserialized.Dup);
Assert.AreEqual(publishPacket.Retain, deserialized.Retain);
CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), deserialized.PayloadSegment.ToArray());
CollectionAssert.AreEqual(publishPacket.Payload.ToArray(), deserialized.Payload.ToArray());
Assert.AreEqual(publishPacket.QualityOfServiceLevel, deserialized.QualityOfServiceLevel);
Assert.AreEqual(publishPacket.Topic, deserialized.Topic);
Assert.AreEqual(null, deserialized.ResponseTopic); // Not supported in v3.1.1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
Expand Down Expand Up @@ -283,7 +284,7 @@ public void Serialize_Full_MqttPublishPacket_V500()
Assert.AreEqual(publishPacket.PacketIdentifier, deserialized.PacketIdentifier);
Assert.AreEqual(publishPacket.Dup, deserialized.Dup);
Assert.AreEqual(publishPacket.Retain, deserialized.Retain);
CollectionAssert.AreEqual(publishPacket.PayloadSegment.ToArray(), deserialized.PayloadSegment.ToArray());
CollectionAssert.AreEqual(publishPacket.Payload.ToArray(), deserialized.Payload.ToArray());
Assert.AreEqual(publishPacket.QualityOfServiceLevel, deserialized.QualityOfServiceLevel);
Assert.AreEqual(publishPacket.Topic, deserialized.Topic);
Assert.AreEqual(publishPacket.ResponseTopic, deserialized.ResponseTopic);
Expand Down
19 changes: 10 additions & 9 deletions Source/MQTTnet.Tests/MQTTnet.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MSTest.TestAdapter" Version="3.3.1"/>
<PackageReference Include="MSTest.TestFramework" Version="3.3.1"/>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0"/>
<FrameworkReference Include="Microsoft.AspNetCore.App"/>
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.1" />
<PackageReference Include="MSTest.TestAdapter" Version="3.3.1" />
<PackageReference Include="MSTest.TestFramework" Version="3.3.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<FrameworkReference Include="Microsoft.AspNetCore.App" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Source\MQTTnet.Server\MQTTnet.Server.csproj"/>
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj"/>
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj"/>
<ProjectReference Include="..\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj"/>
<ProjectReference Include="..\MQTTnet.Extensions.TopicTemplate\MQTTnet.Extensions.TopicTemplate.csproj"/>
<ProjectReference Include="..\..\Source\MQTTnet.Server\MQTTnet.Server.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet.Extensions.Rpc\MQTTnet.Extensions.Rpc.csproj" />
<ProjectReference Include="..\..\Source\MQTTnet\MQTTnet.csproj" />
<ProjectReference Include="..\MQTTnet.AspnetCore\MQTTnet.AspNetCore.csproj" />
<ProjectReference Include="..\MQTTnet.Extensions.TopicTemplate\MQTTnet.Extensions.TopicTemplate.csproj" />
</ItemGroup>

</Project>
Loading
Loading