Skip to content

Commit

Permalink
use memory pool for special case
Browse files Browse the repository at this point in the history
  • Loading branch information
mregen committed Jun 23, 2024
1 parent 27f8486 commit 08b23b9
Show file tree
Hide file tree
Showing 18 changed files with 64 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/AsyncLockBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class AsyncLockBenchmark : BaseBenchmark
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/LoggerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter]
[MemoryDiagnoser]
public class LoggerBenchmark : BaseBenchmark
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MemoryCopyBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class MemoryCopyBenchmark
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MessageProcessingBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace MQTTnet.Benchmarks;

[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter]
[RankColumn]
[MemoryDiagnoser]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MessageProcessingMqttConnectionContextBenchmark : BaseBenchmark
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MqttBufferReaderBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MqttBufferReaderBenchmark
{
Expand Down
11 changes: 5 additions & 6 deletions Source/MQTTnet.Benchmarks/MqttPacketReaderWriterBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using MQTTnet.Buffers;
using MQTTnet.Formatter;
using MQTTnet.Tests.Mockups;
using System;

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MqttPacketReaderWriterBenchmark : BaseBenchmark
{
readonly byte[] _demoPayload = new byte[1024];

byte[] _readPayload;

[GlobalCleanup]
Expand All @@ -28,7 +27,7 @@ public void GlobalCleanup()
public void GlobalSetup()
{
TestEnvironment.EnableLogger = false;

var writer = new MqttBufferWriter(4096, 65535);
writer.WriteString("A relative short string.");
writer.WriteBinary(_demoPayload);
Expand Down Expand Up @@ -70,7 +69,7 @@ public void Read_100_000_Messages()
reader.ReadBinaryData();
}
}

[Benchmark]
public void Write_100_000_Messages()
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/MqttTcpChannelBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

namespace MQTTnet.Benchmarks;

[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class MqttTcpChannelBenchmark : BaseBenchmark
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/RoundtripProcessingBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class RoundtripProcessingBenchmark : BaseBenchmark
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/SendPacketAsyncBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class SendPacketAsyncBenchmark : BaseBenchmark
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/SerializerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter]
[MemoryDiagnoser]
public class SerializerBenchmark : BaseBenchmark
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/ServerProcessingBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter, RankColumn]
[MemoryDiagnoser]
public class ServerProcessingBenchmark : BaseBenchmark
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/TcpPipesBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[MemoryDiagnoser]
public class TcpPipesBenchmark : BaseBenchmark
{
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.Benchmarks/TopicFilterComparerBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace MQTTnet.Benchmarks
{
[SimpleJob(RuntimeMoniker.Net60)]
[SimpleJob(RuntimeMoniker.Net80)]
[RPlotExporter]
[MemoryDiagnoser]
public class TopicFilterComparerBenchmark : BaseBenchmark
Expand Down
12 changes: 9 additions & 3 deletions Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,20 @@ public async Task SendPacketAsync(MqttPacket packet, CancellationToken cancellat

_logger.Verbose("TX ({0} bytes) >>> {1}", packetBuffer.Length, packet);

if (packetBuffer.Payload.Length == 0 || !AllowPacketFragmentation)
if (!AllowPacketFragmentation)
{
await _channel.WriteAsync(new ReadOnlySequence<byte>(packetBuffer.ToArray()), true, cancellationToken).ConfigureAwait(false);
using (var memoryOwner = packetBuffer.ToMemoryOwner())
{
await _channel.WriteAsync(new ReadOnlySequence<byte>(memoryOwner.Memory), true, cancellationToken).ConfigureAwait(false);
}
}
else
{
await _channel.WriteAsync(packetBuffer.Packet, false, cancellationToken).ConfigureAwait(false);
await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false);
if (packetBuffer.Payload.Length > 0)
{
await _channel.WriteAsync(packetBuffer.Payload, true, cancellationToken).ConfigureAwait(false);
}
}

Interlocked.Add(ref _statistics._bytesReceived, packetBuffer.Length);
Expand Down
9 changes: 8 additions & 1 deletion Source/MQTTnet/Buffers/MqttMemoryHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ public static void Copy(byte[] source, int sourceIndex, byte[] destination, int

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Copy(ReadOnlySequence<byte> sequence, int sourceIndex, byte[] destination, int destinationIndex, int length)
{
sequence.Slice(sourceIndex).CopyTo(destination.AsSpan(destinationIndex, length));
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Copy(ReadOnlySequence<byte> sequence, int sourceIndex, Memory<byte> destination, int destinationIndex, int length)
{
var offset = destinationIndex;
foreach (var segment in sequence)
Expand All @@ -23,8 +29,9 @@ public static void Copy(ReadOnlySequence<byte> sequence, int sourceIndex, byte[]
sourceIndex -= segment.Length;
continue;
}

var targetLength = Math.Min(segment.Length - sourceIndex, length);
segment.Span.Slice(sourceIndex, targetLength).CopyTo(destination.AsSpan(offset));
segment.Span.Slice(sourceIndex, targetLength).CopyTo(destination.Span.Slice(offset));
offset += targetLength;
length -= targetLength;
if (length == 0)
Expand Down
28 changes: 18 additions & 10 deletions Source/MQTTnet/Formatter/MqttPacketBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,30 @@ public MqttPacketBuffer(ReadOnlySequence<byte> packet) : this(packet, ReadOnlySe

public byte[] ToArray()
{
if (Payload.Length == 0)
{
var packetBuffer = GC.AllocateUninitializedArray<byte>((int)Packet.Length);
Packet.CopyTo(packetBuffer);
return packetBuffer;
}

var buffer = GC.AllocateUninitializedArray<byte>(Length);
int packetLength = (int)Packet.Length;
int payloadLength = (int)Payload.Length;
MqttMemoryHelper.Copy(Packet, 0, buffer, 0, packetLength);
MqttMemoryHelper.Copy(Payload, 0, buffer, packetLength, payloadLength);

if (Payload.Length > 0)
{
int payloadLength = (int)Payload.Length;
MqttMemoryHelper.Copy(Payload, 0, buffer, packetLength, payloadLength);
}
return buffer;
}

public IMemoryOwner<byte> ToMemoryOwner()
{
var memoryOwner = MemoryPool<byte>.Shared.Rent(Length);
int packetLength = (int)Packet.Length;
MqttMemoryHelper.Copy(Packet, 0, memoryOwner.Memory, 0, packetLength);
if (Payload.Length > 0)
{
int payloadLength = (int)Payload.Length;
MqttMemoryHelper.Copy(Payload, 0, memoryOwner.Memory, packetLength, payloadLength);
}
return memoryOwner;
}

public ArraySegment<byte> Join()
{
return new ArraySegment<byte>(ToArray());
Expand Down
31 changes: 11 additions & 20 deletions Source/MQTTnet/MqttApplicationMessageBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public sealed class MqttApplicationMessageBuilder

MqttPayloadFormatIndicator _payloadFormatIndicator;
ReadOnlySequence<byte> _payload;
IDisposable _payloadOwner;
MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
string _responseTopic;
bool _retain;
Expand All @@ -42,6 +43,7 @@ public MqttApplicationMessage Build()
{
Topic = _topic,
Payload = _payload,
PayloadOwner = _payloadOwner,
QualityOfServiceLevel = _qualityOfServiceLevel,
Retain = _retain,
ContentType = _contentType,
Expand Down Expand Up @@ -99,9 +101,16 @@ public MqttApplicationMessageBuilder WithPayload(ArraySegment<byte> payloadSegme
return this;
}

public MqttApplicationMessageBuilder WithPayload(ReadOnlySequence<byte> payload)
public MqttApplicationMessageBuilder WithPayload(ReadOnlyMemory<byte> payload)
{
_payload = new ReadOnlySequence<byte>(payload);
return this;
}

public MqttApplicationMessageBuilder WithPayload(ReadOnlySequence<byte> payload, IDisposable payloadOwner = null)
{
_payload = payload;
_payloadOwner = payloadOwner;
return this;
}

Expand All @@ -119,7 +128,7 @@ public MqttApplicationMessageBuilder WithPayload(IEnumerable<byte> payload)

if (payload is ArraySegment<byte> arraySegment)
{
return WithPayloadSegment(arraySegment);
return WithPayload(arraySegment);
}

return WithPayload(payload.ToArray());
Expand Down Expand Up @@ -174,24 +183,6 @@ public MqttApplicationMessageBuilder WithPayloadFormatIndicator(MqttPayloadForma
return this;
}

public MqttApplicationMessageBuilder WithPayloadSegment(ArraySegment<byte> payloadSegment)
{
_payload = new ReadOnlySequence<byte>(payloadSegment);
return this;
}

public MqttApplicationMessageBuilder WithPayloadSegment(ReadOnlyMemory<byte> payloadSegment)
{
_payload = new ReadOnlySequence<byte>(payloadSegment);
return this;
}

public MqttApplicationMessageBuilder WithPayloadSequence(ReadOnlySequence<byte> payload)
{
_payload = payload;
return this;
}

/// <summary>
/// The quality of service level.
/// The Quality of Service (QoS) level is an agreement between the sender of a message and the receiver of a message
Expand Down

0 comments on commit 08b23b9

Please sign in to comment.