Skip to content

Commit

Permalink
Add interfaces, optimize performance, fix QoS level 2
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Aug 5, 2017
1 parent 0b0239b commit 447ca6c
Show file tree
Hide file tree
Showing 27 changed files with 198 additions and 147 deletions.
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttClientFactory
{
public MqttClient CreateMqttClient(MqttClientOptions options)
public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttServerFactory
{
public MqttServer CreateMqttServer(MqttServerOptions options)
public IMqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttClientFactory
{
public MqttClient CreateMqttClient(MqttClientOptions options)
public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.NetStandard/MqttServerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttServerFactory
{
public MqttServer CreateMqttServer(MqttServerOptions options)
public IMqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttClientFactory
{
public MqttClient CreateMqttClient(MqttClientOptions options)
public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace MQTTnet
{
public class MqttServerFactory
{
public MqttServer CreateMqttServer(MqttServerOptions options)
public IMqttServer CreateMqttServer(MqttServerOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

Expand Down
24 changes: 24 additions & 0 deletions MQTTnet.Core/Client/IMqttClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Core.Packets;

namespace MQTTnet.Core.Client
{
public interface IMqttClient
{
bool IsConnected { get; }

event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
event EventHandler Connected;
event EventHandler Disconnected;

Task ConnectAsync(MqttApplicationMessage willApplicationMessage = null);
Task DisconnectAsync();
Task PublishAsync(MqttApplicationMessage applicationMessage);
Task<IList<MqttSubscribeResult>> SubscribeAsync(IList<TopicFilter> topicFilters);
Task<IList<MqttSubscribeResult>> SubscribeAsync(params TopicFilter[] topicFilters);
Task Unsubscribe(IList<string> topicFilters);
Task Unsubscribe(params string[] topicFilters);
}
}
47 changes: 21 additions & 26 deletions MQTTnet.Core/Client/MqttClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -13,10 +12,9 @@

namespace MQTTnet.Core.Client
{
public class MqttClient
public class MqttClient : IMqttClient
{
private readonly ConcurrentDictionary<ushort, MqttPublishPacket> _pendingExactlyOncePublishPackets = new ConcurrentDictionary<ushort, MqttPublishPacket>();
private readonly HashSet<ushort> _processedPublishPackets = new HashSet<ushort>();
private readonly HashSet<ushort> _unacknowledgedPublishPackets = new HashSet<ushort>();

private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher();
private readonly MqttClientOptions _options;
Expand Down Expand Up @@ -63,7 +61,6 @@ public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = n

_cancellationTokenSource = new CancellationTokenSource();
_latestPacketIdentifier = 0;
_processedPublishPackets.Clear();
_packetDispatcher.Reset();
IsConnected = true;

Expand Down Expand Up @@ -105,6 +102,7 @@ public async Task<IList<MqttSubscribeResult>> SubscribeAsync(IList<TopicFilter>
{
if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters));
if (!topicFilters.Any()) throw new MqttProtocolViolationException("At least one topic filter must be set [MQTT-3.8.3-3].");

ThrowIfNotConnected();

var subscribePacket = new MqttSubscribePacket
Expand Down Expand Up @@ -154,6 +152,7 @@ public async Task PublishAsync(MqttApplicationMessage applicationMessage)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
// No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier]
await SendAsync(publishPacket);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
Expand All @@ -164,8 +163,8 @@ public async Task PublishAsync(MqttApplicationMessage applicationMessage)
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
publishPacket.PacketIdentifier = GetNewPacketIdentifier();
await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket);
await SendAsync(publishPacket.CreateResponse<MqttPubCompPacket>());
var pubRecPacket = await SendAndReceiveAsync<MqttPubRecPacket>(publishPacket);
await SendAndReceiveAsync<MqttPubCompPacket>(pubRecPacket.CreateResponse<MqttPubRelPacket>());
}
}

Expand Down Expand Up @@ -208,14 +207,12 @@ private Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)
return DisconnectAsync();
}

var publishPacket = mqttPacket as MqttPublishPacket;
if (publishPacket != null)
if (mqttPacket is MqttPublishPacket publishPacket)
{
return ProcessReceivedPublishPacket(publishPacket);
}

var pubRelPacket = mqttPacket as MqttPubRelPacket;
if (pubRelPacket != null)
if (mqttPacket is MqttPubRelPacket pubRelPacket)
{
return ProcessReceivedPubRelPacket(pubRelPacket);
}
Expand All @@ -232,11 +229,6 @@ private Task ProcessReceivedPacketAsync(MqttBasePacket mqttPacket)

private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket)
{
if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce)
{
_processedPublishPackets.Add(publishPacket.PacketIdentifier);
}

var applicationMessage = publishPacket.ToApplicationMessage();

try
Expand Down Expand Up @@ -265,7 +257,13 @@ private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
_pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket;
// QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
lock (_unacknowledgedPublishPackets)
{
_unacknowledgedPublishPackets.Add(publishPacket.PacketIdentifier);
}

FireApplicationMessageReceivedEvent(publishPacket);
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}

Expand All @@ -274,15 +272,12 @@ private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)

private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
{
MqttPublishPacket originalPublishPacket;
if (!_pendingExactlyOncePublishPackets.TryRemove(pubRelPacket.PacketIdentifier, out originalPublishPacket))
lock (_unacknowledgedPublishPackets)
{
throw new MqttCommunicationException();
_unacknowledgedPublishPackets.Remove(pubRelPacket.PacketIdentifier);
}

await SendAsync(originalPublishPacket.CreateResponse<MqttPubCompPacket>());

FireApplicationMessageReceivedEvent(originalPublishPacket);

await SendAsync(pubRelPacket.CreateResponse<MqttPubCompPacket>());
}

private Task SendAsync(MqttBasePacket packet)
Expand All @@ -300,8 +295,8 @@ bool ResponsePacketSelector(MqttBasePacket p)
return false;
}

var pi1 = requestPacket as IPacketWithIdentifier;
var pi2 = p as IPacketWithIdentifier;
var pi1 = requestPacket as IMqttPacketWithIdentifier;
var pi2 = p as IMqttPacketWithIdentifier;

if (pi1 != null && pi2 != null)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
public interface IPacketWithIdentifier
public interface IMqttPacketWithIdentifier
{
ushort PacketIdentifier { get; set; }
}
Expand Down
21 changes: 1 addition & 20 deletions MQTTnet.Core/Packets/MqttBasePacket.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,6 @@
using System;

namespace MQTTnet.Core.Packets
namespace MQTTnet.Core.Packets
{
public abstract class MqttBasePacket
{
public TResponsePacket CreateResponse<TResponsePacket>()
{
var responsePacket = Activator.CreateInstance<TResponsePacket>();
var responsePacketWithIdentifier = responsePacket as IPacketWithIdentifier;
if (responsePacketWithIdentifier != null)
{
var requestPacketWithIdentifier = this as IPacketWithIdentifier;
if (requestPacketWithIdentifier == null)
{
throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not.");
}

responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier;
}

return responsePacket;
}
}
}
2 changes: 1 addition & 1 deletion MQTTnet.Core/Packets/MqttBasePublishPacket.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
public class MqttBasePublishPacket : MqttBasePacket, IPacketWithIdentifier
public class MqttBasePublishPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
}
Expand Down
27 changes: 27 additions & 0 deletions MQTTnet.Core/Packets/MqttPacketExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;

namespace MQTTnet.Core.Packets
{
public static class MqttPacketExtensions
{
public static TResponsePacket CreateResponse<TResponsePacket>(this MqttBasePacket packet)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));

var responsePacket = Activator.CreateInstance<TResponsePacket>();

if (responsePacket is IMqttPacketWithIdentifier responsePacketWithIdentifier)
{
var requestPacketWithIdentifier = packet as IMqttPacketWithIdentifier;
if (requestPacketWithIdentifier == null)
{
throw new InvalidOperationException("Response packet has PacketIdentifier but request packet does not.");
}

responsePacketWithIdentifier.PacketIdentifier = requestPacketWithIdentifier.PacketIdentifier;
}

return responsePacket;
}
}
}
2 changes: 1 addition & 1 deletion MQTTnet.Core/Packets/MqttSubAckPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace MQTTnet.Core.Packets
{
public sealed class MqttSubAckPacket : MqttBasePacket, IPacketWithIdentifier
public sealed class MqttSubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

Expand Down
2 changes: 1 addition & 1 deletion MQTTnet.Core/Packets/MqttSubscribePacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace MQTTnet.Core.Packets
{
public sealed class MqttSubscribePacket : MqttBasePacket, IPacketWithIdentifier
public sealed class MqttSubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

Expand Down
2 changes: 1 addition & 1 deletion MQTTnet.Core/Packets/MqttUnsubAckPacket.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MQTTnet.Core.Packets
{
public sealed class MqttUnsubAckPacket : MqttBasePacket, IPacketWithIdentifier
public sealed class MqttUnsubAckPacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }
}
Expand Down
2 changes: 1 addition & 1 deletion MQTTnet.Core/Packets/MqttUnsubscribe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace MQTTnet.Core.Packets
{
public sealed class MqttUnsubscribePacket : MqttBasePacket, IPacketWithIdentifier
public sealed class MqttUnsubscribePacket : MqttBasePacket, IMqttPacketWithIdentifier
{
public ushort PacketIdentifier { get; set; }

Expand Down
18 changes: 18 additions & 0 deletions MQTTnet.Core/Server/IMqttServer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using MQTTnet.Core.Adapter;

namespace MQTTnet.Core.Server
{
public interface IMqttServer
{
event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
event EventHandler<MqttClientConnectedEventArgs> ClientConnected;

IList<string> GetConnectedClients();
void InjectClient(string identifier, IMqttCommunicationAdapter adapter);
void Publish(MqttApplicationMessage applicationMessage);
void Start();
void Stop();
}
}
7 changes: 3 additions & 4 deletions MQTTnet.Core/Server/MqttClientMessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void Start(IMqttCommunicationAdapter adapter)
_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource();

Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token));
Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
}

public void Stop()
Expand All @@ -45,14 +45,13 @@ public void Stop()
_cancellationTokenSource = null;
}

public void Enqueue(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
public void Enqueue(MqttPublishPacket publishPacket)
{
if (senderClientSession == null) throw new ArgumentNullException(nameof(senderClientSession));
if (publishPacket == null) throw new ArgumentNullException(nameof(publishPacket));

lock (_pendingPublishPackets)
{
_pendingPublishPackets.Add(new MqttClientPublishPacketContext(senderClientSession, publishPacket));
_pendingPublishPackets.Add(new MqttClientPublishPacketContext(publishPacket));
_gate.Set();
}
}
Expand Down
5 changes: 1 addition & 4 deletions MQTTnet.Core/Server/MqttClientPublishPacketContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ namespace MQTTnet.Core.Server
{
public sealed class MqttClientPublishPacketContext
{
public MqttClientPublishPacketContext(MqttClientSession senderClientSession, MqttPublishPacket publishPacket)
public MqttClientPublishPacketContext(MqttPublishPacket publishPacket)
{
SenderClientSession = senderClientSession ?? throw new ArgumentNullException(nameof(senderClientSession));
PublishPacket = publishPacket ?? throw new ArgumentNullException(nameof(publishPacket));
}

public MqttClientSession SenderClientSession { get; }

public MqttPublishPacket PublishPacket { get; }

public int SendTries { get; set; }
Expand Down
Loading

0 comments on commit 447ca6c

Please sign in to comment.