diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec
index 7825ae6d3..368787941 100644
--- a/Build/MQTTnet.nuspec
+++ b/Build/MQTTnet.nuspec
@@ -10,7 +10,11 @@
https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png
false
MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).
- *
+ * [MqttServer] Added support for publishing application messages
+* [Core] Fixed QoS level 2 handling
+* [Core] Performance optimizations
+* [MqttClient/MqttServer] Errors while handline application messages are now catched and traced
+* [MqttClient/MqttServer] Added interfaces
Copyright Christian Kratky 2016-2017
MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware
diff --git a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
index 7b8423935..ac12f97a5 100644
--- a/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
@@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttClientFactory
{
- public MqttClient CreateMqttClient(MqttClientOptions options)
+ public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
diff --git a/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs
index eb7441c30..00da03ae9 100644
--- a/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs
+++ b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs
@@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttServerFactory
{
- public MqttServer CreateMqttServer(MqttServerOptions options)
+ public IMqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
diff --git a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
index ac6f611a9..60735830c 100644
--- a/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
@@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttClientFactory
{
- public MqttClient CreateMqttClient(MqttClientOptions options)
+ public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
diff --git a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs
index eb7441c30..00da03ae9 100644
--- a/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs
+++ b/Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs
@@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttServerFactory
{
- public MqttServer CreateMqttServer(MqttServerOptions options)
+ public IMqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
index 7b8423935..ac12f97a5 100644
--- a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
+++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
@@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttClientFactory
{
- public MqttClient CreateMqttClient(MqttClientOptions options)
+ public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs
index eb7441c30..00da03ae9 100644
--- a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs
+++ b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs
@@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttServerFactory
{
- public MqttServer CreateMqttServer(MqttServerOptions options)
+ public IMqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
diff --git a/MQTTnet.Core/Client/IMqttClient.cs b/MQTTnet.Core/Client/IMqttClient.cs
new file mode 100644
index 000000000..0170adfcd
--- /dev/null
+++ b/MQTTnet.Core/Client/IMqttClient.cs
@@ -0,0 +1,24 @@
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using MQTTnet.Core.Packets;
+
+namespace MQTTnet.Core.Client
+{
+ public interface IMqttClient
+ {
+ bool IsConnected { get; }
+
+ event EventHandler ApplicationMessageReceived;
+ event EventHandler Connected;
+ event EventHandler Disconnected;
+
+ Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null);
+ Task DisconnectAsync();
+ Task PublishAsync(MqttApplicationMessage applicationMessage);
+ Task> SubscribeAsync(IList topicFilters);
+ Task> SubscribeAsync(params TopicFilter[] topicFilters);
+ Task Unsubscribe(IList topicFilters);
+ Task Unsubscribe(params string[] topicFilters);
+ }
+}
\ No newline at end of file
diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs
index df3c61fac..384de1faa 100644
--- a/MQTTnet.Core/Client/MqttClient.cs
+++ b/MQTTnet.Core/Client/MqttClient.cs
@@ -1,5 +1,4 @@
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@@ -13,10 +12,9 @@
namespace MQTTnet.Core.Client
{
- public class MqttClient
+ public class MqttClient : IMqttClient
{
- private readonly ConcurrentDictionary _pendingExactlyOncePublishPackets = new ConcurrentDictionary();
- private readonly HashSet _processedPublishPackets = new HashSet();
+ private readonly HashSet _unacknowledgedPublishPackets = new HashSet();
private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly MqttClientOptions _options;
@@ -63,7 +61,6 @@ public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = n
_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
- _processedPublishPackets.Clear();
_packetDispatcher.Reset();
IsConnected = true;
@@ -105,6 +102,7 @@ public async Task> SubscribeAsync(IList
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");
+
ThrowIfNotConnected();
var subscribePacket = new MqttSubscribePacket
@@ -154,6 +152,7 @@ public async Task PublishAsync(MqttApplicationMessage applicationMessage)
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
+ // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync(publishPacket);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
@@ -164,8 +163,8 @@ public async Task PublishAsync(MqttApplicationMessage applicationMessage)
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
- await SendAndReceiveAsync(publishPacket);
- await SendAsync(publishPacket.CreateResponse());
+ var pubRecPacket = await SendAndReceiveAsync(publishPacket);
+ await SendAndReceiveAsync(pubRecPacket.CreateResponse());
}
}
@@ -208,14 +207,12 @@ private Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)
return DisconnectAsync();
}
- var publishPacket = mqttPacket as MqttPublishPacket;
- if (publishPacket != null)
+ if (mqttPacket is MqttPublishPacket publishPacket)
{
return ProcessReceivedPublishPacket(publishPacket);
}
- var pubRelPacket = mqttPacket as MqttPubRelPacket;
- if (pubRelPacket != null)
+ if (mqttPacket is MqttPubRelPacket pubRelPacket)
{
return ProcessReceivedPubRelPacket(pubRelPacket);
}
@@ -232,13 +229,16 @@ private Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)
private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
{
- if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce)
+ var applicationMessage = publishPacket.ToApplicationMessage();
+
+ try
{
- _processedPublishPackets.Add(publishPacket.PacketIdentifier);
+ ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage));
+ }
+ catch (Exception exception)
+ {
+ MqttTrace.Error(nameof(MqttClient), exception, "Unhandled exception while handling application message.");
}
-
- var applicationMessage = publishPacket.ToApplicationMessage();
- ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage));
}
private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
@@ -257,7 +257,13 @@ private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
- _pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket;
+ // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
+ lock (_unacknowledgedPublishPackets)
+ {
+ _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
+ }
+
+ FireApplicationMessageReceivedEvent(publishPacket);
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}
@@ -266,15 +272,12 @@ private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
{
- MqttPublishPacket originalPublishPacket;
- if (!_pendingExactlyOncePublishPackets.TryRemove(pubRelPacket.PacketIdentifier, out originalPublishPacket))
+ lock (_unacknowledgedPublishPackets)
{
- throw new MqttCommunicationException();
+ _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
}
-
- await SendAsync(originalPublishPacket.CreateResponse());
-
- FireApplicationMessageReceivedEvent(originalPublishPacket);
+
+ await SendAsync(pubRelPacket.CreateResponse());
}
private Task SendAsync(MqttBasePacket packet)
@@ -292,8 +295,8 @@ bool ResponsePacketSelector(MqttBasePacket p)
return false;
}
- var pi1 = requestPacket as IPacketWithIdentifier;
- var pi2 = p as IPacketWithIdentifier;
+ var pi1 = requestPacket as IMqttPacketWithIdentifier;
+ var pi2 = p as IMqttPacketWithIdentifier;
if (pi1 != null && pi2 != null)
{
diff --git a/MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs b/MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs
similarity index 67%
rename from MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs
rename to MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs
index 128f3f56d..420955cdd 100644
--- a/MQTTnet.Core/Packets/IPacketWithPacketIdentifier.cs
+++ b/MQTTnet.Core/Packets/IMqttPacketWithIdentifier.cs
@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
- public interface IPacketWithIdentifier
+ public interface IMqttPacketWithIdentifier
{
ushort PacketIdentifier { get; set; }
}
diff --git a/MQTTnet.Core/Packets/MqttBasePacket.cs b/MQTTnet.Core/Packets/MqttBasePacket.cs
index 2a167cb38..41901e524 100644
--- a/MQTTnet.Core/Packets/MqttBasePacket.cs
+++ b/MQTTnet.Core/Packets/MqttBasePacket.cs
@@ -1,25 +1,6 @@
-using System;
-
-namespace MQTTnet.Core.Packets
+namespace MQTTnet.Core.Packets
{
public abstract class MqttBasePacket
{
- public TResponsePacket CreateResponse()
- {
- var responsePacket = Activator.CreateInstance();
- var responsePacketWithIdentifier = responsePacket as IPacketWithIdentifier;
- if (responsePacketWithIdentifier != null)
- {
- var requestPacketWithIdentifier = this as IPacketWithIdentifier;
- if (requestPacketWithIdentifier == null)
- {
- throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not.");
- }
-
- responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier;
- }
-
- return responsePacket;
- }
}
}
diff --git a/MQTTnet.Core/Packets/MqttBasePublishPacket.cs b/MQTTnet.Core/Packets/MqttBasePublishPacket.cs
index ff57003c2..67d55b90f 100644
--- a/MQTTnet.Core/Packets/MqttBasePublishPacket.cs
+++ b/MQTTnet.Core/Packets/MqttBasePublishPacket.cs
@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
- public class MqttBasePublishPacket : MqttBasePacket, IPacketWithIdentifier
+ public class MqttBasePublishPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
}
diff --git a/MQTTnet.Core/Packets/MqttPacketExtensions.cs b/MQTTnet.Core/Packets/MqttPacketExtensions.cs
new file mode 100644
index 000000000..fef105430
--- /dev/null
+++ b/MQTTnet.Core/Packets/MqttPacketExtensions.cs
@@ -0,0 +1,27 @@
+using System;
+
+namespace MQTTnet.Core.Packets
+{
+ public static class MqttPacketExtensions
+ {
+ public static TResponsePacket CreateResponse(this MqttBasePacket packet)
+ {
+ if (packet == null) throw new ArgumentNullException(nameof(packet));
+
+ var responsePacket = Activator.CreateInstance();
+
+ if (responsePacket is IMqttPacketWithIdentifier responsePacketWithIdentifier)
+ {
+ var requestPacketWithIdentifier = packet as IMqttPacketWithIdentifier;
+ if (requestPacketWithIdentifier == null)
+ {
+ throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not.");
+ }
+
+ responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier;
+ }
+
+ return responsePacket;
+ }
+ }
+}
diff --git a/MQTTnet.Core/Packets/MqttSubAckPacket.cs b/MQTTnet.Core/Packets/MqttSubAckPacket.cs
index 3a7265f47..f63d0ca2e 100644
--- a/MQTTnet.Core/Packets/MqttSubAckPacket.cs
+++ b/MQTTnet.Core/Packets/MqttSubAckPacket.cs
@@ -4,7 +4,7 @@
namespace MQTTnet.Core.Packets
{
- public sealed class MqttSubAckPacket : MqttBasePacket, IPacketWithIdentifier
+ public sealed class MqttSubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
diff --git a/MQTTnet.Core/Packets/MqttSubscribePacket.cs b/MQTTnet.Core/Packets/MqttSubscribePacket.cs
index a0949e8b8..007bde7bd 100644
--- a/MQTTnet.Core/Packets/MqttSubscribePacket.cs
+++ b/MQTTnet.Core/Packets/MqttSubscribePacket.cs
@@ -3,7 +3,7 @@
namespace MQTTnet.Core.Packets
{
- public sealed class MqttSubscribePacket : MqttBasePacket, IPacketWithIdentifier
+ public sealed class MqttSubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
diff --git a/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs b/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs
index 5f330955d..57a5a7dd8 100644
--- a/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs
+++ b/MQTTnet.Core/Packets/MqttUnsubAckPacket.cs
@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
- public sealed class MqttUnsubAckPacket : MqttBasePacket, IPacketWithIdentifier
+ public sealed class MqttUnsubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
}
diff --git a/MQTTnet.Core/Packets/MqttUnsubscribe.cs b/MQTTnet.Core/Packets/MqttUnsubscribe.cs
index 3e3f36f88..b6cfab63c 100644
--- a/MQTTnet.Core/Packets/MqttUnsubscribe.cs
+++ b/MQTTnet.Core/Packets/MqttUnsubscribe.cs
@@ -2,7 +2,7 @@
namespace MQTTnet.Core.Packets
{
- public sealed class MqttUnsubscribePacket : MqttBasePacket, IPacketWithIdentifier
+ public sealed class MqttUnsubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
diff --git a/MQTTnet.Core/Serializer/ByteReader.cs b/MQTTnet.Core/Serializer/ByteReader.cs
index 935c777a9..ce7f709d5 100644
--- a/MQTTnet.Core/Serializer/ByteReader.cs
+++ b/MQTTnet.Core/Serializer/ByteReader.cs
@@ -24,7 +24,7 @@ public bool Read()
return result;
}
- public byte Read(int count)
+ public int Read(int count)
{
if (_index + count > 8)
{
@@ -42,7 +42,7 @@ public byte Read(int count)
_index++;
}
- return (byte)result;
+ return result;
}
}
}
diff --git a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
index ffac41b91..4100b6cb5 100644
--- a/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
+++ b/MQTTnet.Core/Serializer/DefaultMqttV311PacketSerializer.cs
@@ -16,86 +16,72 @@ public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel dest
if (packet == null) throw new ArgumentNullException(nameof(packet));
if (destination == null) throw new ArgumentNullException(nameof(destination));
- var connectPacket = packet as MqttConnectPacket;
- if (connectPacket != null)
+ if (packet is MqttConnectPacket connectPacket)
{
return SerializeAsync(connectPacket, destination);
}
- var connAckPacket = packet as MqttConnAckPacket;
- if (connAckPacket != null)
+ if (packet is MqttConnAckPacket connAckPacket)
{
return SerializeAsync(connAckPacket, destination);
}
- var disconnectPacket = packet as MqttDisconnectPacket;
- if (disconnectPacket != null)
+ if (packet is MqttDisconnectPacket disconnectPacket)
{
return SerializeAsync(disconnectPacket, destination);
}
- var pingReqPacket = packet as MqttPingReqPacket;
- if (pingReqPacket != null)
+ if (packet is MqttPingReqPacket pingReqPacket)
{
return SerializeAsync(pingReqPacket, destination);
}
- var pingRespPacket = packet as MqttPingRespPacket;
- if (pingRespPacket != null)
+ if (packet is MqttPingRespPacket pingRespPacket)
{
return SerializeAsync(pingRespPacket, destination);
}
- var publishPacket = packet as MqttPublishPacket;
- if (publishPacket != null)
+ if (packet is MqttPublishPacket publishPacket)
{
return SerializeAsync(publishPacket, destination);
}
- var pubAckPacket = packet as MqttPubAckPacket;
- if (pubAckPacket != null)
+ if (packet is MqttPubAckPacket pubAckPacket)
{
return SerializeAsync(pubAckPacket, destination);
}
- var pubRecPacket = packet as MqttPubRecPacket;
- if (pubRecPacket != null)
+ if (packet is MqttPubRecPacket pubRecPacket)
{
return SerializeAsync(pubRecPacket, destination);
}
- var pubRelPacket = packet as MqttPubRelPacket;
- if (pubRelPacket != null)
+ if (packet is MqttPubRelPacket pubRelPacket)
{
return SerializeAsync(pubRelPacket, destination);
}
- var pubCompPacket = packet as MqttPubCompPacket;
- if (pubCompPacket != null)
+ if (packet is MqttPubCompPacket pubCompPacket)
{
return SerializeAsync(pubCompPacket, destination);
}
- var subscribePacket = packet as MqttSubscribePacket;
- if (subscribePacket != null)
+ if (packet is MqttSubscribePacket subscribePacket)
{
return SerializeAsync(subscribePacket, destination);
}
- var subAckPacket = packet as MqttSubAckPacket;
- if (subAckPacket != null)
+ if (packet is MqttSubAckPacket subAckPacket)
{
return SerializeAsync(subAckPacket, destination);
}
- var unsubscribePacket = packet as MqttUnsubscribePacket;
- if (unsubscribePacket != null)
+ if (packet is MqttUnsubscribePacket unsubscribePacket)
{
return SerializeAsync(unsubscribePacket, destination);
}
- var unsubAckPacket = packet as MqttUnsubAckPacket;
- if (unsubAckPacket != null)
+ if (packet is MqttUnsubAckPacket unsubAckPacket)
{
return SerializeAsync(unsubAckPacket, destination);
}
@@ -206,7 +192,7 @@ public async Task DeserializeAsync(IMqttCommunicationChannel sou
}
}
- private async Task DeserializeUnsubscribeAsync(MqttPacketReader reader)
+ private static async Task DeserializeUnsubscribeAsync(MqttPacketReader reader)
{
var packet = new MqttUnsubscribePacket
{
@@ -221,7 +207,7 @@ private async Task DeserializeUnsubscribeAsync(MqttPacketReader
return packet;
}
- private async Task DeserializeSubscribeAsync(MqttPacketReader reader)
+ private static async Task DeserializeSubscribeAsync(MqttPacketReader reader)
{
var packet = new MqttSubscribePacket
{
@@ -238,7 +224,7 @@ await reader.ReadRemainingDataStringWithLengthPrefixAsync(),
return packet;
}
- private async Task DeserializePublishAsync(MqttPacketReader reader)
+ private static async Task DeserializePublishAsync(MqttPacketReader reader)
{
var fixedHeader = new ByteReader(reader.FixedHeader);
var retain = fixedHeader.Read();
@@ -266,13 +252,10 @@ private async Task DeserializePublishAsync(MqttPacketReader read
return packet;
}
- private async Task DeserializeConnectAsync(MqttPacketReader reader)
+ private static async Task DeserializeConnectAsync(MqttPacketReader reader)
{
- var packet = new MqttConnectPacket();
-
- await reader.ReadRemainingDataByteAsync();
- await reader.ReadRemainingDataByteAsync();
-
+ await reader.ReadRemainingDataAsync(2); // Skip 2 bytes
+
var protocolName = await reader.ReadRemainingDataAsync(4);
if (Encoding.UTF8.GetString(protocolName, 0, protocolName.Length) != "MQTT")
@@ -285,7 +268,12 @@ private async Task DeserializeConnectAsync(MqttPacketReader read
var connectFlagsReader = new ByteReader(connectFlags);
connectFlagsReader.Read(); // Reserved.
- packet.CleanSession = connectFlagsReader.Read();
+
+ var packet = new MqttConnectPacket
+ {
+ CleanSession = connectFlagsReader.Read()
+ };
+
var willFlag = connectFlagsReader.Read();
var willQoS = connectFlagsReader.Read(2);
var willRetain = connectFlagsReader.Read();
@@ -318,7 +306,7 @@ await reader.ReadRemainingDataWithLengthPrefixAsync(),
return packet;
}
- private async Task DeserializeSubAck(MqttPacketReader reader)
+ private static async Task DeserializeSubAck(MqttPacketReader reader)
{
var packet = new MqttSubAckPacket
{
@@ -333,7 +321,7 @@ private async Task DeserializeSubAck(MqttPacketReader reader)
return packet;
}
- private async Task DeserializeConnAck(MqttPacketReader reader)
+ private static async Task DeserializeConnAck(MqttPacketReader reader)
{
var variableHeader1 = await reader.ReadRemainingDataByteAsync();
var variableHeader2 = await reader.ReadRemainingDataByteAsync();
@@ -347,7 +335,7 @@ private async Task DeserializeConnAck(MqttPacketReader reader)
return packet;
}
- private void ValidateConnectPacket(MqttConnectPacket packet)
+ private static void ValidateConnectPacket(MqttConnectPacket packet)
{
if (string.IsNullOrEmpty(packet.ClientId) && !packet.CleanSession)
{
@@ -355,7 +343,7 @@ private void ValidateConnectPacket(MqttConnectPacket packet)
}
}
- private void ValidatePublishPacket(MqttPublishPacket packet)
+ private static void ValidatePublishPacket(MqttPublishPacket packet)
{
if (packet.QualityOfServiceLevel == 0 && packet.Dup)
{
@@ -365,7 +353,7 @@ private void ValidatePublishPacket(MqttPublishPacket packet)
private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT");
- private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{
ValidateConnectPacket(packet);
@@ -420,7 +408,7 @@ private Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel
}
}
- private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -435,22 +423,22 @@ private Task SerializeAsync(MqttConnAckPacket packet, IMqttCommunicationChannel
}
}
- private Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttDisconnectPacket packet, IMqttCommunicationChannel destination)
{
return SerializeEmptyPacketAsync(MqttControlPacketType.Disconnect, destination);
}
- private Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttPingReqPacket packet, IMqttCommunicationChannel destination)
{
return SerializeEmptyPacketAsync(MqttControlPacketType.PingReq, destination);
}
- private Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttPingRespPacket packet, IMqttCommunicationChannel destination)
{
return SerializeEmptyPacketAsync(MqttControlPacketType.PingResp, destination);
}
- private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel destination)
{
ValidatePublishPacket(packet);
@@ -485,7 +473,7 @@ private Task SerializeAsync(MqttPublishPacket packet, IMqttCommunicationChannel
}
}
- private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -496,7 +484,7 @@ private Task SerializeAsync(MqttPubAckPacket packet, IMqttCommunicationChannel d
}
}
- private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -507,7 +495,7 @@ private Task SerializeAsync(MqttPubRecPacket packet, IMqttCommunicationChannel d
}
}
- private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
+ private static async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -518,7 +506,7 @@ private async Task SerializeAsync(MqttPubRelPacket packet, IMqttCommunicationCha
}
}
- private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -529,7 +517,7 @@ private Task SerializeAsync(MqttPubCompPacket packet, IMqttCommunicationChannel
}
}
- private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -549,7 +537,7 @@ private Task SerializeAsync(MqttSubscribePacket packet, IMqttCommunicationChanne
}
}
- private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -568,7 +556,7 @@ private Task SerializeAsync(MqttSubAckPacket packet, IMqttCommunicationChannel d
}
}
- private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -587,7 +575,7 @@ private Task SerializeAsync(MqttUnsubscribePacket packet, IMqttCommunicationChan
}
}
- private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
+ private static Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
@@ -598,7 +586,7 @@ private Task SerializeAsync(MqttUnsubAckPacket packet, IMqttCommunicationChannel
}
}
- private Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
+ private static Task SerializeEmptyPacketAsync(MqttControlPacketType type, IMqttCommunicationChannel destination)
{
using (var output = new MqttPacketWriter())
{
diff --git a/MQTTnet.Core/Serializer/MqttPacketWriter.cs b/MQTTnet.Core/Serializer/MqttPacketWriter.cs
index eb9bfcf07..4e2a3cb7f 100644
--- a/MQTTnet.Core/Serializer/MqttPacketWriter.cs
+++ b/MQTTnet.Core/Serializer/MqttPacketWriter.cs
@@ -11,39 +11,6 @@ public sealed class MqttPacketWriter : IDisposable
{
private readonly MemoryStream _buffer = new MemoryStream(512);
- public void InjectFixedHeader(byte fixedHeader)
- {
- if (_buffer.Length == 0)
- {
- Write(fixedHeader);
- Write(0);
- return;
- }
-
- var backupBuffer = _buffer.ToArray();
- var remainingLength = (int)_buffer.Length;
-
- _buffer.SetLength(0);
-
- _buffer.WriteByte(fixedHeader);
-
- // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
- var x = remainingLength;
- do
- {
- var encodedByte = x % 128;
- x = x / 128;
- if (x > 0)
- {
- encodedByte = encodedByte | 128;
- }
-
- _buffer.WriteByte((byte)encodedByte);
- } while (x > 0);
-
- _buffer.Write(backupBuffer, 0, backupBuffer.Length);
- }
-
public void InjectFixedHeader(MqttControlPacketType packetType, byte flags = 0)
{
var fixedHeader = (byte)((byte)packetType << 4);
@@ -101,5 +68,38 @@ public void Dispose()
{
_buffer?.Dispose();
}
+
+ private void InjectFixedHeader(byte fixedHeader)
+ {
+ if (_buffer.Length == 0)
+ {
+ Write(fixedHeader);
+ Write(0);
+ return;
+ }
+
+ var backupBuffer = _buffer.ToArray();
+ var remainingLength = (int)_buffer.Length;
+
+ _buffer.SetLength(0);
+
+ _buffer.WriteByte(fixedHeader);
+
+ // Alorithm taken from http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html.
+ var x = remainingLength;
+ do
+ {
+ var encodedByte = x % 128;
+ x = x / 128;
+ if (x > 0)
+ {
+ encodedByte = encodedByte | 128;
+ }
+
+ _buffer.WriteByte((byte)encodedByte);
+ } while (x > 0);
+
+ _buffer.Write(backupBuffer, 0, backupBuffer.Length);
+ }
}
}
diff --git a/MQTTnet.Core/Server/IMqttServer.cs b/MQTTnet.Core/Server/IMqttServer.cs
new file mode 100644
index 000000000..0b91c264a
--- /dev/null
+++ b/MQTTnet.Core/Server/IMqttServer.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using MQTTnet.Core.Adapter;
+
+namespace MQTTnet.Core.Server
+{
+ public interface IMqttServer
+ {
+ event EventHandler ApplicationMessageReceived;
+ event EventHandler ClientConnected;
+
+ IList GetConnectedClients();
+ void InjectClient(string identifier, IMqttCommunicationAdapter adapter);
+ void Publish(MqttApplicationMessage applicationMessage);
+ void Start();
+ void Stop();
+ }
+}
\ No newline at end of file
diff --git a/MQTTnet.Core/Server/MqttClientMessageQueue.cs b/MQTTnet.Core/Server/MqttClientMessageQueue.cs
index 926e5817c..07c6ea59c 100644
--- a/MQTTnet.Core/Server/MqttClientMessageQueue.cs
+++ b/MQTTnet.Core/Server/MqttClientMessageQueue.cs
@@ -35,7 +35,7 @@ public void Start(IMqttCommunicationAdapter adapter)
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource();
- Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token));
+ Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
}
public void Stop()
@@ -45,14 +45,13 @@ public void Stop()
_cancellationTokenSource = null;
}
- public void Enqueue(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
+ public void Enqueue(MqttPublishPacket publishPacket)
{
- if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
lock (_pendingPublishPackets)
{
- _pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket));
+ _pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket));
_gate.Set();
}
}
diff --git a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs b/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs
index 8847c7086..98553906a 100644
--- a/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs
+++ b/MQTTnet.Core/Server/MqttClientPublishPacketContext.cs
@@ -5,14 +5,11 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttClientPublishPacketContext
{
- public MqttClientPublishPacketContext(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
+ public MqttClientPublishPacketContext(MqttPublishPacket publishPacket)
{
- SenderClientSession = senderClientSession ?? throw new ArgumentNullException(nameof(senderClientSession));
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
}
- public MqttClientSession SenderClientSession { get; }
-
public MqttPublishPacket PublishPacket { get; }
public int SendTries { get; set; }
diff --git a/MQTTnet.Core/Server/MqttClientSession.cs b/MQTTnet.Core/Server/MqttClientSession.cs
index b89e73e42..a3919f9e9 100644
--- a/MQTTnet.Core/Server/MqttClientSession.cs
+++ b/MQTTnet.Core/Server/MqttClientSession.cs
@@ -1,5 +1,5 @@
using System;
-using System.Collections.Concurrent;
+using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
@@ -13,13 +13,13 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttClientSession : IDisposable
{
- private readonly ConcurrentDictionary _pendingIncomingPublications = new ConcurrentDictionary();
+ private readonly HashSet _unacknowledgedPublishPackets = new HashSet();
private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
private readonly MqttClientMessageQueue _messageQueue;
private readonly Action _publishPacketReceivedCallback;
private readonly MqttServerOptions _options;
-
+
private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;
private string _identifier;
@@ -79,17 +79,16 @@ public async Task RunAsync(string identifier, MqttApplicationMessage willApplica
}
}
- public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
+ public void EnqueuePublishPacket(MqttPublishPacket publishPacket)
{
- if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
- if (!_subscriptionsManager.IsTopicSubscribed(publishPacket))
+ if (!_subscriptionsManager.IsSubscribed(publishPacket))
{
return;
}
- _messageQueue.Enqueue(senderClientSession, publishPacket);
+ _messageQueue.Enqueue(publishPacket);
MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet.");
}
@@ -101,34 +100,35 @@ public void Dispose()
private Task HandleIncomingPacketAsync(MqttBasePacket packet)
{
- var subscribePacket = packet as MqttSubscribePacket;
- if (subscribePacket != null)
+ if (packet is MqttSubscribePacket subscribePacket)
{
return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
}
- var unsubscribePacket = packet as MqttUnsubscribePacket;
- if (unsubscribePacket != null)
+ if (packet is MqttUnsubscribePacket unsubscribePacket)
{
return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
}
- var publishPacket = packet as MqttPublishPacket;
- if (publishPacket != null)
+ if (packet is MqttPublishPacket publishPacket)
{
return HandleIncomingPublishPacketAsync(publishPacket);
}
- var pubRelPacket = packet as MqttPubRelPacket;
- if (pubRelPacket != null)
+ if (packet is MqttPubRelPacket pubRelPacket)
{
return HandleIncomingPubRelPacketAsync(pubRelPacket);
}
- var pubAckPacket = packet as MqttPubAckPacket;
- if (pubAckPacket != null)
+ if (packet is MqttPubRecPacket pubRecPacket)
+ {
+ return _adapter.SendPacketAsync(pubRecPacket.CreateResponse(), _options.DefaultCommunicationTimeout);
+ }
+
+ if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
{
- return HandleIncomingPubAckPacketAsync(pubAckPacket);
+ // Discard message.
+ return Task.FromResult((object)null);
}
if (packet is MqttPingReqPacket)
@@ -148,12 +148,7 @@ private Task HandleIncomingPacketAsync(MqttBasePacket packet)
return Task.FromResult((object)null);
}
- private async Task HandleIncomingPubAckPacketAsync(MqttPubAckPacket pubAckPacket)
- {
- await Task.FromResult((object)null);
- }
-
- private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
+ private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
{
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
@@ -161,26 +156,33 @@ private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPac
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
- await _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
_publishPacketReceivedCallback(this, publishPacket);
+ return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
- _pendingIncomingPublications[publishPacket.PacketIdentifier] = publishPacket;
- await _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
+ // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
+ lock (_unacknowledgedPublishPackets)
+ {
+ _unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
+ }
+
+ _publishPacketReceivedCallback(this, publishPacket);
+
+ return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
}
+
+ throw new MqttCommunicationException("Received not supported QoS level.");
}
- private async Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
+ private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
{
- MqttPublishPacket publishPacket;
- if (!_pendingIncomingPublications.TryRemove(pubRelPacket.PacketIdentifier, out publishPacket))
+ lock (_unacknowledgedPublishPackets)
{
- return;
+ _unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
}
- await _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
- _publishPacketReceivedCallback(this, publishPacket);
+ return _adapter.SendPacketAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
}
}
}
diff --git a/MQTTnet.Core/Server/MqttClientSessionsManager.cs b/MQTTnet.Core/Server/MqttClientSessionsManager.cs
index ae4d8651a..f78be0ac3 100644
--- a/MQTTnet.Core/Server/MqttClientSessionsManager.cs
+++ b/MQTTnet.Core/Server/MqttClientSessionsManager.cs
@@ -22,7 +22,7 @@ public MqttClientSessionsManager(MqttServerOptions options)
_options = options ?? throw new ArgumentNullException(nameof(options));
}
- public event EventHandler ApplicationMessageReceived;
+ public event EventHandler ApplicationMessageReceived;
public async Task RunClientSessionAsync(MqttClientConnectedEventArgs eventArgs)
{
@@ -127,14 +127,24 @@ private GetOrCreateClientSessionResult GetOrCreateClientSession(MqttConnectPacke
}
}
- private void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
+ public void DispatchPublishPacket(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
{
- var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession.ClientId, publishPacket.ToApplicationMessage());
- ApplicationMessageReceived?.Invoke(this, eventArgs);
+ try
+ {
+ var eventArgs = new MqttApplicationMessageReceivedEventArgs(senderClientSession?.ClientId, publishPacket.ToApplicationMessage());
+ ApplicationMessageReceived?.Invoke(this, eventArgs);
+ }
+ catch (Exception exception)
+ {
+ MqttTrace.Error(nameof(MqttClientSessionsManager), exception, "Error while processing application message");
+ }
- foreach (var clientSession in _clientSessions.Values.ToList())
+ lock (_syncRoot)
{
- clientSession.EnqueuePublishPacket(senderClientSession, publishPacket);
+ foreach (var clientSession in _clientSessions.Values.ToList())
+ {
+ clientSession.EnqueuePublishPacket(publishPacket);
+ }
}
}
}
diff --git a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
index 8cb95634d..390f8f35f 100644
--- a/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
+++ b/MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
@@ -1,5 +1,5 @@
using System;
-using System.Collections.Concurrent;
+using System.Collections.Generic;
using MQTTnet.Core.Packets;
using MQTTnet.Core.Protocol;
@@ -7,17 +7,21 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttClientSubscriptionsManager
{
- private readonly ConcurrentDictionary _subscribedTopics = new ConcurrentDictionary();
+ private readonly Dictionary _subscribedTopics = new Dictionary();
public MqttSubAckPacket Subscribe(MqttSubscribePacket subscribePacket)
{
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));
var responsePacket = subscribePacket.CreateResponse();
- foreach (var topicFilter in subscribePacket.TopicFilters)
+
+ lock (_subscribedTopics)
{
- _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
- responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2.
+ foreach (var topicFilter in subscribePacket.TopicFilters)
+ {
+ _subscribedTopics[topicFilter.Topic] = topicFilter.QualityOfServiceLevel;
+ responsePacket.SubscribeReturnCodes.Add(MqttSubscribeReturnCode.SuccessMaximumQoS1); // TODO: Add support for QoS 2.
+ }
}
return responsePacket;
@@ -27,32 +31,37 @@ public MqttUnsubAckPacket Unsubscribe(MqttUnsubscribePacket unsubscribePacket)
{
if (unsubscribePacket == null) throw new ArgumentNullException(nameof(unsubscribePacket));
- foreach (var topicFilter in unsubscribePacket.TopicFilters)
+ lock (_subscribedTopics)
{
- MqttQualityOfServiceLevel _;
- _subscribedTopics.TryRemove(topicFilter, out _);
+ foreach (var topicFilter in unsubscribePacket.TopicFilters)
+ {
+ _subscribedTopics.Remove(topicFilter);
+ }
}
return unsubscribePacket.CreateResponse();
}
- public bool IsTopicSubscribed(MqttPublishPacket publishPacket)
+ public bool IsSubscribed(MqttPublishPacket publishPacket)
{
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));
- foreach (var subscribedTopic in _subscribedTopics)
+ lock (_subscribedTopics)
{
- if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key))
+ foreach (var subscribedTopic in _subscribedTopics)
{
- continue;
- }
+ if (publishPacket.QualityOfServiceLevel > subscribedTopic.Value)
+ {
+ continue;
+ }
- if (subscribedTopic.Value < publishPacket.QualityOfServiceLevel)
- {
- continue;
- }
+ if (!MqttTopicFilterComparer.IsMatch(publishPacket.Topic, subscribedTopic.Key))
+ {
+ continue;
+ }
- return true;
+ return true;
+ }
}
return false;
diff --git a/MQTTnet.Core/Server/MqttServer.cs b/MQTTnet.Core/Server/MqttServer.cs
index ca55d7a53..98543b6e3 100644
--- a/MQTTnet.Core/Server/MqttServer.cs
+++ b/MQTTnet.Core/Server/MqttServer.cs
@@ -4,10 +4,11 @@
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Diagnostics;
+using MQTTnet.Core.Internal;
namespace MQTTnet.Core.Server
{
- public sealed class MqttServer
+ public sealed class MqttServer : IMqttServer
{
private readonly MqttClientSessionsManager _clientSessionsManager;
private readonly ICollection _adapters;
@@ -33,6 +34,13 @@ public IList GetConnectedClients()
public event EventHandler ApplicationMessageReceived;
+ public void Publish(MqttApplicationMessage applicationMessage)
+ {
+ if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));
+
+ _clientSessionsManager.DispatchPublishPacket(null, applicationMessage.ToPublishPacket());
+ }
+
public void InjectClient(string identifier, IMqttCommunicationAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
diff --git a/MQTTnet.Core/Server/DefaultEndpointOptions.cs b/MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs
similarity index 71%
rename from MQTTnet.Core/Server/DefaultEndpointOptions.cs
rename to MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs
index 203ed2895..04c4d7325 100644
--- a/MQTTnet.Core/Server/DefaultEndpointOptions.cs
+++ b/MQTTnet.Core/Server/MqttServerDefaultEndpointOptions.cs
@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Server
{
- public sealed class DefaultEndpointOptions
+ public sealed class MqttServerDefaultEndpointOptions
{
public bool IsEnabled { get; set; } = true;
diff --git a/MQTTnet.Core/Server/MqttServerOptions.cs b/MQTTnet.Core/Server/MqttServerOptions.cs
index 18e284d9d..5edf50018 100644
--- a/MQTTnet.Core/Server/MqttServerOptions.cs
+++ b/MQTTnet.Core/Server/MqttServerOptions.cs
@@ -6,13 +6,13 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttServerOptions
{
- public DefaultEndpointOptions DefaultEndpointOptions { get; } = new DefaultEndpointOptions();
+ public MqttServerDefaultEndpointOptions DefaultEndpointOptions { get; } = new MqttServerDefaultEndpointOptions();
public MqttServerTlsEndpointOptions TlsEndpointOptions { get; } = new MqttServerTlsEndpointOptions();
public int ConnectionBacklog { get; set; } = 10;
- public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);
+ public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(15);
public Func ConnectionValidator { get; set; }
}
diff --git a/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs b/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs
deleted file mode 100644
index d790526c1..000000000
--- a/MQTTnet.Core/Server/MqttServerTlsEndpointOptionsExtensions.cs
+++ /dev/null
@@ -1,19 +0,0 @@
-using System;
-
-namespace MQTTnet.Core.Server
-{
- public static class MqttServerTlsEndpointOptionsExtensions
- {
- public static int GetPort(this DefaultEndpointOptions options)
- {
- if (options == null) throw new ArgumentNullException(nameof(options));
-
- if (!options.Port.HasValue)
- {
- return 1883;
- }
-
- return options.Port.Value;
- }
- }
-}
diff --git a/README.md b/README.md
index 379df3bbf..5df9d0f5b 100644
--- a/README.md
+++ b/README.md
@@ -15,10 +15,12 @@ MQTTnet is a .NET library for MQTT based communication. It provides a MQTT clien
* Rx support (via another project)
* List of connected clients available (server only)
* Extensible communication channels (i.e. In-Memory, TCP, TCP+SSL, WebSockets (not included in this project))
+* Server is able to publish its own messages (no loopback client required)
* Access to internal trace messages
* Extensible client credential validation (server only)
-* Unit tested (48+ tests)
+* Unit tested (50+ tests)
* Lightweight (only the low level implementation of MQTT, no overhead)
+* Interfaces included for mocking and testing
## Supported frameworks
* .NET Standard 1.3+
diff --git a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
index 1ca8f82af..8454c76be 100644
--- a/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttServerTests.cs
@@ -68,7 +68,64 @@ public async Task MqttServer_WillMessage()
Assert.AreEqual(1, receivedMessagesCount);
}
- private MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server)
+ [TestMethod]
+ public async Task MqttServer_Unsubscribe()
+ {
+ var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() });
+ s.Start();
+
+ var c1 = ConnectTestClient("c1", null, s);
+ var c2 = ConnectTestClient("c2", null, s);
+
+ var receivedMessagesCount = 0;
+ c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
+
+ var message = new MqttApplicationMessage("a", new byte[0], MqttQualityOfServiceLevel.AtLeastOnce, false);
+
+ await c2.PublishAsync(message);
+ Assert.AreEqual(0, receivedMessagesCount);
+
+ await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
+ await c2.PublishAsync(message);
+
+ await Task.Delay(500);
+ Assert.AreEqual(1, receivedMessagesCount);
+
+ await c1.Unsubscribe("a");
+ await c2.PublishAsync(message);
+
+ await Task.Delay(500);
+ Assert.AreEqual(1, receivedMessagesCount);
+
+ s.Stop();
+ await Task.Delay(500);
+
+ Assert.AreEqual(1, receivedMessagesCount);
+ }
+
+ [TestMethod]
+ public async Task MqttServer_Publish()
+ {
+ var s = new MqttServer(new MqttServerOptions(), new List { new TestMqttServerAdapter() });
+ s.Start();
+
+ var c1 = ConnectTestClient("c1", null, s);
+
+ var receivedMessagesCount = 0;
+ c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
+
+ var message = new MqttApplicationMessage("a", new byte[0], MqttQualityOfServiceLevel.AtLeastOnce, false);
+ await c1.SubscribeAsync(new TopicFilter("a", MqttQualityOfServiceLevel.AtLeastOnce));
+
+ s.Publish(message);
+ await Task.Delay(500);
+
+ s.Stop();
+
+ Assert.AreEqual(1, receivedMessagesCount);
+ }
+
+ private static MqttClient ConnectTestClient(string clientId, MqttApplicationMessage willMessage, MqttServer server)
{
var adapterA = new TestMqttCommunicationAdapter();
var adapterB = new TestMqttCommunicationAdapter();
diff --git a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
index 9452b10d7..e47773177 100644
--- a/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
+++ b/Tests/MQTTnet.Core.Tests/MqttSubscriptionsManagerTests.cs
@@ -24,7 +24,7 @@ public void MqttSubscriptionsManager_SubscribeSingleSuccess()
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};
- Assert.IsTrue(sm.IsTopicSubscribed(pp));
+ Assert.IsTrue(sm.IsSubscribed(pp));
}
[TestMethod]
@@ -43,7 +43,7 @@ public void MqttSubscriptionsManager_SubscribeSingleNoSuccess()
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};
- Assert.IsFalse(sm.IsTopicSubscribed(pp));
+ Assert.IsFalse(sm.IsSubscribed(pp));
}
[TestMethod]
@@ -62,13 +62,13 @@ public void MqttSubscriptionsManager_SubscribeAndUnsubscribeSingle()
QualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce
};
- Assert.IsTrue(sm.IsTopicSubscribed(pp));
+ Assert.IsTrue(sm.IsSubscribed(pp));
var up = new MqttUnsubscribePacket();
up.TopicFilters.Add("A/B/C");
sm.Unsubscribe(up);
- Assert.IsFalse(sm.IsTopicSubscribed(pp));
+ Assert.IsFalse(sm.IsSubscribed(pp));
}
}
}
diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
index 1104a33ba..aa40c8b2b 100644
--- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
+++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs
@@ -8,7 +8,7 @@ namespace MQTTnet.TestApp.UniversalWindows
{
public sealed partial class MainPage
{
- private MqttClient _mqttClient;
+ private IMqttClient _mqttClient;
public MainPage()
{