From 9816f12df6a269ae1656e77737897c0159495ad6 Mon Sep 17 00:00:00 2001 From: Christian Kratky Date: Sun, 9 Apr 2017 15:10:35 +0200 Subject: [PATCH] Performance optimizations --- .../MQTTnet.NetCoreApp.csproj | 6 +- .../Properties/AssemblyInfo.cs | 4 +- .../Properties/AssemblyInfo.cs | 4 +- MQTTnet.Core/Client/MqttClient.cs | 24 ++-- MQTTnet.Core/Internal/AsyncAutoResetEvent.cs | 32 +++++ MQTTnet.Core/MQTTnet.Core.csproj | 6 +- MQTTnet.Core/Serializer/ByteReader.cs | 20 +-- MQTTnet.Core/Serializer/ByteWriter.cs | 2 +- .../DefaultMqttV311PacketSerializer.cs | 117 ++++++++---------- MQTTnet.Core/Serializer/MqttPacketReader.cs | 5 +- MQTTnet.Core/Serializer/MqttPacketWriter.cs | 62 +++++----- MQTTnet.Core/Server/MqttClientSession.cs | 2 +- .../Server/MqttClientSessionManager.cs | 2 +- .../Server/MqttOutgoingPublicationsManager.cs | 14 ++- MQTTnet.nuspec | 2 +- README.md | 6 + 16 files changed, 172 insertions(+), 136 deletions(-) create mode 100644 MQTTnet.Core/Internal/AsyncAutoResetEvent.cs diff --git a/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj b/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj index 0e7cacdbc..ec4520174 100644 --- a/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj +++ b/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj @@ -7,9 +7,9 @@ MQTTnet MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). Copyright © Christian Kratky 2016-2017 - 2.1.0.4 - 2.1.0.11 - 2.1.0.11 + 2.1.1.0 + 2.1.1.0 + 2.1.1.0 False MQTTnet MQTTnet diff --git a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs index f9c21ccbc..24471d2bc 100644 --- a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs @@ -11,5 +11,5 @@ [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] [assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")] -[assembly: AssemblyVersion("2.1.0.11")] -[assembly: AssemblyFileVersion("2.1.0.11")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.1.0")] +[assembly: AssemblyFileVersion("2.1.1.0")] \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs index 86cca2262..7ad7d87a9 100644 --- a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs @@ -10,5 +10,5 @@ [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] [assembly: ComVisible(false)] -[assembly: AssemblyVersion("2.1.0.11")] -[assembly: AssemblyFileVersion("2.1.0.11")] \ No newline at end of file +[assembly: AssemblyVersion("2.1.1.0")] +[assembly: AssemblyFileVersion("2.1.1.0")] \ No newline at end of file diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index c2fe8fc71..5c89a913a 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -88,11 +88,11 @@ public async Task DisconnectAsync() await DisconnectInternalAsync(); } - public async Task> SubscribeAsync(params TopicFilter[] topicFilters) + public Task> SubscribeAsync(params TopicFilter[] topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - return await SubscribeAsync(topicFilters.ToList()); + return SubscribeAsync(topicFilters.ToList()); } public async Task> SubscribeAsync(IList topicFilters) @@ -117,11 +117,11 @@ public async Task> SubscribeAsync(IList return topicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList(); } - public async Task Unsubscribe(params string[] topicFilters) + public Task Unsubscribe(params string[] topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); - await Unsubscribe(topicFilters.ToList()); + return Unsubscribe(topicFilters.ToList()); } public async Task Unsubscribe(IList topicFilters) @@ -274,14 +274,14 @@ private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) FireApplicationMessageReceivedEvent(originalPublishPacket); } - private async Task SendAsync(MqttBasePacket packet) + private Task SendAsync(MqttBasePacket packet) { - await _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout); + return _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout); } private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { - Func responsePacketSelector = p => + bool ResponsePacketSelector(MqttBasePacket p) { var p1 = p as TResponsePacket; if (p1 == null) @@ -301,10 +301,10 @@ private async Task SendAndReceiveAsync(MqttBas } return true; - }; + } await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); - return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout); + return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout); } private ushort GetNewPacketIdentifier() @@ -324,8 +324,9 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke await SendAndReceiveAsync(new MqttPingReqPacket()); } } - catch (MqttCommunicationException) + catch (MqttCommunicationException exception) { + MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); } catch (Exception exception) { @@ -351,8 +352,9 @@ private async Task ReceivePackets(CancellationToken cancellationToken) Task.Run(() => ProcessReceivedPacket(mqttPacket), cancellationToken).Forget(); } } - catch (MqttCommunicationException) + catch (MqttCommunicationException exception) { + MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets."); } catch (Exception exception) { diff --git a/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs b/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs new file mode 100644 index 000000000..c0f4972ed --- /dev/null +++ b/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs @@ -0,0 +1,32 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace MQTTnet.Core.Internal +{ + public class AsyncGate + { + private readonly Queue> _waitingTasks = new Queue>(); + + public Task WaitOneAsync() + { + var tcs = new TaskCompletionSource(); + lock (_waitingTasks) + { + _waitingTasks.Enqueue(tcs); + } + + return tcs.Task; + } + + public void Set() + { + lock (_waitingTasks) + { + if (_waitingTasks.Count > 0) + { + _waitingTasks.Dequeue().SetResult(true); + } + } + } + } +} diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj index c28399884..1459a5dae 100644 --- a/MQTTnet.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -9,15 +9,15 @@ MQTTnet Christian Kratky Christian Kratky - 2.1.0.11 + 2.1.1.0 MQTTnet.Core Copyright © Christian Kratky 2016-2017 https://github.com/chkr1011/MQTTnet https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png https://github.com/chkr1011/MQTTnet MQTT MQTTClient MQTTServer MQTTBroker Broker - 2.1.0.11 - 2.1.0.11 + 2.1.1.0 + 2.1.1.0 https://github.com/chkr1011/MQTTnet/blob/master/LICENSE diff --git a/MQTTnet.Core/Serializer/ByteReader.cs b/MQTTnet.Core/Serializer/ByteReader.cs index 06dae2f8d..a95f056ca 100644 --- a/MQTTnet.Core/Serializer/ByteReader.cs +++ b/MQTTnet.Core/Serializer/ByteReader.cs @@ -4,36 +4,42 @@ namespace MQTTnet.Core.Serializer { public class ByteReader { + private readonly int _source; private int _index; - private readonly int _byte; - public ByteReader(byte @byte) + public ByteReader(int source) { - _byte = @byte; + _source = source; } public bool Read() { if (_index >= 8) { - throw new InvalidOperationException("End of the byte reached."); + throw new InvalidOperationException("End of byte reached."); } - var result = ((1 << _index) & _byte) > 0; + var result = ((1 << _index) & _source) > 0; _index++; - return result; } public byte Read(int count) { + if (_index + count > 8) + { + throw new InvalidOperationException("End of byte will be reached."); + } + var result = 0; for (var i = 0; i < count; i++) { - if (Read()) + if (((1 << _index) & _source) > 0) { result |= 1 << i; } + + _index++; } return (byte)result; diff --git a/MQTTnet.Core/Serializer/ByteWriter.cs b/MQTTnet.Core/Serializer/ByteWriter.cs index cc19ce01a..fbaf17551 100644 --- a/MQTTnet.Core/Serializer/ByteWriter.cs +++ b/MQTTnet.Core/Serializer/ByteWriter.cs @@ -9,7 +9,7 @@ public class ByteWriter public byte Value => (byte)_byte; - public void Write(byte @byte, int count) + public void Write(int @byte, int count) { for (var i = 0; i < count; i++) { diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs index 082fa1410..d7a39243e 100644 --- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs +++ b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs @@ -12,7 +12,7 @@ namespace MQTTnet.Core.Serializer { public class DefaultMqttV311PacketSerializer : IMqttPacketSerializer { - public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) + public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination) { if (packet == null) throw new ArgumentNullException(nameof(packet)); if (destination == null) throw new ArgumentNullException(nameof(destination)); @@ -20,99 +20,85 @@ public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChanne var connectPacket = packet as MqttConnectPacket; if (connectPacket != null) { - await SerializeAsync(connectPacket, destination); - return; + return SerializeAsync(connectPacket, destination); } var connAckPacket = packet as MqttConnAckPacket; if (connAckPacket != null) { - await SerializeAsync(connAckPacket, destination); - return; + return SerializeAsync(connAckPacket, destination); } var disconnectPacket = packet as MqttDisconnectPacket; if (disconnectPacket != null) { - await SerializeAsync(disconnectPacket, destination); - return; + return SerializeAsync(disconnectPacket, destination); } var pingReqPacket = packet as MqttPingReqPacket; if (pingReqPacket != null) { - await SerializeAsync(pingReqPacket, destination); - return; + return SerializeAsync(pingReqPacket, destination); } var pingRespPacket = packet as MqttPingRespPacket; if (pingRespPacket != null) { - await SerializeAsync(pingRespPacket, destination); - return; + return SerializeAsync(pingRespPacket, destination); } var publishPacket = packet as MqttPublishPacket; if (publishPacket != null) { - await SerializeAsync(publishPacket, destination); - return; + return SerializeAsync(publishPacket, destination); } var pubAckPacket = packet as MqttPubAckPacket; if (pubAckPacket != null) { - await SerializeAsync(pubAckPacket, destination); - return; + return SerializeAsync(pubAckPacket, destination); } var pubRecPacket = packet as MqttPubRecPacket; if (pubRecPacket != null) { - await SerializeAsync(pubRecPacket, destination); - return; + return SerializeAsync(pubRecPacket, destination); } var pubRelPacket = packet as MqttPubRelPacket; if (pubRelPacket != null) { - await SerializeAsync(pubRelPacket, destination); - return; + return SerializeAsync(pubRelPacket, destination); } var pubCompPacket = packet as MqttPubCompPacket; if (pubCompPacket != null) { - await SerializeAsync(pubCompPacket, destination); - return; + return SerializeAsync(pubCompPacket, destination); } var subscribePacket = packet as MqttSubscribePacket; if (subscribePacket != null) { - await SerializeAsync(subscribePacket, destination); - return; + return SerializeAsync(subscribePacket, destination); } var subAckPacket = packet as MqttSubAckPacket; if (subAckPacket != null) { - await SerializeAsync(subAckPacket, destination); - return; + return SerializeAsync(subAckPacket, destination); } var unsubscribePacket = packet as MqttUnsubscribePacket; if (unsubscribePacket != null) { - await SerializeAsync(unsubscribePacket, destination); - return; + return SerializeAsync(unsubscribePacket, destination); } var unsubAckPacket = packet as MqttUnsubAckPacket; if (unsubAckPacket != null) { - await SerializeAsync(unsubAckPacket, destination); - return; + return SerializeAsync(unsubAckPacket, destination); } throw new MqttProtocolViolationException("Packet type invalid."); @@ -287,6 +273,7 @@ private async Task DeserializeConnectAsync(MqttPacketReader read await reader.ReadRemainingDataByteAsync(); await reader.ReadRemainingDataByteAsync(); + var protocolName = await reader.ReadRemainingDataAsync(4); if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT") @@ -382,19 +369,17 @@ private void ValidatePublishPacket(MqttPublishPacket packet) } } - private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) + private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT"); + + private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination) { ValidateConnectPacket(packet); using (var output = new MqttPacketWriter()) { // Write variable header - output.Write(0x00); // 3.1.2.1 Protocol Name - output.Write(0x04); // "" - output.Write('M'); - output.Write('Q'); - output.Write('T'); - output.Write('T'); + output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name + output.Write(MqttPrefix); output.Write(0x04); // 3.1.2.2 Protocol Level var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags @@ -404,7 +389,7 @@ private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationCh if (packet.WillMessage != null) { - connectFlags.Write((byte)packet.WillMessage.QualityOfServiceLevel, 2); + connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2); connectFlags.Write(packet.WillMessage.Retain); } else @@ -412,7 +397,7 @@ private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationCh connectFlags.Write(0, 2); connectFlags.Write(false); } - + connectFlags.Write(packet.Password != null); connectFlags.Write(packet.Username != null); @@ -437,11 +422,11 @@ private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationCh } output.InjectFixedHeader(MqttControlPacketType.Connect); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -452,26 +437,26 @@ private async Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationCh output.Write((byte)packet.ConnectReturnCode); output.InjectFixedHeader(MqttControlPacketType.ConnAck); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination) { - await SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); + return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination); } - private async Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination) { - await SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination); + return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination); } - private async Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination) { - await SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination); + return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination); } - private async Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination) { ValidatePublishPacket(packet); @@ -502,29 +487,29 @@ private async Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationCh fixedHeader.Write(packet.Dup); output.InjectFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { output.Write(packet.PacketIdentifier); output.InjectFixedHeader(MqttControlPacketType.PubAck); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { output.Write(packet.PacketIdentifier); output.InjectFixedHeader(MqttControlPacketType.PubRec); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } @@ -539,24 +524,24 @@ private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationCha } } - private async Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { output.Write(packet.PacketIdentifier); output.InjectFixedHeader(MqttControlPacketType.PubComp); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { output.Write(packet.PacketIdentifier); - if (packet.TopicFilters?.Any() == true) + if (packet.TopicFilters?.Count > 0) { foreach (var topicFilter in packet.TopicFilters) { @@ -566,11 +551,11 @@ private async Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunication } output.InjectFixedHeader(MqttControlPacketType.Subscribe, 0x02); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -585,11 +570,11 @@ private async Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationCha } output.InjectFixedHeader(MqttControlPacketType.SubAck); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { @@ -604,27 +589,27 @@ private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicati } output.InjectFixedHeader(MqttControlPacketType.Unsubscibe, 0x02); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination) + private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { output.Write(packet.PacketIdentifier); output.InjectFixedHeader(MqttControlPacketType.UnsubAck); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } - private async Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination) + private Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination) { using (var output = new MqttPacketWriter()) { output.InjectFixedHeader(type); - await output.WriteToAsync(destination); + return output.WriteToAsync(destination); } } } diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 11d4b67a1..bf38bd0e4 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -107,15 +107,16 @@ public async Task ReadRemainingDataWithLengthPrefixAsync() return await ReadRemainingDataAsync(length); } - public async Task ReadRemainingDataAsync() + public Task ReadRemainingDataAsync() { - return await ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position); + return ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position); } public async Task ReadRemainingDataAsync(int length) { var buffer = new byte[length]; await _remainingData.ReadAsync(buffer, 0, buffer.Length); + return buffer; } diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs index ced0c57c3..eb9bfcf07 100644 --- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs +++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs @@ -9,7 +9,7 @@ namespace MQTTnet.Core.Serializer { public sealed class MqttPacketWriter : IDisposable { - private readonly MemoryStream _buffer = new MemoryStream(); + private readonly MemoryStream _buffer = new MemoryStream(512); public void InjectFixedHeader(byte fixedHeader) { @@ -20,31 +20,28 @@ public void InjectFixedHeader(byte fixedHeader) return; } + var backupBuffer = _buffer.ToArray(); var remainingLength = (int)_buffer.Length; - using (var buffer = new MemoryStream()) - { - _buffer.WriteTo(buffer); - _buffer.SetLength(0); - _buffer.WriteByte(fixedHeader); + _buffer.SetLength(0); + + _buffer.WriteByte(fixedHeader); - // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. - var x = remainingLength; - do + // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html. + var x = remainingLength; + do + { + var encodedByte = x % 128; + x = x / 128; + if (x > 0) { - var encodedByte = (byte)(x % 128); - x = x / 128; - if (x > 0) - { - encodedByte = (byte)(encodedByte | 128); - } - - _buffer.WriteByte(encodedByte); - } while (x > 0); - - buffer.Position = 0; - buffer.WriteTo(_buffer); - } + encodedByte = encodedByte | 128; + } + + _buffer.WriteByte((byte)encodedByte); + } while (x > 0); + + _buffer.Write(backupBuffer, 0, backupBuffer.Length); } public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0) @@ -59,11 +56,6 @@ public void Write(byte value) _buffer.WriteByte(value); } - public void Write(char value) - { - _buffer.WriteByte((byte)value); - } - public void Write(ushort value) { var buffer = BitConverter.GetBytes(value); @@ -73,11 +65,15 @@ public void Write(ushort value) public void Write(ByteWriter value) { + if (value == null) throw new ArgumentNullException(nameof(value)); + _buffer.WriteByte(value.Value); } - public void Write(byte[] value) + public void Write(params byte[] value) { + if (value == null) throw new ArgumentNullException(nameof(value)); + _buffer.Write(value, 0, value.Length); } @@ -94,14 +90,16 @@ public void WriteWithLengthPrefix(byte[] value) Write(value); } - public void Dispose() + public Task WriteToAsync(IMqttCommunicationChannel destination) { - _buffer?.Dispose(); + if (destination == null) throw new ArgumentNullException(nameof(destination)); + + return destination.WriteAsync(_buffer.ToArray()); } - public async Task WriteToAsync(IMqttCommunicationChannel destination) + public void Dispose() { - await destination.WriteAsync(_buffer.ToArray()); + _buffer?.Dispose(); } } } diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs index 98ebc4212..0af6e4635 100644 --- a/MQTTnet.Core/Server/MqttClientSession.cs +++ b/MQTTnet.Core/Server/MqttClientSession.cs @@ -74,7 +74,7 @@ public async Task RunAsync(string identifier, MqttApplicationMessage willApplica } } - public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) + public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket) { if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession)); if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket)); diff --git a/MQTTnet.Core/Server/MqttClientSessionManager.cs b/MQTTnet.Core/Server/MqttClientSessionManager.cs index 7a215df72..5f6f63e46 100644 --- a/MQTTnet.Core/Server/MqttClientSessionManager.cs +++ b/MQTTnet.Core/Server/MqttClientSessionManager.cs @@ -86,7 +86,7 @@ private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPu { foreach (var clientSession in _clientSessions.Values.ToList()) { - clientSession.DispatchPublishPacket(senderClientSession, publishPacket); + clientSession.EnqueuePublishPacket(senderClientSession, publishPacket); } } } diff --git a/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs b/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs index a87dfcb99..8e7c67200 100644 --- a/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs +++ b/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs @@ -13,8 +13,8 @@ namespace MQTTnet.Core.Server { public class MqttOutgoingPublicationsManager { - private readonly AutoResetEvent _resetEvent = new AutoResetEvent(false); private readonly List _pendingPublishPackets = new List(); + private readonly AsyncGate _gate = new AsyncGate(); private readonly MqttServerOptions _options; private CancellationTokenSource _cancellationTokenSource; @@ -35,11 +35,12 @@ public void Start(IMqttCommunicationAdapter adapter) _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); _cancellationTokenSource = new CancellationTokenSource(); - Task.Run(async () => await SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget(); + Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget(); } public void Stop() { + _adapter = null; _cancellationTokenSource?.Cancel(); _cancellationTokenSource = null; } @@ -52,7 +53,7 @@ public void Enqueue(MqttClientSession senderClientSession, MqttPublishPacket pub lock (_pendingPublishPackets) { _pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket)); - _resetEvent.Set(); + _gate.Set(); } } @@ -62,12 +63,17 @@ private async Task SendPendingPublishPacketsAsync(CancellationToken cancellation { try { - _resetEvent.WaitOne(); + await _gate.WaitOneAsync(); if (cancellationToken.IsCancellationRequested) { return; } + if (_adapter == null) + { + continue; + } + List pendingPublishPackets; lock (_pendingPublishPackets) { diff --git a/MQTTnet.nuspec b/MQTTnet.nuspec index b7d8b65a4..4eaa76b16 100644 --- a/MQTTnet.nuspec +++ b/MQTTnet.nuspec @@ -2,7 +2,7 @@ MQTTnet - 2.1.0.11 + 2.1.1.0 Christian Kratky Christian Kratky https://github.com/chkr1011/MQTTnet/blob/master/LICENSE diff --git a/README.md b/README.md index d0bad7afd..bd469a9a5 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,12 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien * 3.1.1 +## Nuget +This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/ + +## Contributions +If you want to contribute to this project just create a pull request. + # MqttClient ## Example