Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client support for ReadOnlySequence as payload #2017

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Samples/Diagnostics/PackageInspection_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ public static async Task Inspect_Outgoing_Package()
/*
* This sample covers the inspection of outgoing packages from the client.
*/

var mqttFactory = new MqttClientFactory();

using (var mqttClient = mqttFactory.CreateMqttClient())
{
var mqttClientOptions = mqttFactory.CreateClientOptionsBuilder()
.WithTcpServer("broker.hivemq.com")
.Build();

mqttClient.InspectPacketAsync += OnInspectPacket;

await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

Console.WriteLine("MQTT client is connected.");

var mqttClientDisconnectOptions = mqttFactory.CreateClientDisconnectOptionsBuilder()
Expand Down
5 changes: 3 additions & 2 deletions Samples/Server/Server_Retained_Messages_Samples.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// ReSharper disable UnusedMember.Global
// ReSharper disable InconsistentNaming

using System.Buffers;
using System.Text.Json;
using MQTTnet.Packets;
using MQTTnet.Protocol;
Expand Down Expand Up @@ -112,7 +113,7 @@ public static MqttRetainedMessageModel Create(MqttApplicationMessage message)

// Create a copy of the buffer from the payload segment because
// it cannot be serialized and deserialized with the JSON serializer.
Payload = message.PayloadSegment.ToArray(),
Payload = message.Payload.Sequence.ToArray(),
UserProperties = message.UserProperties,
ResponseTopic = message.ResponseTopic,
CorrelationData = message.CorrelationData,
Expand All @@ -130,7 +131,7 @@ public MqttApplicationMessage ToApplicationMessage()
return new MqttApplicationMessage
{
Topic = Topic,
PayloadSegment = new ArraySegment<byte>(Payload ?? Array.Empty<byte>()),
Payload = Payload != null ? new ReadOnlySequence<byte>(Payload) : ReadOnlySequence<byte>.Empty,
PayloadFormatIndicator = PayloadFormatIndicator,
ResponseTopic = ResponseTopic,
CorrelationData = CorrelationData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Buffers;

namespace MQTTnet.AspNetCore.Client
{
Expand Down
18 changes: 14 additions & 4 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellat
{
var buffer = PacketFormatterAdapter.Encode(packet);

if (buffer.Payload.Count == 0)
if (buffer.Packet.IsSingleSegment && buffer.Payload.Length == 0)
{
// zero copy
// https://github.com/dotnet/runtime/blob/e31ddfdc4f574b26231233dc10c9a9c402f40590/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs#L279
await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false);
await _output.WriteAsync(buffer.Packet.First, cancellationToken).ConfigureAwait(false);
}
else
{
Expand All @@ -231,8 +231,18 @@ static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer)

var span = output.GetSpan(buffer.Length);

buffer.Packet.AsSpan().CopyTo(span);
buffer.Payload.AsSpan().CopyTo(span.Slice(buffer.Packet.Count));
int offset = 0;
foreach (var segment in buffer.Packet)
{
segment.Span.CopyTo(span.Slice(offset));
offset += segment.Length;
}

foreach (var segment in buffer.Payload)
{
segment.Span.CopyTo(span.Slice(offset));
offset += segment.Length;
}

output.Advance(buffer.Length);
}
Expand Down
1 change: 1 addition & 0 deletions Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using MQTTnet.Adapter;
using MQTTnet.Buffers;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Server;
Expand Down
1 change: 1 addition & 0 deletions Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using MQTTnet.Adapter;
using MQTTnet.Buffers;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Implementations;
Expand Down
4 changes: 2 additions & 2 deletions Source/MQTTnet.AspnetCore/ReaderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static bool TryDecode(
return false;
}

var fixedHeader = copy.First.Span[0];
var fixedHeader = copy.FirstSpan[0];
copy = copy.Slice(headerLength);
if (copy.Length < bodyLength)
{
Expand All @@ -53,7 +53,7 @@ public static bool TryDecode(
var bodySlice = copy.Slice(0, bodyLength);
var bodySegment = GetArraySegment(ref bodySlice);

var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, bodySegment, headerLength + bodyLength);
using var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, bodySegment, headerLength + bodyLength);
if (formatter.ProtocolVersion == MqttProtocolVersion.Unknown)
{
formatter.DetectProtocolVersion(receivedMqttPacket);
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class AsyncLockBenchmark : BaseBenchmark
{
Expand Down
20 changes: 5 additions & 15 deletions Source/MQTTnet.Benchmarks/ChannelAdapterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.IO;
using System.Threading;
using BenchmarkDotNet.Attributes;
using MQTTnet.Adapter;
using MQTTnet.Buffers;
using MQTTnet.Diagnostics;
using MQTTnet.Formatter;
using MQTTnet.Packets;
using MQTTnet.Tests.Mockups;
using System.Buffers;
using System.IO;
using System.Threading;

namespace MQTTnet.Benchmarks
{
Expand Down Expand Up @@ -58,7 +59,7 @@ public void Setup()

var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311, new MqttBufferWriter(4096, 65535));

var serializedPacket = Join(serializer.Encode(_packet).Join());
var serializedPacket = serializer.Encode(_packet).ToArray();

_iterations = 10000;

Expand All @@ -75,16 +76,5 @@ public void Setup()

_channelAdapter = new MqttChannelAdapter(channel, serializer, new MqttNetEventLogger());
}

static byte[] Join(params ArraySegment<byte>[] chunks)
{
var buffer = new MemoryStream();
foreach (var chunk in chunks)
{
buffer.Write(chunk.Array, chunk.Offset, chunk.Count);
}

return buffer.ToArray();
}
}
}
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/LoggerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter]
[MemoryDiagnoser]
public class LoggerBenchmark : BaseBenchmark
Expand Down
4 changes: 2 additions & 2 deletions Source/MQTTnet.Benchmarks/MemoryCopyBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class MemoryCopyBenchmark
Expand Down Expand Up @@ -33,7 +33,7 @@ public void Array_Copy()
[Benchmark]
public void Memory_Copy()
{
MQTTnet.Internal.MqttMemoryHelper.Copy(source, 0, target, 0, Length);
MQTTnet.Buffers.MqttMemoryHelper.Copy(source, 0, target, 0, Length);
}

}
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace MQTTnet.Benchmarks;

[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter]
[RankColumn]
[MemoryDiagnoser]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MessageProcessingMqttConnectionContextBenchmark : BaseBenchmark
{
Expand Down
3 changes: 2 additions & 1 deletion Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
using System.Text;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Buffers;
using MQTTnet.Formatter;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MqttBufferReaderBenchmark
{
Expand Down
12 changes: 6 additions & 6 deletions Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Formatter;
using MQTTnet.Buffers;
using MQTTnet.Tests.Mockups;
using System;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MqttPacketReaderWriterBenchmark : BaseBenchmark
{
readonly byte[] _demoPayload = new byte[1024];

byte[] _readPayload;

[GlobalCleanup]
Expand All @@ -27,7 +27,7 @@ public void GlobalCleanup()
public void GlobalSetup()
{
TestEnvironment.EnableLogger = false;

var writer = new MqttBufferWriter(4096, 65535);
writer.WriteString("A relative short string.");
writer.WriteBinary(_demoPayload);
Expand Down Expand Up @@ -69,7 +69,7 @@ public void Read_100_000_Messages()
reader.ReadBinaryData();
}
}

[Benchmark]
public void Write_100_000_Messages()
{
Expand Down
15 changes: 8 additions & 7 deletions Source/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Channel;
Expand All @@ -14,10 +10,14 @@
using MQTTnet.Implementations;
using MQTTnet.Server;
using MQTTnet.Server.Internal.Adapter;
using System.Buffers;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

namespace MQTTnet.Benchmarks;

[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MqttTcpChannelBenchmark : BaseBenchmark
{
Expand Down Expand Up @@ -64,12 +64,13 @@ async Task ReadAsync(int iterations, int size)
{
await Task.Yield();

var buffer = new byte[size];
var expected = iterations * size;
long read = 0;

while (read < expected)
{
var readResult = await _clientChannel.ReadAsync(new byte[size], 0, size, CancellationToken.None).ConfigureAwait(false);
var readResult = await _clientChannel.ReadAsync(buffer, 0, size, CancellationToken.None).ConfigureAwait(false);
read += readResult;
}
}
Expand All @@ -78,7 +79,7 @@ async Task WriteAsync(int iterations, int size)
{
await Task.Yield();

var buffer = new ArraySegment<byte>(new byte[size]);
var buffer = new ReadOnlySequence<byte>(new byte[size]);

for (var i = 0; i < iterations; i++)
{
Expand Down
15 changes: 11 additions & 4 deletions Source/MQTTnet.Benchmarks/ReaderExtensionsBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using BenchmarkDotNet.Jobs;
using MQTTnet.Adapter;
using MQTTnet.AspNetCore;
using MQTTnet.Buffers;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Packets;
Expand All @@ -14,7 +15,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class ReaderExtensionsBenchmark
Expand All @@ -35,8 +36,14 @@ public void GlobalSetup()

var buffer = mqttPacketFormatter.Encode(packet);
stream = new MemoryStream();
stream.Write(buffer.Packet);
stream.Write(buffer.Payload);
foreach (var segment in buffer.Packet)
{
stream.Write(segment.Span);
}
foreach (var segment in buffer.Payload)
{
stream.Write(segment.Span);
}
mqttPacketFormatter.Cleanup();
}

Expand Down Expand Up @@ -172,7 +179,7 @@ public static bool TryDecode(MqttPacketFormatterAdapter formatter,
var bodySlice = copy.Slice(0, bodyLength);
var buffer = GetMemory(bodySlice).ToArray();

var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, new ArraySegment<byte>(buffer, 0, buffer.Length), buffer.Length + 2);
using var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, new ArraySegment<byte>(buffer, 0, buffer.Length), buffer.Length + 2);

if (formatter.ProtocolVersion == MqttProtocolVersion.Unknown)
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/RoundtripProcessingBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class RoundtripProcessingBenchmark : BaseBenchmark
Expand Down
Loading