diff --git a/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj b/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj
index 0e7cacdbc..ec4520174 100644
--- a/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj
+++ b/Frameworks/MQTTnet.NetCoreApp/MQTTnet.NetCoreApp.csproj
@@ -7,9 +7,9 @@
MQTTnet
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).
Copyright © Christian Kratky 2016-2017
- 2.1.0.4
- 2.1.0.11
- 2.1.0.11
+ 2.1.1.0
+ 2.1.1.0
+ 2.1.1.0
False
MQTTnet
MQTTnet
diff --git a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs
index f9c21ccbc..24471d2bc 100644
--- a/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs
+++ b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs
@@ -11,5 +11,5 @@
[assembly: AssemblyCulture("")]
[assembly: ComVisible(false)]
[assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")]
-[assembly: AssemblyVersion("2.1.0.11")]
-[assembly: AssemblyFileVersion("2.1.0.11")]
\ No newline at end of file
+[assembly: AssemblyVersion("2.1.1.0")]
+[assembly: AssemblyFileVersion("2.1.1.0")]
\ No newline at end of file
diff --git a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs
index 86cca2262..7ad7d87a9 100644
--- a/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs
+++ b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs
@@ -10,5 +10,5 @@
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
[assembly: ComVisible(false)]
-[assembly: AssemblyVersion("2.1.0.11")]
-[assembly: AssemblyFileVersion("2.1.0.11")]
\ No newline at end of file
+[assembly: AssemblyVersion("2.1.1.0")]
+[assembly: AssemblyFileVersion("2.1.1.0")]
\ No newline at end of file
diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs
index c2fe8fc71..5c89a913a 100644
--- a/MQTTnet.Core/Client/MqttClient.cs
+++ b/MQTTnet.Core/Client/MqttClient.cs
@@ -88,11 +88,11 @@ public async Task DisconnectAsync()
await DisconnectInternalAsync();
}
- public async Task> SubscribeAsync(params TopicFilter[] topicFilters)
+ public Task> SubscribeAsync(params TopicFilter[] topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
- return await SubscribeAsync(topicFilters.ToList());
+ return SubscribeAsync(topicFilters.ToList());
}
public async Task> SubscribeAsync(IList topicFilters)
@@ -117,11 +117,11 @@ public async Task> SubscribeAsync(IList
return topicFilters.Select((t, i) => new MqttSubscribeResult(t, response.SubscribeReturnCodes[i])).ToList();
}
- public async Task Unsubscribe(params string[] topicFilters)
+ public Task Unsubscribe(params string[] topicFilters)
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
- await Unsubscribe(topicFilters.ToList());
+ return Unsubscribe(topicFilters.ToList());
}
public async Task Unsubscribe(IList topicFilters)
@@ -274,14 +274,14 @@ private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
FireApplicationMessageReceivedEvent(originalPublishPacket);
}
- private async Task SendAsync(MqttBasePacket packet)
+ private Task SendAsync(MqttBasePacket packet)
{
- await _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout);
+ return _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout);
}
private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket
{
- Func responsePacketSelector = p =>
+ bool ResponsePacketSelector(MqttBasePacket p)
{
var p1 = p as TResponsePacket;
if (p1 == null)
@@ -301,10 +301,10 @@ private async Task SendAndReceiveAsync(MqttBas
}
return true;
- };
+ }
await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout);
- return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout);
+ return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(ResponsePacketSelector, _options.DefaultCommunicationTimeout);
}
private ushort GetNewPacketIdentifier()
@@ -324,8 +324,9 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke
await SendAndReceiveAsync(new MqttPingReqPacket());
}
}
- catch (MqttCommunicationException)
+ catch (MqttCommunicationException exception)
{
+ MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
}
catch (Exception exception)
{
@@ -351,8 +352,9 @@ private async Task ReceivePackets(CancellationToken cancellationToken)
Task.Run(() => ProcessReceivedPacket(mqttPacket), cancellationToken).Forget();
}
}
- catch (MqttCommunicationException)
+ catch (MqttCommunicationException exception)
{
+ MqttTrace.Warning(nameof(MqttClient), exception, "MQTT communication error while receiving packets.");
}
catch (Exception exception)
{
diff --git a/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs b/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs
new file mode 100644
index 000000000..c0f4972ed
--- /dev/null
+++ b/MQTTnet.Core/Internal/AsyncAutoResetEvent.cs
@@ -0,0 +1,32 @@
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace MQTTnet.Core.Internal
+{
+ public class AsyncGate
+ {
+ private readonly Queue> _waitingTasks = new Queue>();
+
+ public Task WaitOneAsync()
+ {
+ var tcs = new TaskCompletionSource();
+ lock (_waitingTasks)
+ {
+ _waitingTasks.Enqueue(tcs);
+ }
+
+ return tcs.Task;
+ }
+
+ public void Set()
+ {
+ lock (_waitingTasks)
+ {
+ if (_waitingTasks.Count > 0)
+ {
+ _waitingTasks.Dequeue().SetResult(true);
+ }
+ }
+ }
+ }
+}
diff --git a/MQTTnet.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj
index c28399884..1459a5dae 100644
--- a/MQTTnet.Core/MQTTnet.Core.csproj
+++ b/MQTTnet.Core/MQTTnet.Core.csproj
@@ -9,15 +9,15 @@
MQTTnet
Christian Kratky
Christian Kratky
- 2.1.0.11
+ 2.1.1.0
MQTTnet.Core
Copyright © Christian Kratky 2016-2017
https://github.com/chkr1011/MQTTnet
https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png
https://github.com/chkr1011/MQTTnet
MQTT MQTTClient MQTTServer MQTTBroker Broker
- 2.1.0.11
- 2.1.0.11
+ 2.1.1.0
+ 2.1.1.0
https://github.com/chkr1011/MQTTnet/blob/master/LICENSE
diff --git a/MQTTnet.Core/Serializer/ByteReader.cs b/MQTTnet.Core/Serializer/ByteReader.cs
index 06dae2f8d..a95f056ca 100644
--- a/MQTTnet.Core/Serializer/ByteReader.cs
+++ b/MQTTnet.Core/Serializer/ByteReader.cs
@@ -4,36 +4,42 @@ namespace MQTTnet.Core.Serializer
{
public class ByteReader
{
+ private readonly int _source;
private int _index;
- private readonly int _byte;
- public ByteReader(byte @byte)
+ public ByteReader(int source)
{
- _byte = @byte;
+ _source = source;
}
public bool Read()
{
if (_index >= 8)
{
- throw new InvalidOperationException("End of the byte reached.");
+ throw new InvalidOperationException("End of byte reached.");
}
- var result = ((1 << _index) & _byte) > 0;
+ var result = ((1 << _index) & _source) > 0;
_index++;
-
return result;
}
public byte Read(int count)
{
+ if (_index + count > 8)
+ {
+ throw new InvalidOperationException("End of byte will be reached.");
+ }
+
var result = 0;
for (var i = 0; i < count; i++)
{
- if (Read())
+ if (((1 << _index) & _source) > 0)
{
result |= 1 << i;
}
+
+ _index++;
}
return (byte)result;
diff --git a/MQTTnet.Core/Serializer/ByteWriter.cs b/MQTTnet.Core/Serializer/ByteWriter.cs
index cc19ce01a..fbaf17551 100644
--- a/MQTTnet.Core/Serializer/ByteWriter.cs
+++ b/MQTTnet.Core/Serializer/ByteWriter.cs
@@ -9,7 +9,7 @@ public class ByteWriter
public byte Value => (byte)_byte;
- public void Write(byte @byte, int count)
+ public void Write(int @byte, int count)
{
for (var i = 0; i < count; i++)
{
diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
index 082fa1410..d7a39243e 100644
--- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
+++ b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
@@ -12,7 +12,7 @@ namespace MQTTnet.Core.Serializer
{
public class DefaultMqttV311PacketSerializer : IMqttPacketSerializer
{
- public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
+ public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (destination == null) throw new ArgumentNullException(nameof(destination));
@@ -20,99 +20,85 @@ public async Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChanne
var connectPacket = packet as MqttConnectPacket;
if (connectPacket != null)
{
- await SerializeAsync(connectPacket, destination);
- return;
+ return SerializeAsync(connectPacket, destination);
}
var connAckPacket = packet as MqttConnAckPacket;
if (connAckPacket != null)
{
- await SerializeAsync(connAckPacket, destination);
- return;
+ return SerializeAsync(connAckPacket, destination);
}
var disconnectPacket = packet as MqttDisconnectPacket;
if (disconnectPacket != null)
{
- await SerializeAsync(disconnectPacket, destination);
- return;
+ return SerializeAsync(disconnectPacket, destination);
}
var pingReqPacket = packet as MqttPingReqPacket;
if (pingReqPacket != null)
{
- await SerializeAsync(pingReqPacket, destination);
- return;
+ return SerializeAsync(pingReqPacket, destination);
}
var pingRespPacket = packet as MqttPingRespPacket;
if (pingRespPacket != null)
{
- await SerializeAsync(pingRespPacket, destination);
- return;
+ return SerializeAsync(pingRespPacket, destination);
}
var publishPacket = packet as MqttPublishPacket;
if (publishPacket != null)
{
- await SerializeAsync(publishPacket, destination);
- return;
+ return SerializeAsync(publishPacket, destination);
}
var pubAckPacket = packet as MqttPubAckPacket;
if (pubAckPacket != null)
{
- await SerializeAsync(pubAckPacket, destination);
- return;
+ return SerializeAsync(pubAckPacket, destination);
}
var pubRecPacket = packet as MqttPubRecPacket;
if (pubRecPacket != null)
{
- await SerializeAsync(pubRecPacket, destination);
- return;
+ return SerializeAsync(pubRecPacket, destination);
}
var pubRelPacket = packet as MqttPubRelPacket;
if (pubRelPacket != null)
{
- await SerializeAsync(pubRelPacket, destination);
- return;
+ return SerializeAsync(pubRelPacket, destination);
}
var pubCompPacket = packet as MqttPubCompPacket;
if (pubCompPacket != null)
{
- await SerializeAsync(pubCompPacket, destination);
- return;
+ return SerializeAsync(pubCompPacket, destination);
}
var subscribePacket = packet as MqttSubscribePacket;
if (subscribePacket != null)
{
- await SerializeAsync(subscribePacket, destination);
- return;
+ return SerializeAsync(subscribePacket, destination);
}
var subAckPacket = packet as MqttSubAckPacket;
if (subAckPacket != null)
{
- await SerializeAsync(subAckPacket, destination);
- return;
+ return SerializeAsync(subAckPacket, destination);
}
var unsubscribePacket = packet as MqttUnsubscribePacket;
if (unsubscribePacket != null)
{
- await SerializeAsync(unsubscribePacket, destination);
- return;
+ return SerializeAsync(unsubscribePacket, destination);
}
var unsubAckPacket = packet as MqttUnsubAckPacket;
if (unsubAckPacket != null)
{
- await SerializeAsync(unsubAckPacket, destination);
- return;
+ return SerializeAsync(unsubAckPacket, destination);
}
throw new MqttProtocolViolationException("Packet type invalid.");
@@ -287,6 +273,7 @@ private async Task DeserializeConnectAsync(MqttPacketReader read
await reader.ReadRemainingDataByteAsync();
await reader.ReadRemainingDataByteAsync();
+
var protocolName = await reader.ReadRemainingDataAsync(4);
if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT")
@@ -382,19 +369,17 @@ private void ValidatePublishPacket(MqttPublishPacket packet)
}
}
- private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
+ private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT");
+
+ private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{
ValidateConnectPacket(packet);
using (var output = new MqttPacketWriter())
{
// Write variable header
- output.Write(0x00); // 3.1.2.1 Protocol Name
- output.Write(0x04); // ""
- output.Write('M');
- output.Write('Q');
- output.Write('T');
- output.Write('T');
+ output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
+ output.Write(MqttPrefix);
output.Write(0x04); // 3.1.2.2 Protocol Level
var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
@@ -404,7 +389,7 @@ private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationCh
if (packet.WillMessage != null)
{
- connectFlags.Write((byte)packet.WillMessage.QualityOfServiceLevel, 2);
+ connectFlags.Write((int)packet.WillMessage.QualityOfServiceLevel, 2);
connectFlags.Write(packet.WillMessage.Retain);
}
else
@@ -412,7 +397,7 @@ private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationCh
connectFlags.Write(0, 2);
connectFlags.Write(false);
}
-
+
connectFlags.Write(packet.Password != null);
connectFlags.Write(packet.Username != null);
@@ -437,11 +422,11 @@ private async Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationCh
}
output.InjectFixedHeader(MqttControlPacketType.Connect);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -452,26 +437,26 @@ private async Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationCh
output.Write((byte)packet.ConnectReturnCode);
output.InjectFixedHeader(MqttControlPacketType.ConnAck);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
{
- await SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
+ return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
}
- private async Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
{
- await SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
+ return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
}
- private async Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
{
- await SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
+ return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
}
- private async Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
{
ValidatePublishPacket(packet);
@@ -502,29 +487,29 @@ private async Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationCh
fixedHeader.Write(packet.Dup);
output.InjectFixedHeader(MqttControlPacketType.Publish, fixedHeader.Value);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
output.InjectFixedHeader(MqttControlPacketType.PubAck);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
output.InjectFixedHeader(MqttControlPacketType.PubRec);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
@@ -539,24 +524,24 @@ private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationCha
}
}
- private async Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
output.InjectFixedHeader(MqttControlPacketType.PubComp);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
- if (packet.TopicFilters?.Any() == true)
+ if (packet.TopicFilters?.Count > 0)
{
foreach (var topicFilter in packet.TopicFilters)
{
@@ -566,11 +551,11 @@ private async Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunication
}
output.InjectFixedHeader(MqttControlPacketType.Subscribe, 0x02);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -585,11 +570,11 @@ private async Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationCha
}
output.InjectFixedHeader(MqttControlPacketType.SubAck);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -604,27 +589,27 @@ private async Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicati
}
output.InjectFixedHeader(MqttControlPacketType.Unsubscibe, 0x02);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
+ private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.Write(packet.PacketIdentifier);
output.InjectFixedHeader(MqttControlPacketType.UnsubAck);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
- private async Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
+ private Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
output.InjectFixedHeader(type);
- await output.WriteToAsync(destination);
+ return output.WriteToAsync(destination);
}
}
}
diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs
index 11d4b67a1..bf38bd0e4 100644
--- a/MQTTnet.Core/Serializer/MqttPacketReader.cs
+++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs
@@ -107,15 +107,16 @@ public async Task ReadRemainingDataWithLengthPrefixAsync()
return await ReadRemainingDataAsync(length);
}
- public async Task ReadRemainingDataAsync()
+ public Task ReadRemainingDataAsync()
{
- return await ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position);
+ return ReadRemainingDataAsync(RemainingLength - (int)_remainingData.Position);
}
public async Task ReadRemainingDataAsync(int length)
{
var buffer = new byte[length];
await _remainingData.ReadAsync(buffer, 0, buffer.Length);
+
return buffer;
}
diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs
index ced0c57c3..eb9bfcf07 100644
--- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs
+++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs
@@ -9,7 +9,7 @@ namespace MQTTnet.Core.Serializer
{
public sealed class MqttPacketWriter : IDisposable
{
- private readonly MemoryStream _buffer = new MemoryStream();
+ private readonly MemoryStream _buffer = new MemoryStream(512);
public void InjectFixedHeader(byte fixedHeader)
{
@@ -20,31 +20,28 @@ public void InjectFixedHeader(byte fixedHeader)
return;
}
+ var backupBuffer = _buffer.ToArray();
var remainingLength = (int)_buffer.Length;
- using (var buffer = new MemoryStream())
- {
- _buffer.WriteTo(buffer);
- _buffer.SetLength(0);
- _buffer.WriteByte(fixedHeader);
+ _buffer.SetLength(0);
+
+ _buffer.WriteByte(fixedHeader);
- // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
- var x = remainingLength;
- do
+ // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
+ var x = remainingLength;
+ do
+ {
+ var encodedByte = x % 128;
+ x = x / 128;
+ if (x > 0)
{
- var encodedByte = (byte)(x % 128);
- x = x / 128;
- if (x > 0)
- {
- encodedByte = (byte)(encodedByte | 128);
- }
-
- _buffer.WriteByte(encodedByte);
- } while (x > 0);
-
- buffer.Position = 0;
- buffer.WriteTo(_buffer);
- }
+ encodedByte = encodedByte | 128;
+ }
+
+ _buffer.WriteByte((byte)encodedByte);
+ } while (x > 0);
+
+ _buffer.Write(backupBuffer, 0, backupBuffer.Length);
}
public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0)
@@ -59,11 +56,6 @@ public void Write(byte value)
_buffer.WriteByte(value);
}
- public void Write(char value)
- {
- _buffer.WriteByte((byte)value);
- }
-
public void Write(ushort value)
{
var buffer = BitConverter.GetBytes(value);
@@ -73,11 +65,15 @@ public void Write(ushort value)
public void Write(ByteWriter value)
{
+ if (value == null) throw new ArgumentNullException(nameof(value));
+
_buffer.WriteByte(value.Value);
}
- public void Write(byte[] value)
+ public void Write(params byte[] value)
{
+ if (value == null) throw new ArgumentNullException(nameof(value));
+
_buffer.Write(value, 0, value.Length);
}
@@ -94,14 +90,16 @@ public void WriteWithLengthPrefix(byte[] value)
Write(value);
}
- public void Dispose()
+ public Task WriteToAsync(IMqttCommunicationChannel destination)
{
- _buffer?.Dispose();
+ if (destination == null) throw new ArgumentNullException(nameof(destination));
+
+ return destination.WriteAsync(_buffer.ToArray());
}
- public async Task WriteToAsync(IMqttCommunicationChannel destination)
+ public void Dispose()
{
- await destination.WriteAsync(_buffer.ToArray());
+ _buffer?.Dispose();
}
}
}
diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs
index 98ebc4212..0af6e4635 100644
--- a/MQTTnet.Core/Server/MqttClientSession.cs
+++ b/MQTTnet.Core/Server/MqttClientSession.cs
@@ -74,7 +74,7 @@ public async Task RunAsync(string identifier, MqttApplicationMessage willApplica
}
}
- public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
+ public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{
if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
diff --git a/MQTTnet.Core/Server/MqttClientSessionManager.cs b/MQTTnet.Core/Server/MqttClientSessionManager.cs
index 7a215df72..5f6f63e46 100644
--- a/MQTTnet.Core/Server/MqttClientSessionManager.cs
+++ b/MQTTnet.Core/Server/MqttClientSessionManager.cs
@@ -86,7 +86,7 @@ private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPu
{
foreach (var clientSession in _clientSessions.Values.ToList())
{
- clientSession.DispatchPublishPacket(senderClientSession, publishPacket);
+ clientSession.EnqueuePublishPacket(senderClientSession, publishPacket);
}
}
}
diff --git a/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs b/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs
index a87dfcb99..8e7c67200 100644
--- a/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs
+++ b/MQTTnet.Core/Server/MqttOutgoingPublicationsManager.cs
@@ -13,8 +13,8 @@ namespace MQTTnet.Core.Server
{
public class MqttOutgoingPublicationsManager
{
- private readonly AutoResetEvent _resetEvent = new AutoResetEvent(false);
private readonly List _pendingPublishPackets = new List();
+ private readonly AsyncGate _gate = new AsyncGate();
private readonly MqttServerOptions _options;
private CancellationTokenSource _cancellationTokenSource;
@@ -35,11 +35,12 @@ public void Start(IMqttCommunicationAdapter adapter)
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource();
- Task.Run(async () => await SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget();
+ Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget();
}
public void Stop()
{
+ _adapter = null;
_cancellationTokenSource?.Cancel();
_cancellationTokenSource = null;
}
@@ -52,7 +53,7 @@ public void Enqueue(MqttClientSession senderClientSession, MqttPublishPacket pub
lock (_pendingPublishPackets)
{
_pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket));
- _resetEvent.Set();
+ _gate.Set();
}
}
@@ -62,12 +63,17 @@ private async Task SendPendingPublishPacketsAsync(CancellationToken cancellation
{
try
{
- _resetEvent.WaitOne();
+ await _gate.WaitOneAsync();
if (cancellationToken.IsCancellationRequested)
{
return;
}
+ if (_adapter == null)
+ {
+ continue;
+ }
+
List pendingPublishPackets;
lock (_pendingPublishPackets)
{
diff --git a/MQTTnet.nuspec b/MQTTnet.nuspec
index b7d8b65a4..4eaa76b16 100644
--- a/MQTTnet.nuspec
+++ b/MQTTnet.nuspec
@@ -2,7 +2,7 @@
MQTTnet
- 2.1.0.11
+ 2.1.1.0
Christian Kratky
Christian Kratky
https://github.com/chkr1011/MQTTnet/blob/master/LICENSE
diff --git a/README.md b/README.md
index d0bad7afd..bd469a9a5 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,12 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien
* 3.1.1
+## Nuget
+This library is available as a nuget package: https://www.nuget.org/packages/MQTTnet/
+
+## Contributions
+If you want to contribute to this project just create a pull request.
+
# MqttClient
## Example